View Javadoc

1   package com.atlassian.sal.core.executor;
2   
3   import com.atlassian.sal.api.executor.ThreadLocalContextManager;
4   import com.google.common.collect.ImmutableSet;
5   import junit.framework.TestCase;
6   
7   import java.util.Collection;
8   import java.util.concurrent.Callable;
9   import java.util.concurrent.ExecutionException;
10  import java.util.concurrent.Executors;
11  import java.util.concurrent.Future;
12  import java.util.concurrent.TimeUnit;
13  import java.util.concurrent.TimeoutException;
14  import java.util.concurrent.atomic.AtomicReference;
15  
16  /**
17   * Test that ThreadLocalDelegateExecutorService correctly wraps callables and runnables passed to it before passing them
18   * off to its delegate, by checking the thread-local context in the call or run method. Also, test that the thread-local
19   * context doesn't get set to something else on return.
20   */
21  public class TestThreadLocalDelegateExecutorService extends TestCase
22  {
23      private static final Object OUTER_THREAD_LOCAL_CONTEXT = new Object();
24  
25      public void testSubmitCallable() throws ExecutionException, InterruptedException
26      {
27          final ThreadLocalContextManager<Object> manager = threadLocalContextManager();
28          final ThreadLocalDelegateExecutorService<Object> service = threadLocalDelegateExecutorService(manager);
29          service.submit(assertingCallable(manager)).get();
30          assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
31      }
32  
33      public void testSubmitRunnableWithResult() throws ExecutionException, InterruptedException
34      {
35          final ThreadLocalContextManager<Object> manager = threadLocalContextManager();
36          final ThreadLocalDelegateExecutorService<Object> service = threadLocalDelegateExecutorService(manager);
37          service.submit(assertingRunnable(manager), null).get();
38          assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
39      }
40  
41      public void testSubmitRunnable() throws ExecutionException, InterruptedException
42      {
43          final ThreadLocalContextManager<Object> manager = threadLocalContextManager();
44          final ThreadLocalDelegateExecutorService service = threadLocalDelegateExecutorService(manager);
45          service.submit(assertingRunnable(manager)).get();
46          assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
47      }
48  
49      public void testInvokeAll() throws ExecutionException, InterruptedException
50      {
51          final ThreadLocalContextManager<Object> manager = threadLocalContextManager();
52          final ThreadLocalDelegateExecutorService<Object> service = threadLocalDelegateExecutorService(manager);
53          getAllFutures(service.invokeAll(assertingCallables(manager)));
54          assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
55      }
56  
57      public void testInvokeAllWithTimeout() throws ExecutionException, InterruptedException
58      {
59          final ThreadLocalContextManager<Object> manager = threadLocalContextManager();
60          final ThreadLocalDelegateExecutorService<Object> service = threadLocalDelegateExecutorService(manager);
61          getAllFutures(service.invokeAll(assertingCallables(manager), 60, TimeUnit.SECONDS));
62          assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
63      }
64  
65      public void testInvokeAny() throws ExecutionException, InterruptedException
66      {
67          final ThreadLocalContextManager<Object> manager = threadLocalContextManager();
68          final ThreadLocalDelegateExecutorService<Object> service = threadLocalDelegateExecutorService(manager);
69          service.invokeAny(assertingCallables(manager));
70          assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
71      }
72  
73      public void testInvokeAnyWithTimeout() throws ExecutionException, InterruptedException, TimeoutException
74      {
75          final ThreadLocalContextManager<Object> manager = threadLocalContextManager();
76          final ThreadLocalDelegateExecutorService<Object> service = threadLocalDelegateExecutorService(manager);
77          service.invokeAny(assertingCallables(manager), 60, TimeUnit.SECONDS);
78          assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
79      }
80  
81      public void testExecute() throws Exception
82      {
83          // We have to jump through a few hoops here, since the AssertionError gets thrown on a different thread, and
84          // there is no Future to get.
85  
86          final ThreadLocalContextManager<Object> manager = threadLocalContextManager();
87          final ThreadLocalDelegateExecutorService<Object> service = threadLocalDelegateExecutorService(manager);
88          final AtomicReference<AssertionError> assertionErrorReference = new AtomicReference<AssertionError>();
89  
90          service.execute(assertingRunnableWithErrorReference(manager, assertionErrorReference));
91          service.shutdown();
92          service.awaitTermination(60, TimeUnit.SECONDS);
93  
94          final AssertionError assertionError = assertionErrorReference.get();
95          if (assertionError != null)
96              throw assertionError;
97  
98          assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
99      }
100 
101     private ThreadLocalContextManager<Object> threadLocalContextManager()
102     {
103         ThreadLocalContextManager<Object> manager = new StubThreadLocalContextManager();
104         manager.setThreadLocalContext(OUTER_THREAD_LOCAL_CONTEXT);
105         return manager;
106     }
107 
108     private ThreadLocalDelegateExecutorService<Object> threadLocalDelegateExecutorService(ThreadLocalContextManager<Object> manager)
109     {
110         return new ThreadLocalDelegateExecutorService<Object>(manager, Executors.newSingleThreadExecutor());
111     }
112 
113     private Runnable assertingRunnable(final ThreadLocalContextManager manager)
114     {
115         return new Runnable()
116         {
117             @Override
118             public void run()
119             {
120                 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
121             }
122         };
123     }
124 
125     private Runnable assertingRunnableWithErrorReference(final ThreadLocalContextManager manager, final AtomicReference<AssertionError> assertionErrorReference)
126     {
127         return new Runnable()
128         {
129             @Override
130             public void run()
131             {
132                 try
133                 {
134                     assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
135                 }
136                 catch (AssertionError e)
137                 {
138                     assertionErrorReference.set(e);
139                 }
140             }
141         };
142     }
143 
144     private Callable<Object> assertingCallable(final ThreadLocalContextManager manager)
145     {
146         return new Callable<Object>()
147         {
148             @Override
149             public Object call() throws Exception
150             {
151                 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
152                 return null;
153             }
154         };
155     }
156 
157     private Collection<Callable<Object>> assertingCallables(final ThreadLocalContextManager manager)
158     {
159         return ImmutableSet.of(assertingCallable(manager), assertingCallable(manager), assertingCallable(manager));
160     }
161 
162     private void getAllFutures(Collection<Future<Object>> futures) throws ExecutionException, InterruptedException
163     {
164         for (Future<Object> future : futures)
165         {
166             future.get();
167         }
168     }
169 }