1 package com.atlassian.sal.core.executor;
2
3 import com.atlassian.sal.api.executor.ThreadLocalContextManager;
4 import com.atlassian.sal.api.executor.ThreadLocalDelegateExecutorFactory;
5 import com.google.common.collect.ImmutableSet;
6 import org.junit.Test;
7
8 import java.util.Collection;
9 import java.util.concurrent.Callable;
10 import java.util.concurrent.ExecutionException;
11 import java.util.concurrent.Executors;
12 import java.util.concurrent.Future;
13 import java.util.concurrent.TimeUnit;
14 import java.util.concurrent.TimeoutException;
15 import java.util.concurrent.atomic.AtomicReference;
16
17 import static org.junit.Assert.assertEquals;
18
19
20
21
22
23
24 public class TestThreadLocalDelegateExecutorService {
25 private static final Object OUTER_THREAD_LOCAL_CONTEXT = new Object();
26
27 private final ThreadLocalContextManager<Object> manager = threadLocalContextManager();
28 private final ThreadLocalDelegateExecutorFactory executorFactory = new DefaultThreadLocalDelegateExecutorFactory<>(manager);
29
30 @Test
31 public void testSubmitCallable() throws ExecutionException, InterruptedException {
32 final ThreadLocalDelegateExecutorService service = threadLocalDelegateExecutorService();
33 service.submit(assertingCallable()).get();
34 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
35 }
36
37 @Test
38 public void testSubmitRunnableWithResult() throws ExecutionException, InterruptedException {
39 final ThreadLocalDelegateExecutorService service = threadLocalDelegateExecutorService();
40 service.submit(assertingRunnable(), null).get();
41 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
42 }
43
44 @Test
45 public void testSubmitRunnable() throws ExecutionException, InterruptedException {
46 final ThreadLocalDelegateExecutorService service = threadLocalDelegateExecutorService();
47 service.submit(assertingRunnable()).get();
48 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
49 }
50
51 @Test
52 public void testInvokeAll() throws ExecutionException, InterruptedException {
53 final ThreadLocalDelegateExecutorService service = threadLocalDelegateExecutorService();
54 getAllFutures(service.invokeAll(assertingCallables()));
55 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
56 }
57
58 @Test
59 public void testInvokeAllWithTimeout() throws ExecutionException, InterruptedException {
60 final ThreadLocalDelegateExecutorService service = threadLocalDelegateExecutorService();
61 getAllFutures(service.invokeAll(assertingCallables(), 60, TimeUnit.SECONDS));
62 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
63 }
64
65 @Test
66 public void testInvokeAny() throws ExecutionException, InterruptedException {
67 final ThreadLocalDelegateExecutorService service = threadLocalDelegateExecutorService();
68 service.invokeAny(assertingCallables());
69 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
70 }
71
72 @Test
73 public void testInvokeAnyWithTimeout() throws ExecutionException, InterruptedException, TimeoutException {
74 final ThreadLocalDelegateExecutorService service = threadLocalDelegateExecutorService();
75 service.invokeAny(assertingCallables(), 60, TimeUnit.SECONDS);
76 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
77 }
78
79 @Test
80 public void testExecute() throws Exception {
81
82
83
84 final ThreadLocalDelegateExecutorService service = threadLocalDelegateExecutorService();
85 final AtomicReference<AssertionError> assertionErrorReference = new AtomicReference<>();
86
87 service.execute(assertingRunnableWithErrorReference(assertionErrorReference));
88 service.shutdown();
89 service.awaitTermination(60, TimeUnit.SECONDS);
90
91 final AssertionError assertionError = assertionErrorReference.get();
92 if (assertionError != null)
93 throw assertionError;
94
95 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
96 }
97
98 private ThreadLocalContextManager<Object> threadLocalContextManager() {
99 ThreadLocalContextManager<Object> manager = new StubThreadLocalContextManager();
100 manager.setThreadLocalContext(OUTER_THREAD_LOCAL_CONTEXT);
101 return manager;
102 }
103
104 private ThreadLocalDelegateExecutorService threadLocalDelegateExecutorService() {
105 return new ThreadLocalDelegateExecutorService(Executors.newSingleThreadExecutor(), executorFactory);
106 }
107
108 private Runnable assertingRunnable() {
109 return new Runnable() {
110 @Override
111 public void run() {
112 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
113 }
114 };
115 }
116
117 private Runnable assertingRunnableWithErrorReference(final AtomicReference<AssertionError> assertionErrorReference) {
118 return new Runnable() {
119 @Override
120 public void run() {
121 try {
122 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
123 } catch (AssertionError e) {
124 assertionErrorReference.set(e);
125 }
126 }
127 };
128 }
129
130 private Callable<Object> assertingCallable() {
131 return new Callable<Object>() {
132 @Override
133 public Object call() throws Exception {
134 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
135 return null;
136 }
137 };
138 }
139
140 private Collection<Callable<Object>> assertingCallables() {
141 return ImmutableSet.of(assertingCallable(), assertingCallable(), assertingCallable());
142 }
143
144 private void getAllFutures(Collection<Future<Object>> futures) throws ExecutionException, InterruptedException {
145 for (Future<Object> future : futures) {
146 future.get();
147 }
148 }
149 }