1   package com.atlassian.util.concurrent;
2   
3   import static com.atlassian.util.concurrent.Assertions.notNull;
4   
5   import java.util.ArrayList;
6   import java.util.LinkedList;
7   import java.util.List;
8   import java.util.concurrent.BlockingQueue;
9   import java.util.concurrent.LinkedBlockingQueue;
10  import java.util.concurrent.TimeUnit;
11  import java.util.concurrent.TimeoutException;
12  import java.util.concurrent.atomic.AtomicReference;
13  import java.util.concurrent.locks.Condition;
14  import java.util.concurrent.locks.Lock;
15  import java.util.concurrent.locks.ReentrantLock;
16  
17  public class PerformanceAnalysis {
18  
19      public static void main(final String[] args) {
20          final List<Q> queues = new ArrayList<Q>();
21          queues.add(new SRSWBlockingQueue());
22          queues.add(new SyncQueue());
23          queues.add(new RefQueue());
24          queues.add(new BlockingReferenceQueue());
25          queues.add(new LockedQueue());
26          queues.add(new PhasedQueue());
27          // queues.add(new LinkedQueue());
28          System.out.println(new PerformanceAnalysis().runTest(queues));
29      }
30  
31      public String runTest(final List<Q> queues) {
32          final int warm = 100000, run = 100000000;
33          // warm up
34          for (final Q q : queues) {
35              runTest(q, warm);
36          }
37          final StringBuilder builder = new StringBuilder();
38          for (final Q q : queues) {
39              builder.append(q).append('\t');
40              builder.append(runTest(q, run)).append('\r');
41          }
42          return builder.toString();
43      }
44  
45      public long runTest(final Q q, final int iterations) {
46          final Thread reader = new Thread(new Runnable() {
47              public void run() {
48                  try {
49                      while (true) {
50                          int j = 0;
51                          j += q.take();
52                      }
53                  } catch (final InterruptedException e) {}
54              }
55          });
56          reader.start();
57          long start;
58          try {
59              start = System.currentTimeMillis();
60              for (int i = 0; i < 100000000; i++) {
61                  q.put(i);
62              }
63              return System.currentTimeMillis() - start;
64          } finally {
65              reader.interrupt();
66              q.clear();
67          }
68      }
69  
70      interface Q {
71          void put(final int i);
72  
73          Integer take() throws InterruptedException;
74  
75          void clear();
76      }
77  
78      static class UnboundQueue implements Q {
79          private final BlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>();
80  
81          public void put(final int i) {
82              try {
83                  queue.put(i);
84              } catch (final InterruptedException e) {
85                  throw new RuntimeException(e);
86              }
87          }
88  
89          public Integer take() throws InterruptedException {
90              return queue.take();
91          }
92  
93          public void clear() {
94              queue.clear();
95          }
96      }
97  
98      static class LruQueue implements Q {
99          private final LRUBlockingQueue<Integer> queue = new LRUBlockingQueue<Integer>(1);
100 
101         public void put(final int i) {
102             try {
103                 queue.put(i);
104             } catch (final InterruptedException e) {
105                 throw new RuntimeException(e);
106             }
107         }
108 
109         public Integer take() throws InterruptedException {
110             return queue.take();
111         }
112 
113         public void clear() {
114             queue.clear();
115         }
116     }
117 
118     static class LinkedQueue implements Q {
119         private final LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>(1);
120 
121         public void put(final int i) {
122             try {
123                 queue.clear();
124                 queue.put(i);
125             } catch (final InterruptedException e) {
126                 throw new RuntimeException(e);
127             }
128         }
129 
130         public Integer take() throws InterruptedException {
131             return queue.take();
132         }
133 
134         public void clear() {
135             queue.clear();
136         }
137     }
138 
139     static class PhasedQueue implements Q {
140         private final PhasedBlockingReference<Integer> queue = new PhasedBlockingReference<Integer>();
141 
142         public void put(final int i) {
143             queue.set(i);
144         }
145 
146         public Integer take() throws InterruptedException {
147             return queue.take();
148         }
149 
150         public void clear() {
151         // queue.set(null);
152         }
153     }
154 
155     static class RefQueue implements Q {
156         private final BlockingReference<Integer> ref = new BlockingReference<Integer>();
157 
158         public void put(final int i) {
159             ref.set(i);
160         }
161 
162         public Integer take() throws InterruptedException {
163             return ref.take();
164         }
165 
166         public void clear() {
167             ref.clear();
168         }
169     }
170 
171     static class LockedQueue implements Q {
172         private final LockingReference<Integer> queue = new LockingReference<Integer>();
173 
174         public void put(final int i) {
175             queue.set(i);
176         }
177 
178         public Integer take() throws InterruptedException {
179             return queue.take();
180         }
181 
182         public void clear() {
183             queue.clear();
184         }
185     }
186 
187     static class SyncQueue implements Q {
188         private final LinkedList<Integer> queue = new LinkedList<Integer>();
189 
190         public void put(final int i) {
191             synchronized (queue) {
192                 queue.clear();
193                 queue.add(i);
194                 queue.notifyAll();
195             }
196         }
197 
198         public Integer take() throws InterruptedException {
199             synchronized (queue) {
200                 while (queue.isEmpty()) {
201                     queue.wait();
202                 }
203                 return queue.getFirst();
204             }
205         }
206 
207         public void clear() {
208             synchronized (queue) {
209                 queue.clear();
210                 queue.notifyAll();
211             }
212         }
213     }
214 
215     static class SRSWBlockingQueue implements Q {
216         final BooleanLatch latch = new BooleanLatch();
217         final AtomicReference<LinkedList<Integer>> queue = new AtomicReference<LinkedList<Integer>>();
218 
219         public void put(final int i) {
220             LinkedList<Integer> list;
221             do {
222                 list = queue.getAndSet(null);
223                 if (list == null) {
224                     list = new LinkedList<Integer>();
225                 }
226                 list.add(i);
227             } while (!queue.compareAndSet(null, list));
228             latch.release();
229         }
230 
231         public Integer take() throws InterruptedException {
232             LinkedList<Integer> list;
233             do {
234                 latch.await();
235                 list = queue.getAndSet(null);
236             } while (list == null);
237             return list.getFirst();
238         }
239 
240         public void clear() {
241             queue.set(null);
242         }
243     }
244 
245     static class BlockingReferenceQueue implements Q {
246         private final BlockingReference<Integer> ref = new BlockingReference<Integer>();
247 
248         public void put(final int i) {
249             ref.set(i);
250         }
251 
252         public Integer take() throws InterruptedException {
253             return ref.take();
254         }
255 
256         public void clear() {
257             ref.clear();
258         }
259     }
260 }
261 
262 class LockingReference<V> {
263     private V ref;
264     private final Lock lock = new ReentrantLock();
265     private final Condition notEmpty = lock.newCondition();
266 
267     /**
268      * Takes the current element if it is not null and replaces it with null. If
269      * the current element is null then wait until it becomes non-null.
270      * <p>
271      * If the current thread:
272      * <ul>
273      * <li>has its interrupted status set on entry to this method; or
274      * <li>is {@linkplain Thread#interrupt interrupted} while waiting,
275      * </ul>
276      * then {@link InterruptedException} is thrown and the current thread's
277      * interrupted status is cleared.
278      * 
279      * @return the current element
280      * @throws InterruptedException if the current thread is interrupted while
281      * waiting
282      */
283     public V take() throws InterruptedException {
284         lock.lock();
285         try {
286             while (ref == null) {
287                 notEmpty.await();
288             }
289             final V result = ref;
290             ref = null;
291             return result;
292         } finally {
293             lock.unlock();
294         }
295     }
296 
297     /**
298      * Set the value of this reference. This method is lock-free. A thread
299      * waiting in {@link #take()} or {@link #take(long, TimeUnit)} will be
300      * released and given this value.
301      * 
302      * @param value the new value.
303      */
304     public void set(final V value) {
305         notNull("value", value);
306         lock.lock();
307         try {
308             ref = value;
309             notEmpty.signalAll();
310         } finally {
311             lock.unlock();
312         }
313     }
314 
315     public boolean isNull() {
316         return ref == null;
317     }
318 
319     void clear() {
320         lock.lock();
321         try {
322             ref = null;
323         } finally {
324             lock.unlock();
325         }
326     }
327 }
328 
329 class LRUBlockingQueue<E> extends LinkedBlockingQueue<E> {
330     private static final long serialVersionUID = -6070900096160951474L;
331 
332     public LRUBlockingQueue(final int capacity) {
333         super(capacity);
334     }
335 
336     @Override
337     public boolean offer(final E o) {
338         while (remainingCapacity() == 0) {
339             remove(peek());
340         }
341         return super.offer(o);
342     };
343 
344     @Override
345     public void put(final E o) throws InterruptedException {
346         while (remainingCapacity() == 0) {
347             remove(peek());
348         }
349         super.put(o);
350     };
351 }
352 
353 class PhasedBlockingReference<V> {
354     private final AtomicReference<V> ref = new AtomicReference<V>();
355     private final PhasedLatch latch = new PhasedLatch();
356 
357     /**
358      * Takes the current element if it is not null and replaces it with null. If
359      * the current element is null then wait until it becomes non-null.
360      * <p>
361      * If the current thread:
362      * <ul>
363      * <li>has its interrupted status set on entry to this method; or
364      * <li>is {@linkplain Thread#interrupt interrupted} while waiting,
365      * </ul>
366      * then {@link InterruptedException} is thrown and the current thread's
367      * interrupted status is cleared.
368      * 
369      * @return the current element
370      * @throws InterruptedException if the current thread is interrupted while
371      * waiting
372      */
373     public V take() throws InterruptedException {
374         while (true) {
375             latch.await();
376             final V result = ref.getAndSet(null);
377             if (result != null) {
378                 return result;
379             }
380         }
381     }
382 
383     /**
384      * Takes the current element if it is not null and replaces it with null. If
385      * the current element is null then wait until it becomes non-null. The
386      * method will throw a {@link TimeoutException} if the timeout is reached
387      * before an element becomes available.
388      * <p>
389      * If the current thread:
390      * <ul>
391      * <li>has its interrupted status set on entry to this method; or
392      * <li>is {@linkplain Thread#interrupt interrupted} while waiting,
393      * </ul>
394      * then {@link InterruptedException} is thrown and the current thread's
395      * interrupted status is cleared.
396      * 
397      * @param timeout the maximum time to wait
398      * @param unit the time unit of the {@code timeout} argument
399      * @return the current element
400      * @throws InterruptedException if the current thread is interrupted while
401      * waiting
402      * @throws TimeoutException if the timeout is reached without another thread
403      * having called {@link #set(Object)}.
404      */
405     public V take(final long timeout, final TimeUnit unit) throws TimeoutException, InterruptedException {
406         if (!latch.await(timeout, unit)) {
407             throw new TimeoutException();
408         }
409         return ref.getAndSet(null);
410     }
411 
412     /**
413      * Set the value of this reference. This method is lock-free. A thread
414      * waiting in {@link #take()} or {@link #take(long, TimeUnit)} will be
415      * released and given this value.
416      * 
417      * @param value the new value.
418      */
419     public void set(final V value) {
420         notNull("value", value);
421         internalSet(value);
422     }
423 
424     void clear() {
425         internalSet(null);
426     }
427 
428     private void internalSet(final V value) {
429         ref.set(value);
430         latch.release();
431     }
432 
433     public boolean isNull() {
434         return ref.get() == null;
435     }
436 }