View Javadoc

1   package com.atlassian.httpclient.apache.httpcomponents;
2   
3   import com.atlassian.sal.api.executor.ThreadLocalContextManager;
4   import com.atlassian.util.concurrent.Promise;
5   import com.atlassian.util.concurrent.Promises;
6   import com.google.common.annotations.VisibleForTesting;
7   import com.google.common.util.concurrent.SettableFuture;
8   import org.apache.http.HttpResponse;
9   import org.apache.http.client.methods.HttpUriRequest;
10  import org.apache.http.concurrent.FutureCallback;
11  import org.apache.http.nio.client.HttpAsyncClient;
12  import org.apache.http.protocol.HttpContext;
13  
14  import java.util.concurrent.Executor;
15  import java.util.concurrent.TimeoutException;
16  
17  import static com.google.common.base.Preconditions.checkNotNull;
18  
19  final class SettableFuturePromiseHttpPromiseAsyncClient<C> implements PromiseHttpAsyncClient {
20      private final HttpAsyncClient client;
21      private final ThreadLocalContextManager<C> threadLocalContextManager;
22      private final Executor executor;
23  
24      SettableFuturePromiseHttpPromiseAsyncClient(HttpAsyncClient client, ThreadLocalContextManager<C> threadLocalContextManager, Executor executor) {
25          this.client = checkNotNull(client);
26          this.threadLocalContextManager = checkNotNull(threadLocalContextManager);
27          this.executor = new ThreadLocalDelegateExecutor<C>(threadLocalContextManager, executor);
28      }
29  
30      @Override
31      public Promise<HttpResponse> execute(HttpUriRequest request, HttpContext context) {
32          final SettableFuture<HttpResponse> future = SettableFuture.create();
33          client.execute(request, context, new ThreadLocalContextAwareFutureCallback<C, HttpResponse>(threadLocalContextManager) {
34              @Override
35              void doCompleted(final HttpResponse httpResponse) {
36                  executor.execute(new Runnable() {
37                      @Override
38                      public void run() {
39                          future.set(httpResponse);
40                      }
41                  });
42              }
43  
44              @Override
45              void doFailed(final Exception ex) {
46                  executor.execute(new Runnable() {
47                      @Override
48                      public void run() {
49                          future.setException(ex);
50                      }
51                  });
52              }
53  
54              @Override
55              void doCancelled() {
56                  final TimeoutException timeoutException = new TimeoutException();
57                  executor.execute(new Runnable() {
58                      @Override
59                      public void run() {
60                          future.setException(timeoutException);
61                      }
62                  });
63              }
64          });
65          return Promises.forListenableFuture(future);
66      }
67  
68      @VisibleForTesting
69      static <C> void runInContext(ThreadLocalContextManager<C> threadLocalContextManager, C threadLocalContext, ClassLoader contextClassLoader, Runnable runnable) {
70          final C oldThreadLocalContext = threadLocalContextManager.getThreadLocalContext();
71          final ClassLoader oldCcl = Thread.currentThread().getContextClassLoader();
72          try {
73              Thread.currentThread().setContextClassLoader(contextClassLoader);
74              threadLocalContextManager.setThreadLocalContext(threadLocalContext);
75              runnable.run();
76          } finally {
77              threadLocalContextManager.setThreadLocalContext(oldThreadLocalContext);
78              Thread.currentThread().setContextClassLoader(oldCcl);
79          }
80      }
81  
82      private static abstract class ThreadLocalContextAwareFutureCallback<C, HttpResponse> implements FutureCallback<HttpResponse> {
83          private final ThreadLocalContextManager<C> threadLocalContextManager;
84          private final C threadLocalContext;
85          private final ClassLoader contextClassLoader;
86  
87          private ThreadLocalContextAwareFutureCallback(ThreadLocalContextManager<C> threadLocalContextManager) {
88              this.threadLocalContextManager = checkNotNull(threadLocalContextManager);
89              this.threadLocalContext = threadLocalContextManager.getThreadLocalContext();
90              this.contextClassLoader = Thread.currentThread().getContextClassLoader();
91          }
92  
93          abstract void doCompleted(HttpResponse response);
94  
95          abstract void doFailed(Exception ex);
96  
97          abstract void doCancelled();
98  
99          @Override
100         public final void completed(final HttpResponse response) {
101             runInContext(threadLocalContextManager, threadLocalContext, contextClassLoader, new Runnable() {
102                 @Override
103                 public void run() {
104                     doCompleted(response);
105                 }
106             });
107         }
108 
109         @Override
110         public final void failed(final Exception ex) {
111             runInContext(threadLocalContextManager, threadLocalContext, contextClassLoader, new Runnable() {
112                 @Override
113                 public void run() {
114                     doFailed(ex);
115                 }
116             });
117         }
118 
119         @Override
120         public final void cancelled() {
121             runInContext(threadLocalContextManager, threadLocalContext, contextClassLoader, new Runnable() {
122                 @Override
123                 public void run() {
124                     doCancelled();
125                 }
126             });
127         }
128     }
129 
130     private static final class ThreadLocalDelegateExecutor<C> implements Executor {
131         private final Executor delegate;
132         private final ThreadLocalContextManager<C> manager;
133 
134         ThreadLocalDelegateExecutor(ThreadLocalContextManager<C> manager, Executor delegate) {
135             this.delegate = checkNotNull(delegate);
136             this.manager = checkNotNull(manager);
137         }
138 
139         public void execute(final Runnable runnable) {
140             delegate.execute(new ThreadLocalDelegateRunnable<C>(manager, runnable));
141         }
142     }
143 
144     private static final class ThreadLocalDelegateRunnable<C> implements Runnable {
145         private final C context;
146         private final Runnable delegate;
147         private final ClassLoader contextClassLoader;
148         private final ThreadLocalContextManager<C> manager;
149 
150         ThreadLocalDelegateRunnable(ThreadLocalContextManager<C> manager, Runnable delegate) {
151             this.delegate = delegate;
152             this.manager = manager;
153             this.context = manager.getThreadLocalContext();
154             this.contextClassLoader = Thread.currentThread().getContextClassLoader();
155         }
156 
157         public void run() {
158             runInContext(manager, context, contextClassLoader, new Runnable() {
159                 @Override
160                 public void run() {
161                     delegate.run();
162                 }
163             });
164         }
165     }
166 }