View Javadoc

1   /**
2    * Copyright 2008 Atlassian Pty Ltd 
3    * 
4    * Licensed under the Apache License, Version 2.0 (the "License"); 
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at 
7    * 
8    *     http://www.apache.org/licenses/LICENSE-2.0 
9    * 
10   * Unless required by applicable law or agreed to in writing, software 
11   * distributed under the License is distributed on an "AS IS" BASIS, 
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
13   * See the License for the specific language governing permissions and 
14   * limitations under the License.
15   */
16  
17  package com.atlassian.util.concurrent;
18  
19  import static com.atlassian.util.concurrent.Assertions.notNull;
20  
21  import java.util.concurrent.BlockingQueue;
22  import java.util.concurrent.TimeUnit;
23  import java.util.concurrent.TimeoutException;
24  import java.util.concurrent.atomic.AtomicInteger;
25  import java.util.concurrent.atomic.AtomicReference;
26  import java.util.concurrent.locks.Condition;
27  
28  import net.jcip.annotations.ThreadSafe;
29  
30  /**
31   * A Reference with queue semantics where the current reference may be retrieved
32   * or taken instead, and if there is no current element then it will be block
33   * until the reference becomes available. This is somewhat analogous to a single
34   * element {@link BlockingQueue}.
35   * <p>
36   * Note: this class does not support null elements being {@link #set(Object)}
37   * and will throw an exception. If the internal reference is null, then calls to
38   * {@link #take()} or {@link #take(long, TimeUnit)} will block.
39   * <p>
40   * This class is most suited to {@link #newSRSW() SRSW} or {@link #newMRSW()
41   * MRSW} usage. Multiple writers will overwrite each other's elements and the
42   * chosen value will be arbitrary in the absence of any external consensus. If
43   * multiple readers are waiting to {@link #take()} a value, one reader will be
44   * arbitrarily chosen (similar to {@link Condition#signal()}). Multiple readers
45   * can however {@link #get()} the current value if it is not null, but they may
46   * see the current value more than once. If multiple readers attempt to
47   * {@link #get()} a value from the SRSW reference and it is not yet present then
48   * only one waiting thread may be notified, please use the MRSW version for this
49   * case.
50   * <p>
51   * This implementation has been optimized for SRSW performance with
52   * {@link #set(Object)}/{@link #take()} pairs.
53   * <p>
54   * Prometheus contains a similar construct called an <a href"http://prometheus.codehaus.org/javadoc/main/org/codehaus/prometheus/references/AwaitableReference.html"
55   * >AwaitableReference</a>. This class is more explicit in that it handles
56   * take/get separately.
57   * 
58   * @param <V> the value type
59   * @author Jed Wesley-Smith
60   * @see BlockingQueue
61   */
62  @ThreadSafe
63  public class BlockingReference<V> {
64  
65      //
66      // static factory methods
67      //
68  
69      /**
70       * Create a BlockingReference best suited to single-reader/single-writer
71       * situations. In a MRSW case this instance may get missed signals if
72       * multiple reader threads are all waiting on the value.
73       */
74      public static <V> BlockingReference<V> newSRSW() {
75          return newSRSW(null);
76      }
77  
78      /**
79       * Create a BlockingReference best suited to single-reader/single-writer
80       * situations. In a MRSW case this instance may get missed signals if
81       * multiple reader threads are all waiting on the value.
82       * 
83       * @param initialValue the initial value
84       */
85      public static <V> BlockingReference<V> newSRSW(final V initialValue) {
86          return new BlockingReference<V>(new BooleanLatch(), initialValue);
87      }
88  
89      /**
90       * Create a BlockingReference best suited to multi-reader/single-writer
91       * situations. In a SRSW case this instance may not perform quite as well.
92       */
93      public static <V> BlockingReference<V> newMRSW() {
94          return newMRSW(null);
95      }
96  
97      /**
98       * Create a BlockingReference best suited to multi-reader/single-writer
99       * situations. In a SRSW case this instance may not perform quite as well.
100      * 
101      * @param initialValue the initial value
102      */
103     public static <V> BlockingReference<V> newMRSW(final V initialValue) {
104         return new BlockingReference<V>(new PhasedLatch() {
105             /*
106              * Workaround for the fact that take() always calls await. Calling
107              * await() on a phased latch by default waits on the <i>next</i>
108              * phase (after the current one). We need to make sure we await on
109              * the previous phase instead so we remember the previous phase.
110              */
111             private final AtomicInteger currentPhase = new AtomicInteger(super.getPhase());
112 
113             @Override
114             public synchronized int getPhase() {
115                 try {
116                     return currentPhase.get();
117                 } finally {
118                     currentPhase.set(super.getPhase());
119                 }
120             }
121         }, initialValue);
122     }
123 
124     //
125     // instance vars
126     //
127 
128     private final AtomicReference<V> ref = new AtomicReference<V>();
129     private final ReusableLatch latch;
130 
131     //
132     // ctors
133     //
134 
135     BlockingReference(final ReusableLatch latch, final V initialValue) {
136         this.latch = latch;
137         internalSet(initialValue);
138     }
139 
140     // /CLOVER:OFF
141     /**
142      * Creates a new SRSW BlockingReference.
143      * 
144      * @deprecated use {@link #newSRSW()} instead.
145      */
146     @Deprecated
147     public BlockingReference() {
148         this(new BooleanLatch(), null);
149     }
150 
151     /**
152      * Creates a new SRSW BlockingReference.
153      * 
154      * @deprecated use {@link #newSRSW()} instead.
155      */
156     @Deprecated
157     public BlockingReference(@NotNull final V value) {
158         this(new BooleanLatch(), value);
159     }
160 
161     // /CLOVER:ON
162 
163     //
164     // methods
165     //
166 
167     /**
168      * Takes the current element if it is not null and replaces it with null. If
169      * the current element is null then wait until it becomes non-null.
170      * <p>
171      * If the current thread:
172      * <ul>
173      * <li>has its interrupted status set on entry to this method; or
174      * <li>is {@link Thread#interrupt() interrupted} while waiting,
175      * </ul>
176      * then {@link InterruptedException} is thrown and the current thread's
177      * interrupted status is cleared.
178      * 
179      * @return the current element
180      * @throws InterruptedException if the current thread is interrupted while
181      * waiting
182      */
183     public @NotNull
184     V take() throws InterruptedException {
185         V result = null;
186         while (result == null) {
187             latch.await();
188             result = ref.getAndSet(null);
189         }
190         return result;
191     }
192 
193     /**
194      * Takes the current element if it is not null and replaces it with null. If
195      * the current element is null then wait until it becomes non-null. The
196      * method will throw a {@link TimeoutException} if the timeout is reached
197      * before an element becomes available.
198      * <p>
199      * If the current thread:
200      * <ul>
201      * <li>has its interrupted status set on entry to this method; or
202      * <li>is {@link Thread#interrupt() interrupted} while waiting,
203      * </ul>
204      * then {@link InterruptedException} is thrown and the current thread's
205      * interrupted status is cleared.
206      * 
207      * @param time the maximum time to wait
208      * @param unit the time unit of the {@code timeout} argument
209      * @return the current element
210      * @throws InterruptedException if the current thread is interrupted while
211      * waiting
212      * @throws TimeoutException if the timeout is reached without another thread
213      * having called {@link #set(Object)}.
214      */
215     public @NotNull
216     V take(final long time, final TimeUnit unit) throws TimeoutException, InterruptedException {
217         final Timeout timeout = Timeout.getNanosTimeout(time, unit);
218         V result = null;
219         while (result == null) {
220             timeout.await(latch);
221             result = ref.getAndSet(null);
222         }
223         return result;
224     }
225 
226     /**
227      * Gets the current element if it is not null, if it is null then this
228      * method blocks and waits until it is not null. Unlike {@link #take()} it
229      * does not reset the current element to null.
230      * <p>
231      * If the current thread:
232      * <ul>
233      * <li>has its interrupted status set on entry to this method; or
234      * <li>is {@link Thread#interrupt() interrupted} while waiting,
235      * </ul>
236      * then {@link InterruptedException} is thrown and the current thread's
237      * interrupted status is cleared.
238      * 
239      * @return the current element
240      * @throws InterruptedException if the current thread is interrupted while
241      * waiting
242      */
243     public @NotNull
244     V get() throws InterruptedException {
245         V result = ref.get();
246         while (result == null) {
247             latch.await();
248             result = ref.get();
249         }
250         return result;
251     }
252 
253     /**
254      * Gets the current element if it is not null, if it is null then this
255      * method blocks and waits until it is not null. Unlike {@link #take()} it
256      * does not reset the current element to null.
257      * <p>
258      * If the current thread:
259      * <ul>
260      * <li>has its interrupted status set on entry to this method; or
261      * <li>is {@link Thread#interrupt() interrupted} while waiting,
262      * </ul>
263      * then {@link InterruptedException} is thrown and the current thread's
264      * interrupted status is cleared.
265      * 
266      * @return the current element
267      * @throws TimeoutException if the timeout is reached without another thread
268      * having called {@link #set(Object)}.
269      * @throws InterruptedException if the current thread is interrupted while
270      * waiting
271      */
272     public @NotNull
273     V get(final long time, @NotNull final TimeUnit unit) throws TimeoutException, InterruptedException {
274         final Timeout timeout = Timeout.getNanosTimeout(time, unit);
275         V result = ref.get();
276         while (result == null) {
277             timeout.await(latch);
278             result = ref.get();
279         }
280         return result;
281     }
282 
283     /**
284      * Set the value of this reference. This method is lock-free. A thread
285      * waiting in {@link #take()} or {@link #take(long, TimeUnit)} will be
286      * released and given this value.
287      * 
288      * @param value the new value.
289      */
290     public void set(@NotNull final V value) {
291         notNull("value", value);
292         internalSet(value);
293     }
294 
295     /**
296      * Whether or not the current value is null or not. If this is true and the
297      * next call to {@link #take()} or {@link #take(long, TimeUnit)} will not
298      * block.
299      * 
300      * @return true if the current reference is null.
301      */
302     public boolean isEmpty() {
303         return peek() == null;
304     }
305 
306     /**
307      * Return the current value whether is null or not. If this is true and the
308      * next call to {@link #take()} or {@link #take(long, TimeUnit)} will not
309      * block.
310      * 
311      * @return the current reference or null if there is none.
312      */
313     public @Nullable
314     V peek() {
315         return ref.get();
316     }
317 
318     /**
319      * Clear the current reference.
320      */
321     public void clear() {
322         internalSet(null);
323     }
324 
325     //
326     // private
327     //
328 
329     /**
330      * Set the value
331      * 
332      * @param value maybe null
333      */
334     private void internalSet(@Nullable final V value) {
335         ref.set(value);
336         latch.release();
337     }
338 }