View Javadoc

1   /**
2    * Copyright 2008 Atlassian Pty Ltd 
3    * 
4    * Licensed under the Apache License, Version 2.0 (the "License"); 
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at 
7    * 
8    *     http://www.apache.org/licenses/LICENSE-2.0 
9    * 
10   * Unless required by applicable law or agreed to in writing, software 
11   * distributed under the License is distributed on an "AS IS" BASIS, 
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
13   * See the License for the specific language governing permissions and 
14   * limitations under the License.
15   */
16  
17  package com.atlassian.util.concurrent;
18  
19  import java.util.Comparator;
20  import java.util.concurrent.TimeUnit;
21  import java.util.concurrent.locks.AbstractQueuedSynchronizer;
22  
23  import net.jcip.annotations.ThreadSafe;
24  
25  /**
26   * A {@link PhasedLatch} is a shared latch that resets after it is released and
27   * can be reused. Potentially waiting threads can test the current phase before
28   * performing an action. The action is then guarded by that phase and can await
29   * that phase to be advanced via a call to {@link #release() release} the
30   * current phase.
31   */
32  @ThreadSafe
33  public class PhasedLatch implements ReusableLatch {
34      private static final PhaseComparator comparator = new PhaseComparator();
35  
36      private final Sync sync = new Sync();
37  
38      /**
39       * Release the current phase.
40       */
41      public void release() {
42          sync.releaseShared(1);
43      }
44  
45      /**
46       * Await the current phase.
47       * 
48       * @throws InterruptedException if interrupted
49       */
50      public void await() throws InterruptedException {
51          awaitPhase(getPhase());
52      }
53  
54      /**
55       * Await the current phase for the specified period.
56       * 
57       * @param long the period of time
58       * @param unit of time to measure the period in
59       * @return true if the phase was passed, false otherwise
60       * @throws InterruptedException if interrupted
61       */
62      public boolean await(final long period, final TimeUnit unit) throws InterruptedException {
63          return sync.tryAcquireSharedNanos(getPhase(), unit.toNanos(period));
64      }
65  
66      /**
67       * Await the specified phase.
68       * 
69       * @param phase the phase to wait for
70       * @throws InterruptedException if interrupted
71       */
72      public void awaitPhase(final int phase) throws InterruptedException {
73          sync.acquireSharedInterruptibly(phase);
74      }
75  
76      /**
77       * Await the specified phase for the specified period.
78       * 
79       * @param phase the phase to wait for
80       * @param period the period of time to wait for, as specified by:
81       * @param unit of time to measure the period in
82       * @return true if the phase was passed, false otherwise
83       * @throws InterruptedException if interrupted
84       */
85      public boolean awaitPhase(final int phase, final long period, final TimeUnit unit) throws InterruptedException {
86          return sync.tryAcquireSharedNanos(phase, unit.toNanos(period));
87      }
88  
89      public int getPhase() {
90          return sync.getCurrentPhase();
91      }
92  
93      /**
94       * This sync implements Phasing. The state represents the current phase as
95       * an integer that continually increases. The phase can wrap around past
96       * {@link Integer#MAX_VALUE}
97       */
98      private class Sync extends AbstractQueuedSynchronizer {
99          private static final long serialVersionUID = -7753362916930221487L;
100 
101         public int getCurrentPhase() {
102             return getState();
103         }
104 
105         @Override
106         protected int tryAcquireShared(final int phase) {
107             return comparator.isPassed(getState(), phase) ? 1 : -1;
108         }
109 
110         @Override
111         protected boolean tryReleaseShared(final int ignore) {
112             while (true) {
113                 final int state = getState();
114                 // /CLOVER:OFF
115                 // cannot test CAS
116                 if (compareAndSetState(state, state + 1)) {
117                     return true;
118                 }
119                 // /CLOVER:ON
120             }
121         }
122     }
123 
124     static class PhaseComparator implements Comparator<Integer> {
125         public int compare(final Integer current, final Integer waitingFor) {
126             return waitingFor - current;
127         };
128 
129         /**
130          * Has the current phase passed the waiting phase.
131          * 
132          * @param current
133          * @param waitingFor
134          * @return true if current is greater than waiting
135          */
136         boolean isPassed(final int current, final int waitingFor) {
137             return compare(current, waitingFor) < 0;
138         }
139     }
140 }