1 package com.atlassian.util.concurrent;
2
3 import static java.util.concurrent.Executors.newFixedThreadPool;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertSame;
8 import static org.junit.Assert.assertTrue;
9 import static org.junit.Assert.fail;
10
11 import java.util.concurrent.Callable;
12 import java.util.concurrent.CountDownLatch;
13 import java.util.concurrent.ExecutorCompletionService;
14 import java.util.concurrent.Future;
15 import java.util.concurrent.TimeUnit;
16 import java.util.concurrent.TimeoutException;
17
18 import org.junit.Test;
19
20 public class BlockingReferenceTest {
21 final int threads = 20;
22
23 @Test
24 public void simpleSRSWReference() throws Exception {
25 assertSimple(BlockingReference.<String> newSRSW());
26 }
27
28 @Test
29 public void simpleMRSWReference() throws Exception {
30 assertSimple(BlockingReference.<String> newMRSW());
31 }
32
33 @Test
34 public void simpleSRSWReferenceNull() throws Exception {
35 assertSimple(BlockingReference.<String> newSRSW(null));
36 }
37
38 @Test
39 public void simpleMRSWReferenceNull() throws Exception {
40 assertSimple(BlockingReference.<String> newMRSW(null));
41 }
42
43 @Test
44 public void setSRSWReferenceGet() throws Exception {
45 final BlockingReference<String> ref = BlockingReference.newSRSW();
46 final ExecutorCompletionService<String> completionService = getCompletionService(factory(threads, ref, new Callable<String>() {
47 public String call() throws Exception {
48 return ref.get();
49 }
50 }));
51 ref.set("testSRSWReferenceSetGet");
52 final Future<String> take = completionService.take();
53 assertNotNull(take.get());
54 assertSame("testSRSWReferenceSetGet", take.get());
55 Thread.sleep(10);
56
57
58
59 assertNull(completionService.poll());
60 Thread.sleep(1);
61 assertNull(completionService.poll());
62 Thread.sleep(1);
63 assertNull(completionService.poll());
64 Thread.sleep(1);
65 assertNull(completionService.poll());
66 Thread.sleep(1);
67 assertNull(completionService.poll());
68 Thread.sleep(1);
69 assertNull(completionService.poll());
70 }
71
72 @Test
73 public void initialValueSRSWReferenceGet() throws Exception {
74 final BlockingReference<String> ref = BlockingReference.newSRSW("initialValueSRSWReferenceGet");
75 final ExecutorCompletionService<String> completionService = getCompletionService(factory(threads, ref, new Callable<String>() {
76 public String call() throws Exception {
77 return ref.get();
78 }
79 }));
80 for (int i = 0; i < threads; i++) {
81 assertSame("initialValueSRSWReferenceGet", completionService.take().get());
82 }
83 }
84
85 @Test
86 public void initialValueMRSWReferenceGet() throws Exception {
87 final BlockingReference<String> ref = BlockingReference.newSRSW("initialValueMRSWReferenceGet");
88 final ExecutorCompletionService<String> completionService = getCompletionService(factory(threads, ref, new Callable<String>() {
89 public String call() throws Exception {
90 return ref.get();
91 }
92 }));
93 for (int i = 0; i < threads; i++) {
94 assertSame("initialValueMRSWReferenceGet", completionService.take().get());
95 }
96 }
97
98 @Test
99 public void setMRSWReferenceGet() throws Exception {
100 final BlockingReference<String> ref = BlockingReference.newMRSW();
101 final ExecutorCompletionService<String> completionService = getCompletionService(factory(threads, ref, new Callable<String>() {
102 public String call() throws Exception {
103 return ref.get();
104 }
105 }));
106 ref.set("testSRSWReferenceSetGet");
107 for (int i = 0; i < threads; i++) {
108 assertSame("testSRSWReferenceSetGet", completionService.take().get());
109 }
110 assertNull(completionService.poll());
111 assertNotNull(ref.peek());
112 }
113
114 @Test
115 public void setSRSWReferenceTake() throws Exception {
116 final BlockingReference<String> ref = BlockingReference.newSRSW();
117 final ExecutorCompletionService<String> completionService = getCompletionService(factory(threads, ref, new Callable<String>() {
118 public String call() throws Exception {
119 return ref.take();
120 }
121 }));
122 ref.set("testSRSWReferenceSetGet");
123 assertSame("testSRSWReferenceSetGet", completionService.take().get());
124 Thread.sleep(10);
125 assertNull(completionService.poll());
126 assertNull(ref.peek());
127 ref.set("setSRSWReferenceTake2");
128 assertSame("setSRSWReferenceTake2", completionService.take().get());
129 assertNull(completionService.poll());
130 assertNull(ref.peek());
131 ref.set("setSRSWReferenceTake3");
132 assertSame("setSRSWReferenceTake3", completionService.take().get());
133 assertNull(completionService.poll());
134 assertNull(ref.peek());
135 }
136
137 @Test
138 public void setMRSWReferenceTake() throws Exception {
139 final BlockingReference<String> ref = BlockingReference.newMRSW();
140 final ExecutorCompletionService<String> completionService = getCompletionService(factory(threads, ref, new Callable<String>() {
141 public String call() throws Exception {
142 return ref.take();
143 }
144 }));
145 ref.set("setMRSWReferenceTake");
146 assertSame("setMRSWReferenceTake", completionService.take().get());
147 Thread.sleep(10);
148 assertNull(completionService.poll());
149 assertNull(ref.peek());
150 ref.set("setMRSWReferenceTake2");
151 assertSame("setMRSWReferenceTake2", completionService.take().get());
152 ref.set("setMRSWReferenceTake3");
153 assertSame("setMRSWReferenceTake3", completionService.take().get());
154 ref.set("setMRSWReferenceTake4");
155 assertSame("setMRSWReferenceTake4", completionService.take().get());
156 assertNull(completionService.poll());
157 assertNull(ref.peek());
158 }
159
160 @Test
161 public void timeoutSRSWReferenceGet() throws InterruptedException {
162 final BlockingReference<String> ref = BlockingReference.newSRSW();
163 try {
164 ref.get(1, TimeUnit.NANOSECONDS);
165 fail("TimeoutException expected");
166 } catch (final TimeoutException expected) {}
167 }
168
169 @Test
170 public void timeoutSRSWReferenceTake() throws InterruptedException {
171 final BlockingReference<String> ref = BlockingReference.newSRSW();
172 try {
173 ref.take(1, TimeUnit.NANOSECONDS);
174 fail("TimeoutException expected");
175 } catch (final TimeoutException expected) {}
176 }
177
178 @Test
179 public void timeoutMRSWReferenceGet() throws InterruptedException {
180 final BlockingReference<String> ref = BlockingReference.newMRSW();
181 try {
182 ref.get(1, TimeUnit.NANOSECONDS);
183 fail("TimeoutException expected");
184 } catch (final TimeoutException expected) {}
185 }
186
187 @Test
188 public void timeoutMRSWReferenceTake() throws InterruptedException {
189 final BlockingReference<String> ref = BlockingReference.newMRSW();
190 try {
191 ref.take(1, TimeUnit.NANOSECONDS);
192 fail("TimeoutException expected");
193 } catch (final TimeoutException expected) {}
194 }
195
196 private ExecutorCompletionService<String> getCompletionService(final CallableFactory factory) throws InterruptedException {
197 final int threads = factory.threads();
198 final ExecutorCompletionService<String> completionService = new ExecutorCompletionService<String>(newFixedThreadPool(threads));
199 for (int i = 0; i < threads; i++) {
200 completionService.submit(factory.get());
201 }
202 factory.await();
203 return completionService;
204 }
205
206 interface CallableFactory extends Supplier<Callable<String>> {
207 int threads();
208
209 void await();
210 }
211
212 private CallableFactory factory(final int threads, final BlockingReference<String> ref, final Callable<String> delegate) {
213 final CountDownLatch start = new CountDownLatch(threads);
214
215 final Supplier<Callable<String>> supplier = new Supplier<Callable<String>>() {
216 public Callable<String> get() {
217 return new Callable<String>() {
218 public String call() throws Exception {
219 start.countDown();
220 start.await();
221 return delegate.call();
222 }
223 };
224 }
225 };
226
227 return new CallableFactory() {
228 public void await() {
229 try {
230 start.await();
231 } catch (final InterruptedException e) {
232 throw new RuntimeInterruptedException(e);
233 }
234 }
235
236 public Callable<String> get() {
237 return supplier.get();
238 }
239
240 public int threads() {
241 return threads;
242 }
243 };
244 }
245
246 private void assertSimple(final BlockingReference<String> ref) throws InterruptedException {
247 assertTrue(ref.isEmpty());
248 assertNull(ref.peek());
249 ref.set("test");
250 assertFalse(ref.isEmpty());
251 assertNotNull(ref.peek());
252 assertSame("test", ref.peek());
253 ref.clear();
254 assertTrue(ref.isEmpty());
255 assertNull(ref.peek());
256 ref.set("test2");
257 assertFalse(ref.isEmpty());
258 assertSame("test2", ref.get());
259 assertFalse(ref.isEmpty());
260 ref.set("test3");
261 assertFalse(ref.isEmpty());
262 assertSame("test3", ref.peek());
263 assertSame("test3", ref.take());
264 assertTrue(ref.isEmpty());
265 assertNull(ref.peek());
266 }
267 }