1 package com.atlassian.sal.core.executor;
2
3 import com.atlassian.sal.api.executor.ThreadLocalContext;
4 import com.atlassian.sal.api.executor.ThreadLocalContextManager;
5 import com.google.common.collect.ImmutableSet;
6 import junit.framework.TestCase;
7
8 import java.util.Collection;
9 import java.util.concurrent.Callable;
10 import java.util.concurrent.ExecutionException;
11 import java.util.concurrent.Executors;
12 import java.util.concurrent.Future;
13 import java.util.concurrent.TimeUnit;
14 import java.util.concurrent.TimeoutException;
15 import java.util.concurrent.atomic.AtomicReference;
16
17
18
19
20
21
22 public class TestThreadLocalDelegateExecutorService extends TestCase
23 {
24 private static final StubThreadLocalContextManager.Context OUTER_THREAD_LOCAL_CONTEXT = new StubThreadLocalContextManager.Context();
25
26 public void testSubmitCallable() throws ExecutionException, InterruptedException
27 {
28 final ThreadLocalContextManager<ThreadLocalContext> manager = threadLocalContextManager();
29 final ThreadLocalDelegateExecutorService<ThreadLocalContext> service = threadLocalDelegateExecutorService(manager);
30 service.submit(assertingCallable(manager)).get();
31 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
32 }
33
34 public void testSubmitRunnableWithResult() throws ExecutionException, InterruptedException
35 {
36 final ThreadLocalContextManager<ThreadLocalContext> manager = threadLocalContextManager();
37 final ThreadLocalDelegateExecutorService<ThreadLocalContext> service = threadLocalDelegateExecutorService(manager);
38 service.submit(assertingRunnable(manager), null).get();
39 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
40 }
41
42 public void testSubmitRunnable() throws ExecutionException, InterruptedException
43 {
44 final ThreadLocalContextManager<ThreadLocalContext> manager = threadLocalContextManager();
45 final ThreadLocalDelegateExecutorService service = threadLocalDelegateExecutorService(manager);
46 service.submit(assertingRunnable(manager)).get();
47 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
48 }
49
50 public void testInvokeAll() throws ExecutionException, InterruptedException
51 {
52 final ThreadLocalContextManager<ThreadLocalContext> manager = threadLocalContextManager();
53 final ThreadLocalDelegateExecutorService<ThreadLocalContext> service = threadLocalDelegateExecutorService(manager);
54 getAllFutures(service.invokeAll(assertingCallables(manager)));
55 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
56 }
57
58 public void testInvokeAllWithTimeout() throws ExecutionException, InterruptedException
59 {
60 final ThreadLocalContextManager<ThreadLocalContext> manager = threadLocalContextManager();
61 final ThreadLocalDelegateExecutorService<ThreadLocalContext> service = threadLocalDelegateExecutorService(manager);
62 getAllFutures(service.invokeAll(assertingCallables(manager), 60, TimeUnit.SECONDS));
63 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
64 }
65
66 public void testInvokeAny() throws ExecutionException, InterruptedException
67 {
68 final ThreadLocalContextManager<ThreadLocalContext> manager = threadLocalContextManager();
69 final ThreadLocalDelegateExecutorService<ThreadLocalContext> service = threadLocalDelegateExecutorService(manager);
70 service.invokeAny(assertingCallables(manager));
71 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
72 }
73
74 public void testInvokeAnyWithTimeout() throws ExecutionException, InterruptedException, TimeoutException
75 {
76 final ThreadLocalContextManager<ThreadLocalContext> manager = threadLocalContextManager();
77 final ThreadLocalDelegateExecutorService<ThreadLocalContext> service = threadLocalDelegateExecutorService(manager);
78 service.invokeAny(assertingCallables(manager), 60, TimeUnit.SECONDS);
79 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
80 }
81
82 public void testExecute() throws Exception
83 {
84
85
86
87 final ThreadLocalContextManager<ThreadLocalContext> manager = threadLocalContextManager();
88 final ThreadLocalDelegateExecutorService<ThreadLocalContext> service = threadLocalDelegateExecutorService(manager);
89 final AtomicReference<AssertionError> assertionErrorReference = new AtomicReference<AssertionError>();
90
91 service.execute(assertingRunnableWithErrorReference(manager, assertionErrorReference));
92 service.shutdown();
93 service.awaitTermination(60, TimeUnit.SECONDS);
94
95 final AssertionError assertionError = assertionErrorReference.get();
96 if (assertionError != null)
97 throw assertionError;
98
99 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
100 }
101
102 private ThreadLocalContextManager<ThreadLocalContext> threadLocalContextManager()
103 {
104 ThreadLocalContextManager<ThreadLocalContext> manager = new StubThreadLocalContextManager();
105 manager.setThreadLocalContext(OUTER_THREAD_LOCAL_CONTEXT);
106 return manager;
107 }
108
109 private ThreadLocalDelegateExecutorService<ThreadLocalContext> threadLocalDelegateExecutorService(ThreadLocalContextManager<ThreadLocalContext> manager)
110 {
111 return new ThreadLocalDelegateExecutorService<ThreadLocalContext>(manager, Executors.newSingleThreadExecutor());
112 }
113
114 private Runnable assertingRunnable(final ThreadLocalContextManager manager)
115 {
116 return new Runnable()
117 {
118 @Override
119 public void run()
120 {
121 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
122 }
123 };
124 }
125
126 private Runnable assertingRunnableWithErrorReference(final ThreadLocalContextManager manager, final AtomicReference<AssertionError> assertionErrorReference)
127 {
128 return new Runnable()
129 {
130 @Override
131 public void run()
132 {
133 try
134 {
135 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
136 }
137 catch (AssertionError e)
138 {
139 assertionErrorReference.set(e);
140 }
141 }
142 };
143 }
144
145 private Callable<Object> assertingCallable(final ThreadLocalContextManager manager)
146 {
147 return new Callable<Object>()
148 {
149 @Override
150 public Object call() throws Exception
151 {
152 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
153 return null;
154 }
155 };
156 }
157
158 private Collection<Callable<Object>> assertingCallables(final ThreadLocalContextManager manager)
159 {
160 return ImmutableSet.of(assertingCallable(manager), assertingCallable(manager), assertingCallable(manager));
161 }
162
163 private void getAllFutures(Collection<Future<Object>> futures) throws ExecutionException, InterruptedException
164 {
165 for (Future<Object> future : futures)
166 {
167 future.get();
168 }
169 }
170 }