1 /**
2 * Copyright 2008 Atlassian Pty Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package com.atlassian.util.concurrent;
18
19 import java.io.Serializable;
20 import java.util.Comparator;
21 import java.util.concurrent.TimeUnit;
22 import java.util.concurrent.locks.AbstractQueuedSynchronizer;
23
24 import net.jcip.annotations.ThreadSafe;
25
26 /**
27 * A {@link PhasedLatch} is a shared latch that resets after it is released and
28 * can be reused. Potentially waiting threads can test the current phase before
29 * performing an action. The action is then guarded by that phase and can await
30 * that phase to be advanced via a call to {@link #release() release} the
31 * current phase.
32 */
33 @ThreadSafe
34 public class PhasedLatch implements ReusableLatch {
35 private static final PhaseComparator comparator = new PhaseComparator();
36
37 private final Sync sync = new Sync();
38
39 /**
40 * Release the current phase.
41 */
42 public void release() {
43 sync.releaseShared(1);
44 }
45
46 /**
47 * Await the current phase.
48 *
49 * @throws InterruptedException if interrupted
50 */
51 public void await() throws InterruptedException {
52 awaitPhase(getPhase());
53 }
54
55 /**
56 * Await the current phase for the specified period.
57 *
58 * @param time the period of time
59 * @param unit of time to measure the period in
60 * @return true if the phase was passed, false otherwise
61 * @throws InterruptedException if interrupted
62 */
63 public boolean await(final long time, final TimeUnit unit) throws InterruptedException {
64 return sync.tryAcquireSharedNanos(getPhase(), unit.toNanos(time));
65 }
66
67 /**
68 * Await the specified phase.
69 *
70 * @param phase the phase to wait for
71 * @throws InterruptedException if interrupted
72 */
73 public void awaitPhase(final int phase) throws InterruptedException {
74 sync.acquireSharedInterruptibly(phase);
75 }
76
77 /**
78 * Await the specified phase for the specified period.
79 *
80 * @param phase the phase to wait for
81 * @param period the period of time to wait for, as specified by:
82 * @param unit of time to measure the period in
83 * @return true if the phase was passed, false otherwise
84 * @throws InterruptedException if interrupted
85 */
86 public boolean awaitPhase(final int phase, final long period, final TimeUnit unit) throws InterruptedException {
87 return sync.tryAcquireSharedNanos(phase, unit.toNanos(period));
88 }
89
90 public int getPhase() {
91 return sync.getCurrentPhase();
92 }
93
94 /**
95 * This sync implements Phasing. The state represents the current phase as
96 * an integer that continually increases. The phase can wrap around past
97 * {@link Integer#MAX_VALUE}
98 */
99 private static class Sync extends AbstractQueuedSynchronizer {
100 private static final long serialVersionUID = -7753362916930221487L;
101
102 public int getCurrentPhase() {
103 return getState();
104 }
105
106 @Override
107 protected int tryAcquireShared(final int phase) {
108 return comparator.isPassed(getState(), phase) ? 1 : -1;
109 }
110
111 @Override
112 protected boolean tryReleaseShared(final int ignore) {
113 while (true) {
114 final int state = getState();
115 // /CLOVER:OFF
116 // cannot test CAS
117 if (compareAndSetState(state, state + 1)) {
118 return true;
119 }
120 // /CLOVER:ON
121 }
122 }
123 }
124
125 static class PhaseComparator implements Comparator<Integer>, Serializable {
126 private static final long serialVersionUID = -614957178717195674L;
127
128 public int compare(final Integer current, final Integer waitingFor) {
129 return waitingFor - current;
130 };
131
132 /**
133 * Has the current phase passed the waiting phase.
134 *
135 * @param current
136 * @param waitingFor
137 * @return true if current is greater than waiting
138 */
139 boolean isPassed(final int current, final int waitingFor) {
140 return compare(current, waitingFor) < 0;
141 }
142 }
143 }