View Javadoc

1   package com.atlassian.sal.core.executor;
2   
3   import com.atlassian.sal.api.executor.ThreadLocalContextManager;
4   import com.google.common.collect.ImmutableSet;
5   import junit.framework.TestCase;
6   
7   import java.util.Collection;
8   import java.util.concurrent.Callable;
9   import java.util.concurrent.ExecutionException;
10  import java.util.concurrent.Executors;
11  import java.util.concurrent.Future;
12  import java.util.concurrent.TimeUnit;
13  import java.util.concurrent.TimeoutException;
14  import java.util.concurrent.atomic.AtomicReference;
15  
16  /**
17   * Test that ThreadLocalDelegateExecutorService correctly wraps callables and runnables passed to it before passing them
18   * off to its delegate, by checking the thread-local context in the call or run method. Also, test that the thread-local
19   * context doesn't get set to something else on return.
20   */
21  public class TestThreadLocalDelegateExecutorService extends TestCase {
22      private static final Object OUTER_THREAD_LOCAL_CONTEXT = new Object();
23  
24      public void testSubmitCallable() throws ExecutionException, InterruptedException {
25          final ThreadLocalContextManager<Object> manager = threadLocalContextManager();
26          final ThreadLocalDelegateExecutorService<Object> service = threadLocalDelegateExecutorService(manager);
27          service.submit(assertingCallable(manager)).get();
28          assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
29      }
30  
31      public void testSubmitRunnableWithResult() throws ExecutionException, InterruptedException {
32          final ThreadLocalContextManager<Object> manager = threadLocalContextManager();
33          final ThreadLocalDelegateExecutorService<Object> service = threadLocalDelegateExecutorService(manager);
34          service.submit(assertingRunnable(manager), null).get();
35          assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
36      }
37  
38      public void testSubmitRunnable() throws ExecutionException, InterruptedException {
39          final ThreadLocalContextManager<Object> manager = threadLocalContextManager();
40          final ThreadLocalDelegateExecutorService service = threadLocalDelegateExecutorService(manager);
41          service.submit(assertingRunnable(manager)).get();
42          assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
43      }
44  
45      public void testInvokeAll() throws ExecutionException, InterruptedException {
46          final ThreadLocalContextManager<Object> manager = threadLocalContextManager();
47          final ThreadLocalDelegateExecutorService<Object> service = threadLocalDelegateExecutorService(manager);
48          getAllFutures(service.invokeAll(assertingCallables(manager)));
49          assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
50      }
51  
52      public void testInvokeAllWithTimeout() throws ExecutionException, InterruptedException {
53          final ThreadLocalContextManager<Object> manager = threadLocalContextManager();
54          final ThreadLocalDelegateExecutorService<Object> service = threadLocalDelegateExecutorService(manager);
55          getAllFutures(service.invokeAll(assertingCallables(manager), 60, TimeUnit.SECONDS));
56          assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
57      }
58  
59      public void testInvokeAny() throws ExecutionException, InterruptedException {
60          final ThreadLocalContextManager<Object> manager = threadLocalContextManager();
61          final ThreadLocalDelegateExecutorService<Object> service = threadLocalDelegateExecutorService(manager);
62          service.invokeAny(assertingCallables(manager));
63          assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
64      }
65  
66      public void testInvokeAnyWithTimeout() throws ExecutionException, InterruptedException, TimeoutException {
67          final ThreadLocalContextManager<Object> manager = threadLocalContextManager();
68          final ThreadLocalDelegateExecutorService<Object> service = threadLocalDelegateExecutorService(manager);
69          service.invokeAny(assertingCallables(manager), 60, TimeUnit.SECONDS);
70          assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
71      }
72  
73      public void testExecute() throws Exception {
74          // We have to jump through a few hoops here, since the AssertionError gets thrown on a different thread, and
75          // there is no Future to get.
76  
77          final ThreadLocalContextManager<Object> manager = threadLocalContextManager();
78          final ThreadLocalDelegateExecutorService<Object> service = threadLocalDelegateExecutorService(manager);
79          final AtomicReference<AssertionError> assertionErrorReference = new AtomicReference<AssertionError>();
80  
81          service.execute(assertingRunnableWithErrorReference(manager, assertionErrorReference));
82          service.shutdown();
83          service.awaitTermination(60, TimeUnit.SECONDS);
84  
85          final AssertionError assertionError = assertionErrorReference.get();
86          if (assertionError != null)
87              throw assertionError;
88  
89          assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
90      }
91  
92      private ThreadLocalContextManager<Object> threadLocalContextManager() {
93          ThreadLocalContextManager<Object> manager = new StubThreadLocalContextManager();
94          manager.setThreadLocalContext(OUTER_THREAD_LOCAL_CONTEXT);
95          return manager;
96      }
97  
98      private ThreadLocalDelegateExecutorService<Object> threadLocalDelegateExecutorService(ThreadLocalContextManager<Object> manager) {
99          return new ThreadLocalDelegateExecutorService<Object>(manager, Executors.newSingleThreadExecutor());
100     }
101 
102     private Runnable assertingRunnable(final ThreadLocalContextManager manager) {
103         return new Runnable() {
104             @Override
105             public void run() {
106                 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
107             }
108         };
109     }
110 
111     private Runnable assertingRunnableWithErrorReference(final ThreadLocalContextManager manager, final AtomicReference<AssertionError> assertionErrorReference) {
112         return new Runnable() {
113             @Override
114             public void run() {
115                 try {
116                     assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
117                 } catch (AssertionError e) {
118                     assertionErrorReference.set(e);
119                 }
120             }
121         };
122     }
123 
124     private Callable<Object> assertingCallable(final ThreadLocalContextManager manager) {
125         return new Callable<Object>() {
126             @Override
127             public Object call() throws Exception {
128                 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
129                 return null;
130             }
131         };
132     }
133 
134     private Collection<Callable<Object>> assertingCallables(final ThreadLocalContextManager manager) {
135         return ImmutableSet.of(assertingCallable(manager), assertingCallable(manager), assertingCallable(manager));
136     }
137 
138     private void getAllFutures(Collection<Future<Object>> futures) throws ExecutionException, InterruptedException {
139         for (Future<Object> future : futures) {
140             future.get();
141         }
142     }
143 }