View Javadoc

1   package com.atlassian.sal.core.executor;
2   
3   import com.atlassian.sal.api.executor.ThreadLocalContext;
4   import com.atlassian.sal.api.executor.ThreadLocalContextManager;
5   import com.google.common.collect.ImmutableSet;
6   import junit.framework.TestCase;
7   
8   import java.util.Collection;
9   import java.util.concurrent.Callable;
10  import java.util.concurrent.ExecutionException;
11  import java.util.concurrent.Executors;
12  import java.util.concurrent.Future;
13  import java.util.concurrent.TimeUnit;
14  import java.util.concurrent.TimeoutException;
15  import java.util.concurrent.atomic.AtomicReference;
16  
17  /**
18   * Test that ThreadLocalDelegateExecutorService correctly wraps callables and runnables passed to it before passing them
19   * off to its delegate, by checking the thread-local context in the call or run method. Also, test that the thread-local
20   * context doesn't get set to something else on return.
21   */
22  public class TestThreadLocalDelegateExecutorService extends TestCase
23  {
24      private static final StubThreadLocalContextManager.Context OUTER_THREAD_LOCAL_CONTEXT = new StubThreadLocalContextManager.Context();
25  
26      public void testSubmitCallable() throws ExecutionException, InterruptedException
27      {
28          final ThreadLocalContextManager<ThreadLocalContext> manager = threadLocalContextManager();
29          final ThreadLocalDelegateExecutorService<ThreadLocalContext> service = threadLocalDelegateExecutorService(manager);
30          service.submit(assertingCallable(manager)).get();
31          assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
32      }
33  
34      public void testSubmitRunnableWithResult() throws ExecutionException, InterruptedException
35      {
36          final ThreadLocalContextManager<ThreadLocalContext> manager = threadLocalContextManager();
37          final ThreadLocalDelegateExecutorService<ThreadLocalContext> service = threadLocalDelegateExecutorService(manager);
38          service.submit(assertingRunnable(manager), null).get();
39          assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
40      }
41  
42      public void testSubmitRunnable() throws ExecutionException, InterruptedException
43      {
44          final ThreadLocalContextManager<ThreadLocalContext> manager = threadLocalContextManager();
45          final ThreadLocalDelegateExecutorService service = threadLocalDelegateExecutorService(manager);
46          service.submit(assertingRunnable(manager)).get();
47          assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
48      }
49  
50      public void testInvokeAll() throws ExecutionException, InterruptedException
51      {
52          final ThreadLocalContextManager<ThreadLocalContext> manager = threadLocalContextManager();
53          final ThreadLocalDelegateExecutorService<ThreadLocalContext> service = threadLocalDelegateExecutorService(manager);
54          getAllFutures(service.invokeAll(assertingCallables(manager)));
55          assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
56      }
57  
58      public void testInvokeAllWithTimeout() throws ExecutionException, InterruptedException
59      {
60          final ThreadLocalContextManager<ThreadLocalContext> manager = threadLocalContextManager();
61          final ThreadLocalDelegateExecutorService<ThreadLocalContext> service = threadLocalDelegateExecutorService(manager);
62          getAllFutures(service.invokeAll(assertingCallables(manager), 60, TimeUnit.SECONDS));
63          assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
64      }
65  
66      public void testInvokeAny() throws ExecutionException, InterruptedException
67      {
68          final ThreadLocalContextManager<ThreadLocalContext> manager = threadLocalContextManager();
69          final ThreadLocalDelegateExecutorService<ThreadLocalContext> service = threadLocalDelegateExecutorService(manager);
70          service.invokeAny(assertingCallables(manager));
71          assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
72      }
73  
74      public void testInvokeAnyWithTimeout() throws ExecutionException, InterruptedException, TimeoutException
75      {
76          final ThreadLocalContextManager<ThreadLocalContext> manager = threadLocalContextManager();
77          final ThreadLocalDelegateExecutorService<ThreadLocalContext> service = threadLocalDelegateExecutorService(manager);
78          service.invokeAny(assertingCallables(manager), 60, TimeUnit.SECONDS);
79          assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
80      }
81  
82      public void testExecute() throws Exception
83      {
84          // We have to jump through a few hoops here, since the AssertionError gets thrown on a different thread, and
85          // there is no Future to get.
86  
87          final ThreadLocalContextManager<ThreadLocalContext> manager = threadLocalContextManager();
88          final ThreadLocalDelegateExecutorService<ThreadLocalContext> service = threadLocalDelegateExecutorService(manager);
89          final AtomicReference<AssertionError> assertionErrorReference = new AtomicReference<AssertionError>();
90  
91          service.execute(assertingRunnableWithErrorReference(manager, assertionErrorReference));
92          service.shutdown();
93          service.awaitTermination(60, TimeUnit.SECONDS);
94  
95          final AssertionError assertionError = assertionErrorReference.get();
96          if (assertionError != null)
97              throw assertionError;
98  
99          assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
100     }
101 
102     private ThreadLocalContextManager<ThreadLocalContext> threadLocalContextManager()
103     {
104         ThreadLocalContextManager<ThreadLocalContext> manager = new StubThreadLocalContextManager();
105         manager.setThreadLocalContext(OUTER_THREAD_LOCAL_CONTEXT);
106         return manager;
107     }
108 
109     private ThreadLocalDelegateExecutorService<ThreadLocalContext> threadLocalDelegateExecutorService(ThreadLocalContextManager<ThreadLocalContext> manager)
110     {
111         return new ThreadLocalDelegateExecutorService<ThreadLocalContext>(manager, Executors.newSingleThreadExecutor());
112     }
113 
114     private Runnable assertingRunnable(final ThreadLocalContextManager manager)
115     {
116         return new Runnable()
117         {
118             @Override
119             public void run()
120             {
121                 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
122             }
123         };
124     }
125 
126     private Runnable assertingRunnableWithErrorReference(final ThreadLocalContextManager manager, final AtomicReference<AssertionError> assertionErrorReference)
127     {
128         return new Runnable()
129         {
130             @Override
131             public void run()
132             {
133                 try
134                 {
135                     assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
136                 }
137                 catch (AssertionError e)
138                 {
139                     assertionErrorReference.set(e);
140                 }
141             }
142         };
143     }
144 
145     private Callable<Object> assertingCallable(final ThreadLocalContextManager manager)
146     {
147         return new Callable<Object>()
148         {
149             @Override
150             public Object call() throws Exception
151             {
152                 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
153                 return null;
154             }
155         };
156     }
157 
158     private Collection<Callable<Object>> assertingCallables(final ThreadLocalContextManager manager)
159     {
160         return ImmutableSet.of(assertingCallable(manager), assertingCallable(manager), assertingCallable(manager));
161     }
162 
163     private void getAllFutures(Collection<Future<Object>> futures) throws ExecutionException, InterruptedException
164     {
165         for (Future<Object> future : futures)
166         {
167             future.get();
168         }
169     }
170 }