1 package com.atlassian.sal.core.executor;
2
3 import com.atlassian.sal.api.executor.ThreadLocalContextManager;
4 import com.google.common.collect.ImmutableSet;
5 import junit.framework.TestCase;
6
7 import java.util.Collection;
8 import java.util.concurrent.Callable;
9 import java.util.concurrent.ExecutionException;
10 import java.util.concurrent.Executors;
11 import java.util.concurrent.Future;
12 import java.util.concurrent.TimeUnit;
13 import java.util.concurrent.TimeoutException;
14 import java.util.concurrent.atomic.AtomicReference;
15
16
17
18
19
20
21 public class TestThreadLocalDelegateExecutorService extends TestCase
22 {
23 private static final String OUTER_THREAD_LOCAL_CONTEXT = "OUTER_THREAD_LOCAL_CONTEXT";
24
25 public void testSubmitCallable() throws ExecutionException, InterruptedException
26 {
27 final ThreadLocalContextManager<Object> manager = threadLocalContextManager();
28 final ThreadLocalDelegateExecutorService<Object> service = threadLocalDelegateExecutorService(manager);
29 service.submit(assertingCallable(manager)).get();
30 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
31 }
32
33 public void testSubmitRunnableWithResult() throws ExecutionException, InterruptedException
34 {
35 final ThreadLocalContextManager<Object> manager = threadLocalContextManager();
36 final ThreadLocalDelegateExecutorService<Object> service = threadLocalDelegateExecutorService(manager);
37 service.submit(assertingRunnable(manager), null).get();
38 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
39 }
40
41 public void testSubmitRunnable() throws ExecutionException, InterruptedException
42 {
43 final ThreadLocalContextManager manager = threadLocalContextManager();
44 final ThreadLocalDelegateExecutorService service = threadLocalDelegateExecutorService(manager);
45 service.submit(assertingRunnable(manager)).get();
46 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
47 }
48
49 public void testInvokeAll() throws ExecutionException, InterruptedException
50 {
51 final ThreadLocalContextManager<Object> manager = threadLocalContextManager();
52 final ThreadLocalDelegateExecutorService<Object> service = threadLocalDelegateExecutorService(manager);
53 getAllFutures(service.invokeAll(assertingCallables(manager)));
54 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
55 }
56
57 public void testInvokeAllWithTimeout() throws ExecutionException, InterruptedException
58 {
59 final ThreadLocalContextManager<Object> manager = threadLocalContextManager();
60 final ThreadLocalDelegateExecutorService<Object> service = threadLocalDelegateExecutorService(manager);
61 getAllFutures(service.invokeAll(assertingCallables(manager), 60, TimeUnit.SECONDS));
62 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
63 }
64
65 public void testInvokeAny() throws ExecutionException, InterruptedException
66 {
67 final ThreadLocalContextManager<Object> manager = threadLocalContextManager();
68 final ThreadLocalDelegateExecutorService<Object> service = threadLocalDelegateExecutorService(manager);
69 service.invokeAny(assertingCallables(manager));
70 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
71 }
72
73 public void testInvokeAnyWithTimeout() throws ExecutionException, InterruptedException, TimeoutException
74 {
75 final ThreadLocalContextManager<Object> manager = threadLocalContextManager();
76 final ThreadLocalDelegateExecutorService<Object> service = threadLocalDelegateExecutorService(manager);
77 service.invokeAny(assertingCallables(manager), 60, TimeUnit.SECONDS);
78 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
79 }
80
81 public void testExecute() throws Exception
82 {
83
84
85
86 final ThreadLocalContextManager<Object> manager = threadLocalContextManager();
87 final ThreadLocalDelegateExecutorService<Object> service = threadLocalDelegateExecutorService(manager);
88 final AtomicReference<AssertionError> assertionErrorReference = new AtomicReference<AssertionError>();
89
90 service.execute(assertingRunnableWithErrorReference(manager, assertionErrorReference));
91 service.shutdown();
92 service.awaitTermination(60, TimeUnit.SECONDS);
93
94 if (assertionErrorReference.get() != null)
95 throw assertionErrorReference.get();
96
97 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
98 }
99
100 private ThreadLocalContextManager<Object> threadLocalContextManager()
101 {
102 ThreadLocalContextManager<Object> manager = new StubThreadLocalContextManager();
103 manager.setThreadLocalContext(OUTER_THREAD_LOCAL_CONTEXT);
104 return manager;
105 }
106
107 private ThreadLocalDelegateExecutorService<Object> threadLocalDelegateExecutorService(ThreadLocalContextManager<Object> manager)
108 {
109 return new ThreadLocalDelegateExecutorService<Object>(manager, Executors.newSingleThreadExecutor());
110 }
111
112 private Runnable assertingRunnable(final ThreadLocalContextManager manager)
113 {
114 return new Runnable()
115 {
116 @Override
117 public void run()
118 {
119 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
120 }
121 };
122 }
123
124 private Runnable assertingRunnableWithErrorReference(final ThreadLocalContextManager manager, final AtomicReference<AssertionError> assertionErrorReference)
125 {
126 return new Runnable()
127 {
128 @Override
129 public void run()
130 {
131 try
132 {
133 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
134 }
135 catch (AssertionError e)
136 {
137 assertionErrorReference.set(e);
138 }
139 }
140 };
141 }
142
143 private Callable<Object> assertingCallable(final ThreadLocalContextManager manager)
144 {
145 return new Callable<Object>()
146 {
147 @Override
148 public Object call() throws Exception
149 {
150 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
151 return null;
152 }
153 };
154 }
155
156 private Collection<Callable<Object>> assertingCallables(final ThreadLocalContextManager manager)
157 {
158 return ImmutableSet.of(assertingCallable(manager), assertingCallable(manager), assertingCallable(manager));
159 }
160
161 private void getAllFutures(Collection<Future<Object>> futures) throws ExecutionException, InterruptedException
162 {
163 for (Future<Object> future : futures)
164 {
165 future.get();
166 }
167 }
168 }