1 package com.atlassian.sal.core.executor;
2
3 import com.atlassian.sal.api.executor.ThreadLocalContextManager;
4 import com.google.common.collect.ImmutableSet;
5 import junit.framework.TestCase;
6
7 import java.util.Collection;
8 import java.util.concurrent.Callable;
9 import java.util.concurrent.ExecutionException;
10 import java.util.concurrent.Executors;
11 import java.util.concurrent.Future;
12 import java.util.concurrent.TimeUnit;
13 import java.util.concurrent.TimeoutException;
14 import java.util.concurrent.atomic.AtomicReference;
15
16
17
18
19
20
21 public class TestThreadLocalDelegateExecutorService extends TestCase
22 {
23 private static final Object OUTER_THREAD_LOCAL_CONTEXT = new Object();
24
25 public void testSubmitCallable() throws ExecutionException, InterruptedException
26 {
27 final ThreadLocalContextManager<Object> manager = threadLocalContextManager();
28 final ThreadLocalDelegateExecutorService<Object> service = threadLocalDelegateExecutorService(manager);
29 service.submit(assertingCallable(manager)).get();
30 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
31 }
32
33 public void testSubmitRunnableWithResult() throws ExecutionException, InterruptedException
34 {
35 final ThreadLocalContextManager<Object> manager = threadLocalContextManager();
36 final ThreadLocalDelegateExecutorService<Object> service = threadLocalDelegateExecutorService(manager);
37 service.submit(assertingRunnable(manager), null).get();
38 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
39 }
40
41 public void testSubmitRunnable() throws ExecutionException, InterruptedException
42 {
43 final ThreadLocalContextManager<Object> manager = threadLocalContextManager();
44 final ThreadLocalDelegateExecutorService service = threadLocalDelegateExecutorService(manager);
45 service.submit(assertingRunnable(manager)).get();
46 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
47 }
48
49 public void testInvokeAll() throws ExecutionException, InterruptedException
50 {
51 final ThreadLocalContextManager<Object> manager = threadLocalContextManager();
52 final ThreadLocalDelegateExecutorService<Object> service = threadLocalDelegateExecutorService(manager);
53 getAllFutures(service.invokeAll(assertingCallables(manager)));
54 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
55 }
56
57 public void testInvokeAllWithTimeout() throws ExecutionException, InterruptedException
58 {
59 final ThreadLocalContextManager<Object> manager = threadLocalContextManager();
60 final ThreadLocalDelegateExecutorService<Object> service = threadLocalDelegateExecutorService(manager);
61 getAllFutures(service.invokeAll(assertingCallables(manager), 60, TimeUnit.SECONDS));
62 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
63 }
64
65 public void testInvokeAny() throws ExecutionException, InterruptedException
66 {
67 final ThreadLocalContextManager<Object> manager = threadLocalContextManager();
68 final ThreadLocalDelegateExecutorService<Object> service = threadLocalDelegateExecutorService(manager);
69 service.invokeAny(assertingCallables(manager));
70 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
71 }
72
73 public void testInvokeAnyWithTimeout() throws ExecutionException, InterruptedException, TimeoutException
74 {
75 final ThreadLocalContextManager<Object> manager = threadLocalContextManager();
76 final ThreadLocalDelegateExecutorService<Object> service = threadLocalDelegateExecutorService(manager);
77 service.invokeAny(assertingCallables(manager), 60, TimeUnit.SECONDS);
78 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
79 }
80
81 public void testExecute() throws Exception
82 {
83
84
85
86 final ThreadLocalContextManager<Object> manager = threadLocalContextManager();
87 final ThreadLocalDelegateExecutorService<Object> service = threadLocalDelegateExecutorService(manager);
88 final AtomicReference<AssertionError> assertionErrorReference = new AtomicReference<AssertionError>();
89
90 service.execute(assertingRunnableWithErrorReference(manager, assertionErrorReference));
91 service.shutdown();
92 service.awaitTermination(60, TimeUnit.SECONDS);
93
94 final AssertionError assertionError = assertionErrorReference.get();
95 if (assertionError != null)
96 throw assertionError;
97
98 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
99 }
100
101 private ThreadLocalContextManager<Object> threadLocalContextManager()
102 {
103 ThreadLocalContextManager<Object> manager = new StubThreadLocalContextManager();
104 manager.setThreadLocalContext(OUTER_THREAD_LOCAL_CONTEXT);
105 return manager;
106 }
107
108 private ThreadLocalDelegateExecutorService<Object> threadLocalDelegateExecutorService(ThreadLocalContextManager<Object> manager)
109 {
110 return new ThreadLocalDelegateExecutorService<Object>(manager, Executors.newSingleThreadExecutor());
111 }
112
113 private Runnable assertingRunnable(final ThreadLocalContextManager manager)
114 {
115 return new Runnable()
116 {
117 @Override
118 public void run()
119 {
120 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
121 }
122 };
123 }
124
125 private Runnable assertingRunnableWithErrorReference(final ThreadLocalContextManager manager, final AtomicReference<AssertionError> assertionErrorReference)
126 {
127 return new Runnable()
128 {
129 @Override
130 public void run()
131 {
132 try
133 {
134 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
135 }
136 catch (AssertionError e)
137 {
138 assertionErrorReference.set(e);
139 }
140 }
141 };
142 }
143
144 private Callable<Object> assertingCallable(final ThreadLocalContextManager manager)
145 {
146 return new Callable<Object>()
147 {
148 @Override
149 public Object call() throws Exception
150 {
151 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
152 return null;
153 }
154 };
155 }
156
157 private Collection<Callable<Object>> assertingCallables(final ThreadLocalContextManager manager)
158 {
159 return ImmutableSet.of(assertingCallable(manager), assertingCallable(manager), assertingCallable(manager));
160 }
161
162 private void getAllFutures(Collection<Future<Object>> futures) throws ExecutionException, InterruptedException
163 {
164 for (Future<Object> future : futures)
165 {
166 future.get();
167 }
168 }
169 }