View Javadoc

1   package com.atlassian.sal.core.executor;
2   
3   import com.google.common.collect.ImmutableSet;
4   import junit.framework.TestCase;
5   
6   import java.util.Collection;
7   import java.util.concurrent.Callable;
8   import java.util.concurrent.ExecutionException;
9   import java.util.concurrent.Executors;
10  import java.util.concurrent.Future;
11  import java.util.concurrent.TimeUnit;
12  import java.util.concurrent.TimeoutException;
13  import java.util.concurrent.atomic.AtomicReference;
14  
15  /**
16   * Test that ThreadLocalDelegateExecutorService correctly wraps callables and runnables passed to it before passing them
17   * off to its delegate, by checking the thread-local context in the call or run method. Also, test that the thread-local
18   * context doesn't get set to something else on return.
19   */
20  public class TestThreadLocalDelegateExecutorService extends TestCase
21  {
22      private static final String OUTER_THREAD_LOCAL_CONTEXT = "OUTER_THREAD_LOCAL_CONTEXT";
23  
24      public void testSubmitCallable() throws ExecutionException, InterruptedException
25      {
26          final ThreadLocalContextManager manager = threadLocalContextManager();
27          final ThreadLocalDelegateExecutorService service = threadLocalDelegateExecutorService(manager);
28          service.submit(assertingCallable(manager)).get();
29          assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
30      }
31  
32      public void testSubmitRunnableWithResult() throws ExecutionException, InterruptedException
33      {
34          final ThreadLocalContextManager manager = threadLocalContextManager();
35          final ThreadLocalDelegateExecutorService service = threadLocalDelegateExecutorService(manager);
36          service.submit(assertingRunnable(manager), null).get();
37          assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
38      }
39  
40      public void testSubmitRunnable() throws ExecutionException, InterruptedException
41      {
42          final ThreadLocalContextManager manager = threadLocalContextManager();
43          final ThreadLocalDelegateExecutorService service = threadLocalDelegateExecutorService(manager);
44          service.submit(assertingRunnable(manager)).get();
45          assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
46      }
47  
48      public void testInvokeAll() throws ExecutionException, InterruptedException
49      {
50          final ThreadLocalContextManager manager = threadLocalContextManager();
51          final ThreadLocalDelegateExecutorService service = threadLocalDelegateExecutorService(manager);
52          getAllFutures(service.invokeAll(assertingCallables(manager)));
53          assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
54      }
55  
56      public void testInvokeAllWithTimeout() throws ExecutionException, InterruptedException
57      {
58          final ThreadLocalContextManager manager = threadLocalContextManager();
59          final ThreadLocalDelegateExecutorService service = threadLocalDelegateExecutorService(manager);
60          getAllFutures(service.invokeAll(assertingCallables(manager), 60, TimeUnit.SECONDS));
61          assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
62      }
63  
64      public void testInvokeAny() throws ExecutionException, InterruptedException
65      {
66          final ThreadLocalContextManager manager = threadLocalContextManager();
67          final ThreadLocalDelegateExecutorService service = threadLocalDelegateExecutorService(manager);
68          service.invokeAny(assertingCallables(manager));
69          assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
70      }
71  
72      public void testInvokeAnyWithTimeout() throws ExecutionException, InterruptedException, TimeoutException
73      {
74          final ThreadLocalContextManager manager = threadLocalContextManager();
75          final ThreadLocalDelegateExecutorService service = threadLocalDelegateExecutorService(manager);
76          service.invokeAny(assertingCallables(manager), 60, TimeUnit.SECONDS);
77          assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
78      }
79  
80      public void testExecute() throws Exception
81      {
82          // We have to jump through a few hoops here, since the AssertionError gets thrown on a different thread, and
83          // there is no Future to get.
84  
85          final ThreadLocalContextManager manager = threadLocalContextManager();
86          final ThreadLocalDelegateExecutorService service = threadLocalDelegateExecutorService(manager);
87          final AtomicReference<AssertionError> assertionErrorReference = new AtomicReference<AssertionError>();
88  
89          service.execute(assertingRunnableWithErrorReference(manager, assertionErrorReference));
90          service.shutdown();
91          service.awaitTermination(60, TimeUnit.SECONDS);
92  
93          if (assertionErrorReference.get() != null)
94              throw assertionErrorReference.get();
95  
96          assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
97      }
98  
99      private ThreadLocalContextManager threadLocalContextManager()
100     {
101         ThreadLocalContextManager manager = new StubThreadLocalContextManager();
102         manager.setThreadLocalContext(OUTER_THREAD_LOCAL_CONTEXT);
103         return manager;
104     }
105 
106     private ThreadLocalDelegateExecutorService threadLocalDelegateExecutorService(ThreadLocalContextManager manager)
107     {
108         return new ThreadLocalDelegateExecutorService(manager, Executors.newSingleThreadExecutor());
109     }
110 
111     private Runnable assertingRunnable(final ThreadLocalContextManager manager)
112     {
113         return new Runnable()
114         {
115             @Override
116             public void run()
117             {
118                 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
119             }
120         };
121     }
122 
123     private Runnable assertingRunnableWithErrorReference(final ThreadLocalContextManager manager, final AtomicReference<AssertionError> assertionErrorReference)
124     {
125         return new Runnable()
126         {
127             @Override
128             public void run()
129             {
130                 try
131                 {
132                     assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
133                 }
134                 catch (AssertionError e)
135                 {
136                     assertionErrorReference.set(e);
137                 }
138             }
139         };
140     }
141 
142     private Callable<Object> assertingCallable(final ThreadLocalContextManager manager)
143     {
144         return new Callable<Object>()
145         {
146             @Override
147             public Object call() throws Exception
148             {
149                 assertEquals(OUTER_THREAD_LOCAL_CONTEXT, manager.getThreadLocalContext());
150                 return null;
151             }
152         };
153     }
154 
155     private Collection<Callable<Object>> assertingCallables(final ThreadLocalContextManager manager)
156     {
157         return ImmutableSet.of(assertingCallable(manager), assertingCallable(manager), assertingCallable(manager));
158     }
159 
160     private void getAllFutures(Collection<Future<Object>> futures) throws ExecutionException, InterruptedException
161     {
162         for (Future<Object> future : futures)
163         {
164             future.get();
165         }
166     }
167 }