1 package com.atlassian.util.concurrent;
2
3 import static com.atlassian.util.concurrent.Assertions.notNull;
4
5 import java.util.ArrayList;
6 import java.util.LinkedList;
7 import java.util.List;
8 import java.util.concurrent.BlockingQueue;
9 import java.util.concurrent.LinkedBlockingQueue;
10 import java.util.concurrent.TimeUnit;
11 import java.util.concurrent.TimeoutException;
12 import java.util.concurrent.atomic.AtomicReference;
13 import java.util.concurrent.locks.Condition;
14 import java.util.concurrent.locks.Lock;
15 import java.util.concurrent.locks.ReentrantLock;
16
17 public class PerformanceAnalysis {
18
19 public static void main(final String[] args) {
20 final List<Q> queues = new ArrayList<Q>();
21 queues.add(new SRSWBlockingQueue());
22 queues.add(new SyncQueue());
23 queues.add(new RefQueue());
24 queues.add(new BlockingReferenceQueue());
25 queues.add(new LockedQueue());
26 queues.add(new PhasedQueue());
27
28 System.out.println(new PerformanceAnalysis().runTest(queues));
29 }
30
31 public String runTest(final List<Q> queues) {
32 final int warm = 100000, run = 100000000;
33
34 for (final Q q : queues) {
35 runTest(q, warm);
36 }
37 final StringBuilder builder = new StringBuilder();
38 for (final Q q : queues) {
39 builder.append(q).append('\t');
40 builder.append(runTest(q, run)).append('\r');
41 }
42 return builder.toString();
43 }
44
45 public long runTest(final Q q, final int iterations) {
46 final Thread reader = new Thread(new Runnable() {
47 public void run() {
48 try {
49 while (true) {
50 int j = 0;
51 j += q.take();
52 }
53 } catch (final InterruptedException e) {}
54 }
55 });
56 reader.start();
57 long start;
58 try {
59 start = System.currentTimeMillis();
60 for (int i = 0; i < 100000000; i++) {
61 q.put(i);
62 }
63 return System.currentTimeMillis() - start;
64 } finally {
65 reader.interrupt();
66 q.clear();
67 }
68 }
69
70 interface Q {
71 void put(final int i);
72
73 Integer take() throws InterruptedException;
74
75 void clear();
76 }
77
78 static class UnboundQueue implements Q {
79 private final BlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>();
80
81 public void put(final int i) {
82 try {
83 queue.put(i);
84 } catch (final InterruptedException e) {
85 throw new RuntimeException(e);
86 }
87 }
88
89 public Integer take() throws InterruptedException {
90 return queue.take();
91 }
92
93 public void clear() {
94 queue.clear();
95 }
96 }
97
98 static class LruQueue implements Q {
99 private final LRUBlockingQueue<Integer> queue = new LRUBlockingQueue<Integer>(1);
100
101 public void put(final int i) {
102 try {
103 queue.put(i);
104 } catch (final InterruptedException e) {
105 throw new RuntimeException(e);
106 }
107 }
108
109 public Integer take() throws InterruptedException {
110 return queue.take();
111 }
112
113 public void clear() {
114 queue.clear();
115 }
116 }
117
118 static class LinkedQueue implements Q {
119 private final LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>(1);
120
121 public void put(final int i) {
122 try {
123 queue.clear();
124 queue.put(i);
125 } catch (final InterruptedException e) {
126 throw new RuntimeException(e);
127 }
128 }
129
130 public Integer take() throws InterruptedException {
131 return queue.take();
132 }
133
134 public void clear() {
135 queue.clear();
136 }
137 }
138
139 static class PhasedQueue implements Q {
140 private final PhasedBlockingReference<Integer> queue = new PhasedBlockingReference<Integer>();
141
142 public void put(final int i) {
143 queue.set(i);
144 }
145
146 public Integer take() throws InterruptedException {
147 return queue.take();
148 }
149
150 public void clear() {
151
152 }
153 }
154
155 static class RefQueue implements Q {
156 private final BlockingReference<Integer> ref = new BlockingReference<Integer>();
157
158 public void put(final int i) {
159 ref.set(i);
160 }
161
162 public Integer take() throws InterruptedException {
163 return ref.take();
164 }
165
166 public void clear() {
167 ref.clear();
168 }
169 }
170
171 static class LockedQueue implements Q {
172 private final LockingReference<Integer> queue = new LockingReference<Integer>();
173
174 public void put(final int i) {
175 queue.set(i);
176 }
177
178 public Integer take() throws InterruptedException {
179 return queue.take();
180 }
181
182 public void clear() {
183 queue.clear();
184 }
185 }
186
187 static class SyncQueue implements Q {
188 private final LinkedList<Integer> queue = new LinkedList<Integer>();
189
190 public void put(final int i) {
191 synchronized (queue) {
192 queue.clear();
193 queue.add(i);
194 queue.notifyAll();
195 }
196 }
197
198 public Integer take() throws InterruptedException {
199 synchronized (queue) {
200 while (queue.isEmpty()) {
201 queue.wait();
202 }
203 return queue.getFirst();
204 }
205 }
206
207 public void clear() {
208 synchronized (queue) {
209 queue.clear();
210 queue.notifyAll();
211 }
212 }
213 }
214
215 static class SRSWBlockingQueue implements Q {
216 final BooleanLatch latch = new BooleanLatch();
217 final AtomicReference<LinkedList<Integer>> queue = new AtomicReference<LinkedList<Integer>>();
218
219 public void put(final int i) {
220 LinkedList<Integer> list;
221 do {
222 list = queue.getAndSet(null);
223 if (list == null) {
224 list = new LinkedList<Integer>();
225 }
226 list.add(i);
227 } while (!queue.compareAndSet(null, list));
228 latch.release();
229 }
230
231 public Integer take() throws InterruptedException {
232 LinkedList<Integer> list;
233 do {
234 latch.await();
235 list = queue.getAndSet(null);
236 } while (list == null);
237 return list.getFirst();
238 }
239
240 public void clear() {
241 queue.set(null);
242 }
243 }
244
245 static class BlockingReferenceQueue implements Q {
246 private final BlockingReference<Integer> ref = new BlockingReference<Integer>();
247
248 public void put(final int i) {
249 ref.set(i);
250 }
251
252 public Integer take() throws InterruptedException {
253 return ref.take();
254 }
255
256 public void clear() {
257 ref.clear();
258 }
259 }
260 }
261
262 class LockingReference<V> {
263 private V ref;
264 private final Lock lock = new ReentrantLock();
265 private final Condition notEmpty = lock.newCondition();
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283 public V take() throws InterruptedException {
284 lock.lock();
285 try {
286 while (ref == null) {
287 notEmpty.await();
288 }
289 final V result = ref;
290 ref = null;
291 return result;
292 } finally {
293 lock.unlock();
294 }
295 }
296
297
298
299
300
301
302
303
304 public void set(final V value) {
305 notNull("value", value);
306 lock.lock();
307 try {
308 ref = value;
309 notEmpty.signalAll();
310 } finally {
311 lock.unlock();
312 }
313 }
314
315 public boolean isNull() {
316 return ref == null;
317 }
318
319 void clear() {
320 lock.lock();
321 try {
322 ref = null;
323 } finally {
324 lock.unlock();
325 }
326 }
327 }
328
329 class LRUBlockingQueue<E> extends LinkedBlockingQueue<E> {
330 private static final long serialVersionUID = -6070900096160951474L;
331
332 public LRUBlockingQueue(final int capacity) {
333 super(capacity);
334 }
335
336 @Override
337 public boolean offer(final E o) {
338 while (remainingCapacity() == 0) {
339 remove(peek());
340 }
341 return super.offer(o);
342 };
343
344 @Override
345 public void put(final E o) throws InterruptedException {
346 while (remainingCapacity() == 0) {
347 remove(peek());
348 }
349 super.put(o);
350 };
351 }
352
353 class PhasedBlockingReference<V> {
354 private final AtomicReference<V> ref = new AtomicReference<V>();
355 private final PhasedLatch latch = new PhasedLatch();
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373 public V take() throws InterruptedException {
374 while (true) {
375 latch.await();
376 final V result = ref.getAndSet(null);
377 if (result != null) {
378 return result;
379 }
380 }
381 }
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405 public V take(final long timeout, final TimeUnit unit) throws TimeoutException, InterruptedException {
406 if (!latch.await(timeout, unit)) {
407 throw new TimeoutException();
408 }
409 return ref.getAndSet(null);
410 }
411
412
413
414
415
416
417
418
419 public void set(final V value) {
420 notNull("value", value);
421 internalSet(value);
422 }
423
424 void clear() {
425 internalSet(null);
426 }
427
428 private void internalSet(final V value) {
429 ref.set(value);
430 latch.release();
431 }
432
433 public boolean isNull() {
434 return ref.get() == null;
435 }
436 }