1   package com.atlassian.util.concurrent;
2   
3   import static org.junit.Assert.assertEquals;
4   import static org.junit.Assert.assertFalse;
5   import static org.junit.Assert.assertTrue;
6   
7   import com.atlassian.util.concurrent.ExceptionPolicy.Policies;
8   
9   import org.junit.Test;
10  
11  import com.google.common.collect.ImmutableList;
12  import com.google.common.collect.Lists;
13  
14  import java.util.Iterator;
15  import java.util.List;
16  import java.util.concurrent.Callable;
17  import java.util.concurrent.CompletionService;
18  import java.util.concurrent.Executor;
19  import java.util.concurrent.Future;
20  import java.util.concurrent.TimeUnit;
21  import java.util.concurrent.atomic.AtomicInteger;
22  import java.util.concurrent.atomic.AtomicReference;
23  
24  public class AsyncCompleterTest {
25      @Test
26      public void reverseOrder() {
27          final AsyncCompleter queue = new AsyncCompleter.Builder(new Executor() {
28              private final AtomicReference<Runnable> first = new AtomicReference<Runnable>();
29  
30              public void execute(final Runnable command) {
31                  if (first.get() == null) {
32                      first.set(command);
33                      return;
34                  }
35                  command.run();
36                  first.get().run();
37              }
38          }).build();
39          final Iterator<Integer> queued = queue.invokeAll(ImmutableList.of(callable(1), callable(2))).iterator();
40          assertEquals(2, queued.next().intValue());
41          assertEquals(1, queued.next().intValue());
42          assertFalse(queued.hasNext());
43      }
44  
45      @Test
46      public void order() {
47          final AsyncCompleter completion = new AsyncCompleter.Builder(new CallerExecutor()).build();
48          final Iterator<Integer> queued = completion.invokeAll(ImmutableList.of(callable(1), callable(2))).iterator();
49          assertEquals(1, queued.next().intValue());
50          assertEquals(2, queued.next().intValue());
51          assertFalse(queued.hasNext());
52      }
53  
54      @Test
55      public void singleExecute() {
56          final AtomicInteger count = new AtomicInteger();
57          final AsyncCompleter completion = new AsyncCompleter.Builder(new Executor() {
58              public void execute(final Runnable command) {
59                  count.getAndIncrement();
60                  command.run();
61              }
62          }).build();
63          final Iterable<Integer> queued = completion.invokeAll(ImmutableList.of(callable(1)));
64          assertEquals(1, queued.iterator().next().intValue());
65          assertEquals(1, queued.iterator().next().intValue());
66          assertEquals(1, queued.iterator().next().intValue());
67          assertEquals(1, count.get());
68      }
69  
70      @Test
71      public void nullLastFiltered() {
72          final AsyncCompleter completion = new AsyncCompleter.Builder(new CallerExecutor()).build();
73          final ImmutableList<Callable<Integer>> input = ImmutableList.of(callable(1), callable((Integer) null));
74          final Iterator<Integer> queued = completion.invokeAll(input).iterator();
75          assertEquals(1, queued.next().intValue());
76          assertFalse(queued.hasNext());
77      }
78  
79      @Test
80      public void nullFirstFiltered() {
81          final AsyncCompleter completion = new AsyncCompleter.Builder(new CallerExecutor()).build();
82          final ImmutableList<Callable<Integer>> input = ImmutableList.of(callable((Integer) null), callable(2));
83          final Iterator<Integer> queued = completion.invokeAll(input).iterator();
84          assertEquals(2, queued.next().intValue());
85          assertFalse(queued.hasNext());
86      }
87  
88      @Test
89      public void limitedExecute() {
90          final List<Runnable> jobs = Lists.newArrayList();
91          final AsyncCompleter completion = new AsyncCompleter.Builder(new Executor() {
92              public void execute(final Runnable command) {
93                  jobs.add(command);
94              }
95          }).handleExceptions(Policies.THROW).limitParallelExecutionTo(1);
96          final Iterable<Integer> queued = completion.invokeAll(ImmutableList.of(callable(1), callable(2), callable(3)));
97  
98          final Iterator<Integer> iterator = queued.iterator();
99          assertEquals(1, jobs.size());
100         // can't test that hasNext() will block, but it should
101         jobs.get(0).run();
102         assertEquals(2, jobs.size());
103         // can test that next() will not block anymore
104         assertEquals(1, iterator.next().intValue());
105 
106         jobs.get(1).run();
107         assertEquals(3, jobs.size());
108         assertEquals(2, iterator.next().intValue());
109         jobs.get(2).run();
110         assertEquals(3, jobs.size());
111         assertEquals(3, iterator.next().intValue());
112         assertFalse(iterator.hasNext());
113     }
114 
115     @Test
116     public void callableCompletedBeforeTimeout() {
117         final AsyncCompleter completion = new AsyncCompleter.Builder(new CallerExecutor()).build();
118         final ImmutableList<Callable<Integer>> input = ImmutableList.of(sleeper(1, 2));
119         final Integer value = completion.invokeAll(input, 1, TimeUnit.NANOSECONDS).iterator().next();
120         assertEquals(1, value.intValue());
121     }
122 
123     @Test(expected = RuntimeTimeoutException.class)
124     public void callableTimedOutBeforeCompleting() {
125         final AsyncCompleter completion = new AsyncCompleter.Builder(new NaiveExecutor()).build();
126         // should reach timeout before completing
127         completion.invokeAll(ImmutableList.of(sleeper(1, 10)), 1, TimeUnit.NANOSECONDS).iterator().next();
128     }
129 
130     @Test
131     public void invocationRegistersWithAccessor() throws Exception {
132         final AsyncCompleter completion = new AsyncCompleter.Builder(new CallerExecutor()).build();
133         final AtomicReference<Future<String>> ref = new AtomicReference<Future<String>>();
134         completion.invokeAllTasks(ImmutableList.of(callable("blah!")), new AsyncCompleter.Accessor<String>() {
135             @Override
136             public String apply(final CompletionService<String> input) {
137                 try {
138                     return input.poll().get();
139                 } catch (final Exception e) {
140                     throw new AssertionError(e);
141                 }
142             }
143 
144             @Override
145             public void register(final Future<String> f) {
146                 assertTrue(ref.compareAndSet(null, f));
147             }
148         });
149         assertEquals("blah!", ref.get().get());
150     }
151 
152     <T> Callable<T> callable(final T input) {
153         return new Callable<T>() {
154             public T call() throws Exception {
155                 return input;
156             }
157         };
158     }
159 
160     <T> Callable<T> sleeper(final T input, final int sleep) {
161         return new Callable<T>() {
162             public T call() throws Exception {
163                 Thread.sleep(sleep);
164                 return input;
165             }
166         };
167     }
168 
169     static class NaiveExecutor implements Executor {
170         public void execute(final Runnable command) {
171             new Thread(command).start();
172         }
173     }
174 
175     static class CallerExecutor implements Executor {
176         public void execute(final Runnable command) {
177             command.run();
178         }
179     }
180 }