1   package com.atlassian.util.concurrent;
2   
3   import static java.util.concurrent.Executors.newFixedThreadPool;
4   import static org.junit.Assert.assertFalse;
5   import static org.junit.Assert.assertNotNull;
6   import static org.junit.Assert.assertNull;
7   import static org.junit.Assert.assertSame;
8   import static org.junit.Assert.assertTrue;
9   import static org.junit.Assert.fail;
10  
11  import java.util.concurrent.Callable;
12  import java.util.concurrent.CountDownLatch;
13  import java.util.concurrent.ExecutorCompletionService;
14  import java.util.concurrent.Future;
15  import java.util.concurrent.TimeUnit;
16  import java.util.concurrent.TimeoutException;
17  
18  import org.junit.Test;
19  
20  public class BlockingReferenceTest {
21      final int threads = 20;
22  
23      @Test
24      public void simpleSRSWReference() throws Exception {
25          assertSimple(BlockingReference.<String> newSRSW());
26      }
27  
28      @Test
29      public void simpleMRSWReference() throws Exception {
30          assertSimple(BlockingReference.<String> newMRSW());
31      }
32  
33      @Test
34      public void simpleSRSWReferenceNull() throws Exception {
35          assertSimple(BlockingReference.<String> newSRSW(null));
36      }
37  
38      @Test
39      public void simpleMRSWReferenceNull() throws Exception {
40          assertSimple(BlockingReference.<String> newMRSW(null));
41      }
42  
43      @Test
44      public void setSRSWReferenceGet() throws Exception {
45          final BlockingReference<String> ref = BlockingReference.newSRSW();
46          final ExecutorCompletionService<String> completionService = getCompletionService(factory(threads, ref, new Callable<String>() {
47              public String call() throws Exception {
48                  return ref.get();
49              }
50          }));
51          ref.set("testSRSWReferenceSetGet");
52          final Future<String> take = completionService.take();
53          assertNotNull(take.get());
54          assertSame("testSRSWReferenceSetGet", take.get());
55          Thread.sleep(10);
56          // these threads were already waiting, SRSW will only notify ONE thread
57          // in this state - we are testing that the client who is using this
58          // incorrectly will see dodgy behaviour
59          assertNull(completionService.poll());
60          Thread.sleep(1);
61          assertNull(completionService.poll());
62          Thread.sleep(1);
63          assertNull(completionService.poll());
64          Thread.sleep(1);
65          assertNull(completionService.poll());
66          Thread.sleep(1);
67          assertNull(completionService.poll());
68          Thread.sleep(1);
69          assertNull(completionService.poll());
70      }
71  
72      @Test
73      public void initialValueSRSWReferenceGet() throws Exception {
74          final BlockingReference<String> ref = BlockingReference.newSRSW("initialValueSRSWReferenceGet");
75          final ExecutorCompletionService<String> completionService = getCompletionService(factory(threads, ref, new Callable<String>() {
76              public String call() throws Exception {
77                  return ref.get();
78              }
79          }));
80          for (int i = 0; i < threads; i++) {
81              assertSame("initialValueSRSWReferenceGet", completionService.take().get());
82          }
83      }
84  
85      @Test
86      public void initialValueMRSWReferenceGet() throws Exception {
87          final BlockingReference<String> ref = BlockingReference.newSRSW("initialValueMRSWReferenceGet");
88          final ExecutorCompletionService<String> completionService = getCompletionService(factory(threads, ref, new Callable<String>() {
89              public String call() throws Exception {
90                  return ref.get();
91              }
92          }));
93          for (int i = 0; i < threads; i++) {
94              assertSame("initialValueMRSWReferenceGet", completionService.take().get());
95          }
96      }
97  
98      @Test
99      public void setMRSWReferenceGet() throws Exception {
100         final BlockingReference<String> ref = BlockingReference.newMRSW();
101         final ExecutorCompletionService<String> completionService = getCompletionService(factory(threads, ref, new Callable<String>() {
102             public String call() throws Exception {
103                 return ref.get();
104             }
105         }));
106         ref.set("testSRSWReferenceSetGet");
107         for (int i = 0; i < threads; i++) {
108             assertSame("testSRSWReferenceSetGet", completionService.take().get());
109         }
110         assertNull(completionService.poll());
111         assertNotNull(ref.peek());
112     }
113 
114     @Test
115     public void setSRSWReferenceTake() throws Exception {
116         final BlockingReference<String> ref = BlockingReference.newSRSW();
117         final ExecutorCompletionService<String> completionService = getCompletionService(factory(threads, ref, new Callable<String>() {
118             public String call() throws Exception {
119                 return ref.take();
120             }
121         }));
122         ref.set("testSRSWReferenceSetGet");
123         assertSame("testSRSWReferenceSetGet", completionService.take().get());
124         Thread.sleep(10);
125         assertNull(completionService.poll());
126         assertNull(ref.peek());
127         ref.set("setSRSWReferenceTake2");
128         assertSame("setSRSWReferenceTake2", completionService.take().get());
129         assertNull(completionService.poll());
130         assertNull(ref.peek());
131         ref.set("setSRSWReferenceTake3");
132         assertSame("setSRSWReferenceTake3", completionService.take().get());
133         assertNull(completionService.poll());
134         assertNull(ref.peek());
135     }
136 
137     @Test
138     public void setMRSWReferenceTake() throws Exception {
139         final BlockingReference<String> ref = BlockingReference.newMRSW();
140         final ExecutorCompletionService<String> completionService = getCompletionService(factory(threads, ref, new Callable<String>() {
141             public String call() throws Exception {
142                 return ref.take();
143             }
144         }));
145         ref.set("setMRSWReferenceTake");
146         assertSame("setMRSWReferenceTake", completionService.take().get());
147         Thread.sleep(10);
148         assertNull(completionService.poll());
149         assertNull(ref.peek());
150         ref.set("setMRSWReferenceTake2");
151         assertSame("setMRSWReferenceTake2", completionService.take().get());
152         ref.set("setMRSWReferenceTake3");
153         assertSame("setMRSWReferenceTake3", completionService.take().get());
154         ref.set("setMRSWReferenceTake4");
155         assertSame("setMRSWReferenceTake4", completionService.take().get());
156         assertNull(completionService.poll());
157         assertNull(ref.peek());
158     }
159 
160     @Test
161     public void timeoutSRSWReferenceGet() throws InterruptedException {
162         final BlockingReference<String> ref = BlockingReference.newSRSW();
163         try {
164             ref.get(1, TimeUnit.NANOSECONDS);
165             fail("TimeoutException expected");
166         } catch (final TimeoutException expected) {}
167     }
168 
169     @Test
170     public void timeoutSRSWReferenceTake() throws InterruptedException {
171         final BlockingReference<String> ref = BlockingReference.newSRSW();
172         try {
173             ref.take(1, TimeUnit.NANOSECONDS);
174             fail("TimeoutException expected");
175         } catch (final TimeoutException expected) {}
176     }
177 
178     @Test
179     public void timeoutMRSWReferenceGet() throws InterruptedException {
180         final BlockingReference<String> ref = BlockingReference.newMRSW();
181         try {
182             ref.get(1, TimeUnit.NANOSECONDS);
183             fail("TimeoutException expected");
184         } catch (final TimeoutException expected) {}
185     }
186 
187     @Test
188     public void timeoutMRSWReferenceTake() throws InterruptedException {
189         final BlockingReference<String> ref = BlockingReference.newMRSW();
190         try {
191             ref.take(1, TimeUnit.NANOSECONDS);
192             fail("TimeoutException expected");
193         } catch (final TimeoutException expected) {}
194     }
195 
196     private ExecutorCompletionService<String> getCompletionService(final CallableFactory factory) throws InterruptedException {
197         final int threads = factory.threads();
198         final ExecutorCompletionService<String> completionService = new ExecutorCompletionService<String>(newFixedThreadPool(threads));
199         for (int i = 0; i < threads; i++) {
200             completionService.submit(factory.get());
201         }
202         factory.await();
203         return completionService;
204     }
205 
206     interface CallableFactory extends Supplier<Callable<String>> {
207         int threads();
208 
209         void await();
210     }
211 
212     private CallableFactory factory(final int threads, final BlockingReference<String> ref, final Callable<String> delegate) {
213         final CountDownLatch start = new CountDownLatch(threads);
214 
215         final Supplier<Callable<String>> supplier = new Supplier<Callable<String>>() {
216             public Callable<String> get() {
217                 return new Callable<String>() {
218                     public String call() throws Exception {
219                         start.countDown();
220                         start.await();
221                         return delegate.call();
222                     }
223                 };
224             }
225         };
226 
227         return new CallableFactory() {
228             public void await() {
229                 try {
230                     start.await();
231                 } catch (final InterruptedException e) {
232                     throw new RuntimeInterruptedException(e);
233                 }
234             }
235 
236             public Callable<String> get() {
237                 return supplier.get();
238             }
239 
240             public int threads() {
241                 return threads;
242             }
243         };
244     }
245 
246     private void assertSimple(final BlockingReference<String> ref) throws InterruptedException {
247         assertTrue(ref.isEmpty());
248         assertNull(ref.peek());
249         ref.set("test");
250         assertFalse(ref.isEmpty());
251         assertNotNull(ref.peek());
252         assertSame("test", ref.peek());
253         ref.clear();
254         assertTrue(ref.isEmpty());
255         assertNull(ref.peek());
256         ref.set("test2");
257         assertFalse(ref.isEmpty());
258         assertSame("test2", ref.get());
259         assertFalse(ref.isEmpty());
260         ref.set("test3");
261         assertFalse(ref.isEmpty());
262         assertSame("test3", ref.peek());
263         assertSame("test3", ref.take());
264         assertTrue(ref.isEmpty());
265         assertNull(ref.peek());
266     }
267 }