1 package com.atlassian.httpclient.apache.httpcomponents;
2
3 import com.atlassian.event.api.EventPublisher;
4 import com.atlassian.httpclient.apache.httpcomponents.cache.FlushableHttpCacheStorage;
5 import com.atlassian.httpclient.apache.httpcomponents.cache.FlushableHttpCacheStorageImpl;
6 import com.atlassian.httpclient.apache.httpcomponents.cache.LoggingHttpCacheStorage;
7 import com.atlassian.httpclient.apache.httpcomponents.proxy.ProxyConfigFactory;
8 import com.atlassian.httpclient.apache.httpcomponents.proxy.ProxyCredentialsProvider;
9 import com.atlassian.httpclient.api.HttpClient;
10 import com.atlassian.httpclient.api.HttpStatus;
11 import com.atlassian.httpclient.api.Request;
12 import com.atlassian.httpclient.api.Response;
13 import com.atlassian.httpclient.api.ResponsePromise;
14 import com.atlassian.httpclient.api.ResponsePromises;
15 import com.atlassian.httpclient.api.ResponseTooLargeException;
16 import com.atlassian.httpclient.api.factory.HttpClientOptions;
17 import com.atlassian.httpclient.base.AbstractHttpClient;
18 import com.atlassian.httpclient.base.event.HttpRequestCompletedEvent;
19 import com.atlassian.httpclient.base.event.HttpRequestFailedEvent;
20 import com.atlassian.sal.api.ApplicationProperties;
21 import com.atlassian.sal.api.executor.ThreadLocalContextManager;
22 import com.atlassian.util.concurrent.ThreadFactories;
23 import com.google.common.base.Function;
24 import com.google.common.base.Functions;
25 import com.google.common.base.Supplier;
26 import com.google.common.base.Suppliers;
27 import com.google.common.base.Throwables;
28 import com.google.common.primitives.Ints;
29 import org.apache.http.Header;
30 import org.apache.http.HttpEntity;
31 import org.apache.http.HttpResponse;
32 import org.apache.http.StatusLine;
33 import org.apache.http.client.config.CookieSpecs;
34 import org.apache.http.client.config.RequestConfig;
35 import org.apache.http.client.methods.HttpDelete;
36 import org.apache.http.client.methods.HttpGet;
37 import org.apache.http.client.methods.HttpHead;
38 import org.apache.http.client.methods.HttpOptions;
39 import org.apache.http.client.methods.HttpPost;
40 import org.apache.http.client.methods.HttpPut;
41 import org.apache.http.client.methods.HttpRequestBase;
42 import org.apache.http.client.methods.HttpTrace;
43 import org.apache.http.config.Registry;
44 import org.apache.http.config.RegistryBuilder;
45 import org.apache.http.conn.ssl.SSLContextBuilder;
46 import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
47 import org.apache.http.conn.ssl.X509HostnameVerifier;
48 import org.apache.http.impl.client.ProxyAuthenticationStrategy;
49 import org.apache.http.impl.client.cache.CacheConfig;
50 import org.apache.http.impl.client.cache.CachingHttpAsyncClient;
51 import org.apache.http.impl.conn.DefaultSchemePortResolver;
52 import org.apache.http.impl.conn.SystemDefaultRoutePlanner;
53 import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
54 import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
55 import org.apache.http.impl.nio.client.HttpAsyncClients;
56 import org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionFactory;
57 import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager;
58 import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
59 import org.apache.http.impl.nio.reactor.IOReactorConfig;
60 import org.apache.http.nio.conn.NoopIOSessionStrategy;
61 import org.apache.http.nio.conn.SchemeIOSessionStrategy;
62 import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
63 import org.apache.http.nio.reactor.IOReactorException;
64 import org.apache.http.nio.reactor.IOReactorExceptionHandler;
65 import org.apache.http.protocol.BasicHttpContext;
66 import org.apache.http.util.TextUtils;
67 import org.slf4j.Logger;
68 import org.slf4j.LoggerFactory;
69 import org.springframework.beans.factory.DisposableBean;
70
71 import javax.net.ssl.SSLContext;
72 import javax.net.ssl.SSLSession;
73 import javax.net.ssl.SSLSocket;
74 import java.io.IOException;
75 import java.security.GeneralSecurityException;
76 import java.security.KeyManagementException;
77 import java.security.KeyStoreException;
78 import java.security.NoSuchAlgorithmException;
79 import java.security.cert.X509Certificate;
80 import java.util.Map;
81 import java.util.concurrent.ExecutorService;
82 import java.util.concurrent.TimeUnit;
83 import java.util.regex.Pattern;
84
85 import static com.atlassian.util.concurrent.Promises.rejected;
86 import static com.google.common.base.Preconditions.checkNotNull;
87 import static java.lang.String.format;
88
89 public final class ApacheAsyncHttpClient<C> extends AbstractHttpClient implements HttpClient, DisposableBean {
90 private final Logger log = LoggerFactory.getLogger(this.getClass());
91
92 private static final Supplier<String> httpClientVersion = Suppliers.memoize(
93 () -> MavenUtils.getVersion("com.atlassian.httpclient", "atlassian-httpclient-api"));
94
95 private final Function<Object, Void> eventConsumer;
96 private final Supplier<String> applicationName;
97 private final ThreadLocalContextManager<C> threadLocalContextManager;
98 private final ExecutorService callbackExecutor;
99 private final HttpClientOptions httpClientOptions;
100
101 private final CachingHttpAsyncClient httpClient;
102 private final CloseableHttpAsyncClient nonCachingHttpClient;
103 private final FlushableHttpCacheStorage httpCacheStorage;
104
105 public ApacheAsyncHttpClient(EventPublisher eventConsumer, ApplicationProperties applicationProperties,
106 ThreadLocalContextManager<C> threadLocalContextManager) {
107 this(eventConsumer, applicationProperties, threadLocalContextManager, new HttpClientOptions());
108 }
109
110 public ApacheAsyncHttpClient(EventPublisher eventConsumer,
111 ApplicationProperties applicationProperties,
112 ThreadLocalContextManager<C> threadLocalContextManager,
113 HttpClientOptions options) {
114 this(new DefaultApplicationNameSupplier(applicationProperties),
115 new EventConsumerFunction(eventConsumer),
116 threadLocalContextManager,
117 options);
118 }
119
120 public ApacheAsyncHttpClient(String applicationName) {
121 this(applicationName, new HttpClientOptions());
122 }
123
124 public ApacheAsyncHttpClient(String applicationName, final HttpClientOptions options) {
125 this(Suppliers.ofInstance(applicationName), Functions.constant(null), new NoOpThreadLocalContextManager<>(), options);
126 }
127
128 public ApacheAsyncHttpClient(final Supplier<String> applicationName,
129 final Function<Object, Void> eventConsumer,
130 final ThreadLocalContextManager<C> threadLocalContextManager,
131 final HttpClientOptions options) {
132 this.eventConsumer = checkNotNull(eventConsumer);
133 this.applicationName = checkNotNull(applicationName);
134 this.threadLocalContextManager = checkNotNull(threadLocalContextManager);
135 this.httpClientOptions = checkNotNull(options);
136
137 try {
138 final IOReactorConfig reactorConfig = IOReactorConfig.custom()
139 .setIoThreadCount(options.getIoThreadCount())
140 .setSelectInterval(options.getIoSelectInterval())
141 .setInterestOpQueued(true)
142 .build();
143
144 final DefaultConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(reactorConfig);
145 ioReactor.setExceptionHandler(new IOReactorExceptionHandler() {
146 @Override
147 public boolean handle(final IOException e) {
148 log.error("IO exception in reactor ", e);
149 return false;
150 }
151
152 @Override
153 public boolean handle(final RuntimeException e) {
154 log.error("Fatal runtime error", e);
155 return false;
156 }
157 });
158
159 final PoolingNHttpClientConnectionManager connectionManager = new PoolingNHttpClientConnectionManager(
160 ioReactor,
161 ManagedNHttpClientConnectionFactory.INSTANCE,
162 getRegistry(options),
163 DefaultSchemePortResolver.INSTANCE,
164 host -> options.getHostResolver()
165 .orElse(DefaultHostResolver.INSTANCE)
166 .resolve(host),
167 options.getConnectionPoolTimeToLive(),
168 TimeUnit.MILLISECONDS) {
169 @SuppressWarnings("MethodDoesntCallSuperMethod")
170 @Override
171 protected void finalize() {
172
173
174
175
176 }
177 };
178
179 final RequestConfig requestConfig = RequestConfig.custom()
180 .setConnectTimeout((int) options.getConnectionTimeout())
181 .setConnectionRequestTimeout((int) options.getLeaseTimeout())
182 .setCookieSpec(options.getIgnoreCookies() ? CookieSpecs.IGNORE_COOKIES : CookieSpecs.DEFAULT)
183 .setSocketTimeout((int) options.getSocketTimeout())
184 .build();
185
186 connectionManager.setDefaultMaxPerRoute(options.getMaxConnectionsPerHost());
187 connectionManager.setMaxTotal(options.getMaxTotalConnections());
188
189 final HttpAsyncClientBuilder clientBuilder = HttpAsyncClients.custom()
190 .setThreadFactory(ThreadFactories.namedThreadFactory(options.getThreadPrefix() + "-io", ThreadFactories.Type.DAEMON))
191 .setDefaultIOReactorConfig(reactorConfig)
192 .setConnectionManager(connectionManager)
193 .setRedirectStrategy(new RedirectStrategy())
194 .setUserAgent(getUserAgent(options))
195 .setDefaultRequestConfig(requestConfig);
196
197
198 ProxyConfigFactory.getProxyConfig(options).forEach(proxyConfig -> {
199
200
201 clientBuilder.setRoutePlanner(new SystemDefaultRoutePlanner(DefaultSchemePortResolver.INSTANCE, proxyConfig.toProxySelector()));
202
203 ProxyCredentialsProvider.build(options).forEach(credsProvider -> {
204 clientBuilder.setProxyAuthenticationStrategy(ProxyAuthenticationStrategy.INSTANCE);
205 clientBuilder.setDefaultCredentialsProvider(credsProvider);
206 });
207 });
208
209 this.nonCachingHttpClient = new BoundedHttpAsyncClient(clientBuilder.build(),
210 Ints.saturatedCast(options.getMaxEntitySize()));
211
212 final CacheConfig cacheConfig = CacheConfig.custom()
213 .setMaxCacheEntries(options.getMaxCacheEntries())
214 .setSharedCache(false)
215 .setNeverCacheHTTP10ResponsesWithQueryString(false)
216 .setMaxObjectSize(options.getMaxCacheObjectSize())
217 .build();
218
219 this.httpCacheStorage = new LoggingHttpCacheStorage(new FlushableHttpCacheStorageImpl(cacheConfig));
220 this.httpClient = new CachingHttpAsyncClient(nonCachingHttpClient, httpCacheStorage, cacheConfig);
221 this.callbackExecutor = options.getCallbackExecutor();
222
223 nonCachingHttpClient.start();
224 } catch (IOReactorException e) {
225 throw new RuntimeException("Reactor " + options.getThreadPrefix() + "not set up correctly", e);
226 }
227 }
228
229 private Registry<SchemeIOSessionStrategy> getRegistry(final HttpClientOptions options) {
230 try {
231 final TrustSelfSignedStrategy strategy = options.trustSelfSignedCertificates() ?
232 new TrustSelfSignedStrategy() : null;
233
234 final SSLContext sslContext = new SSLContextBuilder()
235 .useTLS()
236 .loadTrustMaterial(null, strategy)
237 .build();
238
239 final SSLIOSessionStrategy sslioSessionStrategy = new SSLIOSessionStrategy(
240 sslContext,
241 split(System.getProperty("https.protocols")),
242 split(System.getProperty("https.cipherSuites")),
243 options.trustSelfSignedCertificates() ?
244 getSelfSignedVerifier() : SSLIOSessionStrategy.BROWSER_COMPATIBLE_HOSTNAME_VERIFIER);
245
246 return RegistryBuilder.<SchemeIOSessionStrategy>create()
247 .register("http", NoopIOSessionStrategy.INSTANCE)
248 .register("https", sslioSessionStrategy)
249 .build();
250 } catch (KeyManagementException | NoSuchAlgorithmException | KeyStoreException e) {
251 return getFallbackRegistry(e);
252 }
253 }
254
255 private X509HostnameVerifier getSelfSignedVerifier() {
256 return new X509HostnameVerifier() {
257 @Override
258 public void verify(final String host, final SSLSocket ssl) {
259 log.debug("Verification for certificates from {0} disabled", host);
260 }
261
262 @Override
263 public void verify(final String host, final X509Certificate cert) {
264 log.debug("Verification for certificates from {0} disabled", host);
265 }
266
267 @Override
268 public void verify(final String host, final String[] cns, final String[] subjectAlts) {
269 log.debug("Verification for certificates from {0} disabled", host);
270 }
271
272 @Override
273 public boolean verify(final String host, final SSLSession sslSession) {
274 log.debug("Verification for certificates from {0} disabled", host);
275 return true;
276 }
277 };
278 }
279
280 private Registry<SchemeIOSessionStrategy> getFallbackRegistry(final GeneralSecurityException e) {
281 log.error("Error when creating scheme session strategy registry", e);
282 return RegistryBuilder.<SchemeIOSessionStrategy>create()
283 .register("http", NoopIOSessionStrategy.INSTANCE)
284 .register("https", SSLIOSessionStrategy.getDefaultStrategy())
285 .build();
286 }
287
288 private String getUserAgent(HttpClientOptions options) {
289 return format("Atlassian HttpClient %s / %s / %s",
290 httpClientVersion.get(),
291 applicationName.get(),
292 options.getUserAgent());
293 }
294
295 @Override
296 public final ResponsePromise execute(final Request request) {
297 try {
298 return doExecute(request);
299 } catch (Throwable t) {
300 return ResponsePromises.toResponsePromise(rejected(t, Response.class));
301 }
302 }
303
304 private ResponsePromise doExecute(final Request request) {
305 httpClientOptions.getRequestPreparer().apply(request);
306
307 final long start = System.currentTimeMillis();
308 final HttpRequestBase op;
309 final String uri = request.getUri().toString();
310 final Request.Method method = request.getMethod();
311 switch (method) {
312 case GET:
313 op = new HttpGet(uri);
314 break;
315 case POST:
316 op = new HttpPost(uri);
317 break;
318 case PUT:
319 op = new HttpPut(uri);
320 break;
321 case DELETE:
322 op = new HttpDelete(uri);
323 break;
324 case OPTIONS:
325 op = new HttpOptions(uri);
326 break;
327 case HEAD:
328 op = new HttpHead(uri);
329 break;
330 case TRACE:
331 op = new HttpTrace(uri);
332 break;
333 default:
334 throw new UnsupportedOperationException(method.toString());
335 }
336 if (request.hasEntity()) {
337 new RequestEntityEffect(request).apply(op);
338 }
339
340 for (Map.Entry<String, String> entry : request.getHeaders().entrySet()) {
341 op.setHeader(entry.getKey(), entry.getValue());
342 }
343
344 final PromiseHttpAsyncClient asyncClient = getPromiseHttpAsyncClient(request);
345 return ResponsePromises.toResponsePromise(asyncClient.execute(op, new BasicHttpContext()).fold(
346 ex -> {
347 final long requestDuration = System.currentTimeMillis() - start;
348 Throwable exception = maybeTranslate(ex);
349 publishEvent(request, requestDuration, exception);
350 throw Throwables.propagate(exception);
351 },
352 httpResponse -> {
353 final long requestDuration = System.currentTimeMillis() - start;
354 publishEvent(request, requestDuration, httpResponse.getStatusLine().getStatusCode());
355 try {
356 return translate(httpResponse);
357 } catch (IOException e) {
358 throw Throwables.propagate(e);
359 }
360 }
361 ));
362 }
363
364 private void publishEvent(Request request, long requestDuration, int statusCode) {
365 if (HttpStatus.OK.code <= statusCode && statusCode < HttpStatus.MULTIPLE_CHOICES.code) {
366 eventConsumer.apply(new HttpRequestCompletedEvent(
367 request.getUri().toString(),
368 request.getMethod().name(),
369 statusCode,
370 requestDuration,
371 request.getAttributes()));
372 } else {
373 eventConsumer.apply(new HttpRequestFailedEvent(
374 request.getUri().toString(),
375 request.getMethod().name(),
376 statusCode,
377 requestDuration,
378 request.getAttributes()));
379 }
380 }
381
382 private void publishEvent(Request request, long requestDuration, Throwable ex) {
383 eventConsumer.apply(new HttpRequestFailedEvent(
384 request.getUri().toString(),
385 request.getMethod().name(),
386 ex.toString(),
387 requestDuration,
388 request.getAttributes()));
389 }
390
391 private PromiseHttpAsyncClient getPromiseHttpAsyncClient(Request request) {
392 return new SettableFuturePromiseHttpPromiseAsyncClient<>(
393 request.isCacheDisabled() ? nonCachingHttpClient : httpClient,
394 threadLocalContextManager, callbackExecutor);
395 }
396
397 private Throwable maybeTranslate(Throwable ex) {
398 if (ex instanceof EntityTooLargeException) {
399 EntityTooLargeException tooLarge = (EntityTooLargeException) ex;
400 try {
401
402 return new ResponseTooLargeException(translate(tooLarge.getResponse()), ex.getMessage());
403 } catch (IOException e) {
404
405 }
406 }
407 return ex;
408 }
409
410 private Response translate(HttpResponse httpResponse) throws IOException {
411 StatusLine status = httpResponse.getStatusLine();
412 Response.Builder responseBuilder = DefaultResponse.builder()
413 .setMaxEntitySize(httpClientOptions.getMaxEntitySize())
414 .setStatusCode(status.getStatusCode())
415 .setStatusText(status.getReasonPhrase());
416
417 Header[] httpHeaders = httpResponse.getAllHeaders();
418 for (Header httpHeader : httpHeaders) {
419 responseBuilder.setHeader(httpHeader.getName(), httpHeader.getValue());
420 }
421 final HttpEntity entity = httpResponse.getEntity();
422 if (entity != null) {
423 responseBuilder.setEntityStream(entity.getContent());
424 }
425 return responseBuilder.build();
426 }
427
428 @Override
429 public void destroy() throws Exception {
430 callbackExecutor.shutdown();
431 nonCachingHttpClient.close();
432 }
433
434 @Override
435 public void flushCacheByUriPattern(Pattern urlPattern) {
436 httpCacheStorage.flushByUriPattern(urlPattern);
437 }
438
439 private static final class NoOpThreadLocalContextManager<C> implements ThreadLocalContextManager<C> {
440 @Override
441 public C getThreadLocalContext() {
442 return null;
443 }
444
445 @Override
446 public void setThreadLocalContext(C context) {
447 }
448
449 @Override
450 public void clearThreadLocalContext() {
451 }
452 }
453
454 private static final class DefaultApplicationNameSupplier implements Supplier<String> {
455 private final ApplicationProperties applicationProperties;
456
457 public DefaultApplicationNameSupplier(ApplicationProperties applicationProperties) {
458 this.applicationProperties = checkNotNull(applicationProperties);
459 }
460
461 @Override
462 public String get() {
463 return format("%s-%s (%s)",
464 applicationProperties.getDisplayName(),
465 applicationProperties.getVersion(),
466 applicationProperties.getBuildNumber());
467 }
468 }
469
470 private static class EventConsumerFunction implements Function<Object, Void> {
471 private final EventPublisher eventPublisher;
472
473 public EventConsumerFunction(EventPublisher eventPublisher) {
474 this.eventPublisher = eventPublisher;
475 }
476
477 @Override
478 public Void apply(Object event) {
479 eventPublisher.publish(event);
480 return null;
481 }
482 }
483
484 private static String[] split(final String s) {
485 if (TextUtils.isBlank(s)) {
486 return null;
487 }
488 return s.split(" *, *");
489 }
490 }