1 package com.atlassian.httpclient.apache.httpcomponents;
2
3 import com.atlassian.sal.api.executor.ThreadLocalContextManager;
4 import com.atlassian.util.concurrent.Promise;
5 import com.atlassian.util.concurrent.Promises;
6 import com.google.common.annotations.VisibleForTesting;
7 import com.google.common.util.concurrent.SettableFuture;
8 import org.apache.http.HttpResponse;
9 import org.apache.http.client.methods.HttpUriRequest;
10 import org.apache.http.concurrent.FutureCallback;
11 import org.apache.http.nio.client.HttpAsyncClient;
12 import org.apache.http.protocol.HttpContext;
13
14 import java.util.concurrent.Executor;
15 import java.util.concurrent.TimeoutException;
16
17 import static com.google.common.base.Preconditions.checkNotNull;
18
19 final class SettableFuturePromiseHttpPromiseAsyncClient<C> implements PromiseHttpAsyncClient {
20 private final HttpAsyncClient client;
21 private final ThreadLocalContextManager<C> threadLocalContextManager;
22 private final Executor executor;
23
24 SettableFuturePromiseHttpPromiseAsyncClient(HttpAsyncClient client, ThreadLocalContextManager<C> threadLocalContextManager, Executor executor) {
25 this.client = checkNotNull(client);
26 this.threadLocalContextManager = checkNotNull(threadLocalContextManager);
27 this.executor = new ThreadLocalDelegateExecutor<C>(threadLocalContextManager, executor);
28 }
29
30 @Override
31 public Promise<HttpResponse> execute(HttpUriRequest request, HttpContext context) {
32 final SettableFuture<HttpResponse> future = SettableFuture.create();
33 client.execute(request, context, new ThreadLocalContextAwareFutureCallback<C, HttpResponse>(threadLocalContextManager) {
34 @Override
35 void doCompleted(final HttpResponse httpResponse) {
36 executor.execute(new Runnable() {
37 @Override
38 public void run() {
39 future.set(httpResponse);
40 }
41 });
42 }
43
44 @Override
45 void doFailed(final Exception ex) {
46 executor.execute(new Runnable() {
47 @Override
48 public void run() {
49 future.setException(ex);
50 }
51 });
52 }
53
54 @Override
55 void doCancelled() {
56 final TimeoutException timeoutException = new TimeoutException();
57 executor.execute(new Runnable() {
58 @Override
59 public void run() {
60 future.setException(timeoutException);
61 }
62 });
63 }
64 });
65 return Promises.forListenableFuture(future);
66 }
67
68 @VisibleForTesting
69 static <C> void runInContext(ThreadLocalContextManager<C> threadLocalContextManager, C threadLocalContext, ClassLoader contextClassLoader, Runnable runnable) {
70 final C oldThreadLocalContext = threadLocalContextManager.getThreadLocalContext();
71 final ClassLoader oldCcl = Thread.currentThread().getContextClassLoader();
72 try {
73 Thread.currentThread().setContextClassLoader(contextClassLoader);
74 threadLocalContextManager.setThreadLocalContext(threadLocalContext);
75 runnable.run();
76 } finally {
77 threadLocalContextManager.setThreadLocalContext(oldThreadLocalContext);
78 Thread.currentThread().setContextClassLoader(oldCcl);
79 }
80 }
81
82 private static abstract class ThreadLocalContextAwareFutureCallback<C, HttpResponse> implements FutureCallback<HttpResponse> {
83 private final ThreadLocalContextManager<C> threadLocalContextManager;
84 private final C threadLocalContext;
85 private final ClassLoader contextClassLoader;
86
87 private ThreadLocalContextAwareFutureCallback(ThreadLocalContextManager<C> threadLocalContextManager) {
88 this.threadLocalContextManager = checkNotNull(threadLocalContextManager);
89 this.threadLocalContext = threadLocalContextManager.getThreadLocalContext();
90 this.contextClassLoader = Thread.currentThread().getContextClassLoader();
91 }
92
93 abstract void doCompleted(HttpResponse response);
94
95 abstract void doFailed(Exception ex);
96
97 abstract void doCancelled();
98
99 @Override
100 public final void completed(final HttpResponse response) {
101 runInContext(threadLocalContextManager, threadLocalContext, contextClassLoader, new Runnable() {
102 @Override
103 public void run() {
104 doCompleted(response);
105 }
106 });
107 }
108
109 @Override
110 public final void failed(final Exception ex) {
111 runInContext(threadLocalContextManager, threadLocalContext, contextClassLoader, new Runnable() {
112 @Override
113 public void run() {
114 doFailed(ex);
115 }
116 });
117 }
118
119 @Override
120 public final void cancelled() {
121 runInContext(threadLocalContextManager, threadLocalContext, contextClassLoader, new Runnable() {
122 @Override
123 public void run() {
124 doCancelled();
125 }
126 });
127 }
128 }
129
130 private static final class ThreadLocalDelegateExecutor<C> implements Executor {
131 private final Executor delegate;
132 private final ThreadLocalContextManager<C> manager;
133
134 ThreadLocalDelegateExecutor(ThreadLocalContextManager<C> manager, Executor delegate) {
135 this.delegate = checkNotNull(delegate);
136 this.manager = checkNotNull(manager);
137 }
138
139 public void execute(final Runnable runnable) {
140 delegate.execute(new ThreadLocalDelegateRunnable<C>(manager, runnable));
141 }
142 }
143
144 private static final class ThreadLocalDelegateRunnable<C> implements Runnable {
145 private final C context;
146 private final Runnable delegate;
147 private final ClassLoader contextClassLoader;
148 private final ThreadLocalContextManager<C> manager;
149
150 ThreadLocalDelegateRunnable(ThreadLocalContextManager<C> manager, Runnable delegate) {
151 this.delegate = delegate;
152 this.manager = manager;
153 this.context = manager.getThreadLocalContext();
154 this.contextClassLoader = Thread.currentThread().getContextClassLoader();
155 }
156
157 public void run() {
158 runInContext(manager, context, contextClassLoader, new Runnable() {
159 @Override
160 public void run() {
161 delegate.run();
162 }
163 });
164 }
165 }
166 }