1 package com.atlassian.util.concurrent;
2
3 import static java.util.concurrent.Executors.newFixedThreadPool;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertSame;
8 import static org.junit.Assert.assertTrue;
9
10 import java.util.concurrent.Callable;
11 import java.util.concurrent.CountDownLatch;
12 import java.util.concurrent.ExecutorCompletionService;
13 import java.util.concurrent.ExecutorService;
14 import java.util.concurrent.Future;
15 import java.util.concurrent.TimeUnit;
16 import java.util.concurrent.TimeoutException;
17
18 import org.junit.Test;
19
20 public class BlockingReferenceTest {
21 final int threads = 20;
22
23 @Test
24 public void simpleSRSWReference() throws Exception {
25 assertSimple(BlockingReference.<String> newSRSW());
26 }
27
28 @Test
29 public void simpleMRSWReference() throws Exception {
30 assertSimple(BlockingReference.<String> newMRSW());
31 }
32
33 @Test
34 public void simpleSRSWReferenceNull() throws Exception {
35 assertSimple(BlockingReference.<String> newSRSW(null));
36 }
37
38 @Test
39 public void simpleMRSWReferenceNull() throws Exception {
40 assertSimple(BlockingReference.<String> newMRSW(null));
41 }
42
43 @Test
44 public void setSRSWReferenceGet() throws Exception {
45 final BlockingReference<String> ref = BlockingReference.newSRSW();
46 final Executor<String> executor = getCompletionService(factory(threads, new Callable<String>() {
47 public String call() throws Exception {
48 return ref.get();
49 }
50 }));
51 try {
52 ref.set("testSRSWReferenceSetGet");
53 final Future<String> take = executor.completion.take();
54 assertNotNull(take.get());
55 assertSame("testSRSWReferenceSetGet", take.get());
56 Thread.sleep(10);
57
58
59
60
61 final Future<String> poll = executor.completion.poll();
62 Thread.sleep(1);
63 assertNull(poll);
64 Thread.sleep(1);
65 assertNull(executor.completion.poll());
66 Thread.sleep(1);
67 assertNull(executor.completion.poll());
68 Thread.sleep(1);
69 assertNull(executor.completion.poll());
70 Thread.sleep(1);
71 assertNull(executor.completion.poll());
72 Thread.sleep(1);
73 assertNull(executor.completion.poll());
74 } finally {
75 executor.pool.shutdown();
76 }
77 }
78
79 @Test
80 public void initialValueSRSWReferenceGet() throws Exception {
81 final BlockingReference<String> ref = BlockingReference.newSRSW("initialValueSRSWReferenceGet");
82 final Executor<String> executor = getCompletionService(factory(threads, new Callable<String>() {
83 public String call() throws Exception {
84 return ref.get();
85 }
86 }));
87 try {
88 for (int i = 0; i < threads; i++) {
89 assertSame("initialValueSRSWReferenceGet", executor.completion.take().get());
90 }
91 } finally {
92 executor.pool.shutdown();
93 }
94 }
95
96 @Test
97 public void initialValueMRSWReferenceGet() throws Exception {
98 final BlockingReference<String> ref = BlockingReference.newSRSW("initialValueMRSWReferenceGet");
99 final Executor<String> executor = getCompletionService(factory(threads, new Callable<String>() {
100 public String call() throws Exception {
101 return ref.get();
102 }
103 }));
104 try {
105 for (int i = 0; i < threads; i++) {
106 assertSame("initialValueMRSWReferenceGet", executor.completion.take().get());
107 }
108 } finally {
109 executor.pool.shutdown();
110 }
111 }
112
113 @Test
114 public void setMRSWReferenceGet() throws Exception {
115 final BlockingReference<String> ref = BlockingReference.newMRSW();
116 final Executor<String> executor = getCompletionService(factory(threads, new Callable<String>() {
117 public String call() throws Exception {
118 return ref.get();
119 }
120 }));
121 try {
122 ref.set("testSRSWReferenceSetGet");
123 for (int i = 0; i < threads; i++) {
124 assertSame("testSRSWReferenceSetGet", executor.completion.take().get());
125 }
126 assertNull(executor.completion.poll());
127 assertNotNull(ref.peek());
128 } finally {
129 executor.pool.shutdown();
130 }
131 }
132
133 @Test
134 public void setSRSWReferenceTake() throws Exception {
135 final CountDownLatch running = new CountDownLatch(1);
136 final BlockingReference<String> ref = BlockingReference.newSRSW();
137 final Executor<String> executor = getCompletionService(factory(threads, new Callable<String>() {
138 public String call() throws Exception {
139 running.await();
140 return ref.take();
141 }
142 }));
143 try {
144 ref.set("testSRSWReferenceSetGet");
145 running.countDown();
146 assertSame("testSRSWReferenceSetGet", executor.completion.take().get());
147 Thread.sleep(10);
148 assertNull(executor.completion.poll());
149 assertNull(ref.peek());
150 ref.set("setSRSWReferenceTake2");
151 assertSame("setSRSWReferenceTake2", executor.completion.take().get());
152 assertNull(executor.completion.poll());
153 assertNull(ref.peek());
154 ref.set("setSRSWReferenceTake3");
155 assertSame("setSRSWReferenceTake3", executor.completion.take().get());
156 assertNull(executor.completion.poll());
157 assertNull(ref.peek());
158 } finally {
159 executor.pool.shutdown();
160 }
161 }
162
163 @Test
164 public void setMRSWReferenceTake() throws Exception {
165 final BlockingReference<String> ref = BlockingReference.newMRSW();
166 final Executor<String> executor = getCompletionService(factory(threads, new Callable<String>() {
167 public String call() throws Exception {
168 return ref.take();
169 }
170 }));
171 try {
172 ref.set("setMRSWReferenceTake");
173 assertSame("setMRSWReferenceTake", executor.completion.take().get());
174 Thread.sleep(10);
175 assertNull(executor.completion.poll());
176 assertNull(ref.peek());
177 ref.set("setMRSWReferenceTake2");
178 assertSame("setMRSWReferenceTake2", executor.completion.take().get());
179 ref.set("setMRSWReferenceTake3");
180 assertSame("setMRSWReferenceTake3", executor.completion.take().get());
181 ref.set("setMRSWReferenceTake4");
182 assertSame("setMRSWReferenceTake4", executor.completion.take().get());
183 assertNull(executor.completion.poll());
184 assertNull(ref.peek());
185 } finally {
186 executor.pool.shutdown();
187 }
188 }
189
190 @Test(expected = TimeoutException.class)
191 public void timeoutSRSWReferenceGet() throws Exception {
192 final BlockingReference<String> ref = BlockingReference.newSRSW();
193 ref.get(1, TimeUnit.NANOSECONDS);
194 }
195
196 @Test(expected = TimeoutException.class)
197 public void timeoutSRSWReferenceTake() throws Exception {
198 final BlockingReference<String> ref = BlockingReference.newSRSW();
199 ref.take(1, TimeUnit.NANOSECONDS);
200 }
201
202 @Test(expected = TimeoutException.class)
203 public void timeoutMRSWReferenceGet() throws Exception {
204 final BlockingReference<String> ref = BlockingReference.newMRSW();
205 ref.get(1, TimeUnit.NANOSECONDS);
206 }
207
208 @Test(expected = TimeoutException.class)
209 public void timeoutMRSWReferenceTake() throws Exception {
210 final BlockingReference<String> ref = BlockingReference.newMRSW();
211 ref.take(1, TimeUnit.NANOSECONDS);
212 }
213
214 @Test(expected = TimeoutException.class)
215 public void timeoutSRSWReferenceTakeIfTaken() throws Exception {
216 final BlockingReference<String> ref = BlockingReference.newSRSW();
217 ref.set("blah");
218 assertSame("blah", ref.take(1, TimeUnit.NANOSECONDS));
219 ref.take(1, TimeUnit.NANOSECONDS);
220 }
221
222 @Test(expected = TimeoutException.class)
223 public void timeoutMRSWReferenceTakeIfTaken() throws Exception {
224 final BlockingReference<String> ref = BlockingReference.newMRSW();
225 ref.set("blah");
226 assertSame("blah", ref.take(1, TimeUnit.NANOSECONDS));
227 ref.take(1, TimeUnit.NANOSECONDS);
228 }
229
230 private Executor<String> getCompletionService(final CallableFactory factory) throws InterruptedException {
231 final int threads = factory.threads();
232 final ExecutorService pool = newFixedThreadPool(threads);
233 final ExecutorCompletionService<String> completionService = new ExecutorCompletionService<String>(pool);
234 for (int i = 0; i < threads; i++) {
235 completionService.submit(factory.get());
236 }
237 factory.await();
238 return new Executor<String>(pool, completionService);
239 }
240
241 interface CallableFactory extends Supplier<Callable<String>> {
242 int threads();
243
244 void await();
245 }
246
247 private CallableFactory factory(final int threads, final Callable<String> delegate) {
248 final CountDownLatch start = new CountDownLatch(threads);
249
250 final Supplier<Callable<String>> supplier = new Supplier<Callable<String>>() {
251 public Callable<String> get() {
252 return new Callable<String>() {
253 public String call() throws Exception {
254 start.countDown();
255 start.await();
256 return delegate.call();
257 }
258 };
259 }
260 };
261
262 return new CallableFactory() {
263 public void await() {
264 try {
265 start.await();
266 } catch (final InterruptedException e) {
267 throw new RuntimeInterruptedException(e);
268 }
269 }
270
271 public Callable<String> get() {
272 return supplier.get();
273 }
274
275 public int threads() {
276 return threads;
277 }
278 };
279 }
280
281 private void assertSimple(final BlockingReference<String> ref) throws InterruptedException, TimeoutException {
282 assertTrue(ref.isEmpty());
283 assertNull(ref.peek());
284 ref.set("test");
285 assertFalse(ref.isEmpty());
286 assertNotNull(ref.peek());
287 assertSame("test", ref.peek());
288 ref.clear();
289 assertTrue(ref.isEmpty());
290 assertNull(ref.peek());
291 ref.set("test2");
292 assertFalse(ref.isEmpty());
293 assertSame("test2", ref.get());
294 assertFalse(ref.isEmpty());
295 ref.set("test3");
296 assertFalse(ref.isEmpty());
297 assertSame("test3", ref.peek());
298 assertSame("test3", ref.take());
299 assertTrue(ref.isEmpty());
300 assertNull(ref.peek());
301 ref.set("test4");
302 assertFalse(ref.isEmpty());
303 assertSame("test4", ref.get(1, TimeUnit.SECONDS));
304 assertFalse(ref.isEmpty());
305 assertSame("test4", ref.take(1, TimeUnit.SECONDS));
306 assertTrue(ref.isEmpty());
307 }
308
309 class Executor<T> {
310 final ExecutorService pool;
311 final ExecutorCompletionService<T> completion;
312
313 Executor(final ExecutorService executor, final ExecutorCompletionService<T> completion) {
314 this.pool = executor;
315 this.completion = completion;
316 }
317 }
318 }