View Javadoc

1   /**
2    * Copyright 2008 Atlassian Pty Ltd 
3    * 
4    * Licensed under the Apache License, Version 2.0 (the "License"); 
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at 
7    * 
8    *     http://www.apache.org/licenses/LICENSE-2.0 
9    * 
10   * Unless required by applicable law or agreed to in writing, software 
11   * distributed under the License is distributed on an "AS IS" BASIS, 
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
13   * See the License for the specific language governing permissions and 
14   * limitations under the License.
15   */
16  
17  package com.atlassian.util.concurrent;
18  
19  import static com.atlassian.util.concurrent.Assertions.notNull;
20  import net.jcip.annotations.ThreadSafe;
21  
22  import java.util.concurrent.BlockingQueue;
23  import java.util.concurrent.TimeUnit;
24  import java.util.concurrent.TimeoutException;
25  import java.util.concurrent.atomic.AtomicInteger;
26  import java.util.concurrent.atomic.AtomicReference;
27  import java.util.concurrent.locks.Condition;
28  
29  /**
30   * A Reference with queue semantics where the current reference may be retrieved
31   * or taken instead, and if there is no current element then it will be block
32   * until the reference becomes available. This is somewhat analogous to a single
33   * element {@link BlockingQueue}.
34   * <p>
35   * Note: this class does not support null elements being {@link #set(Object)}
36   * and will throw an exception. If the internal reference is null, then calls to
37   * {@link #take()} or {@link #take(long, TimeUnit)} will block.
38   * <p>
39   * This class is most suited to {@link #newSRSW() SRSW} or {@link #newMRSW()
40   * MRSW} usage. Multiple writers will overwrite each other's elements and the
41   * chosen value will be arbitrary in the absence of any external consensus. If
42   * multiple readers are waiting to {@link #take()} a value, one reader will be
43   * arbitrarily chosen (similar to {@link Condition#signal()}). Multiple readers
44   * can however {@link #get()} the current value if it is not null, but they may
45   * see the current value more than once. If multiple readers attempt to
46   * {@link #get()} a value from the SRSW reference and it is not yet present then
47   * only one waiting thread may be notified, please use the MRSW version for this
48   * case.
49   * <p>
50   * This implementation has been optimized for SRSW performance with
51   * {@link #set(Object)}/{@link #take()} pairs.
52   * <p>
53   * Prometheus contains a similar construct called an <a href"http://prometheus.codehaus.org/javadoc/main/org/codehaus/prometheus/references/AwaitableReference.html"
54   * >AwaitableReference</a>. This class is more explicit in that it handles
55   * take/get separately.
56   * 
57   * @param <V> the value type
58   * @author Jed Wesley-Smith
59   * @see BlockingQueue
60   */
61  @ThreadSafe
62  public class BlockingReference<V> {
63  
64      //
65      // static factory methods
66      //
67  
68      /**
69       * Create a BlockingReference best suited to single-reader/single-writer
70       * situations. In a MRSW case this instance may get missed signals if
71       * multiple reader threads are all waiting on the value.
72       */
73      public static <V> BlockingReference<V> newSRSW() {
74          return newSRSW(null);
75      }
76  
77      /**
78       * Create a BlockingReference best suited to single-reader/single-writer
79       * situations. In a MRSW case this instance may get missed signals if
80       * multiple reader threads are all waiting on the value.
81       * 
82       * @param initialValue the initial value
83       */
84      public static <V> BlockingReference<V> newSRSW(final V initialValue) {
85          return new BlockingReference<V>(new BooleanLatch(), initialValue);
86      }
87  
88      /**
89       * Create a BlockingReference best suited to multi-reader/single-writer
90       * situations. In a SRSW case this instance may not perform quite as well.
91       */
92      public static <V> BlockingReference<V> newMRSW() {
93          return newMRSW(null);
94      }
95  
96      /**
97       * Create a BlockingReference best suited to multi-reader/single-writer
98       * situations. In a SRSW case this instance may not perform quite as well.
99       * 
100      * @param initialValue the initial value
101      */
102     public static <V> BlockingReference<V> newMRSW(final V initialValue) {
103         return new BlockingReference<V>(new PhasedLatch() {
104             /*
105              * Workaround for the fact that take() always calls await. Calling
106              * await() on a phased latch by default waits on the <i>next</i>
107              * phase (after the current one). We need to make sure we await on
108              * the previous phase instead so we remember the previous phase.
109              */
110             private final AtomicInteger currentPhase = new AtomicInteger(super.getPhase());
111 
112             @Override
113             public synchronized int getPhase() {
114                 try {
115                     return currentPhase.get();
116                 } finally {
117                     currentPhase.set(super.getPhase());
118                 }
119             }
120         }, initialValue);
121     }
122 
123     //
124     // instance vars
125     //
126 
127     private final AtomicReference<V> ref = new AtomicReference<V>();
128     private final ReusableLatch latch;
129 
130     //
131     // ctors
132     //
133 
134     BlockingReference(final ReusableLatch latch, final V initialValue) {
135         this.latch = latch;
136         internalSet(initialValue);
137     }
138 
139     /**
140      * Creates a new SRSW BlockingReference.
141      * 
142      * @deprecated use {@link #newSRSW()} instead.
143      */
144     @Deprecated
145     public BlockingReference() {
146         this(new BooleanLatch(), null);
147     }
148 
149     /**
150      * Creates a new SRSW BlockingReference.
151      * 
152      * @deprecated use {@link #newSRSW()} instead.
153      */
154     @Deprecated
155     public BlockingReference(@NotNull final V value) {
156         this(new BooleanLatch(), value);
157     }
158 
159     //
160     // methods
161     //
162 
163     /**
164      * Takes the current element if it is not null and replaces it with null. If
165      * the current element is null then wait until it becomes non-null.
166      * <p>
167      * If the current thread:
168      * <ul>
169      * <li>has its interrupted status set on entry to this method; or
170      * <li>is {@link Thread#interrupt() interrupted} while waiting,
171      * </ul>
172      * then {@link InterruptedException} is thrown and the current thread's
173      * interrupted status is cleared.
174      * 
175      * @return the current element
176      * @throws InterruptedException if the current thread is interrupted while
177      * waiting
178      */
179     public @NotNull
180     V take() throws InterruptedException {
181         V result = null;
182         while (result == null) {
183             latch.await();
184             result = ref.getAndSet(null);
185         }
186         return result;
187     }
188 
189     /**
190      * Takes the current element if it is not null and replaces it with null. If
191      * the current element is null then wait until it becomes non-null. The
192      * method will throw a {@link TimeoutException} if the timeout is reached
193      * before an element becomes available.
194      * <p>
195      * If the current thread:
196      * <ul>
197      * <li>has its interrupted status set on entry to this method; or
198      * <li>is {@link Thread#interrupt() interrupted} while waiting,
199      * </ul>
200      * then {@link InterruptedException} is thrown and the current thread's
201      * interrupted status is cleared.
202      * 
203      * @param timeout the maximum time to wait
204      * @param unit the time unit of the {@code timeout} argument
205      * @return the current element
206      * @throws InterruptedException if the current thread is interrupted while
207      * waiting
208      * @throws TimeoutException if the timeout is reached without another thread
209      * having called {@link #set(Object)}.
210      */
211     public @NotNull
212     V take(final long time, final TimeUnit unit) throws TimeoutException, InterruptedException {
213         final Timeout timeout = Timeout.getNanosTimeout(time, unit);
214         V result = null;
215         while (result == null) {
216             timeout.await(latch);
217             result = ref.getAndSet(null);
218         }
219         return result;
220     }
221 
222     /**
223      * Gets the current element if it is not null, if it is null then this
224      * method blocks and waits until it is not null. Unlike {@link #take()} it
225      * does not reset the current element to null.
226      * <p>
227      * If the current thread:
228      * <ul>
229      * <li>has its interrupted status set on entry to this method; or
230      * <li>is {@link Thread#interrupt() interrupted} while waiting,
231      * </ul>
232      * then {@link InterruptedException} is thrown and the current thread's
233      * interrupted status is cleared.
234      * 
235      * @return the current element
236      * @throws InterruptedException if the current thread is interrupted while
237      * waiting
238      */
239     public @NotNull
240     V get() throws InterruptedException {
241         V result = ref.get();
242         while (result == null) {
243             latch.await();
244             result = ref.get();
245         }
246         return result;
247     }
248 
249     /**
250      * Gets the current element if it is not null, if it is null then this
251      * method blocks and waits until it is not null. Unlike {@link #take()} it
252      * does not reset the current element to null.
253      * <p>
254      * If the current thread:
255      * <ul>
256      * <li>has its interrupted status set on entry to this method; or
257      * <li>is {@link Thread#interrupt() interrupted} while waiting,
258      * </ul>
259      * then {@link InterruptedException} is thrown and the current thread's
260      * interrupted status is cleared.
261      * 
262      * @return the current element
263      * @throws TimeoutException if the timeout is reached without another thread
264      * having called {@link #set(Object)}.
265      * @throws InterruptedException if the current thread is interrupted while
266      * waiting
267      */
268     public @NotNull
269     V get(final long time, @NotNull final TimeUnit unit) throws TimeoutException, InterruptedException {
270         final Timeout timeout = Timeout.getNanosTimeout(time, unit);
271         V result = ref.get();
272         while (result == null) {
273             timeout.await(latch);
274             result = ref.get();
275         }
276         return result;
277     }
278 
279     /**
280      * Set the value of this reference. This method is lock-free. A thread
281      * waiting in {@link #take()} or {@link #take(long, TimeUnit)} will be
282      * released and given this value.
283      * 
284      * @param value the new value.
285      */
286     public void set(@NotNull final V value) {
287         notNull("value", value);
288         internalSet(value);
289     }
290 
291     /**
292      * Whether or not the current value is null or not. If this is true and the
293      * next call to {@link #take()} or {@link #take(long, TimeUnit)} will not
294      * block.
295      * 
296      * @return true if the current reference is null.
297      */
298     public boolean isEmpty() {
299         return peek() == null;
300     }
301 
302     /**
303      * Return the current value whether is null or not. If this is true and the
304      * next call to {@link #take()} or {@link #take(long, TimeUnit)} will not
305      * block.
306      * 
307      * @return the current reference or null if there is none.
308      */
309     public @Nullable
310     V peek() {
311         return ref.get();
312     }
313 
314     /**
315      * Clear the current reference.
316      */
317     public void clear() {
318         internalSet(null);
319     }
320 
321     //
322     // private
323     //
324 
325     /**
326      * Set the value
327      * 
328      * @param value maybe null
329      */
330     private void internalSet(@Nullable final V value) {
331         ref.set(value);
332         latch.release();
333     }
334 }