1 package com.atlassian.util.concurrent;
2
3 import static java.util.concurrent.Executors.newFixedThreadPool;
4 import static org.junit.Assert.assertEquals;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7
8 import java.util.concurrent.Callable;
9 import java.util.concurrent.CountDownLatch;
10 import java.util.concurrent.ExecutorCompletionService;
11 import java.util.concurrent.Future;
12 import java.util.concurrent.TimeUnit;
13 import java.util.concurrent.atomic.AtomicInteger;
14
15 import org.junit.Test;
16
17 public class BooleanLatchTest {
18 @Test
19 public void singleThreadIsReleased() throws Exception {
20 final AtomicInteger call = new AtomicInteger();
21 final BooleanLatch latch = new BooleanLatch();
22 final ExecutorCompletionService<String> completionService = getCompletionService(factory(5, new Callable<String>() {
23 public String call() throws Exception {
24 latch.await();
25 return String.valueOf(call.incrementAndGet());
26 }
27 }));
28 latch.release();
29 final Future<String> take = completionService.take();
30 assertNotNull(take.get());
31 assertEquals("1", take.get());
32 Thread.sleep(10);
33
34
35
36 final Future<String> poll = completionService.poll();
37 assertNull(poll);
38 Thread.sleep(1);
39 assertNull(completionService.poll());
40 Thread.sleep(1);
41 assertNull(completionService.poll());
42 Thread.sleep(1);
43 assertNull(completionService.poll());
44 Thread.sleep(1);
45 assertNull(completionService.poll());
46 Thread.sleep(1);
47 assertNull(completionService.poll());
48 }
49
50 @Test
51 public void singleThreadIsReleasedWithTimeout() throws Exception {
52 final AtomicInteger call = new AtomicInteger();
53 final BooleanLatch latch = new BooleanLatch();
54 final ExecutorCompletionService<String> completionService = getCompletionService(factory(5, new Callable<String>() {
55 public String call() throws Exception {
56 latch.await(100, TimeUnit.SECONDS);
57 return String.valueOf(call.incrementAndGet());
58 }
59 }));
60 latch.release();
61 final Future<String> take = completionService.take();
62 assertNotNull(take.get());
63 assertEquals("1", take.get());
64 Thread.sleep(10);
65
66
67
68 final Future<String> poll = completionService.poll();
69 assertNull(poll);
70 Thread.sleep(1);
71 assertNull(completionService.poll());
72 Thread.sleep(1);
73 assertNull(completionService.poll());
74 Thread.sleep(1);
75 assertNull(completionService.poll());
76 Thread.sleep(1);
77 assertNull(completionService.poll());
78 Thread.sleep(1);
79 assertNull(completionService.poll());
80
81 }
82
83 private CallableFactory factory(final int threads, final Callable<String> delegate) {
84 final CountDownLatch start = new CountDownLatch(threads);
85
86 final Supplier<Callable<String>> supplier = new Supplier<Callable<String>>() {
87 public Callable<String> get() {
88 return new Callable<String>() {
89 public String call() throws Exception {
90 start.countDown();
91 start.await();
92 return delegate.call();
93 }
94 };
95 }
96 };
97
98 return new CallableFactory() {
99 public void await() {
100 try {
101 start.await();
102 } catch (final InterruptedException e) {
103
104 throw new RuntimeInterruptedException(e);
105
106 }
107 }
108
109 public Callable<String> get() {
110 return supplier.get();
111 }
112
113 public int threads() {
114 return threads;
115 }
116 };
117 }
118
119 interface CallableFactory extends Supplier<Callable<String>> {
120 int threads();
121
122 void await();
123 }
124
125 private ExecutorCompletionService<String> getCompletionService(final CallableFactory factory) throws InterruptedException {
126 final int threads = factory.threads();
127 final ExecutorCompletionService<String> completionService = new ExecutorCompletionService<String>(newFixedThreadPool(threads));
128 for (int i = 0; i < threads; i++) {
129 completionService.submit(factory.get());
130 }
131 factory.await();
132 return completionService;
133 }
134 }