View Javadoc

1   package com.atlassian.sal.core.executor;
2   
3   import com.atlassian.sal.api.executor.ThreadLocalContextManager;
4   import com.google.common.base.Function;
5   import com.google.common.collect.Collections2;
6   
7   import java.util.Collection;
8   import java.util.List;
9   import java.util.concurrent.Callable;
10  import java.util.concurrent.ExecutionException;
11  import java.util.concurrent.ExecutorService;
12  import java.util.concurrent.Future;
13  import java.util.concurrent.TimeUnit;
14  import java.util.concurrent.TimeoutException;
15  
16  import static com.google.common.base.Preconditions.checkNotNull;
17  
18  /**
19   * Executor service that wraps executing callables and runnables in a wrapper that transfers the thread local state of
20   * the caller to the thread of the executing task.
21   *
22   * @since 2.0
23   */
24  public class ThreadLocalDelegateExecutorService<C> implements ExecutorService
25  {
26      protected final ThreadLocalContextManager<C> manager;
27      private final ExecutorService delegate;
28  
29      public ThreadLocalDelegateExecutorService(ThreadLocalContextManager<C> manager, ExecutorService delegate)
30      {
31          this.manager = checkNotNull(manager);
32          this.delegate = checkNotNull(delegate);
33      }
34  
35      public void shutdown()
36      {
37          delegate.shutdown();
38      }
39  
40      public List<Runnable> shutdownNow()
41      {
42          return delegate.shutdownNow();
43      }
44  
45      public boolean isShutdown()
46      {
47          return delegate.isShutdown();
48      }
49  
50      public boolean isTerminated()
51      {
52          return delegate.isTerminated();
53      }
54  
55      public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException
56      {
57          return delegate.awaitTermination(timeout, unit);
58      }
59  
60      public <T> Future<T> submit(Callable<T> callable)
61      {
62          return delegate.submit(threadLocalDelegateCallable(callable));
63      }
64  
65      public <T> Future<T> submit(Runnable runnable, T t)
66      {
67          return delegate.submit(threadLocalDelegateRunnable(runnable), t);
68      }
69  
70      public Future<?> submit(Runnable runnable)
71      {
72          return delegate.submit(threadLocalDelegateRunnable(runnable));
73      }
74  
75      public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> callables) throws InterruptedException
76      {
77          return delegate.invokeAll(threadLocalDelegateCallableCollection(callables));
78      }
79  
80      public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> callables, long timeout, TimeUnit unit) throws InterruptedException
81      {
82          return delegate.invokeAll(threadLocalDelegateCallableCollection(callables), timeout, unit);
83      }
84  
85      public <T> T invokeAny(Collection<? extends Callable<T>> callables) throws InterruptedException, ExecutionException
86      {
87          return delegate.invokeAny(threadLocalDelegateCallableCollection(callables));
88      }
89  
90      public <T> T invokeAny(Collection<? extends Callable<T>> callables, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
91      {
92          return delegate.invokeAny(threadLocalDelegateCallableCollection(callables), timeout, unit);
93      }
94  
95      public void execute(Runnable runnable)
96      {
97          delegate.execute(threadLocalDelegateRunnable(runnable));
98      }
99  
100     private ThreadLocalDelegateRunnable threadLocalDelegateRunnable(Runnable runnable)
101     {
102         return new ThreadLocalDelegateRunnable<C>(manager, runnable);
103     }
104 
105     private <T> ThreadLocalDelegateCallable<C, T> threadLocalDelegateCallable(Callable<T> callable)
106     {
107         return new ThreadLocalDelegateCallable<C, T>(manager, callable);
108     }
109 
110     private <T> Collection<ThreadLocalDelegateCallable<C, T>> threadLocalDelegateCallableCollection(Collection<? extends Callable<T>> callables)
111     {
112         return Collections2.transform(callables, new Function<Callable<T>, ThreadLocalDelegateCallable<C, T>>()
113         {
114             public ThreadLocalDelegateCallable<C, T> apply(Callable<T> callable)
115             {
116                 return threadLocalDelegateCallable(callable);
117             }
118         });
119     }
120 }