View Javadoc

1   /**
2    * Copyright 2008 Atlassian Pty Ltd 
3    * 
4    * Licensed under the Apache License, Version 2.0 (the "License"); 
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at 
7    * 
8    *     http://www.apache.org/licenses/LICENSE-2.0 
9    * 
10   * Unless required by applicable law or agreed to in writing, software 
11   * distributed under the License is distributed on an "AS IS" BASIS, 
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
13   * See the License for the specific language governing permissions and 
14   * limitations under the License.
15   */
16  
17  package com.atlassian.util.concurrent;
18  
19  import java.io.Serializable;
20  import java.util.Comparator;
21  import java.util.concurrent.TimeUnit;
22  import java.util.concurrent.locks.AbstractQueuedSynchronizer;
23  
24  import net.jcip.annotations.ThreadSafe;
25  
26  /**
27   * A {@link PhasedLatch} is a shared latch that resets after it is released and
28   * can be reused. Potentially waiting threads can test the current phase before
29   * performing an action. The action is then guarded by that phase and can await
30   * that phase to be advanced via a call to {@link #release() release} the
31   * current phase.
32   */
33  @ThreadSafe
34  public class PhasedLatch implements ReusableLatch {
35      private static final PhaseComparator comparator = new PhaseComparator();
36  
37      private final Sync sync = new Sync();
38  
39      /**
40       * Release the current phase.
41       */
42      public void release() {
43          sync.releaseShared(1);
44      }
45  
46      /**
47       * Await the current phase.
48       * 
49       * @throws InterruptedException if interrupted
50       */
51      public void await() throws InterruptedException {
52          awaitPhase(getPhase());
53      }
54  
55      /**
56       * Await the current phase for the specified period.
57       * 
58       * @param time the period of time
59       * @param unit of time to measure the period in
60       * @return true if the phase was passed, false otherwise
61       * @throws InterruptedException if interrupted
62       */
63      public boolean await(final long time, final TimeUnit unit) throws InterruptedException {
64          return sync.tryAcquireSharedNanos(getPhase(), unit.toNanos(time));
65      }
66  
67      /**
68       * Await the specified phase.
69       * 
70       * @param phase the phase to wait for
71       * @throws InterruptedException if interrupted
72       */
73      public void awaitPhase(final int phase) throws InterruptedException {
74          sync.acquireSharedInterruptibly(phase);
75      }
76  
77      /**
78       * Await the specified phase for the specified period.
79       * 
80       * @param phase the phase to wait for
81       * @param period the period of time to wait for, as specified by:
82       * @param unit of time to measure the period in
83       * @return true if the phase was passed, false otherwise
84       * @throws InterruptedException if interrupted
85       */
86      public boolean awaitPhase(final int phase, final long period, final TimeUnit unit) throws InterruptedException {
87          return sync.tryAcquireSharedNanos(phase, unit.toNanos(period));
88      }
89  
90      public int getPhase() {
91          return sync.getCurrentPhase();
92      }
93  
94      /**
95       * This sync implements Phasing. The state represents the current phase as
96       * an integer that continually increases. The phase can wrap around past
97       * {@link Integer#MAX_VALUE}
98       */
99      private static class Sync extends AbstractQueuedSynchronizer {
100         private static final long serialVersionUID = -7753362916930221487L;
101 
102         public int getCurrentPhase() {
103             return getState();
104         }
105 
106         @Override
107         protected int tryAcquireShared(final int phase) {
108             return comparator.isPassed(getState(), phase) ? 1 : -1;
109         }
110 
111         @Override
112         protected boolean tryReleaseShared(final int ignore) {
113             while (true) {
114                 final int state = getState();
115                 // /CLOVER:OFF
116                 // cannot test CAS
117                 if (compareAndSetState(state, state + 1)) {
118                     return true;
119                 }
120                 // /CLOVER:ON
121             }
122         }
123     }
124 
125     static class PhaseComparator implements Comparator<Integer>, Serializable {
126         private static final long serialVersionUID = -614957178717195674L;
127 
128         public int compare(final Integer current, final Integer waitingFor) {
129             return waitingFor - current;
130         };
131 
132         /**
133          * Has the current phase passed the waiting phase.
134          * 
135          * @param current
136          * @param waitingFor
137          * @return true if current is greater than waiting
138          */
139         boolean isPassed(final int current, final int waitingFor) {
140             return compare(current, waitingFor) < 0;
141         }
142     }
143 }