View Javadoc

1   package com.atlassian.sal.core.executor;
2   
3   import com.atlassian.sal.api.executor.ThreadLocalContextManager;
4   import com.google.common.base.Function;
5   import com.google.common.collect.Collections2;
6   
7   import java.util.Collection;
8   import java.util.List;
9   import java.util.concurrent.Callable;
10  import java.util.concurrent.ExecutionException;
11  import java.util.concurrent.ExecutorService;
12  import java.util.concurrent.Future;
13  import java.util.concurrent.TimeUnit;
14  import java.util.concurrent.TimeoutException;
15  
16  import static com.google.common.base.Preconditions.checkNotNull;
17  
18  /**
19   * Executor service that wraps executing callables and runnables in a wrapper that transfers the thread local state of
20   * the caller to the thread of the executing task.
21   *
22   * @since 2.0
23   */
24  public class ThreadLocalDelegateExecutorService<C> implements ExecutorService
25  {
26      protected final ThreadLocalContextManager<C> manager;
27      private final ExecutorService delegate;
28  
29      public ThreadLocalDelegateExecutorService(ThreadLocalContextManager<C> manager, ExecutorService delegate)
30      {
31          this.manager = checkNotNull(manager);
32          this.delegate = checkNotNull(delegate);
33      }
34  
35      /**
36       * Only present to provide backwards compatibility
37       * @deprecated use {@link ThreadLocalDelegateExecutorService#ThreadLocalDelegateExecutorService(com.atlassian.sal.api.executor.ThreadLocalContextManager, java.util.concurrent.ExecutorService)}
38       */
39      public ThreadLocalDelegateExecutorService(com.atlassian.sal.core.executor.ThreadLocalContextManager<C> manager, ExecutorService delegate)
40      {
41          this.manager = checkNotNull(manager);
42          this.delegate = checkNotNull(delegate);
43      }
44  
45      public void shutdown()
46      {
47          delegate.shutdown();
48      }
49  
50      public List<Runnable> shutdownNow()
51      {
52          return delegate.shutdownNow();
53      }
54  
55      public boolean isShutdown()
56      {
57          return delegate.isShutdown();
58      }
59  
60      public boolean isTerminated()
61      {
62          return delegate.isTerminated();
63      }
64  
65      public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException
66      {
67          return delegate.awaitTermination(timeout, unit);
68      }
69  
70      public <T> Future<T> submit(Callable<T> callable)
71      {
72          return delegate.submit(threadLocalDelegateCallable(callable));
73      }
74  
75      public <T> Future<T> submit(Runnable runnable, T t)
76      {
77          return delegate.submit(threadLocalDelegateRunnable(runnable), t);
78      }
79  
80      public Future<?> submit(Runnable runnable)
81      {
82          return delegate.submit(threadLocalDelegateRunnable(runnable));
83      }
84  
85      public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> callables) throws InterruptedException
86      {
87          return delegate.invokeAll(threadLocalDelegateCallableCollection(callables));
88      }
89  
90      public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> callables, long timeout, TimeUnit unit) throws InterruptedException
91      {
92          return delegate.invokeAll(threadLocalDelegateCallableCollection(callables), timeout, unit);
93      }
94  
95      public <T> T invokeAny(Collection<? extends Callable<T>> callables) throws InterruptedException, ExecutionException
96      {
97          return delegate.invokeAny(threadLocalDelegateCallableCollection(callables));
98      }
99  
100     public <T> T invokeAny(Collection<? extends Callable<T>> callables, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
101     {
102         return delegate.invokeAny(threadLocalDelegateCallableCollection(callables), timeout, unit);
103     }
104 
105     public void execute(Runnable runnable)
106     {
107         delegate.execute(threadLocalDelegateRunnable(runnable));
108     }
109 
110     private ThreadLocalDelegateRunnable threadLocalDelegateRunnable(Runnable runnable)
111     {
112         return new ThreadLocalDelegateRunnable<C>(manager, runnable);
113     }
114 
115     private <T> ThreadLocalDelegateCallable<C, T> threadLocalDelegateCallable(Callable<T> callable)
116     {
117         return new ThreadLocalDelegateCallable<C, T>(manager, callable);
118     }
119 
120     private <T> Collection<ThreadLocalDelegateCallable<C, T>> threadLocalDelegateCallableCollection(Collection<? extends Callable<T>> callables)
121     {
122         return Collections2.transform(callables, new Function<Callable<T>, ThreadLocalDelegateCallable<C, T>>()
123         {
124             public ThreadLocalDelegateCallable<C, T> apply(Callable<T> callable)
125             {
126                 return threadLocalDelegateCallable(callable);
127             }
128         });
129     }
130 }