View Javadoc

1   package com.atlassian.sal.core.executor;
2   
3   import com.atlassian.sal.api.executor.ThreadLocalContextManager;
4   import com.google.common.base.Function;
5   import com.google.common.collect.Collections2;
6   
7   import java.util.Collection;
8   import java.util.List;
9   import java.util.concurrent.Callable;
10  import java.util.concurrent.ExecutionException;
11  import java.util.concurrent.ExecutorService;
12  import java.util.concurrent.Future;
13  import java.util.concurrent.TimeUnit;
14  import java.util.concurrent.TimeoutException;
15  
16  import static com.google.common.base.Preconditions.checkNotNull;
17  
18  /**
19   * Executor service that wraps executing callables and runnables in a wrapper that transfers the thread local state of
20   * the caller to the thread of the executing task.
21   *
22   * @since 2.0
23   */
24  public class ThreadLocalDelegateExecutorService<C> implements ExecutorService {
25      protected final ThreadLocalContextManager<C> manager;
26      private final ExecutorService delegate;
27  
28      public ThreadLocalDelegateExecutorService(ThreadLocalContextManager<C> manager, ExecutorService delegate) {
29          this.manager = checkNotNull(manager);
30          this.delegate = checkNotNull(delegate);
31      }
32  
33      public void shutdown() {
34          delegate.shutdown();
35      }
36  
37      public List<Runnable> shutdownNow() {
38          return delegate.shutdownNow();
39      }
40  
41      public boolean isShutdown() {
42          return delegate.isShutdown();
43      }
44  
45      public boolean isTerminated() {
46          return delegate.isTerminated();
47      }
48  
49      public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
50          return delegate.awaitTermination(timeout, unit);
51      }
52  
53      public <T> Future<T> submit(Callable<T> callable) {
54          return delegate.submit(threadLocalDelegateCallable(callable));
55      }
56  
57      public <T> Future<T> submit(Runnable runnable, T t) {
58          return delegate.submit(threadLocalDelegateRunnable(runnable), t);
59      }
60  
61      public Future<?> submit(Runnable runnable) {
62          return delegate.submit(threadLocalDelegateRunnable(runnable));
63      }
64  
65      public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> callables) throws InterruptedException {
66          return delegate.invokeAll(threadLocalDelegateCallableCollection(callables));
67      }
68  
69      public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> callables, long timeout, TimeUnit unit) throws InterruptedException {
70          return delegate.invokeAll(threadLocalDelegateCallableCollection(callables), timeout, unit);
71      }
72  
73      public <T> T invokeAny(Collection<? extends Callable<T>> callables) throws InterruptedException, ExecutionException {
74          return delegate.invokeAny(threadLocalDelegateCallableCollection(callables));
75      }
76  
77      public <T> T invokeAny(Collection<? extends Callable<T>> callables, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
78          return delegate.invokeAny(threadLocalDelegateCallableCollection(callables), timeout, unit);
79      }
80  
81      public void execute(Runnable runnable) {
82          delegate.execute(threadLocalDelegateRunnable(runnable));
83      }
84  
85      private ThreadLocalDelegateRunnable threadLocalDelegateRunnable(Runnable runnable) {
86          return new ThreadLocalDelegateRunnable<C>(manager, runnable);
87      }
88  
89      private <T> ThreadLocalDelegateCallable<C, T> threadLocalDelegateCallable(Callable<T> callable) {
90          return new ThreadLocalDelegateCallable<C, T>(manager, callable);
91      }
92  
93      private <T> Collection<ThreadLocalDelegateCallable<C, T>> threadLocalDelegateCallableCollection(Collection<? extends Callable<T>> callables) {
94          return Collections2.transform(callables, new Function<Callable<T>, ThreadLocalDelegateCallable<C, T>>() {
95              public ThreadLocalDelegateCallable<C, T> apply(Callable<T> callable) {
96                  return threadLocalDelegateCallable(callable);
97              }
98          });
99      }
100 }