1 /**
2 * Copyright 2008 Atlassian Pty Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package com.atlassian.util.concurrent;
18
19 import static com.atlassian.util.concurrent.Assertions.notNull;
20 import net.jcip.annotations.ThreadSafe;
21
22 import java.util.concurrent.BlockingQueue;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.TimeoutException;
25 import java.util.concurrent.atomic.AtomicReference;
26 import java.util.concurrent.locks.Condition;
27
28 /**
29 * A Reference with queue semantics where rather than getting the current reference it is taken
30 * instead. Analogous to a single element {@link BlockingQueue}.
31 * <p>
32 * Note: this class does not support null elements being {@link #set(Object)} and will throw an
33 * exception. If the internal reference is null, then calls to {@link #take()} or
34 * {@link #take(long, TimeUnit)} will block.
35 * <p>
36 * This class is most suited to SRSW usage. Multiple writers will overwrite each other's elements,
37 * and if multiple readers are waiting to take a value, one reader will be arbitrarily chosen
38 * (similar to {@link Condition#signal()}).
39 *
40 * @param <V> the value type
41 * @see BlockingQueue
42 */
43 @ThreadSafe public class BlockingReference<V> {
44 private final AtomicReference<V> ref = new AtomicReference<V>();
45 private final BooleanLatch latch = new BooleanLatch();
46
47 /**
48 * Takes the current element if it is not null and replaces it with null. If the current element
49 * is null then wait until it becomes non-null.
50 * <p>
51 * If the current thread:
52 * <ul>
53 * <li>has its interrupted status set on entry to this method; or
54 * <li>is {@linkplain Thread#interrupt interrupted} while waiting,
55 * </ul>
56 * then {@link InterruptedException} is thrown and the current thread's interrupted status is
57 * cleared.
58 *
59 * @return the current element
60 * @throws InterruptedException if the current thread is interrupted while waiting
61 */
62 public V take() throws InterruptedException {
63 while (true) {
64 latch.await();
65 final V result = ref.getAndSet(null);
66 if (result != null) {
67 return result;
68 }
69 }
70 }
71
72 /**
73 * Takes the current element if it is not null and replaces it with null. If the current element
74 * is null then wait until it becomes non-null. The method will throw a {@link TimeoutException}
75 * if the timeout is reached before an element becomes available.
76 * <p>
77 * If the current thread:
78 * <ul>
79 * <li>has its interrupted status set on entry to this method; or
80 * <li>is {@linkplain Thread#interrupt interrupted} while waiting,
81 * </ul>
82 * then {@link InterruptedException} is thrown and the current thread's interrupted status is
83 * cleared.
84 *
85 * @param timeout the maximum time to wait
86 * @param unit the time unit of the {@code timeout} argument
87 * @return the current element
88 * @throws InterruptedException if the current thread is interrupted while waiting
89 * @throws TimeoutException if the timeout is reached without another thread having called
90 * {@link #set(Object)}.
91 */
92 public V take(final long timeout, final TimeUnit unit) throws TimeoutException, InterruptedException {
93 if (!latch.await(timeout, unit)) {
94 throw new TimeoutException();
95 }
96 return ref.getAndSet(null);
97 }
98
99 /**
100 * Set the value of this reference. This method is lock-free. A thread waiting in
101 * {@link #take()} or {@link #take(long, TimeUnit)} will be released and given this value.
102 *
103 * @param value the new value.
104 */
105 public void set(final V value) {
106 notNull("value", value);
107 ref.set(value);
108 latch.release();
109 }
110
111 /**
112 * Whether or not the current value is null or not. If this is true and the next call to
113 * {@link #take()} or {@link #take(long, TimeUnit)} will not block.
114 *
115 * @return true if the current reference is null.
116 */
117 public boolean isEmpty() {
118 return ref.get() == null;
119 }
120 }