View Javadoc

1   /**
2    * Copyright 2008 Atlassian Pty Ltd 
3    * 
4    * Licensed under the Apache License, Version 2.0 (the "License"); 
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at 
7    * 
8    *     http://www.apache.org/licenses/LICENSE-2.0 
9    * 
10   * Unless required by applicable law or agreed to in writing, software 
11   * distributed under the License is distributed on an "AS IS" BASIS, 
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
13   * See the License for the specific language governing permissions and 
14   * limitations under the License.
15   */
16  
17  package com.atlassian.util.concurrent;
18  
19  import static com.atlassian.util.concurrent.Assertions.notNull;
20  import net.jcip.annotations.ThreadSafe;
21  
22  import java.util.concurrent.BlockingQueue;
23  import java.util.concurrent.TimeUnit;
24  import java.util.concurrent.TimeoutException;
25  import java.util.concurrent.atomic.AtomicReference;
26  import java.util.concurrent.locks.Condition;
27  
28  /**
29   * A Reference with queue semantics where rather than getting the current reference it is taken
30   * instead. Analogous to a single element {@link BlockingQueue}.
31   * <p>
32   * Note: this class does not support null elements being {@link #set(Object)} and will throw an
33   * exception. If the internal reference is null, then calls to {@link #take()} or
34   * {@link #take(long, TimeUnit)} will block.
35   * <p>
36   * This class is most suited to SRSW usage. Multiple writers will overwrite each other's elements,
37   * and if multiple readers are waiting to take a value, one reader will be arbitrarily chosen
38   * (similar to {@link Condition#signal()}).
39   * 
40   * @param <V> the value type
41   * @see BlockingQueue
42   */
43  @ThreadSafe public class BlockingReference<V> {
44      private final AtomicReference<V> ref = new AtomicReference<V>();
45      private final BooleanLatch latch = new BooleanLatch();
46  
47      /**
48       * Takes the current element if it is not null and replaces it with null. If the current element
49       * is null then wait until it becomes non-null.
50       * <p>
51       * If the current thread:
52       * <ul>
53       * <li>has its interrupted status set on entry to this method; or
54       * <li>is {@linkplain Thread#interrupt interrupted} while waiting,
55       * </ul>
56       * then {@link InterruptedException} is thrown and the current thread's interrupted status is
57       * cleared.
58       * 
59       * @return the current element
60       * @throws InterruptedException if the current thread is interrupted while waiting
61       */
62      public V take() throws InterruptedException {
63          while (true) {
64              latch.await();
65              final V result = ref.getAndSet(null);
66              if (result != null) {
67                  return result;
68              }
69          }
70      }
71  
72      /**
73       * Takes the current element if it is not null and replaces it with null. If the current element
74       * is null then wait until it becomes non-null. The method will throw a {@link TimeoutException}
75       * if the timeout is reached before an element becomes available.
76       * <p>
77       * If the current thread:
78       * <ul>
79       * <li>has its interrupted status set on entry to this method; or
80       * <li>is {@linkplain Thread#interrupt interrupted} while waiting,
81       * </ul>
82       * then {@link InterruptedException} is thrown and the current thread's interrupted status is
83       * cleared.
84       * 
85       * @param timeout the maximum time to wait
86       * @param unit the time unit of the {@code timeout} argument
87       * @return the current element
88       * @throws InterruptedException if the current thread is interrupted while waiting
89       * @throws TimeoutException if the timeout is reached without another thread having called
90       *             {@link #set(Object)}.
91       */
92      public V take(final long timeout, final TimeUnit unit) throws TimeoutException, InterruptedException {
93          if (!latch.await(timeout, unit)) {
94              throw new TimeoutException();
95          }
96          return ref.getAndSet(null);
97      }
98  
99      /**
100      * Set the value of this reference. This method is lock-free. A thread waiting in
101      * {@link #take()} or {@link #take(long, TimeUnit)} will be released and given this value.
102      * 
103      * @param value the new value.
104      */
105     public void set(final V value) {
106         notNull("value", value);
107         ref.set(value);
108         latch.release();
109     }
110 
111     /**
112      * Whether or not the current value is null or not. If this is true and the next call to
113      * {@link #take()} or {@link #take(long, TimeUnit)} will not block.
114      * 
115      * @return true if the current reference is null.
116      */
117     public boolean isEmpty() {
118         return ref.get() == null;
119     }
120 }