1 package com.atlassian.util.concurrent;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertTrue;
6
7 import com.atlassian.util.concurrent.ExceptionPolicy.Policies;
8
9 import org.junit.Test;
10
11 import com.google.common.collect.ImmutableList;
12 import com.google.common.collect.Lists;
13
14 import java.util.Iterator;
15 import java.util.List;
16 import java.util.concurrent.Callable;
17 import java.util.concurrent.CompletionService;
18 import java.util.concurrent.Executor;
19 import java.util.concurrent.Future;
20 import java.util.concurrent.TimeUnit;
21 import java.util.concurrent.atomic.AtomicInteger;
22 import java.util.concurrent.atomic.AtomicReference;
23
24 public class AsyncCompleterTest {
25 @Test
26 public void reverseOrder() {
27 final AsyncCompleter queue = new AsyncCompleter.Builder(new Executor() {
28 private final AtomicReference<Runnable> first = new AtomicReference<Runnable>();
29
30 public void execute(final Runnable command) {
31 if (first.get() == null) {
32 first.set(command);
33 return;
34 }
35 command.run();
36 first.get().run();
37 }
38 }).build();
39 final Iterator<Integer> queued = queue.invokeAll(ImmutableList.of(callable(1), callable(2))).iterator();
40 assertEquals(2, queued.next().intValue());
41 assertEquals(1, queued.next().intValue());
42 assertFalse(queued.hasNext());
43 }
44
45 @Test
46 public void order() {
47 final AsyncCompleter completion = new AsyncCompleter.Builder(new CallerExecutor()).build();
48 final Iterator<Integer> queued = completion.invokeAll(ImmutableList.of(callable(1), callable(2))).iterator();
49 assertEquals(1, queued.next().intValue());
50 assertEquals(2, queued.next().intValue());
51 assertFalse(queued.hasNext());
52 }
53
54 @Test
55 public void singleExecute() {
56 final AtomicInteger count = new AtomicInteger();
57 final AsyncCompleter completion = new AsyncCompleter.Builder(new Executor() {
58 public void execute(final Runnable command) {
59 count.getAndIncrement();
60 command.run();
61 }
62 }).build();
63 final Iterable<Integer> queued = completion.invokeAll(ImmutableList.of(callable(1)));
64 assertEquals(1, queued.iterator().next().intValue());
65 assertEquals(1, queued.iterator().next().intValue());
66 assertEquals(1, queued.iterator().next().intValue());
67 assertEquals(1, count.get());
68 }
69
70 @Test
71 public void nullLastFiltered() {
72 final AsyncCompleter completion = new AsyncCompleter.Builder(new CallerExecutor()).build();
73 final ImmutableList<Callable<Integer>> input = ImmutableList.of(callable(1), callable((Integer) null));
74 final Iterator<Integer> queued = completion.invokeAll(input).iterator();
75 assertEquals(1, queued.next().intValue());
76 assertFalse(queued.hasNext());
77 }
78
79 @Test
80 public void nullFirstFiltered() {
81 final AsyncCompleter completion = new AsyncCompleter.Builder(new CallerExecutor()).build();
82 final ImmutableList<Callable<Integer>> input = ImmutableList.of(callable((Integer) null), callable(2));
83 final Iterator<Integer> queued = completion.invokeAll(input).iterator();
84 assertEquals(2, queued.next().intValue());
85 assertFalse(queued.hasNext());
86 }
87
88 @Test
89 public void limitedExecute() {
90 final List<Runnable> jobs = Lists.newArrayList();
91 final AsyncCompleter completion = new AsyncCompleter.Builder(new Executor() {
92 public void execute(final Runnable command) {
93 jobs.add(command);
94 }
95 }).handleExceptions(Policies.THROW).limitParallelExecutionTo(1);
96 final Iterable<Integer> queued = completion.invokeAll(ImmutableList.of(callable(1), callable(2), callable(3)));
97
98 final Iterator<Integer> iterator = queued.iterator();
99 assertEquals(1, jobs.size());
100
101 jobs.get(0).run();
102 assertEquals(2, jobs.size());
103
104 assertEquals(1, iterator.next().intValue());
105
106 jobs.get(1).run();
107 assertEquals(3, jobs.size());
108 assertEquals(2, iterator.next().intValue());
109 jobs.get(2).run();
110 assertEquals(3, jobs.size());
111 assertEquals(3, iterator.next().intValue());
112 assertFalse(iterator.hasNext());
113 }
114
115 @Test
116 public void callableCompletedBeforeTimeout() {
117 final AsyncCompleter completion = new AsyncCompleter.Builder(new CallerExecutor()).build();
118 final ImmutableList<Callable<Integer>> input = ImmutableList.of(sleeper(1, 2));
119 final Integer value = completion.invokeAll(input, 1, TimeUnit.NANOSECONDS).iterator().next();
120 assertEquals(1, value.intValue());
121 }
122
123 @Test(expected = RuntimeTimeoutException.class)
124 public void callableTimedOutBeforeCompleting() {
125 final AsyncCompleter completion = new AsyncCompleter.Builder(new NaiveExecutor()).build();
126
127 completion.invokeAll(ImmutableList.of(sleeper(1, 10)), 1, TimeUnit.NANOSECONDS).iterator().next();
128 }
129
130 @Test
131 public void invocationRegistersWithAccessor() throws Exception {
132 final AsyncCompleter completion = new AsyncCompleter.Builder(new CallerExecutor()).build();
133 final AtomicReference<Future<String>> ref = new AtomicReference<Future<String>>();
134 completion.invokeAllTasks(ImmutableList.of(callable("blah!")), new AsyncCompleter.Accessor<String>() {
135 @Override
136 public String apply(final CompletionService<String> input) {
137 try {
138 return input.poll().get();
139 } catch (final Exception e) {
140 throw new AssertionError(e);
141 }
142 }
143
144 @Override
145 public void register(final Future<String> f) {
146 assertTrue(ref.compareAndSet(null, f));
147 }
148 });
149 assertEquals("blah!", ref.get().get());
150 }
151
152 <T> Callable<T> callable(final T input) {
153 return new Callable<T>() {
154 public T call() throws Exception {
155 return input;
156 }
157 };
158 }
159
160 <T> Callable<T> sleeper(final T input, final int sleep) {
161 return new Callable<T>() {
162 public T call() throws Exception {
163 Thread.sleep(sleep);
164 return input;
165 }
166 };
167 }
168
169 static class NaiveExecutor implements Executor {
170 public void execute(final Runnable command) {
171 new Thread(command).start();
172 }
173 }
174
175 static class CallerExecutor implements Executor {
176 public void execute(final Runnable command) {
177 command.run();
178 }
179 }
180 }