1   package com.atlassian.util.concurrent;
2   
3   import static com.atlassian.util.concurrent.Assertions.notNull;
4   
5   import java.util.ArrayList;
6   import java.util.LinkedList;
7   import java.util.List;
8   import java.util.concurrent.BlockingQueue;
9   import java.util.concurrent.LinkedBlockingQueue;
10  import java.util.concurrent.TimeUnit;
11  import java.util.concurrent.TimeoutException;
12  import java.util.concurrent.atomic.AtomicReference;
13  import java.util.concurrent.locks.Condition;
14  import java.util.concurrent.locks.Lock;
15  import java.util.concurrent.locks.ReentrantLock;
16  
17  import org.junit.Test;
18  
19  public class PerformanceTest {
20  
21      @Test
22      public void test() {}
23  
24      public static void main(final String[] args) {
25          final List<Q> queues = new ArrayList<Q>();
26          queues.add(new SRSWBlockingQueue());
27          queues.add(new SyncQueue());
28          queues.add(new RefQueue());
29          queues.add(new BlockingReferenceQueue());
30          queues.add(new LockedQueue());
31          queues.add(new PhasedQueue());
32          // queues.add(new LinkedQueue());
33          System.out.println(new PerformanceTest().runTest(queues));
34      }
35  
36      public String runTest(final List<Q> queues) {
37          final int warm = 100000, run = 100000000;
38          // warm up
39          for (final Q q : queues) {
40              runTest(q, warm);
41          }
42          final StringBuilder builder = new StringBuilder();
43          for (final Q q : queues) {
44              builder.append(q).append('\t');
45              builder.append(runTest(q, run)).append('\r');
46          }
47          return builder.toString();
48      }
49  
50      public long runTest(final Q q, final int iterations) {
51          final Thread reader = new Thread(new Runnable() {
52              public void run() {
53                  try {
54                      while (true) {
55                          int j = 0;
56                          j += q.take();
57                      }
58                  } catch (final InterruptedException e) {}
59              }
60          });
61          reader.start();
62          long start;
63          try {
64              start = System.currentTimeMillis();
65              for (int i = 0; i < 100000000; i++) {
66                  q.put(i);
67              }
68              return System.currentTimeMillis() - start;
69          } finally {
70              reader.interrupt();
71              q.clear();
72          }
73      }
74  
75      interface Q {
76          void put(final int i);
77  
78          Integer take() throws InterruptedException;
79  
80          void clear();
81      }
82  
83      static class UnboundQueue implements Q {
84          private final BlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>();
85  
86          public void put(final int i) {
87              try {
88                  queue.put(i);
89              } catch (final InterruptedException e) {
90                  throw new RuntimeException(e);
91              }
92          }
93  
94          public Integer take() throws InterruptedException {
95              return queue.take();
96          }
97  
98          public void clear() {
99              queue.clear();
100         }
101     }
102 
103     static class LruQueue implements Q {
104         private final LRUBlockingQueue<Integer> queue = new LRUBlockingQueue<Integer>(1);
105 
106         public void put(final int i) {
107             try {
108                 queue.put(i);
109             } catch (final InterruptedException e) {
110                 throw new RuntimeException(e);
111             }
112         }
113 
114         public Integer take() throws InterruptedException {
115             return queue.take();
116         }
117 
118         public void clear() {
119             queue.clear();
120         }
121     }
122 
123     static class LinkedQueue implements Q {
124         private final LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>(1);
125 
126         public void put(final int i) {
127             try {
128                 queue.clear();
129                 queue.put(i);
130             } catch (final InterruptedException e) {
131                 throw new RuntimeException(e);
132             }
133         }
134 
135         public Integer take() throws InterruptedException {
136             return queue.take();
137         }
138 
139         public void clear() {
140             queue.clear();
141         }
142     }
143 
144     static class PhasedQueue implements Q {
145         private final PhasedBlockingReference<Integer> queue = new PhasedBlockingReference<Integer>();
146 
147         public void put(final int i) {
148             queue.set(i);
149         }
150 
151         public Integer take() throws InterruptedException {
152             return queue.take();
153         }
154 
155         public void clear() {
156         // queue.set(null);
157         }
158     }
159 
160     static class RefQueue implements Q {
161         private final BlockingReference<Integer> ref = BlockingReference.newSRSW();
162 
163         public void put(final int i) {
164             ref.set(i);
165         }
166 
167         public Integer take() throws InterruptedException {
168             return ref.take();
169         }
170 
171         public void clear() {
172             ref.clear();
173         }
174     }
175 
176     static class LockedQueue implements Q {
177         private final LockingReference<Integer> queue = new LockingReference<Integer>();
178 
179         public void put(final int i) {
180             queue.set(i);
181         }
182 
183         public Integer take() throws InterruptedException {
184             return queue.take();
185         }
186 
187         public void clear() {
188             queue.clear();
189         }
190     }
191 
192     static class SyncQueue implements Q {
193         private final LinkedList<Integer> queue = new LinkedList<Integer>();
194 
195         public void put(final int i) {
196             synchronized (queue) {
197                 queue.clear();
198                 queue.add(i);
199                 queue.notifyAll();
200             }
201         }
202 
203         public Integer take() throws InterruptedException {
204             synchronized (queue) {
205                 while (queue.isEmpty()) {
206                     queue.wait();
207                 }
208                 return queue.getFirst();
209             }
210         }
211 
212         public void clear() {
213             synchronized (queue) {
214                 queue.clear();
215                 queue.notifyAll();
216             }
217         }
218     }
219 
220     static class SRSWBlockingQueue implements Q {
221         final BooleanLatch latch = new BooleanLatch();
222         final AtomicReference<LinkedList<Integer>> queue = new AtomicReference<LinkedList<Integer>>();
223 
224         public void put(final int i) {
225             LinkedList<Integer> list;
226             do {
227                 list = queue.getAndSet(null);
228                 if (list == null) {
229                     list = new LinkedList<Integer>();
230                 }
231                 list.add(i);
232             } while (!queue.compareAndSet(null, list));
233             latch.release();
234         }
235 
236         public Integer take() throws InterruptedException {
237             LinkedList<Integer> list;
238             do {
239                 latch.await();
240                 list = queue.getAndSet(null);
241             } while (list == null);
242             return list.getFirst();
243         }
244 
245         public void clear() {
246             queue.set(null);
247         }
248     }
249 
250     static class BlockingReferenceQueue implements Q {
251         private final BlockingReference<Integer> ref = BlockingReference.newSRSW();
252 
253         public void put(final int i) {
254             ref.set(i);
255         }
256 
257         public Integer take() throws InterruptedException {
258             return ref.take();
259         }
260 
261         public void clear() {
262             ref.clear();
263         }
264     }
265 }
266 
267 class LockingReference<V> {
268     private V ref;
269     private final Lock lock = new ReentrantLock();
270     private final Condition notEmpty = lock.newCondition();
271 
272     /**
273      * Takes the current element if it is not null and replaces it with null. If
274      * the current element is null then wait until it becomes non-null.
275      * <p>
276      * If the current thread:
277      * <ul>
278      * <li>has its interrupted status set on entry to this method; or
279      * <li>is {@linkplain Thread#interrupt interrupted} while waiting,
280      * </ul>
281      * then {@link InterruptedException} is thrown and the current thread's
282      * interrupted status is cleared.
283      * 
284      * @return the current element
285      * @throws InterruptedException if the current thread is interrupted while
286      * waiting
287      */
288     public V take() throws InterruptedException {
289         lock.lock();
290         try {
291             while (ref == null) {
292                 notEmpty.await();
293             }
294             final V result = ref;
295             ref = null;
296             return result;
297         } finally {
298             lock.unlock();
299         }
300     }
301 
302     /**
303      * Set the value of this reference. This method is lock-free. A thread
304      * waiting in {@link #take()} or {@link #take(long, TimeUnit)} will be
305      * released and given this value.
306      * 
307      * @param value the new value.
308      */
309     public void set(final V value) {
310         notNull("value", value);
311         lock.lock();
312         try {
313             ref = value;
314             notEmpty.signalAll();
315         } finally {
316             lock.unlock();
317         }
318     }
319 
320     public boolean isNull() {
321         return ref == null;
322     }
323 
324     void clear() {
325         lock.lock();
326         try {
327             ref = null;
328         } finally {
329             lock.unlock();
330         }
331     }
332 }
333 
334 class LRUBlockingQueue<E> extends LinkedBlockingQueue<E> {
335     private static final long serialVersionUID = -6070900096160951474L;
336 
337     public LRUBlockingQueue(final int capacity) {
338         super(capacity);
339     }
340 
341     @Override
342     public boolean offer(final E o) {
343         while (remainingCapacity() == 0) {
344             remove(peek());
345         }
346         return super.offer(o);
347     };
348 
349     @Override
350     public void put(final E o) throws InterruptedException {
351         while (remainingCapacity() == 0) {
352             remove(peek());
353         }
354         super.put(o);
355     };
356 }
357 
358 class PhasedBlockingReference<V> {
359     private final AtomicReference<V> ref = new AtomicReference<V>();
360     private final PhasedLatch latch = new PhasedLatch();
361 
362     /**
363      * Takes the current element if it is not null and replaces it with null. If
364      * the current element is null then wait until it becomes non-null.
365      * <p>
366      * If the current thread:
367      * <ul>
368      * <li>has its interrupted status set on entry to this method; or
369      * <li>is {@linkplain Thread#interrupt interrupted} while waiting,
370      * </ul>
371      * then {@link InterruptedException} is thrown and the current thread's
372      * interrupted status is cleared.
373      * 
374      * @return the current element
375      * @throws InterruptedException if the current thread is interrupted while
376      * waiting
377      */
378     public V take() throws InterruptedException {
379         while (true) {
380             latch.await();
381             final V result = ref.getAndSet(null);
382             if (result != null) {
383                 return result;
384             }
385         }
386     }
387 
388     /**
389      * Takes the current element if it is not null and replaces it with null. If
390      * the current element is null then wait until it becomes non-null. The
391      * method will throw a {@link TimeoutException} if the timeout is reached
392      * before an element becomes available.
393      * <p>
394      * If the current thread:
395      * <ul>
396      * <li>has its interrupted status set on entry to this method; or
397      * <li>is {@linkplain Thread#interrupt interrupted} while waiting,
398      * </ul>
399      * then {@link InterruptedException} is thrown and the current thread's
400      * interrupted status is cleared.
401      * 
402      * @param timeout the maximum time to wait
403      * @param unit the time unit of the {@code timeout} argument
404      * @return the current element
405      * @throws InterruptedException if the current thread is interrupted while
406      * waiting
407      * @throws TimeoutException if the timeout is reached without another thread
408      * having called {@link #set(Object)}.
409      */
410     public V take(final long timeout, final TimeUnit unit) throws TimeoutException, InterruptedException {
411         if (!latch.await(timeout, unit)) {
412             throw new TimedOutException(timeout, unit);
413         }
414         return ref.getAndSet(null);
415     }
416 
417     /**
418      * Set the value of this reference. This method is lock-free. A thread
419      * waiting in {@link #take()} or {@link #take(long, TimeUnit)} will be
420      * released and given this value.
421      * 
422      * @param value the new value.
423      */
424     public void set(final V value) {
425         notNull("value", value);
426         internalSet(value);
427     }
428 
429     void clear() {
430         internalSet(null);
431     }
432 
433     private void internalSet(final V value) {
434         ref.set(value);
435         latch.release();
436     }
437 
438     public boolean isNull() {
439         return ref.get() == null;
440     }
441 }