1 package com.atlassian.sal.core.executor;
2
3 import com.atlassian.sal.api.executor.ThreadLocalContextManager;
4 import com.google.common.collect.ImmutableSet;
5 import junit.framework.TestCase;
6
7 import java.util.Collection;
8 import java.util.concurrent.Callable;
9 import java.util.concurrent.ExecutionException;
10 import java.util.concurrent.Executors;
11 import java.util.concurrent.Future;
12 import java.util.concurrent.TimeUnit;
13 import java.util.concurrent.TimeoutException;
14 import java.util.concurrent.atomic.AtomicReference;
15
16
17
18
19
20
21 public class TestThreadLocalDelegateExecutorService extends TestCase {
22 private static final Object OUTER_THREAD_LOCAL_CONTEXT = new Object();
23
24 public void testSubmitCallable() throws ExecutionException, InterruptedException {
25 final ThreadLocalContextManager<Object> manager = threadLocalContextManager();
26 final ThreadLocalDelegateExecutorService<Object> service = threadLocalDelegateExecutorService(manager);
27 service.submit(assertingCallable(manager)).get();
28 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
29 }
30
31 public void testSubmitRunnableWithResult() throws ExecutionException, InterruptedException {
32 final ThreadLocalContextManager<Object> manager = threadLocalContextManager();
33 final ThreadLocalDelegateExecutorService<Object> service = threadLocalDelegateExecutorService(manager);
34 service.submit(assertingRunnable(manager), null).get();
35 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
36 }
37
38 public void testSubmitRunnable() throws ExecutionException, InterruptedException {
39 final ThreadLocalContextManager<Object> manager = threadLocalContextManager();
40 final ThreadLocalDelegateExecutorService service = threadLocalDelegateExecutorService(manager);
41 service.submit(assertingRunnable(manager)).get();
42 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
43 }
44
45 public void testInvokeAll() throws ExecutionException, InterruptedException {
46 final ThreadLocalContextManager<Object> manager = threadLocalContextManager();
47 final ThreadLocalDelegateExecutorService<Object> service = threadLocalDelegateExecutorService(manager);
48 getAllFutures(service.invokeAll(assertingCallables(manager)));
49 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
50 }
51
52 public void testInvokeAllWithTimeout() throws ExecutionException, InterruptedException {
53 final ThreadLocalContextManager<Object> manager = threadLocalContextManager();
54 final ThreadLocalDelegateExecutorService<Object> service = threadLocalDelegateExecutorService(manager);
55 getAllFutures(service.invokeAll(assertingCallables(manager), 60, TimeUnit.SECONDS));
56 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
57 }
58
59 public void testInvokeAny() throws ExecutionException, InterruptedException {
60 final ThreadLocalContextManager<Object> manager = threadLocalContextManager();
61 final ThreadLocalDelegateExecutorService<Object> service = threadLocalDelegateExecutorService(manager);
62 service.invokeAny(assertingCallables(manager));
63 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
64 }
65
66 public void testInvokeAnyWithTimeout() throws ExecutionException, InterruptedException, TimeoutException {
67 final ThreadLocalContextManager<Object> manager = threadLocalContextManager();
68 final ThreadLocalDelegateExecutorService<Object> service = threadLocalDelegateExecutorService(manager);
69 service.invokeAny(assertingCallables(manager), 60, TimeUnit.SECONDS);
70 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
71 }
72
73 public void testExecute() throws Exception {
74
75
76
77 final ThreadLocalContextManager<Object> manager = threadLocalContextManager();
78 final ThreadLocalDelegateExecutorService<Object> service = threadLocalDelegateExecutorService(manager);
79 final AtomicReference<AssertionError> assertionErrorReference = new AtomicReference<AssertionError>();
80
81 service.execute(assertingRunnableWithErrorReference(manager, assertionErrorReference));
82 service.shutdown();
83 service.awaitTermination(60, TimeUnit.SECONDS);
84
85 final AssertionError assertionError = assertionErrorReference.get();
86 if (assertionError != null)
87 throw assertionError;
88
89 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
90 }
91
92 private ThreadLocalContextManager<Object> threadLocalContextManager() {
93 ThreadLocalContextManager<Object> manager = new StubThreadLocalContextManager();
94 manager.setThreadLocalContext(OUTER_THREAD_LOCAL_CONTEXT);
95 return manager;
96 }
97
98 private ThreadLocalDelegateExecutorService<Object> threadLocalDelegateExecutorService(ThreadLocalContextManager<Object> manager) {
99 return new ThreadLocalDelegateExecutorService<Object>(manager, Executors.newSingleThreadExecutor());
100 }
101
102 private Runnable assertingRunnable(final ThreadLocalContextManager manager) {
103 return new Runnable() {
104 @Override
105 public void run() {
106 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
107 }
108 };
109 }
110
111 private Runnable assertingRunnableWithErrorReference(final ThreadLocalContextManager manager, final AtomicReference<AssertionError> assertionErrorReference) {
112 return new Runnable() {
113 @Override
114 public void run() {
115 try {
116 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
117 } catch (AssertionError e) {
118 assertionErrorReference.set(e);
119 }
120 }
121 };
122 }
123
124 private Callable<Object> assertingCallable(final ThreadLocalContextManager manager) {
125 return new Callable<Object>() {
126 @Override
127 public Object call() throws Exception {
128 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
129 return null;
130 }
131 };
132 }
133
134 private Collection<Callable<Object>> assertingCallables(final ThreadLocalContextManager manager) {
135 return ImmutableSet.of(assertingCallable(manager), assertingCallable(manager), assertingCallable(manager));
136 }
137
138 private void getAllFutures(Collection<Future<Object>> futures) throws ExecutionException, InterruptedException {
139 for (Future<Object> future : futures) {
140 future.get();
141 }
142 }
143 }