1   package com.atlassian.util.concurrent;
2   
3   import static java.util.concurrent.Executors.newFixedThreadPool;
4   import static org.junit.Assert.assertEquals;
5   import static org.junit.Assert.assertNotNull;
6   import static org.junit.Assert.assertNull;
7   
8   import java.util.concurrent.Callable;
9   import java.util.concurrent.CountDownLatch;
10  import java.util.concurrent.ExecutorCompletionService;
11  import java.util.concurrent.Future;
12  import java.util.concurrent.atomic.AtomicInteger;
13  
14  import org.junit.Test;
15  
16  public class BooleanLatchTest {
17      @Test
18      public void singleThreadIsReleased() throws Exception {
19          final AtomicInteger call = new AtomicInteger();
20          final BooleanLatch latch = new BooleanLatch();
21          final ExecutorCompletionService<String> completionService = getCompletionService(factory(5, new Callable<String>() {
22              public String call() throws Exception {
23                  latch.await();
24                  return String.valueOf(call.incrementAndGet());
25              }
26          }));
27          latch.release();
28          final Future<String> take = completionService.take();
29          assertNotNull(take.get());
30          assertEquals("1", take.get());
31          Thread.sleep(10);
32          // these threads were already waiting, SRSW will only notify ONE thread
33          // in this state - we are testing that the client who is using this
34          // incorrectly will see dodgy behaviour
35          final Future<String> poll = completionService.poll();
36          assertNull(poll);
37          Thread.sleep(1);
38          assertNull(completionService.poll());
39          Thread.sleep(1);
40          assertNull(completionService.poll());
41          Thread.sleep(1);
42          assertNull(completionService.poll());
43          Thread.sleep(1);
44          assertNull(completionService.poll());
45          Thread.sleep(1);
46          assertNull(completionService.poll());
47  
48      }
49  
50      private CallableFactory factory(final int threads, final Callable<String> delegate) {
51          final CountDownLatch start = new CountDownLatch(threads);
52  
53          final Supplier<Callable<String>> supplier = new Supplier<Callable<String>>() {
54              public Callable<String> get() {
55                  return new Callable<String>() {
56                      public String call() throws Exception {
57                          start.countDown();
58                          start.await();
59                          return delegate.call();
60                      }
61                  };
62              }
63          };
64  
65          return new CallableFactory() {
66              public void await() {
67                  try {
68                      start.await();
69                  } catch (final InterruptedException e) {
70                      throw new RuntimeInterruptedException(e);
71                  }
72              }
73  
74              public Callable<String> get() {
75                  return supplier.get();
76              }
77  
78              public int threads() {
79                  return threads;
80              }
81          };
82      }
83  
84      interface CallableFactory extends Supplier<Callable<String>> {
85          int threads();
86  
87          void await();
88      }
89  
90      private ExecutorCompletionService<String> getCompletionService(final CallableFactory factory) throws InterruptedException {
91          final int threads = factory.threads();
92          final ExecutorCompletionService<String> completionService = new ExecutorCompletionService<String>(newFixedThreadPool(threads));
93          for (int i = 0; i < threads; i++) {
94              completionService.submit(factory.get());
95          }
96          factory.await();
97          return completionService;
98      }
99  }