1 /**
2 * Copyright 2008 Atlassian Pty Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package com.atlassian.util.concurrent;
18
19 import static com.atlassian.util.concurrent.Assertions.notNull;
20
21 import java.util.concurrent.BlockingQueue;
22 import java.util.concurrent.TimeUnit;
23 import java.util.concurrent.TimeoutException;
24 import java.util.concurrent.atomic.AtomicInteger;
25 import java.util.concurrent.atomic.AtomicReference;
26 import java.util.concurrent.locks.Condition;
27
28 import net.jcip.annotations.ThreadSafe;
29
30 /**
31 * A Reference with queue semantics where the current reference may be retrieved
32 * or taken instead, and if there is no current element then it will be block
33 * until the reference becomes available. This is somewhat analogous to a single
34 * element {@link BlockingQueue}.
35 * <p>
36 * Note: this class does not support null elements being {@link #set(Object)}
37 * and will throw an exception. If the internal reference is null, then calls to
38 * {@link #take()} or {@link #take(long, TimeUnit)} will block.
39 * <p>
40 * This class is most suited to {@link #newSRSW() SRSW} or {@link #newMRSW()
41 * MRSW} usage. Multiple writers will overwrite each other's elements and the
42 * chosen value will be arbitrary in the absence of any external consensus. If
43 * multiple readers are waiting to {@link #take()} a value, one reader will be
44 * arbitrarily chosen (similar to {@link Condition#signal()}). Multiple readers
45 * can however {@link #get()} the current value if it is not null, but they may
46 * see the current value more than once. If multiple readers attempt to
47 * {@link #get()} a value from the SRSW reference and it is not yet present then
48 * only one waiting thread may be notified, please use the MRSW version for this
49 * case.
50 * <p>
51 * This implementation has been optimized for SRSW performance with
52 * {@link #set(Object)}/{@link #take()} pairs.
53 * <p>
54 * Prometheus contains a similar construct called an <a href"http://prometheus.codehaus.org/javadoc/main/org/codehaus/prometheus/references/AwaitableReference.html"
55 * >AwaitableReference</a>. This class is more explicit in that it handles
56 * take/get separately.
57 *
58 * @param <V> the value type
59 * @author Jed Wesley-Smith
60 * @see BlockingQueue
61 */
62 @ThreadSafe
63 public class BlockingReference<V> {
64
65 //
66 // static factory methods
67 //
68
69 /**
70 * Create a BlockingReference best suited to single-reader/single-writer
71 * situations. In a MRSW case this instance may get missed signals if
72 * multiple reader threads are all waiting on the value.
73 */
74 public static <V> BlockingReference<V> newSRSW() {
75 return newSRSW(null);
76 }
77
78 /**
79 * Create a BlockingReference best suited to single-reader/single-writer
80 * situations. In a MRSW case this instance may get missed signals if
81 * multiple reader threads are all waiting on the value.
82 *
83 * @param initialValue the initial value
84 */
85 public static <V> BlockingReference<V> newSRSW(final V initialValue) {
86 return new BlockingReference<V>(new BooleanLatch(), initialValue);
87 }
88
89 /**
90 * Create a BlockingReference best suited to multi-reader/single-writer
91 * situations. In a SRSW case this instance may not perform quite as well.
92 */
93 public static <V> BlockingReference<V> newMRSW() {
94 return newMRSW(null);
95 }
96
97 /**
98 * Create a BlockingReference best suited to multi-reader/single-writer
99 * situations. In a SRSW case this instance may not perform quite as well.
100 *
101 * @param initialValue the initial value
102 */
103 public static <V> BlockingReference<V> newMRSW(final V initialValue) {
104 return new BlockingReference<V>(new PhasedLatch() {
105 /*
106 * Workaround for the fact that take() always calls await. Calling
107 * await() on a phased latch by default waits on the <i>next</i>
108 * phase (after the current one). We need to make sure we await on
109 * the previous phase instead so we remember the previous phase.
110 */
111 private final AtomicInteger currentPhase = new AtomicInteger(super.getPhase());
112
113 @Override
114 public synchronized int getPhase() {
115 try {
116 return currentPhase.get();
117 } finally {
118 currentPhase.set(super.getPhase());
119 }
120 }
121 }, initialValue);
122 }
123
124 //
125 // instance vars
126 //
127
128 private final AtomicReference<V> ref = new AtomicReference<V>();
129 private final ReusableLatch latch;
130
131 //
132 // ctors
133 //
134
135 BlockingReference(final ReusableLatch latch, final V initialValue) {
136 this.latch = latch;
137 internalSet(initialValue);
138 }
139
140 // /CLOVER:OFF
141 /**
142 * Creates a new SRSW BlockingReference.
143 *
144 * @deprecated use {@link #newSRSW()} instead.
145 */
146 @Deprecated
147 public BlockingReference() {
148 this(new BooleanLatch(), null);
149 }
150
151 /**
152 * Creates a new SRSW BlockingReference.
153 *
154 * @deprecated use {@link #newSRSW()} instead.
155 */
156 @Deprecated
157 public BlockingReference(@NotNull final V value) {
158 this(new BooleanLatch(), value);
159 }
160
161 // /CLOVER:ON
162
163 //
164 // methods
165 //
166
167 /**
168 * Takes the current element if it is not null and replaces it with null. If
169 * the current element is null then wait until it becomes non-null.
170 * <p>
171 * If the current thread:
172 * <ul>
173 * <li>has its interrupted status set on entry to this method; or
174 * <li>is {@link Thread#interrupt() interrupted} while waiting,
175 * </ul>
176 * then {@link InterruptedException} is thrown and the current thread's
177 * interrupted status is cleared.
178 *
179 * @return the current element
180 * @throws InterruptedException if the current thread is interrupted while
181 * waiting
182 */
183 public @NotNull
184 V take() throws InterruptedException {
185 V result = null;
186 while (result == null) {
187 latch.await();
188 result = ref.getAndSet(null);
189 }
190 return result;
191 }
192
193 /**
194 * Takes the current element if it is not null and replaces it with null. If
195 * the current element is null then wait until it becomes non-null. The
196 * method will throw a {@link TimeoutException} if the timeout is reached
197 * before an element becomes available.
198 * <p>
199 * If the current thread:
200 * <ul>
201 * <li>has its interrupted status set on entry to this method; or
202 * <li>is {@link Thread#interrupt() interrupted} while waiting,
203 * </ul>
204 * then {@link InterruptedException} is thrown and the current thread's
205 * interrupted status is cleared.
206 *
207 * @param time the maximum time to wait
208 * @param unit the time unit of the {@code timeout} argument
209 * @return the current element
210 * @throws InterruptedException if the current thread is interrupted while
211 * waiting
212 * @throws TimeoutException if the timeout is reached without another thread
213 * having called {@link #set(Object)}.
214 */
215 public @NotNull
216 V take(final long time, final TimeUnit unit) throws TimeoutException, InterruptedException {
217 final Timeout timeout = Timeout.getNanosTimeout(time, unit);
218 V result = null;
219 while (result == null) {
220 timeout.await(latch);
221 result = ref.getAndSet(null);
222 }
223 return result;
224 }
225
226 /**
227 * Gets the current element if it is not null, if it is null then this
228 * method blocks and waits until it is not null. Unlike {@link #take()} it
229 * does not reset the current element to null.
230 * <p>
231 * If the current thread:
232 * <ul>
233 * <li>has its interrupted status set on entry to this method; or
234 * <li>is {@link Thread#interrupt() interrupted} while waiting,
235 * </ul>
236 * then {@link InterruptedException} is thrown and the current thread's
237 * interrupted status is cleared.
238 *
239 * @return the current element
240 * @throws InterruptedException if the current thread is interrupted while
241 * waiting
242 */
243 public @NotNull
244 V get() throws InterruptedException {
245 V result = ref.get();
246 while (result == null) {
247 latch.await();
248 result = ref.get();
249 }
250 return result;
251 }
252
253 /**
254 * Gets the current element if it is not null, if it is null then this
255 * method blocks and waits until it is not null. Unlike {@link #take()} it
256 * does not reset the current element to null.
257 * <p>
258 * If the current thread:
259 * <ul>
260 * <li>has its interrupted status set on entry to this method; or
261 * <li>is {@link Thread#interrupt() interrupted} while waiting,
262 * </ul>
263 * then {@link InterruptedException} is thrown and the current thread's
264 * interrupted status is cleared.
265 *
266 * @return the current element
267 * @throws TimeoutException if the timeout is reached without another thread
268 * having called {@link #set(Object)}.
269 * @throws InterruptedException if the current thread is interrupted while
270 * waiting
271 */
272 public @NotNull
273 V get(final long time, @NotNull final TimeUnit unit) throws TimeoutException, InterruptedException {
274 final Timeout timeout = Timeout.getNanosTimeout(time, unit);
275 V result = ref.get();
276 while (result == null) {
277 timeout.await(latch);
278 result = ref.get();
279 }
280 return result;
281 }
282
283 /**
284 * Set the value of this reference. This method is lock-free. A thread
285 * waiting in {@link #take()} or {@link #take(long, TimeUnit)} will be
286 * released and given this value.
287 *
288 * @param value the new value.
289 */
290 public void set(@NotNull final V value) {
291 notNull("value", value);
292 internalSet(value);
293 }
294
295 /**
296 * Whether or not the current value is null or not. If this is true and the
297 * next call to {@link #take()} or {@link #take(long, TimeUnit)} will not
298 * block.
299 *
300 * @return true if the current reference is null.
301 */
302 public boolean isEmpty() {
303 return peek() == null;
304 }
305
306 /**
307 * Return the current value whether is null or not. If this is true and the
308 * next call to {@link #take()} or {@link #take(long, TimeUnit)} will not
309 * block.
310 *
311 * @return the current reference or null if there is none.
312 */
313 public @Nullable
314 V peek() {
315 return ref.get();
316 }
317
318 /**
319 * Clear the current reference.
320 */
321 public void clear() {
322 internalSet(null);
323 }
324
325 //
326 // private
327 //
328
329 /**
330 * Set the value
331 *
332 * @param value maybe null
333 */
334 private void internalSet(@Nullable final V value) {
335 ref.set(value);
336 latch.release();
337 }
338 }