1   package com.atlassian.util.concurrent;
2   
3   import static java.util.concurrent.Executors.newFixedThreadPool;
4   import static org.junit.Assert.assertEquals;
5   import static org.junit.Assert.assertNotNull;
6   import static org.junit.Assert.assertNull;
7   
8   import java.util.concurrent.Callable;
9   import java.util.concurrent.CountDownLatch;
10  import java.util.concurrent.ExecutorCompletionService;
11  import java.util.concurrent.Future;
12  import java.util.concurrent.TimeUnit;
13  import java.util.concurrent.atomic.AtomicInteger;
14  
15  import org.junit.Test;
16  
17  public class BooleanLatchTest {
18      @Test
19      public void singleThreadIsReleased() throws Exception {
20          final AtomicInteger call = new AtomicInteger();
21          final BooleanLatch latch = new BooleanLatch();
22          final ExecutorCompletionService<String> completionService = getCompletionService(factory(5, new Callable<String>() {
23              public String call() throws Exception {
24                  latch.await();
25                  return String.valueOf(call.incrementAndGet());
26              }
27          }));
28          latch.release();
29          final Future<String> take = completionService.take();
30          assertNotNull(take.get());
31          assertEquals("1", take.get());
32          Thread.sleep(10);
33          // these threads were already waiting, SRSW will only notify ONE thread
34          // in this state - we are testing that the client who is using this
35          // incorrectly will see dodgy behaviour
36          final Future<String> poll = completionService.poll();
37          assertNull(poll);
38          Thread.sleep(1);
39          assertNull(completionService.poll());
40          Thread.sleep(1);
41          assertNull(completionService.poll());
42          Thread.sleep(1);
43          assertNull(completionService.poll());
44          Thread.sleep(1);
45          assertNull(completionService.poll());
46          Thread.sleep(1);
47          assertNull(completionService.poll());
48      }
49  
50      @Test
51      public void singleThreadIsReleasedWithTimeout() throws Exception {
52          final AtomicInteger call = new AtomicInteger();
53          final BooleanLatch latch = new BooleanLatch();
54          final ExecutorCompletionService<String> completionService = getCompletionService(factory(5, new Callable<String>() {
55              public String call() throws Exception {
56                  latch.await(100, TimeUnit.SECONDS);
57                  return String.valueOf(call.incrementAndGet());
58              }
59          }));
60          latch.release();
61          final Future<String> take = completionService.take();
62          assertNotNull(take.get());
63          assertEquals("1", take.get());
64          Thread.sleep(10);
65          // these threads were already waiting, SRSW will only notify ONE thread
66          // in this state - we are testing that the client who is using this
67          // incorrectly will see dodgy behaviour
68          final Future<String> poll = completionService.poll();
69          assertNull(poll);
70          Thread.sleep(1);
71          assertNull(completionService.poll());
72          Thread.sleep(1);
73          assertNull(completionService.poll());
74          Thread.sleep(1);
75          assertNull(completionService.poll());
76          Thread.sleep(1);
77          assertNull(completionService.poll());
78          Thread.sleep(1);
79          assertNull(completionService.poll());
80  
81      }
82  
83      private CallableFactory factory(final int threads, final Callable<String> delegate) {
84          final CountDownLatch start = new CountDownLatch(threads);
85  
86          final Supplier<Callable<String>> supplier = new Supplier<Callable<String>>() {
87              public Callable<String> get() {
88                  return new Callable<String>() {
89                      public String call() throws Exception {
90                          start.countDown();
91                          start.await();
92                          return delegate.call();
93                      }
94                  };
95              }
96          };
97  
98          return new CallableFactory() {
99              public void await() {
100                 try {
101                     start.await();
102                 } catch (final InterruptedException e) {
103                     // /CLOVER:OFF
104                     throw new RuntimeInterruptedException(e);
105                     // /CLOVER:ON
106                 }
107             }
108 
109             public Callable<String> get() {
110                 return supplier.get();
111             }
112 
113             public int threads() {
114                 return threads;
115             }
116         };
117     }
118 
119     interface CallableFactory extends Supplier<Callable<String>> {
120         int threads();
121 
122         void await();
123     }
124 
125     private ExecutorCompletionService<String> getCompletionService(final CallableFactory factory) throws InterruptedException {
126         final int threads = factory.threads();
127         final ExecutorCompletionService<String> completionService = new ExecutorCompletionService<String>(newFixedThreadPool(threads));
128         for (int i = 0; i < threads; i++) {
129             completionService.submit(factory.get());
130         }
131         factory.await();
132         return completionService;
133     }
134 }