View Javadoc

1   /**
2    * Copyright 2008 Atlassian Pty Ltd 
3    * 
4    * Licensed under the Apache License, Version 2.0 (the "License"); 
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at 
7    * 
8    *     http://www.apache.org/licenses/LICENSE-2.0 
9    * 
10   * Unless required by applicable law or agreed to in writing, software 
11   * distributed under the License is distributed on an "AS IS" BASIS, 
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
13   * See the License for the specific language governing permissions and 
14   * limitations under the License.
15   */
16  
17  package com.atlassian.util.concurrent;
18  
19  import net.jcip.annotations.ThreadSafe;
20  
21  import java.util.Comparator;
22  import java.util.concurrent.TimeUnit;
23  import java.util.concurrent.locks.AbstractQueuedSynchronizer;
24  
25  /**
26   * A {@link PhasedLatch} is a shared latch that resets after it is released and can be reused.
27   * Potentially waiting threads can test the current phase before performing an action. The action is
28   * then guarded by that phase and can await that phase to be advanced via a call to
29   * {@link #release() release} the current phase.
30   */
31  @ThreadSafe public class PhasedLatch {
32      private static PhaseComparator comparator = new PhaseComparator();
33  
34      private final Sync sync = new Sync();
35  
36      /**
37       * Release the current phase.
38       */
39      public void release() {
40          sync.releaseShared(1);
41      }
42  
43      /**
44       * Await the current phase.
45       * 
46       * @throws InterruptedException if interrupted
47       */
48      public void await() throws InterruptedException {
49          awaitPhase(getPhase());
50      }
51  
52      /**
53       * Await the current phase for the specified period.
54       * 
55       * @param long the period of time
56       * @param unit of time to measure the period in
57       * @return true if the phase was passed, false otherwise
58       * @throws InterruptedException if interrupted
59       */
60      public boolean await(final long period, final TimeUnit unit) throws InterruptedException {
61          return sync.tryAcquireSharedNanos(getPhase(), unit.toNanos(period));
62      }
63  
64      /**
65       * Await the specified phase.
66       * 
67       * @param phase the phase to wait for
68       * @throws InterruptedException if interrupted
69       */
70      public void awaitPhase(final int phase) throws InterruptedException {
71          sync.acquireSharedInterruptibly(phase);
72      }
73  
74      /**
75       * Await the specified phase for the specified period.
76       * 
77       * @param phase the phase to wait for
78       * @param period the period of time to wait for, as specified by:
79       * @param unit of time to measure the period in
80       * @return true if the phase was passed, false otherwise
81       * @throws InterruptedException if interrupted
82       */
83      public boolean awaitPhase(final int phase, final long period, final TimeUnit unit) throws InterruptedException {
84          return sync.tryAcquireSharedNanos(phase, unit.toNanos(period));
85      }
86  
87      public int getPhase() {
88          return sync.getCurrentPhase();
89      }
90  
91      /**
92       * This sync implements Phasing. The state represents the current phase as an integer that
93       * continually increases. The phase can wrap around past {@link Integer#MAX_VALUE}
94       */
95      private class Sync extends AbstractQueuedSynchronizer {
96          private static final long serialVersionUID = -7753362916930221487L;
97  
98          public int getCurrentPhase() {
99              return getState();
100         }
101 
102         @Override protected int tryAcquireShared(final int phase) {
103             return comparator.isPassed(getState(), phase) ? 1 : -1;
104         }
105 
106         @Override protected boolean tryReleaseShared(final int ignore) {
107             while (true) {
108                 final int state = getState();
109                 if (compareAndSetState(state, state + 1)) {
110                     return true;
111                 }
112             }
113         }
114     }
115 
116     static class PhaseComparator implements Comparator<Integer> {
117         public int compare(final Integer current, final Integer waitingFor) {
118             return waitingFor - current;
119         };
120 
121         /**
122          * Has the current phase passed the waiting phase.
123          * 
124          * @param current
125          * @param waitingFor
126          * @return true if current is greater than waiting
127          */
128         boolean isPassed(final int current, final int waitingFor) {
129             return compare(current, waitingFor) < 0;
130         }
131     }
132 }