View Javadoc

1   package com.atlassian.sal.core.executor;
2   
3   import com.atlassian.sal.api.executor.ThreadLocalContextManager;
4   import com.google.common.collect.ImmutableSet;
5   import junit.framework.TestCase;
6   
7   import java.util.Collection;
8   import java.util.concurrent.Callable;
9   import java.util.concurrent.ExecutionException;
10  import java.util.concurrent.Executors;
11  import java.util.concurrent.Future;
12  import java.util.concurrent.TimeUnit;
13  import java.util.concurrent.TimeoutException;
14  import java.util.concurrent.atomic.AtomicReference;
15  
16  /**
17   * Test that ThreadLocalDelegateExecutorService correctly wraps callables and runnables passed to it before passing them
18   * off to its delegate, by checking the thread-local context in the call or run method. Also, test that the thread-local
19   * context doesn't get set to something else on return.
20   */
21  public class TestThreadLocalDelegateExecutorService extends TestCase
22  {
23      private static final String OUTER_THREAD_LOCAL_CONTEXT = "OUTER_THREAD_LOCAL_CONTEXT";
24  
25      public void testSubmitCallable() throws ExecutionException, InterruptedException
26      {
27          final ThreadLocalContextManager<Object> manager = threadLocalContextManager();
28          final ThreadLocalDelegateExecutorService<Object> service = threadLocalDelegateExecutorService(manager);
29          service.submit(assertingCallable(manager)).get();
30          assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
31      }
32  
33      public void testSubmitRunnableWithResult() throws ExecutionException, InterruptedException
34      {
35          final ThreadLocalContextManager<Object> manager = threadLocalContextManager();
36          final ThreadLocalDelegateExecutorService<Object> service = threadLocalDelegateExecutorService(manager);
37          service.submit(assertingRunnable(manager), null).get();
38          assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
39      }
40  
41      public void testSubmitRunnable() throws ExecutionException, InterruptedException
42      {
43          final ThreadLocalContextManager manager = threadLocalContextManager();
44          final ThreadLocalDelegateExecutorService service = threadLocalDelegateExecutorService(manager);
45          service.submit(assertingRunnable(manager)).get();
46          assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
47      }
48  
49      public void testInvokeAll() throws ExecutionException, InterruptedException
50      {
51          final ThreadLocalContextManager<Object> manager = threadLocalContextManager();
52          final ThreadLocalDelegateExecutorService<Object> service = threadLocalDelegateExecutorService(manager);
53          getAllFutures(service.invokeAll(assertingCallables(manager)));
54          assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
55      }
56  
57      public void testInvokeAllWithTimeout() throws ExecutionException, InterruptedException
58      {
59          final ThreadLocalContextManager<Object> manager = threadLocalContextManager();
60          final ThreadLocalDelegateExecutorService<Object> service = threadLocalDelegateExecutorService(manager);
61          getAllFutures(service.invokeAll(assertingCallables(manager), 60, TimeUnit.SECONDS));
62          assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
63      }
64  
65      public void testInvokeAny() throws ExecutionException, InterruptedException
66      {
67          final ThreadLocalContextManager<Object> manager = threadLocalContextManager();
68          final ThreadLocalDelegateExecutorService<Object> service = threadLocalDelegateExecutorService(manager);
69          service.invokeAny(assertingCallables(manager));
70          assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
71      }
72  
73      public void testInvokeAnyWithTimeout() throws ExecutionException, InterruptedException, TimeoutException
74      {
75          final ThreadLocalContextManager<Object> manager = threadLocalContextManager();
76          final ThreadLocalDelegateExecutorService<Object> service = threadLocalDelegateExecutorService(manager);
77          service.invokeAny(assertingCallables(manager), 60, TimeUnit.SECONDS);
78          assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
79      }
80  
81      public void testExecute() throws Exception
82      {
83          // We have to jump through a few hoops here, since the AssertionError gets thrown on a different thread, and
84          // there is no Future to get.
85  
86          final ThreadLocalContextManager<Object> manager = threadLocalContextManager();
87          final ThreadLocalDelegateExecutorService<Object> service = threadLocalDelegateExecutorService(manager);
88          final AtomicReference<AssertionError> assertionErrorReference = new AtomicReference<AssertionError>();
89  
90          service.execute(assertingRunnableWithErrorReference(manager, assertionErrorReference));
91          service.shutdown();
92          service.awaitTermination(60, TimeUnit.SECONDS);
93  
94          if (assertionErrorReference.get() != null)
95              throw assertionErrorReference.get();
96  
97          assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
98      }
99  
100     private ThreadLocalContextManager<Object> threadLocalContextManager()
101     {
102         ThreadLocalContextManager<Object> manager = new StubThreadLocalContextManager();
103         manager.setThreadLocalContext(OUTER_THREAD_LOCAL_CONTEXT);
104         return manager;
105     }
106 
107     private ThreadLocalDelegateExecutorService<Object> threadLocalDelegateExecutorService(ThreadLocalContextManager<Object> manager)
108     {
109         return new ThreadLocalDelegateExecutorService<Object>(manager, Executors.newSingleThreadExecutor());
110     }
111 
112     private Runnable assertingRunnable(final ThreadLocalContextManager manager)
113     {
114         return new Runnable()
115         {
116             @Override
117             public void run()
118             {
119                 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
120             }
121         };
122     }
123 
124     private Runnable assertingRunnableWithErrorReference(final ThreadLocalContextManager manager, final AtomicReference<AssertionError> assertionErrorReference)
125     {
126         return new Runnable()
127         {
128             @Override
129             public void run()
130             {
131                 try
132                 {
133                     assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
134                 }
135                 catch (AssertionError e)
136                 {
137                     assertionErrorReference.set(e);
138                 }
139             }
140         };
141     }
142 
143     private Callable<Object> assertingCallable(final ThreadLocalContextManager manager)
144     {
145         return new Callable<Object>()
146         {
147             @Override
148             public Object call() throws Exception
149             {
150                 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
151                 return null;
152             }
153         };
154     }
155 
156     private Collection<Callable<Object>> assertingCallables(final ThreadLocalContextManager manager)
157     {
158         return ImmutableSet.of(assertingCallable(manager), assertingCallable(manager), assertingCallable(manager));
159     }
160 
161     private void getAllFutures(Collection<Future<Object>> futures) throws ExecutionException, InterruptedException
162     {
163         for (Future<Object> future : futures)
164         {
165             future.get();
166         }
167     }
168 }