1 package com.atlassian.httpclient.apache.httpcomponents;
2
3 import com.atlassian.event.api.EventPublisher;
4 import com.atlassian.httpclient.apache.httpcomponents.cache.FlushableHttpCacheStorage;
5 import com.atlassian.httpclient.apache.httpcomponents.cache.FlushableHttpCacheStorageImpl;
6 import com.atlassian.httpclient.apache.httpcomponents.cache.LoggingHttpCacheStorage;
7 import com.atlassian.httpclient.apache.httpcomponents.proxy.ProxyConfigFactory;
8 import com.atlassian.httpclient.apache.httpcomponents.proxy.ProxyCredentialsProvider;
9 import com.atlassian.httpclient.api.*;
10 import com.atlassian.httpclient.api.factory.HttpClientOptions;
11 import com.atlassian.httpclient.base.AbstractHttpClient;
12 import com.atlassian.httpclient.base.event.HttpRequestCompletedEvent;
13 import com.atlassian.httpclient.base.event.HttpRequestFailedEvent;
14 import com.atlassian.sal.api.ApplicationProperties;
15 import com.atlassian.sal.api.executor.ThreadLocalContextManager;
16 import com.atlassian.util.concurrent.ThreadFactories;
17 import com.google.common.base.*;
18 import com.google.common.primitives.Ints;
19 import org.apache.http.Header;
20 import org.apache.http.HttpEntity;
21 import org.apache.http.HttpResponse;
22 import org.apache.http.StatusLine;
23 import org.apache.http.client.config.CookieSpecs;
24 import org.apache.http.client.config.RequestConfig;
25 import org.apache.http.client.methods.*;
26 import org.apache.http.config.Registry;
27 import org.apache.http.config.RegistryBuilder;
28 import org.apache.http.conn.ssl.SSLContextBuilder;
29 import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
30 import org.apache.http.conn.ssl.X509HostnameVerifier;
31 import org.apache.http.impl.client.ProxyAuthenticationStrategy;
32 import org.apache.http.impl.client.cache.CacheConfig;
33 import org.apache.http.impl.client.cache.CachingHttpAsyncClient;
34 import org.apache.http.impl.conn.DefaultSchemePortResolver;
35 import org.apache.http.impl.conn.SystemDefaultRoutePlanner;
36 import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
37 import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
38 import org.apache.http.impl.nio.client.HttpAsyncClients;
39 import org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionFactory;
40 import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager;
41 import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
42 import org.apache.http.impl.nio.reactor.IOReactorConfig;
43 import org.apache.http.nio.conn.NoopIOSessionStrategy;
44 import org.apache.http.nio.conn.SchemeIOSessionStrategy;
45 import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
46 import org.apache.http.nio.reactor.IOReactorException;
47 import org.apache.http.nio.reactor.IOReactorExceptionHandler;
48 import org.apache.http.protocol.BasicHttpContext;
49 import org.apache.http.util.TextUtils;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
52 import org.springframework.beans.factory.DisposableBean;
53
54 import javax.net.ssl.SSLContext;
55 import javax.net.ssl.SSLSession;
56 import javax.net.ssl.SSLSocket;
57 import java.io.IOException;
58 import java.security.GeneralSecurityException;
59 import java.security.KeyManagementException;
60 import java.security.KeyStoreException;
61 import java.security.NoSuchAlgorithmException;
62 import java.security.cert.X509Certificate;
63 import java.util.List;
64 import java.util.Map;
65 import java.util.concurrent.ExecutorService;
66 import java.util.concurrent.TimeUnit;
67 import java.util.regex.Pattern;
68
69 import static com.atlassian.util.concurrent.Promises.rejected;
70 import static com.google.common.base.Preconditions.checkNotNull;
71 import static java.lang.String.format;
72
73 public final class ApacheAsyncHttpClient<C> extends AbstractHttpClient implements HttpClient, DisposableBean {
74 private final Logger log = LoggerFactory.getLogger(this.getClass());
75
76 private static final Supplier<String> httpClientVersion = Suppliers.memoize(
77 () -> MavenUtils.getVersion("com.atlassian.httpclient", "atlassian-httpclient-api"));
78
79 private final Function<Object, Void> eventConsumer;
80 private final Supplier<String> applicationName;
81 private final ThreadLocalContextManager<C> threadLocalContextManager;
82 private final ExecutorService callbackExecutor;
83 private final HttpClientOptions httpClientOptions;
84
85 private final CachingHttpAsyncClient httpClient;
86 private final CloseableHttpAsyncClient nonCachingHttpClient;
87 private final FlushableHttpCacheStorage httpCacheStorage;
88
89 public ApacheAsyncHttpClient(EventPublisher eventConsumer, ApplicationProperties applicationProperties,
90 ThreadLocalContextManager<C> threadLocalContextManager) {
91 this(eventConsumer, applicationProperties, threadLocalContextManager, new HttpClientOptions());
92 }
93
94 public ApacheAsyncHttpClient(EventPublisher eventConsumer,
95 ApplicationProperties applicationProperties,
96 ThreadLocalContextManager<C> threadLocalContextManager,
97 HttpClientOptions options) {
98 this(new DefaultApplicationNameSupplier(applicationProperties),
99 new EventConsumerFunction(eventConsumer),
100 threadLocalContextManager,
101 options);
102 }
103
104 public ApacheAsyncHttpClient(String applicationName) {
105 this(applicationName, new HttpClientOptions());
106 }
107
108 public ApacheAsyncHttpClient(String applicationName, final HttpClientOptions options) {
109 this(Suppliers.ofInstance(applicationName), Functions.constant(null), new NoOpThreadLocalContextManager<>(), options);
110 }
111
112 public ApacheAsyncHttpClient(final Supplier<String> applicationName,
113 final Function<Object, Void> eventConsumer,
114 final ThreadLocalContextManager<C> threadLocalContextManager,
115 final HttpClientOptions options) {
116 this.eventConsumer = checkNotNull(eventConsumer);
117 this.applicationName = checkNotNull(applicationName);
118 this.threadLocalContextManager = checkNotNull(threadLocalContextManager);
119 this.httpClientOptions = checkNotNull(options);
120
121 try {
122 final IOReactorConfig reactorConfig = IOReactorConfig.custom()
123 .setIoThreadCount(options.getIoThreadCount())
124 .setSelectInterval(options.getIoSelectInterval())
125 .setInterestOpQueued(true)
126 .build();
127
128 final DefaultConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(reactorConfig);
129 ioReactor.setExceptionHandler(new IOReactorExceptionHandler() {
130 @Override
131 public boolean handle(final IOException e) {
132 log.error("IO exception in reactor ", e);
133 return false;
134 }
135
136 @Override
137 public boolean handle(final RuntimeException e) {
138 log.error("Fatal runtime error", e);
139 return false;
140 }
141 });
142
143 List<String> bannedAddresses = options.getBlacklistedAddresses();
144 HostResolver resolver;
145 if (bannedAddresses.isEmpty()) {
146 resolver = DefaultHostResolver.INSTANCE;
147 } else {
148 resolver = new BannedHostResolver(bannedAddresses);
149 }
150
151 final PoolingNHttpClientConnectionManager connectionManager = new PoolingNHttpClientConnectionManager(
152 ioReactor,
153 ManagedNHttpClientConnectionFactory.INSTANCE,
154 getRegistry(options),
155 DefaultSchemePortResolver.INSTANCE,
156 resolver::resolve,
157 options.getConnectionPoolTimeToLive(),
158 TimeUnit.MILLISECONDS) {
159 @SuppressWarnings("MethodDoesntCallSuperMethod")
160 @Override
161 protected void finalize() {
162
163
164
165
166 }
167 };
168
169 final RequestConfig requestConfig = RequestConfig.custom()
170 .setConnectTimeout((int) options.getConnectionTimeout())
171 .setConnectionRequestTimeout((int) options.getLeaseTimeout())
172 .setCookieSpec(options.getIgnoreCookies() ? CookieSpecs.IGNORE_COOKIES : CookieSpecs.DEFAULT)
173 .setSocketTimeout((int) options.getSocketTimeout())
174 .build();
175
176 connectionManager.setDefaultMaxPerRoute(options.getMaxConnectionsPerHost());
177 connectionManager.setMaxTotal(options.getMaxTotalConnections());
178
179 final HttpAsyncClientBuilder clientBuilder = HttpAsyncClients.custom()
180 .setThreadFactory(ThreadFactories.namedThreadFactory(options.getThreadPrefix() + "-io", ThreadFactories.Type.DAEMON))
181 .setDefaultIOReactorConfig(reactorConfig)
182 .setConnectionManager(connectionManager)
183 .setRedirectStrategy(new RedirectStrategy())
184 .setUserAgent(getUserAgent(options))
185 .setDefaultRequestConfig(requestConfig);
186
187
188 ProxyConfigFactory.getProxyConfig(options).forEach(proxyConfig -> {
189
190
191 clientBuilder.setRoutePlanner(new SystemDefaultRoutePlanner(DefaultSchemePortResolver.INSTANCE, proxyConfig.toProxySelector()));
192
193 ProxyCredentialsProvider.build(options).forEach(credsProvider -> {
194 clientBuilder.setProxyAuthenticationStrategy(ProxyAuthenticationStrategy.INSTANCE);
195 clientBuilder.setDefaultCredentialsProvider(credsProvider);
196 });
197 });
198
199 this.nonCachingHttpClient = new BoundedHttpAsyncClient(clientBuilder.build(),
200 Ints.saturatedCast(options.getMaxEntitySize()));
201
202 final CacheConfig cacheConfig = CacheConfig.custom()
203 .setMaxCacheEntries(options.getMaxCacheEntries())
204 .setSharedCache(false)
205 .setNeverCacheHTTP10ResponsesWithQueryString(false)
206 .setMaxObjectSize(options.getMaxCacheObjectSize())
207 .build();
208
209 this.httpCacheStorage = new LoggingHttpCacheStorage(new FlushableHttpCacheStorageImpl(cacheConfig));
210 this.httpClient = new CachingHttpAsyncClient(nonCachingHttpClient, httpCacheStorage, cacheConfig);
211 this.callbackExecutor = options.getCallbackExecutor();
212
213 nonCachingHttpClient.start();
214 } catch (IOReactorException e) {
215 throw new RuntimeException("Reactor " + options.getThreadPrefix() + "not set up correctly", e);
216 }
217 }
218
219 private Registry<SchemeIOSessionStrategy> getRegistry(final HttpClientOptions options) {
220 try {
221 final TrustSelfSignedStrategy strategy = options.trustSelfSignedCertificates() ?
222 new TrustSelfSignedStrategy() : null;
223
224 final SSLContext sslContext = new SSLContextBuilder()
225 .useTLS()
226 .loadTrustMaterial(null, strategy)
227 .build();
228
229 final SSLIOSessionStrategy sslioSessionStrategy = new SSLIOSessionStrategy(
230 sslContext,
231 split(System.getProperty("https.protocols")),
232 split(System.getProperty("https.cipherSuites")),
233 options.trustSelfSignedCertificates() ?
234 getSelfSignedVerifier() : SSLIOSessionStrategy.BROWSER_COMPATIBLE_HOSTNAME_VERIFIER);
235
236 return RegistryBuilder.<SchemeIOSessionStrategy>create()
237 .register("http", NoopIOSessionStrategy.INSTANCE)
238 .register("https", sslioSessionStrategy)
239 .build();
240 } catch (KeyManagementException | NoSuchAlgorithmException | KeyStoreException e) {
241 return getFallbackRegistry(e);
242 }
243 }
244
245 private X509HostnameVerifier getSelfSignedVerifier() {
246 return new X509HostnameVerifier() {
247 @Override
248 public void verify(final String host, final SSLSocket ssl) {
249 log.debug("Verification for certificates from {0} disabled", host);
250 }
251
252 @Override
253 public void verify(final String host, final X509Certificate cert) {
254 log.debug("Verification for certificates from {0} disabled", host);
255 }
256
257 @Override
258 public void verify(final String host, final String[] cns, final String[] subjectAlts) {
259 log.debug("Verification for certificates from {0} disabled", host);
260 }
261
262 @Override
263 public boolean verify(final String host, final SSLSession sslSession) {
264 log.debug("Verification for certificates from {0} disabled", host);
265 return true;
266 }
267 };
268 }
269
270 private Registry<SchemeIOSessionStrategy> getFallbackRegistry(final GeneralSecurityException e) {
271 log.error("Error when creating scheme session strategy registry", e);
272 return RegistryBuilder.<SchemeIOSessionStrategy>create()
273 .register("http", NoopIOSessionStrategy.INSTANCE)
274 .register("https", SSLIOSessionStrategy.getDefaultStrategy())
275 .build();
276 }
277
278 private String getUserAgent(HttpClientOptions options) {
279 return format("Atlassian HttpClient %s / %s / %s",
280 httpClientVersion.get(),
281 applicationName.get(),
282 options.getUserAgent());
283 }
284
285 @Override
286 public final ResponsePromise execute(final Request request) {
287 try {
288 return doExecute(request);
289 } catch (Throwable t) {
290 return ResponsePromises.toResponsePromise(rejected(t, Response.class));
291 }
292 }
293
294 private ResponsePromise doExecute(final Request request) {
295 httpClientOptions.getRequestPreparer().apply(request);
296
297 final long start = System.currentTimeMillis();
298 final HttpRequestBase op;
299 final String uri = request.getUri().toString();
300 final Request.Method method = request.getMethod();
301 switch (method) {
302 case GET:
303 op = new HttpGet(uri);
304 break;
305 case POST:
306 op = new HttpPost(uri);
307 break;
308 case PUT:
309 op = new HttpPut(uri);
310 break;
311 case DELETE:
312 op = new HttpDelete(uri);
313 break;
314 case OPTIONS:
315 op = new HttpOptions(uri);
316 break;
317 case HEAD:
318 op = new HttpHead(uri);
319 break;
320 case TRACE:
321 op = new HttpTrace(uri);
322 break;
323 default:
324 throw new UnsupportedOperationException(method.toString());
325 }
326 if (request.hasEntity()) {
327 new RequestEntityEffect(request).apply(op);
328 }
329
330 for (Map.Entry<String, String> entry : request.getHeaders().entrySet()) {
331 op.setHeader(entry.getKey(), entry.getValue());
332 }
333
334 final PromiseHttpAsyncClient asyncClient = getPromiseHttpAsyncClient(request);
335 return ResponsePromises.toResponsePromise(asyncClient.execute(op, new BasicHttpContext()).fold(
336 ex -> {
337 final long requestDuration = System.currentTimeMillis() - start;
338 Throwable exception = maybeTranslate(ex);
339 publishEvent(request, requestDuration, exception);
340 throw Throwables.propagate(exception);
341 },
342 httpResponse -> {
343 final long requestDuration = System.currentTimeMillis() - start;
344 publishEvent(request, requestDuration, httpResponse.getStatusLine().getStatusCode());
345 try {
346 return translate(httpResponse);
347 } catch (IOException e) {
348 throw Throwables.propagate(e);
349 }
350 }
351 ));
352 }
353
354 private void publishEvent(Request request, long requestDuration, int statusCode) {
355 if (HttpStatus.OK.code <= statusCode && statusCode < HttpStatus.MULTIPLE_CHOICES.code) {
356 eventConsumer.apply(new HttpRequestCompletedEvent(
357 request.getUri().toString(),
358 request.getMethod().name(),
359 statusCode,
360 requestDuration,
361 request.getAttributes()));
362 } else {
363 eventConsumer.apply(new HttpRequestFailedEvent(
364 request.getUri().toString(),
365 request.getMethod().name(),
366 statusCode,
367 requestDuration,
368 request.getAttributes()));
369 }
370 }
371
372 private void publishEvent(Request request, long requestDuration, Throwable ex) {
373 eventConsumer.apply(new HttpRequestFailedEvent(
374 request.getUri().toString(),
375 request.getMethod().name(),
376 ex.toString(),
377 requestDuration,
378 request.getAttributes()));
379 }
380
381 private PromiseHttpAsyncClient getPromiseHttpAsyncClient(Request request) {
382 return new SettableFuturePromiseHttpPromiseAsyncClient<>(
383 request.isCacheDisabled() ? nonCachingHttpClient : httpClient,
384 threadLocalContextManager, callbackExecutor);
385 }
386
387 private Throwable maybeTranslate(Throwable ex) {
388 if (ex instanceof EntityTooLargeException) {
389 EntityTooLargeException tooLarge = (EntityTooLargeException) ex;
390 try {
391
392 return new ResponseTooLargeException(translate(tooLarge.getResponse()), ex.getMessage());
393 } catch (IOException e) {
394
395 }
396 }
397 return ex;
398 }
399
400 private Response translate(HttpResponse httpResponse) throws IOException {
401 StatusLine status = httpResponse.getStatusLine();
402 Response.Builder responseBuilder = DefaultResponse.builder()
403 .setMaxEntitySize(httpClientOptions.getMaxEntitySize())
404 .setStatusCode(status.getStatusCode())
405 .setStatusText(status.getReasonPhrase());
406
407 Header[] httpHeaders = httpResponse.getAllHeaders();
408 for (Header httpHeader : httpHeaders) {
409 responseBuilder.setHeader(httpHeader.getName(), httpHeader.getValue());
410 }
411 final HttpEntity entity = httpResponse.getEntity();
412 if (entity != null) {
413 responseBuilder.setEntityStream(entity.getContent());
414 }
415 return responseBuilder.build();
416 }
417
418 @Override
419 public void destroy() throws Exception {
420 callbackExecutor.shutdown();
421 nonCachingHttpClient.close();
422 }
423
424 @Override
425 public void flushCacheByUriPattern(Pattern urlPattern) {
426 httpCacheStorage.flushByUriPattern(urlPattern);
427 }
428
429 private static final class NoOpThreadLocalContextManager<C> implements ThreadLocalContextManager<C> {
430 @Override
431 public C getThreadLocalContext() {
432 return null;
433 }
434
435 @Override
436 public void setThreadLocalContext(C context) {
437 }
438
439 @Override
440 public void clearThreadLocalContext() {
441 }
442 }
443
444 private static final class DefaultApplicationNameSupplier implements Supplier<String> {
445 private final ApplicationProperties applicationProperties;
446
447 public DefaultApplicationNameSupplier(ApplicationProperties applicationProperties) {
448 this.applicationProperties = checkNotNull(applicationProperties);
449 }
450
451 @Override
452 public String get() {
453 return format("%s-%s (%s)",
454 applicationProperties.getDisplayName(),
455 applicationProperties.getVersion(),
456 applicationProperties.getBuildNumber());
457 }
458 }
459
460 private static class EventConsumerFunction implements Function<Object, Void> {
461 private final EventPublisher eventPublisher;
462
463 public EventConsumerFunction(EventPublisher eventPublisher) {
464 this.eventPublisher = eventPublisher;
465 }
466
467 @Override
468 public Void apply(Object event) {
469 eventPublisher.publish(event);
470 return null;
471 }
472 }
473
474 private static String[] split(final String s) {
475 if (TextUtils.isBlank(s)) {
476 return null;
477 }
478 return s.split(" *, *");
479 }
480 }