1 package com.atlassian.util.concurrent;
2
3 import static java.util.concurrent.Executors.newFixedThreadPool;
4 import static org.junit.Assert.assertEquals;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7
8 import java.util.concurrent.Callable;
9 import java.util.concurrent.CountDownLatch;
10 import java.util.concurrent.ExecutorCompletionService;
11 import java.util.concurrent.Future;
12 import java.util.concurrent.atomic.AtomicInteger;
13
14 import org.junit.Test;
15
16 public class BooleanLatchTest {
17 @Test
18 public void singleThreadIsReleased() throws Exception {
19 final AtomicInteger call = new AtomicInteger();
20 final BooleanLatch latch = new BooleanLatch();
21 final ExecutorCompletionService<String> completionService = getCompletionService(factory(5, new Callable<String>() {
22 public String call() throws Exception {
23 latch.await();
24 return String.valueOf(call.incrementAndGet());
25 }
26 }));
27 latch.release();
28 final Future<String> take = completionService.take();
29 assertNotNull(take.get());
30 assertEquals("1", take.get());
31 Thread.sleep(10);
32
33
34
35 final Future<String> poll = completionService.poll();
36 assertNull(poll);
37 Thread.sleep(1);
38 assertNull(completionService.poll());
39 Thread.sleep(1);
40 assertNull(completionService.poll());
41 Thread.sleep(1);
42 assertNull(completionService.poll());
43 Thread.sleep(1);
44 assertNull(completionService.poll());
45 Thread.sleep(1);
46 assertNull(completionService.poll());
47
48 }
49
50 private CallableFactory factory(final int threads, final Callable<String> delegate) {
51 final CountDownLatch start = new CountDownLatch(threads);
52
53 final Supplier<Callable<String>> supplier = new Supplier<Callable<String>>() {
54 public Callable<String> get() {
55 return new Callable<String>() {
56 public String call() throws Exception {
57 start.countDown();
58 start.await();
59 return delegate.call();
60 }
61 };
62 }
63 };
64
65 return new CallableFactory() {
66 public void await() {
67 try {
68 start.await();
69 } catch (final InterruptedException e) {
70 throw new RuntimeInterruptedException(e);
71 }
72 }
73
74 public Callable<String> get() {
75 return supplier.get();
76 }
77
78 public int threads() {
79 return threads;
80 }
81 };
82 }
83
84 interface CallableFactory extends Supplier<Callable<String>> {
85 int threads();
86
87 void await();
88 }
89
90 private ExecutorCompletionService<String> getCompletionService(final CallableFactory factory) throws InterruptedException {
91 final int threads = factory.threads();
92 final ExecutorCompletionService<String> completionService = new ExecutorCompletionService<String>(newFixedThreadPool(threads));
93 for (int i = 0; i < threads; i++) {
94 completionService.submit(factory.get());
95 }
96 factory.await();
97 return completionService;
98 }
99 }