1 package com.atlassian.sal.core.executor;
2
3 import com.google.common.collect.ImmutableSet;
4 import junit.framework.TestCase;
5
6 import java.util.Collection;
7 import java.util.concurrent.Callable;
8 import java.util.concurrent.ExecutionException;
9 import java.util.concurrent.Executors;
10 import java.util.concurrent.Future;
11 import java.util.concurrent.TimeUnit;
12 import java.util.concurrent.TimeoutException;
13 import java.util.concurrent.atomic.AtomicReference;
14
15
16
17
18
19
20 public class TestThreadLocalDelegateExecutorService extends TestCase
21 {
22 private static final String OUTER_THREAD_LOCAL_CONTEXT = "OUTER_THREAD_LOCAL_CONTEXT";
23
24 public void testSubmitCallable() throws ExecutionException, InterruptedException
25 {
26 final ThreadLocalContextManager manager = threadLocalContextManager();
27 final ThreadLocalDelegateExecutorService service = threadLocalDelegateExecutorService(manager);
28 service.submit(assertingCallable(manager)).get();
29 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
30 }
31
32 public void testSubmitRunnableWithResult() throws ExecutionException, InterruptedException
33 {
34 final ThreadLocalContextManager manager = threadLocalContextManager();
35 final ThreadLocalDelegateExecutorService service = threadLocalDelegateExecutorService(manager);
36 service.submit(assertingRunnable(manager), null).get();
37 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
38 }
39
40 public void testSubmitRunnable() throws ExecutionException, InterruptedException
41 {
42 final ThreadLocalContextManager manager = threadLocalContextManager();
43 final ThreadLocalDelegateExecutorService service = threadLocalDelegateExecutorService(manager);
44 service.submit(assertingRunnable(manager)).get();
45 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
46 }
47
48 public void testInvokeAll() throws ExecutionException, InterruptedException
49 {
50 final ThreadLocalContextManager manager = threadLocalContextManager();
51 final ThreadLocalDelegateExecutorService service = threadLocalDelegateExecutorService(manager);
52 getAllFutures(service.invokeAll(assertingCallables(manager)));
53 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
54 }
55
56 public void testInvokeAllWithTimeout() throws ExecutionException, InterruptedException
57 {
58 final ThreadLocalContextManager manager = threadLocalContextManager();
59 final ThreadLocalDelegateExecutorService service = threadLocalDelegateExecutorService(manager);
60 getAllFutures(service.invokeAll(assertingCallables(manager), 60, TimeUnit.SECONDS));
61 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
62 }
63
64 public void testInvokeAny() throws ExecutionException, InterruptedException
65 {
66 final ThreadLocalContextManager manager = threadLocalContextManager();
67 final ThreadLocalDelegateExecutorService service = threadLocalDelegateExecutorService(manager);
68 service.invokeAny(assertingCallables(manager));
69 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
70 }
71
72 public void testInvokeAnyWithTimeout() throws ExecutionException, InterruptedException, TimeoutException
73 {
74 final ThreadLocalContextManager manager = threadLocalContextManager();
75 final ThreadLocalDelegateExecutorService service = threadLocalDelegateExecutorService(manager);
76 service.invokeAny(assertingCallables(manager), 60, TimeUnit.SECONDS);
77 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
78 }
79
80 public void testExecute() throws Exception
81 {
82
83
84
85 final ThreadLocalContextManager manager = threadLocalContextManager();
86 final ThreadLocalDelegateExecutorService service = threadLocalDelegateExecutorService(manager);
87 final AtomicReference<AssertionError> assertionErrorReference = new AtomicReference<AssertionError>();
88
89 service.execute(assertingRunnableWithErrorReference(manager, assertionErrorReference));
90 service.shutdown();
91 service.awaitTermination(60, TimeUnit.SECONDS);
92
93 if (assertionErrorReference.get() != null)
94 throw assertionErrorReference.get();
95
96 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
97 }
98
99 private ThreadLocalContextManager threadLocalContextManager()
100 {
101 ThreadLocalContextManager manager = new StubThreadLocalContextManager();
102 manager.setThreadLocalContext(OUTER_THREAD_LOCAL_CONTEXT);
103 return manager;
104 }
105
106 private ThreadLocalDelegateExecutorService threadLocalDelegateExecutorService(ThreadLocalContextManager manager)
107 {
108 return new ThreadLocalDelegateExecutorService(manager, Executors.newSingleThreadExecutor());
109 }
110
111 private Runnable assertingRunnable(final ThreadLocalContextManager manager)
112 {
113 return new Runnable()
114 {
115 @Override
116 public void run()
117 {
118 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
119 }
120 };
121 }
122
123 private Runnable assertingRunnableWithErrorReference(final ThreadLocalContextManager manager, final AtomicReference<AssertionError> assertionErrorReference)
124 {
125 return new Runnable()
126 {
127 @Override
128 public void run()
129 {
130 try
131 {
132 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
133 }
134 catch (AssertionError e)
135 {
136 assertionErrorReference.set(e);
137 }
138 }
139 };
140 }
141
142 private Callable<Object> assertingCallable(final ThreadLocalContextManager manager)
143 {
144 return new Callable<Object>()
145 {
146 @Override
147 public Object call() throws Exception
148 {
149 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
150 return null;
151 }
152 };
153 }
154
155 private Collection<Callable<Object>> assertingCallables(final ThreadLocalContextManager manager)
156 {
157 return ImmutableSet.of(assertingCallable(manager), assertingCallable(manager), assertingCallable(manager));
158 }
159
160 private void getAllFutures(Collection<Future<Object>> futures) throws ExecutionException, InterruptedException
161 {
162 for (Future<Object> future : futures)
163 {
164 future.get();
165 }
166 }
167 }