1   package com.atlassian.util.concurrent;
2   
3   import static java.util.concurrent.Executors.newFixedThreadPool;
4   import static org.junit.Assert.assertFalse;
5   import static org.junit.Assert.assertNotNull;
6   import static org.junit.Assert.assertNull;
7   import static org.junit.Assert.assertSame;
8   import static org.junit.Assert.assertTrue;
9   
10  import java.util.concurrent.Callable;
11  import java.util.concurrent.CountDownLatch;
12  import java.util.concurrent.ExecutorCompletionService;
13  import java.util.concurrent.ExecutorService;
14  import java.util.concurrent.Future;
15  import java.util.concurrent.TimeUnit;
16  import java.util.concurrent.TimeoutException;
17  
18  import org.junit.Test;
19  
20  public class BlockingReferenceTest {
21      final int threads = 20;
22  
23      @Test
24      public void simpleSRSWReference() throws Exception {
25          assertSimple(BlockingReference.<String> newSRSW());
26      }
27  
28      @Test
29      public void simpleMRSWReference() throws Exception {
30          assertSimple(BlockingReference.<String> newMRSW());
31      }
32  
33      @Test
34      public void simpleSRSWReferenceNull() throws Exception {
35          assertSimple(BlockingReference.<String> newSRSW(null));
36      }
37  
38      @Test
39      public void simpleMRSWReferenceNull() throws Exception {
40          assertSimple(BlockingReference.<String> newMRSW(null));
41      }
42  
43      @Test
44      public void setSRSWReferenceGet() throws Exception {
45          final BlockingReference<String> ref = BlockingReference.newSRSW();
46          final Executor<String> executor = getCompletionService(factory(threads, new Callable<String>() {
47              public String call() throws Exception {
48                  return ref.get();
49              }
50          }));
51          try {
52              ref.set("testSRSWReferenceSetGet");
53              final Future<String> take = executor.completion.take();
54              assertNotNull(take.get());
55              assertSame("testSRSWReferenceSetGet", take.get());
56              Thread.sleep(10);
57              // these threads were already waiting, SRSW will only notify ONE
58              // thread
59              // in this state - we are testing that the client who is using this
60              // incorrectly will see dodgy behaviour
61              final Future<String> poll = executor.completion.poll();
62              Thread.sleep(1);
63              assertNull(poll);
64              Thread.sleep(1);
65              assertNull(executor.completion.poll());
66              Thread.sleep(1);
67              assertNull(executor.completion.poll());
68              Thread.sleep(1);
69              assertNull(executor.completion.poll());
70              Thread.sleep(1);
71              assertNull(executor.completion.poll());
72              Thread.sleep(1);
73              assertNull(executor.completion.poll());
74          } finally {
75              executor.pool.shutdown();
76          }
77      }
78  
79      @Test
80      public void initialValueSRSWReferenceGet() throws Exception {
81          final BlockingReference<String> ref = BlockingReference.newSRSW("initialValueSRSWReferenceGet");
82          final Executor<String> executor = getCompletionService(factory(threads, new Callable<String>() {
83              public String call() throws Exception {
84                  return ref.get();
85              }
86          }));
87          try {
88              for (int i = 0; i < threads; i++) {
89                  assertSame("initialValueSRSWReferenceGet", executor.completion.take().get());
90              }
91          } finally {
92              executor.pool.shutdown();
93          }
94      }
95  
96      @Test
97      public void initialValueMRSWReferenceGet() throws Exception {
98          final BlockingReference<String> ref = BlockingReference.newSRSW("initialValueMRSWReferenceGet");
99          final Executor<String> executor = getCompletionService(factory(threads, new Callable<String>() {
100             public String call() throws Exception {
101                 return ref.get();
102             }
103         }));
104         try {
105             for (int i = 0; i < threads; i++) {
106                 assertSame("initialValueMRSWReferenceGet", executor.completion.take().get());
107             }
108         } finally {
109             executor.pool.shutdown();
110         }
111     }
112 
113     @Test
114     public void setMRSWReferenceGet() throws Exception {
115         final BlockingReference<String> ref = BlockingReference.newMRSW();
116         final Executor<String> executor = getCompletionService(factory(threads, new Callable<String>() {
117             public String call() throws Exception {
118                 return ref.get();
119             }
120         }));
121         try {
122             ref.set("testSRSWReferenceSetGet");
123             for (int i = 0; i < threads; i++) {
124                 assertSame("testSRSWReferenceSetGet", executor.completion.take().get());
125             }
126             assertNull(executor.completion.poll());
127             assertNotNull(ref.peek());
128         } finally {
129             executor.pool.shutdown();
130         }
131     }
132 
133     @Test
134     public void setSRSWReferenceTake() throws Exception {
135         final CountDownLatch running = new CountDownLatch(1);
136         final BlockingReference<String> ref = BlockingReference.newSRSW();
137         final Executor<String> executor = getCompletionService(factory(threads, new Callable<String>() {
138             public String call() throws Exception {
139                 running.await();
140                 return ref.take();
141             }
142         }));
143         try {
144             ref.set("testSRSWReferenceSetGet");
145             running.countDown();
146             assertSame("testSRSWReferenceSetGet", executor.completion.take().get());
147             Thread.sleep(10);
148             assertNull(executor.completion.poll());
149             assertNull(ref.peek());
150             ref.set("setSRSWReferenceTake2");
151             assertSame("setSRSWReferenceTake2", executor.completion.take().get());
152             assertNull(executor.completion.poll());
153             assertNull(ref.peek());
154             ref.set("setSRSWReferenceTake3");
155             assertSame("setSRSWReferenceTake3", executor.completion.take().get());
156             assertNull(executor.completion.poll());
157             assertNull(ref.peek());
158         } finally {
159             executor.pool.shutdown();
160         }
161     }
162 
163     @Test
164     public void setMRSWReferenceTake() throws Exception {
165         final BlockingReference<String> ref = BlockingReference.newMRSW();
166         final Executor<String> executor = getCompletionService(factory(threads, new Callable<String>() {
167             public String call() throws Exception {
168                 return ref.take();
169             }
170         }));
171         try {
172             ref.set("setMRSWReferenceTake");
173             assertSame("setMRSWReferenceTake", executor.completion.take().get());
174             Thread.sleep(10);
175             assertNull(executor.completion.poll());
176             assertNull(ref.peek());
177             ref.set("setMRSWReferenceTake2");
178             assertSame("setMRSWReferenceTake2", executor.completion.take().get());
179             ref.set("setMRSWReferenceTake3");
180             assertSame("setMRSWReferenceTake3", executor.completion.take().get());
181             ref.set("setMRSWReferenceTake4");
182             assertSame("setMRSWReferenceTake4", executor.completion.take().get());
183             assertNull(executor.completion.poll());
184             assertNull(ref.peek());
185         } finally {
186             executor.pool.shutdown();
187         }
188     }
189 
190     @Test(expected = TimeoutException.class)
191     public void timeoutSRSWReferenceGet() throws Exception {
192         final BlockingReference<String> ref = BlockingReference.newSRSW();
193         ref.get(1, TimeUnit.NANOSECONDS);
194     }
195 
196     @Test(expected = TimeoutException.class)
197     public void timeoutSRSWReferenceTake() throws Exception {
198         final BlockingReference<String> ref = BlockingReference.newSRSW();
199         ref.take(1, TimeUnit.NANOSECONDS);
200     }
201 
202     @Test(expected = TimeoutException.class)
203     public void timeoutMRSWReferenceGet() throws Exception {
204         final BlockingReference<String> ref = BlockingReference.newMRSW();
205         ref.get(1, TimeUnit.NANOSECONDS);
206     }
207 
208     @Test(expected = TimeoutException.class)
209     public void timeoutMRSWReferenceTake() throws Exception {
210         final BlockingReference<String> ref = BlockingReference.newMRSW();
211         ref.take(1, TimeUnit.NANOSECONDS);
212     }
213 
214     @Test(expected = TimeoutException.class)
215     public void timeoutSRSWReferenceTakeIfTaken() throws Exception {
216         final BlockingReference<String> ref = BlockingReference.newSRSW();
217         ref.set("blah");
218         assertSame("blah", ref.take(1, TimeUnit.NANOSECONDS));
219         ref.take(1, TimeUnit.NANOSECONDS);
220     }
221 
222     @Test(expected = TimeoutException.class)
223     public void timeoutMRSWReferenceTakeIfTaken() throws Exception {
224         final BlockingReference<String> ref = BlockingReference.newMRSW();
225         ref.set("blah");
226         assertSame("blah", ref.take(1, TimeUnit.NANOSECONDS));
227         ref.take(1, TimeUnit.NANOSECONDS);
228     }
229 
230     private Executor<String> getCompletionService(final CallableFactory factory) throws InterruptedException {
231         final int threads = factory.threads();
232         final ExecutorService pool = newFixedThreadPool(threads);
233         final ExecutorCompletionService<String> completionService = new ExecutorCompletionService<String>(pool);
234         for (int i = 0; i < threads; i++) {
235             completionService.submit(factory.get());
236         }
237         factory.await();
238         return new Executor<String>(pool, completionService);
239     }
240 
241     interface CallableFactory extends Supplier<Callable<String>> {
242         int threads();
243 
244         void await();
245     }
246 
247     private CallableFactory factory(final int threads, final Callable<String> delegate) {
248         final CountDownLatch start = new CountDownLatch(threads);
249 
250         final Supplier<Callable<String>> supplier = new Supplier<Callable<String>>() {
251             public Callable<String> get() {
252                 return new Callable<String>() {
253                     public String call() throws Exception {
254                         start.countDown();
255                         start.await();
256                         return delegate.call();
257                     }
258                 };
259             }
260         };
261 
262         return new CallableFactory() {
263             public void await() {
264                 try {
265                     start.await();
266                 } catch (final InterruptedException e) {
267                     throw new RuntimeInterruptedException(e);
268                 }
269             }
270 
271             public Callable<String> get() {
272                 return supplier.get();
273             }
274 
275             public int threads() {
276                 return threads;
277             }
278         };
279     }
280 
281     private void assertSimple(final BlockingReference<String> ref) throws InterruptedException, TimeoutException {
282         assertTrue(ref.isEmpty());
283         assertNull(ref.peek());
284         ref.set("test");
285         assertFalse(ref.isEmpty());
286         assertNotNull(ref.peek());
287         assertSame("test", ref.peek());
288         ref.clear();
289         assertTrue(ref.isEmpty());
290         assertNull(ref.peek());
291         ref.set("test2");
292         assertFalse(ref.isEmpty());
293         assertSame("test2", ref.get());
294         assertFalse(ref.isEmpty());
295         ref.set("test3");
296         assertFalse(ref.isEmpty());
297         assertSame("test3", ref.peek());
298         assertSame("test3", ref.take());
299         assertTrue(ref.isEmpty());
300         assertNull(ref.peek());
301         ref.set("test4");
302         assertFalse(ref.isEmpty());
303         assertSame("test4", ref.get(1, TimeUnit.SECONDS));
304         assertFalse(ref.isEmpty());
305         assertSame("test4", ref.take(1, TimeUnit.SECONDS));
306         assertTrue(ref.isEmpty());
307     }
308 
309     class Executor<T> {
310         final ExecutorService pool;
311         final ExecutorCompletionService<T> completion;
312 
313         Executor(final ExecutorService executor, final ExecutorCompletionService<T> completion) {
314             this.pool = executor;
315             this.completion = completion;
316         }
317     }
318 }