View Javadoc
1   package com.atlassian.sal.core.executor;
2   
3   import com.atlassian.sal.api.executor.ThreadLocalContextManager;
4   import com.atlassian.sal.api.executor.ThreadLocalDelegateExecutorFactory;
5   import com.google.common.collect.ImmutableSet;
6   import org.junit.Test;
7   
8   import java.util.Collection;
9   import java.util.concurrent.Callable;
10  import java.util.concurrent.ExecutionException;
11  import java.util.concurrent.Executors;
12  import java.util.concurrent.Future;
13  import java.util.concurrent.TimeUnit;
14  import java.util.concurrent.TimeoutException;
15  import java.util.concurrent.atomic.AtomicReference;
16  
17  import static org.junit.Assert.assertEquals;
18  
19  /**
20   * Test that ThreadLocalDelegateExecutorService correctly wraps callables and runnables passed to it before passing them
21   * off to its delegate, by checking the thread-local context in the call or run method. Also, test that the thread-local
22   * context doesn't get set to something else on return.
23   */
24  public class TestThreadLocalDelegateExecutorService {
25      private static final Object OUTER_THREAD_LOCAL_CONTEXT = new Object();
26  
27      private final ThreadLocalContextManager<Object> manager = threadLocalContextManager();
28      private final ThreadLocalDelegateExecutorFactory executorFactory = new DefaultThreadLocalDelegateExecutorFactory<>(manager);
29  
30      @Test
31      public void testSubmitCallable() throws ExecutionException, InterruptedException {
32          final ThreadLocalDelegateExecutorService service = threadLocalDelegateExecutorService();
33          service.submit(assertingCallable()).get();
34          assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
35      }
36  
37      @Test
38      public void testSubmitRunnableWithResult() throws ExecutionException, InterruptedException {
39          final ThreadLocalDelegateExecutorService service = threadLocalDelegateExecutorService();
40          service.submit(assertingRunnable(), null).get();
41          assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
42      }
43  
44      @Test
45      public void testSubmitRunnable() throws ExecutionException, InterruptedException {
46          final ThreadLocalDelegateExecutorService service = threadLocalDelegateExecutorService();
47          service.submit(assertingRunnable()).get();
48          assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
49      }
50  
51      @Test
52      public void testInvokeAll() throws ExecutionException, InterruptedException {
53          final ThreadLocalDelegateExecutorService service = threadLocalDelegateExecutorService();
54          getAllFutures(service.invokeAll(assertingCallables()));
55          assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
56      }
57  
58      @Test
59      public void testInvokeAllWithTimeout() throws ExecutionException, InterruptedException {
60          final ThreadLocalDelegateExecutorService service = threadLocalDelegateExecutorService();
61          getAllFutures(service.invokeAll(assertingCallables(), 60, TimeUnit.SECONDS));
62          assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
63      }
64  
65      @Test
66      public void testInvokeAny() throws ExecutionException, InterruptedException {
67          final ThreadLocalDelegateExecutorService service = threadLocalDelegateExecutorService();
68          service.invokeAny(assertingCallables());
69          assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
70      }
71  
72      @Test
73      public void testInvokeAnyWithTimeout() throws ExecutionException, InterruptedException, TimeoutException {
74          final ThreadLocalDelegateExecutorService service = threadLocalDelegateExecutorService();
75          service.invokeAny(assertingCallables(), 60, TimeUnit.SECONDS);
76          assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
77      }
78  
79      @Test
80      public void testExecute() throws Exception {
81          // We have to jump through a few hoops here, since the AssertionError gets thrown on a different thread, and
82          // there is no Future to get.
83  
84          final ThreadLocalDelegateExecutorService service = threadLocalDelegateExecutorService();
85          final AtomicReference<AssertionError> assertionErrorReference = new AtomicReference<>();
86  
87          service.execute(assertingRunnableWithErrorReference(assertionErrorReference));
88          service.shutdown();
89          service.awaitTermination(60, TimeUnit.SECONDS);
90  
91          final AssertionError assertionError = assertionErrorReference.get();
92          if (assertionError != null)
93              throw assertionError;
94  
95          assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
96      }
97  
98      private ThreadLocalContextManager<Object> threadLocalContextManager() {
99          ThreadLocalContextManager<Object> manager = new StubThreadLocalContextManager();
100         manager.setThreadLocalContext(OUTER_THREAD_LOCAL_CONTEXT);
101         return manager;
102     }
103 
104     private ThreadLocalDelegateExecutorService threadLocalDelegateExecutorService() {
105         return new ThreadLocalDelegateExecutorService(Executors.newSingleThreadExecutor(), executorFactory);
106     }
107 
108     private Runnable assertingRunnable() {
109         return new Runnable() {
110             @Override
111             public void run() {
112                 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
113             }
114         };
115     }
116 
117     private Runnable assertingRunnableWithErrorReference(final AtomicReference<AssertionError> assertionErrorReference) {
118         return new Runnable() {
119             @Override
120             public void run() {
121                 try {
122                     assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
123                 } catch (AssertionError e) {
124                     assertionErrorReference.set(e);
125                 }
126             }
127         };
128     }
129 
130     private Callable<Object> assertingCallable() {
131         return new Callable<Object>() {
132             @Override
133             public Object call() throws Exception {
134                 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
135                 return null;
136             }
137         };
138     }
139 
140     private Collection<Callable<Object>> assertingCallables() {
141         return ImmutableSet.of(assertingCallable(), assertingCallable(), assertingCallable());
142     }
143 
144     private void getAllFutures(Collection<Future<Object>> futures) throws ExecutionException, InterruptedException {
145         for (Future<Object> future : futures) {
146             future.get();
147         }
148     }
149 }