View Javadoc
1   package com.atlassian.sal.core.executor;
2   
3   import com.atlassian.sal.api.executor.ThreadLocalDelegateExecutorFactory;
4   import com.google.common.base.Function;
5   import com.google.common.collect.Collections2;
6   
7   import javax.annotation.Nonnull;
8   import javax.annotation.Nullable;
9   import java.util.Collection;
10  import java.util.List;
11  import java.util.concurrent.Callable;
12  import java.util.concurrent.ExecutionException;
13  import java.util.concurrent.ExecutorService;
14  import java.util.concurrent.Future;
15  import java.util.concurrent.TimeUnit;
16  import java.util.concurrent.TimeoutException;
17  
18  import static com.google.common.base.Preconditions.checkNotNull;
19  
20  /**
21   * Executor service that wraps executing callables and runnables in a wrapper that transfers the thread local state of
22   * the caller to the thread of the executing task.
23   *
24   * @since 2.0
25   */
26  public class ThreadLocalDelegateExecutorService implements ExecutorService {
27      private final ExecutorService delegate;
28      private final ThreadLocalDelegateExecutorFactory delegateExecutorFactory;
29  
30      public ThreadLocalDelegateExecutorService(ExecutorService delegate, ThreadLocalDelegateExecutorFactory delegateExecutorFactory) {
31          this.delegate = checkNotNull(delegate);
32          this.delegateExecutorFactory = checkNotNull(delegateExecutorFactory);
33      }
34  
35      @Override
36      public void shutdown() {
37          delegate.shutdown();
38      }
39  
40      @Override
41      @Nonnull
42      public List<Runnable> shutdownNow() {
43          return delegate.shutdownNow();
44      }
45  
46      @Override
47      public boolean isShutdown() {
48          return delegate.isShutdown();
49      }
50  
51      @Override
52      public boolean isTerminated() {
53          return delegate.isTerminated();
54      }
55  
56      @Override
57      public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
58          return delegate.awaitTermination(timeout, unit);
59      }
60  
61      @Override
62      @Nonnull
63      public <T> Future<T> submit(Callable<T> callable) {
64          return delegate.submit(threadLocalDelegateCallable(callable));
65      }
66  
67      @Override
68      @Nonnull
69      public <T> Future<T> submit(Runnable runnable, @Nullable T result) {
70          return delegate.submit(threadLocalDelegateRunnable(runnable), result);
71      }
72  
73      @Override
74      @Nonnull
75      public Future<?> submit(Runnable runnable) {
76          return delegate.submit(threadLocalDelegateRunnable(runnable));
77      }
78  
79      @Override
80      @Nonnull
81      public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> callables) throws InterruptedException {
82          return delegate.invokeAll(threadLocalDelegateCallableCollection(callables));
83      }
84  
85      @Override
86      @Nonnull
87      public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> callables, long timeout, TimeUnit unit) throws InterruptedException {
88          return delegate.invokeAll(threadLocalDelegateCallableCollection(callables), timeout, unit);
89      }
90  
91      @Override
92      @Nonnull
93      public <T> T invokeAny(Collection<? extends Callable<T>> callables) throws InterruptedException, ExecutionException {
94          return delegate.invokeAny(threadLocalDelegateCallableCollection(callables));
95      }
96  
97      @Override
98      public <T> T invokeAny(Collection<? extends Callable<T>> callables, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
99          return delegate.invokeAny(threadLocalDelegateCallableCollection(callables), timeout, unit);
100     }
101 
102     public void execute(Runnable runnable) {
103         delegate.execute(threadLocalDelegateRunnable(runnable));
104     }
105 
106     private Runnable threadLocalDelegateRunnable(Runnable runnable) {
107         return delegateExecutorFactory.createRunnable(runnable);
108     }
109 
110     private <T> Callable<T> threadLocalDelegateCallable(Callable<T> callable) {
111         return delegateExecutorFactory.createCallable(callable);
112     }
113 
114     private <T> Collection<? extends Callable<T>> threadLocalDelegateCallableCollection(Collection<? extends Callable<T>> callables) {
115         return Collections2.transform(callables, (Function<Callable<T>, Callable<T>>) this::threadLocalDelegateCallable);
116     }
117 }