1 /**
2 * Copyright 2008 Atlassian Pty Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package com.atlassian.util.concurrent;
18
19 import static com.atlassian.util.concurrent.Assertions.notNull;
20 import net.jcip.annotations.ThreadSafe;
21
22 import java.util.concurrent.BlockingQueue;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.TimeoutException;
25 import java.util.concurrent.atomic.AtomicInteger;
26 import java.util.concurrent.atomic.AtomicReference;
27 import java.util.concurrent.locks.Condition;
28
29 /**
30 * A Reference with queue semantics where the current reference may be retrieved
31 * or taken instead, and if there is no current element then it will be block
32 * until the reference becomes available. This is somewhat analogous to a single
33 * element {@link BlockingQueue}.
34 * <p>
35 * Note: this class does not support null elements being {@link #set(Object)}
36 * and will throw an exception. If the internal reference is null, then calls to
37 * {@link #take()} or {@link #take(long, TimeUnit)} will block.
38 * <p>
39 * This class is most suited to {@link #newSRSW() SRSW} or {@link #newMRSW()
40 * MRSW} usage. Multiple writers will overwrite each other's elements and the
41 * chosen value will be arbitrary in the absence of any external consensus. If
42 * multiple readers are waiting to {@link #take()} a value, one reader will be
43 * arbitrarily chosen (similar to {@link Condition#signal()}). Multiple readers
44 * can however {@link #get()} the current value if it is not null, but they may
45 * see the current value more than once. If multiple readers attempt to
46 * {@link #get()} a value from the SRSW reference and it is not yet present then
47 * only one waiting thread may be notified, please use the MRSW version for this
48 * case.
49 * <p>
50 * This implementation has been optimized for SRSW performance with
51 * {@link #set(Object)}/{@link #take()} pairs.
52 * <p>
53 * Prometheus contains a similar construct called an <a href"http://prometheus.codehaus.org/javadoc/main/org/codehaus/prometheus/references/AwaitableReference.html"
54 * >AwaitableReference</a>. This class is more explicit in that it handles
55 * take/get separately.
56 *
57 * @param <V> the value type
58 * @author Jed Wesley-Smith
59 * @see BlockingQueue
60 */
61 @ThreadSafe
62 public class BlockingReference<V> {
63
64 //
65 // static factory methods
66 //
67
68 /**
69 * Create a BlockingReference best suited to single-reader/single-writer
70 * situations. In a MRSW case this instance may get missed signals if
71 * multiple reader threads are all waiting on the value.
72 */
73 public static <V> BlockingReference<V> newSRSW() {
74 return newSRSW(null);
75 }
76
77 /**
78 * Create a BlockingReference best suited to single-reader/single-writer
79 * situations. In a MRSW case this instance may get missed signals if
80 * multiple reader threads are all waiting on the value.
81 *
82 * @param initialValue the initial value
83 */
84 public static <V> BlockingReference<V> newSRSW(final V initialValue) {
85 return new BlockingReference<V>(new BooleanLatch(), initialValue);
86 }
87
88 /**
89 * Create a BlockingReference best suited to multi-reader/single-writer
90 * situations. In a SRSW case this instance may not perform quite as well.
91 */
92 public static <V> BlockingReference<V> newMRSW() {
93 return newMRSW(null);
94 }
95
96 /**
97 * Create a BlockingReference best suited to multi-reader/single-writer
98 * situations. In a SRSW case this instance may not perform quite as well.
99 *
100 * @param initialValue the initial value
101 */
102 public static <V> BlockingReference<V> newMRSW(final V initialValue) {
103 return new BlockingReference<V>(new PhasedLatch() {
104 /*
105 * Workaround for the fact that take() always calls await. Calling
106 * await() on a phased latch by default waits on the <i>next</i>
107 * phase (after the current one). We need to make sure we await on
108 * the previous phase instead so we remember the previous phase.
109 */
110 private final AtomicInteger currentPhase = new AtomicInteger(super.getPhase());
111
112 @Override
113 public synchronized int getPhase() {
114 try {
115 return currentPhase.get();
116 } finally {
117 currentPhase.set(super.getPhase());
118 }
119 }
120 }, initialValue);
121 }
122
123 //
124 // instance vars
125 //
126
127 private final AtomicReference<V> ref = new AtomicReference<V>();
128 private final ReusableLatch latch;
129
130 //
131 // ctors
132 //
133
134 BlockingReference(final ReusableLatch latch, final V initialValue) {
135 this.latch = latch;
136 internalSet(initialValue);
137 }
138
139 /**
140 * Creates a new SRSW BlockingReference.
141 *
142 * @deprecated use {@link #newSRSW()} instead.
143 */
144 @Deprecated
145 public BlockingReference() {
146 this(new BooleanLatch(), null);
147 }
148
149 /**
150 * Creates a new SRSW BlockingReference.
151 *
152 * @deprecated use {@link #newSRSW()} instead.
153 */
154 @Deprecated
155 public BlockingReference(@NotNull final V value) {
156 this(new BooleanLatch(), value);
157 }
158
159 //
160 // methods
161 //
162
163 /**
164 * Takes the current element if it is not null and replaces it with null. If
165 * the current element is null then wait until it becomes non-null.
166 * <p>
167 * If the current thread:
168 * <ul>
169 * <li>has its interrupted status set on entry to this method; or
170 * <li>is {@link Thread#interrupt() interrupted} while waiting,
171 * </ul>
172 * then {@link InterruptedException} is thrown and the current thread's
173 * interrupted status is cleared.
174 *
175 * @return the current element
176 * @throws InterruptedException if the current thread is interrupted while
177 * waiting
178 */
179 public @NotNull
180 V take() throws InterruptedException {
181 V result = null;
182 while (result == null) {
183 latch.await();
184 result = ref.getAndSet(null);
185 }
186 return result;
187 }
188
189 /**
190 * Takes the current element if it is not null and replaces it with null. If
191 * the current element is null then wait until it becomes non-null. The
192 * method will throw a {@link TimeoutException} if the timeout is reached
193 * before an element becomes available.
194 * <p>
195 * If the current thread:
196 * <ul>
197 * <li>has its interrupted status set on entry to this method; or
198 * <li>is {@link Thread#interrupt() interrupted} while waiting,
199 * </ul>
200 * then {@link InterruptedException} is thrown and the current thread's
201 * interrupted status is cleared.
202 *
203 * @param timeout the maximum time to wait
204 * @param unit the time unit of the {@code timeout} argument
205 * @return the current element
206 * @throws InterruptedException if the current thread is interrupted while
207 * waiting
208 * @throws TimeoutException if the timeout is reached without another thread
209 * having called {@link #set(Object)}.
210 */
211 public @NotNull
212 V take(final long time, final TimeUnit unit) throws TimeoutException, InterruptedException {
213 final Timeout timeout = Timeout.getNanosTimeout(time, unit);
214 V result = null;
215 while (result == null) {
216 timeout.await(latch);
217 result = ref.getAndSet(null);
218 }
219 return result;
220 }
221
222 /**
223 * Gets the current element if it is not null, if it is null then this
224 * method blocks and waits until it is not null. Unlike {@link #take()} it
225 * does not reset the current element to null.
226 * <p>
227 * If the current thread:
228 * <ul>
229 * <li>has its interrupted status set on entry to this method; or
230 * <li>is {@link Thread#interrupt() interrupted} while waiting,
231 * </ul>
232 * then {@link InterruptedException} is thrown and the current thread's
233 * interrupted status is cleared.
234 *
235 * @return the current element
236 * @throws InterruptedException if the current thread is interrupted while
237 * waiting
238 */
239 public @NotNull
240 V get() throws InterruptedException {
241 V result = ref.get();
242 while (result == null) {
243 latch.await();
244 result = ref.get();
245 }
246 return result;
247 }
248
249 /**
250 * Gets the current element if it is not null, if it is null then this
251 * method blocks and waits until it is not null. Unlike {@link #take()} it
252 * does not reset the current element to null.
253 * <p>
254 * If the current thread:
255 * <ul>
256 * <li>has its interrupted status set on entry to this method; or
257 * <li>is {@link Thread#interrupt() interrupted} while waiting,
258 * </ul>
259 * then {@link InterruptedException} is thrown and the current thread's
260 * interrupted status is cleared.
261 *
262 * @return the current element
263 * @throws TimeoutException if the timeout is reached without another thread
264 * having called {@link #set(Object)}.
265 * @throws InterruptedException if the current thread is interrupted while
266 * waiting
267 */
268 public @NotNull
269 V get(final long time, @NotNull final TimeUnit unit) throws TimeoutException, InterruptedException {
270 final Timeout timeout = Timeout.getNanosTimeout(time, unit);
271 V result = ref.get();
272 while (result == null) {
273 timeout.await(latch);
274 result = ref.get();
275 }
276 return result;
277 }
278
279 /**
280 * Set the value of this reference. This method is lock-free. A thread
281 * waiting in {@link #take()} or {@link #take(long, TimeUnit)} will be
282 * released and given this value.
283 *
284 * @param value the new value.
285 */
286 public void set(@NotNull final V value) {
287 notNull("value", value);
288 internalSet(value);
289 }
290
291 /**
292 * Whether or not the current value is null or not. If this is true and the
293 * next call to {@link #take()} or {@link #take(long, TimeUnit)} will not
294 * block.
295 *
296 * @return true if the current reference is null.
297 */
298 public boolean isEmpty() {
299 return peek() == null;
300 }
301
302 /**
303 * Return the current value whether is null or not. If this is true and the
304 * next call to {@link #take()} or {@link #take(long, TimeUnit)} will not
305 * block.
306 *
307 * @return the current reference or null if there is none.
308 */
309 public @Nullable
310 V peek() {
311 return ref.get();
312 }
313
314 /**
315 * Clear the current reference.
316 */
317 public void clear() {
318 internalSet(null);
319 }
320
321 //
322 // private
323 //
324
325 /**
326 * Set the value
327 *
328 * @param value maybe null
329 */
330 private void internalSet(@Nullable final V value) {
331 ref.set(value);
332 latch.release();
333 }
334 }