View Javadoc

1   package com.atlassian.httpclient.apache.httpcomponents;
2   
3   import com.atlassian.event.api.EventPublisher;
4   import com.atlassian.httpclient.apache.httpcomponents.cache.FlushableHttpCacheStorage;
5   import com.atlassian.httpclient.apache.httpcomponents.cache.FlushableHttpCacheStorageImpl;
6   import com.atlassian.httpclient.apache.httpcomponents.cache.LoggingHttpCacheStorage;
7   import com.atlassian.httpclient.apache.httpcomponents.proxy.ProxyConfigFactory;
8   import com.atlassian.httpclient.apache.httpcomponents.proxy.ProxyCredentialsProvider;
9   import com.atlassian.httpclient.api.HttpClient;
10  import com.atlassian.httpclient.api.HttpStatus;
11  import com.atlassian.httpclient.api.Request;
12  import com.atlassian.httpclient.api.Response;
13  import com.atlassian.httpclient.api.ResponsePromise;
14  import com.atlassian.httpclient.api.ResponsePromises;
15  import com.atlassian.httpclient.api.ResponseTooLargeException;
16  import com.atlassian.httpclient.api.factory.HttpClientOptions;
17  import com.atlassian.httpclient.base.AbstractHttpClient;
18  import com.atlassian.httpclient.base.event.HttpRequestCompletedEvent;
19  import com.atlassian.httpclient.base.event.HttpRequestFailedEvent;
20  import com.atlassian.sal.api.ApplicationProperties;
21  import com.atlassian.sal.api.executor.ThreadLocalContextManager;
22  import com.atlassian.util.concurrent.ThreadFactories;
23  import com.google.common.base.Function;
24  import com.google.common.base.Functions;
25  import com.google.common.base.Supplier;
26  import com.google.common.base.Suppliers;
27  import com.google.common.base.Throwables;
28  import com.google.common.primitives.Ints;
29  import org.apache.http.Header;
30  import org.apache.http.HttpEntity;
31  import org.apache.http.HttpResponse;
32  import org.apache.http.StatusLine;
33  import org.apache.http.client.config.CookieSpecs;
34  import org.apache.http.client.config.RequestConfig;
35  import org.apache.http.client.methods.HttpDelete;
36  import org.apache.http.client.methods.HttpGet;
37  import org.apache.http.client.methods.HttpHead;
38  import org.apache.http.client.methods.HttpOptions;
39  import org.apache.http.client.methods.HttpPost;
40  import org.apache.http.client.methods.HttpPut;
41  import org.apache.http.client.methods.HttpRequestBase;
42  import org.apache.http.client.methods.HttpTrace;
43  import org.apache.http.config.Registry;
44  import org.apache.http.config.RegistryBuilder;
45  import org.apache.http.conn.ssl.SSLContextBuilder;
46  import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
47  import org.apache.http.conn.ssl.X509HostnameVerifier;
48  import org.apache.http.impl.client.ProxyAuthenticationStrategy;
49  import org.apache.http.impl.client.cache.CacheConfig;
50  import org.apache.http.impl.client.cache.CachingHttpAsyncClient;
51  import org.apache.http.impl.conn.DefaultSchemePortResolver;
52  import org.apache.http.impl.conn.SystemDefaultRoutePlanner;
53  import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
54  import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
55  import org.apache.http.impl.nio.client.HttpAsyncClients;
56  import org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionFactory;
57  import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager;
58  import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
59  import org.apache.http.impl.nio.reactor.IOReactorConfig;
60  import org.apache.http.nio.conn.NoopIOSessionStrategy;
61  import org.apache.http.nio.conn.SchemeIOSessionStrategy;
62  import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
63  import org.apache.http.nio.reactor.IOReactorException;
64  import org.apache.http.nio.reactor.IOReactorExceptionHandler;
65  import org.apache.http.protocol.BasicHttpContext;
66  import org.apache.http.util.TextUtils;
67  import org.slf4j.Logger;
68  import org.slf4j.LoggerFactory;
69  import org.springframework.beans.factory.DisposableBean;
70  
71  import javax.net.ssl.SSLContext;
72  import javax.net.ssl.SSLSession;
73  import javax.net.ssl.SSLSocket;
74  import java.io.IOException;
75  import java.security.GeneralSecurityException;
76  import java.security.KeyManagementException;
77  import java.security.KeyStoreException;
78  import java.security.NoSuchAlgorithmException;
79  import java.security.cert.X509Certificate;
80  import java.util.Map;
81  import java.util.concurrent.ExecutorService;
82  import java.util.concurrent.TimeUnit;
83  import java.util.regex.Pattern;
84  
85  import static com.atlassian.util.concurrent.Promises.rejected;
86  import static com.google.common.base.Preconditions.checkNotNull;
87  import static java.lang.String.format;
88  
89  public final class ApacheAsyncHttpClient<C> extends AbstractHttpClient implements HttpClient, DisposableBean {
90      private final Logger log = LoggerFactory.getLogger(this.getClass());
91  
92      private static final Supplier<String> httpClientVersion = Suppliers.memoize(
93              () -> MavenUtils.getVersion("com.atlassian.httpclient", "atlassian-httpclient-api"));
94  
95      private final Function<Object, Void> eventConsumer;
96      private final Supplier<String> applicationName;
97      private final ThreadLocalContextManager<C> threadLocalContextManager;
98      private final ExecutorService callbackExecutor;
99      private final HttpClientOptions httpClientOptions;
100 
101     private final CachingHttpAsyncClient httpClient;
102     private final CloseableHttpAsyncClient nonCachingHttpClient;
103     private final FlushableHttpCacheStorage httpCacheStorage;
104 
105     public ApacheAsyncHttpClient(EventPublisher eventConsumer, ApplicationProperties applicationProperties,
106                                  ThreadLocalContextManager<C> threadLocalContextManager) {
107         this(eventConsumer, applicationProperties, threadLocalContextManager, new HttpClientOptions());
108     }
109 
110     public ApacheAsyncHttpClient(EventPublisher eventConsumer,
111                                  ApplicationProperties applicationProperties,
112                                  ThreadLocalContextManager<C> threadLocalContextManager,
113                                  HttpClientOptions options) {
114         this(new DefaultApplicationNameSupplier(applicationProperties),
115                 new EventConsumerFunction(eventConsumer),
116                 threadLocalContextManager,
117                 options);
118     }
119 
120     public ApacheAsyncHttpClient(String applicationName) {
121         this(applicationName, new HttpClientOptions());
122     }
123 
124     public ApacheAsyncHttpClient(String applicationName, final HttpClientOptions options) {
125         this(Suppliers.ofInstance(applicationName), Functions.constant(null), new NoOpThreadLocalContextManager<>(), options);
126     }
127 
128     public ApacheAsyncHttpClient(final Supplier<String> applicationName,
129                                  final Function<Object, Void> eventConsumer,
130                                  final ThreadLocalContextManager<C> threadLocalContextManager,
131                                  final HttpClientOptions options) {
132         this.eventConsumer = checkNotNull(eventConsumer);
133         this.applicationName = checkNotNull(applicationName);
134         this.threadLocalContextManager = checkNotNull(threadLocalContextManager);
135         this.httpClientOptions = checkNotNull(options);
136 
137         try {
138             final IOReactorConfig reactorConfig = IOReactorConfig.custom()
139                     .setIoThreadCount(options.getIoThreadCount())
140                     .setSelectInterval(options.getIoSelectInterval())
141                     .setInterestOpQueued(true)
142                     .build();
143 
144             final DefaultConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(reactorConfig);
145             ioReactor.setExceptionHandler(new IOReactorExceptionHandler() {
146                 @Override
147                 public boolean handle(final IOException e) {
148                     log.error("IO exception in reactor ", e);
149                     return false;
150                 }
151 
152                 @Override
153                 public boolean handle(final RuntimeException e) {
154                     log.error("Fatal runtime error", e);
155                     return false;
156                 }
157             });
158 
159             final PoolingNHttpClientConnectionManager connectionManager = new PoolingNHttpClientConnectionManager(
160                     ioReactor,
161                     ManagedNHttpClientConnectionFactory.INSTANCE,
162                     getRegistry(options),
163                     DefaultSchemePortResolver.INSTANCE,
164                     host -> options.getHostResolver()
165                             .orElse(DefaultHostResolver.INSTANCE)
166                             .resolve(host),
167                     options.getConnectionPoolTimeToLive(),
168                     TimeUnit.MILLISECONDS) {
169                 @SuppressWarnings("MethodDoesntCallSuperMethod")
170                 @Override
171                 protected void finalize() {
172                     // prevent the PoolingClientAsyncConnectionManager from logging - this causes exceptions due to
173                     // the ClassLoader probably having been removed when the plugin shuts down.  Added a
174                     // PluginEventListener to make sure the shutdown method is called while the plugin classloader
175                     // is still active.
176                 }
177             };
178 
179             final RequestConfig requestConfig = RequestConfig.custom()
180                     .setConnectTimeout((int) options.getConnectionTimeout())
181                     .setConnectionRequestTimeout((int) options.getLeaseTimeout())
182                     .setCookieSpec(options.getIgnoreCookies() ? CookieSpecs.IGNORE_COOKIES : CookieSpecs.DEFAULT)
183                     .setSocketTimeout((int) options.getSocketTimeout())
184                     .build();
185 
186             connectionManager.setDefaultMaxPerRoute(options.getMaxConnectionsPerHost());
187             connectionManager.setMaxTotal(options.getMaxTotalConnections());
188 
189             final HttpAsyncClientBuilder clientBuilder = HttpAsyncClients.custom()
190                     .setThreadFactory(ThreadFactories.namedThreadFactory(options.getThreadPrefix() + "-io", ThreadFactories.Type.DAEMON))
191                     .setDefaultIOReactorConfig(reactorConfig)
192                     .setConnectionManager(connectionManager)
193                     .setRedirectStrategy(new RedirectStrategy())
194                     .setUserAgent(getUserAgent(options))
195                     .setDefaultRequestConfig(requestConfig);
196 
197             // set up a route planner if there is proxy configuration
198             ProxyConfigFactory.getProxyConfig(options).forEach(proxyConfig -> {
199                 // don't be fooled by its name. If SystemDefaultRoutePlanner is passed a proxy selector it will use that
200                 // instead of creating the default one that reads system properties
201                 clientBuilder.setRoutePlanner(new SystemDefaultRoutePlanner(DefaultSchemePortResolver.INSTANCE, proxyConfig.toProxySelector()));
202 
203                 ProxyCredentialsProvider.build(options).forEach(credsProvider -> {
204                     clientBuilder.setProxyAuthenticationStrategy(ProxyAuthenticationStrategy.INSTANCE);
205                     clientBuilder.setDefaultCredentialsProvider(credsProvider);
206                 });
207             });
208 
209             this.nonCachingHttpClient = new BoundedHttpAsyncClient(clientBuilder.build(),
210                     Ints.saturatedCast(options.getMaxEntitySize()));
211 
212             final CacheConfig cacheConfig = CacheConfig.custom()
213                     .setMaxCacheEntries(options.getMaxCacheEntries())
214                     .setSharedCache(false)
215                     .setNeverCacheHTTP10ResponsesWithQueryString(false)
216                     .setMaxObjectSize(options.getMaxCacheObjectSize())
217                     .build();
218 
219             this.httpCacheStorage = new LoggingHttpCacheStorage(new FlushableHttpCacheStorageImpl(cacheConfig));
220             this.httpClient = new CachingHttpAsyncClient(nonCachingHttpClient, httpCacheStorage, cacheConfig);
221             this.callbackExecutor = options.getCallbackExecutor();
222 
223             nonCachingHttpClient.start();
224         } catch (IOReactorException e) {
225             throw new RuntimeException("Reactor " + options.getThreadPrefix() + "not set up correctly", e);
226         }
227     }
228 
229     private Registry<SchemeIOSessionStrategy> getRegistry(final HttpClientOptions options) {
230         try {
231             final TrustSelfSignedStrategy strategy = options.trustSelfSignedCertificates() ?
232                     new TrustSelfSignedStrategy() : null;
233 
234             final SSLContext sslContext = new SSLContextBuilder()
235                     .useTLS()
236                     .loadTrustMaterial(null, strategy)
237                     .build();
238 
239             final SSLIOSessionStrategy sslioSessionStrategy = new SSLIOSessionStrategy(
240                     sslContext,
241                     split(System.getProperty("https.protocols")),
242                     split(System.getProperty("https.cipherSuites")),
243                     options.trustSelfSignedCertificates() ?
244                             getSelfSignedVerifier() : SSLIOSessionStrategy.BROWSER_COMPATIBLE_HOSTNAME_VERIFIER);
245 
246             return RegistryBuilder.<SchemeIOSessionStrategy>create()
247                     .register("http", NoopIOSessionStrategy.INSTANCE)
248                     .register("https", sslioSessionStrategy)
249                     .build();
250         } catch (KeyManagementException | NoSuchAlgorithmException | KeyStoreException e) {
251             return getFallbackRegistry(e);
252         }
253     }
254 
255     private X509HostnameVerifier getSelfSignedVerifier() {
256         return new X509HostnameVerifier() {
257             @Override
258             public void verify(final String host, final SSLSocket ssl) {
259                 log.debug("Verification for certificates from {0} disabled", host);
260             }
261 
262             @Override
263             public void verify(final String host, final X509Certificate cert) {
264                 log.debug("Verification for certificates from {0} disabled", host);
265             }
266 
267             @Override
268             public void verify(final String host, final String[] cns, final String[] subjectAlts) {
269                 log.debug("Verification for certificates from {0} disabled", host);
270             }
271 
272             @Override
273             public boolean verify(final String host, final SSLSession sslSession) {
274                 log.debug("Verification for certificates from {0} disabled", host);
275                 return true;
276             }
277         };
278     }
279 
280     private Registry<SchemeIOSessionStrategy> getFallbackRegistry(final GeneralSecurityException e) {
281         log.error("Error when creating scheme session strategy registry", e);
282         return RegistryBuilder.<SchemeIOSessionStrategy>create()
283                 .register("http", NoopIOSessionStrategy.INSTANCE)
284                 .register("https", SSLIOSessionStrategy.getDefaultStrategy())
285                 .build();
286     }
287 
288     private String getUserAgent(HttpClientOptions options) {
289         return format("Atlassian HttpClient %s / %s / %s",
290                 httpClientVersion.get(),
291                 applicationName.get(),
292                 options.getUserAgent());
293     }
294 
295     @Override
296     public final ResponsePromise execute(final Request request) {
297         try {
298             return doExecute(request);
299         } catch (Throwable t) {
300             return ResponsePromises.toResponsePromise(rejected(t, Response.class));
301         }
302     }
303 
304     private ResponsePromise doExecute(final Request request) {
305         httpClientOptions.getRequestPreparer().apply(request);
306 
307         final long start = System.currentTimeMillis();
308         final HttpRequestBase op;
309         final String uri = request.getUri().toString();
310         final Request.Method method = request.getMethod();
311         switch (method) {
312             case GET:
313                 op = new HttpGet(uri);
314                 break;
315             case POST:
316                 op = new HttpPost(uri);
317                 break;
318             case PUT:
319                 op = new HttpPut(uri);
320                 break;
321             case DELETE:
322                 op = new HttpDelete(uri);
323                 break;
324             case OPTIONS:
325                 op = new HttpOptions(uri);
326                 break;
327             case HEAD:
328                 op = new HttpHead(uri);
329                 break;
330             case TRACE:
331                 op = new HttpTrace(uri);
332                 break;
333             default:
334                 throw new UnsupportedOperationException(method.toString());
335         }
336         if (request.hasEntity()) {
337             new RequestEntityEffect(request).apply(op);
338         }
339 
340         for (Map.Entry<String, String> entry : request.getHeaders().entrySet()) {
341             op.setHeader(entry.getKey(), entry.getValue());
342         }
343 
344         final PromiseHttpAsyncClient asyncClient = getPromiseHttpAsyncClient(request);
345         return ResponsePromises.toResponsePromise(asyncClient.execute(op, new BasicHttpContext()).fold(
346                 ex -> {
347                     final long requestDuration = System.currentTimeMillis() - start;
348                     Throwable exception = maybeTranslate(ex);
349                     publishEvent(request, requestDuration, exception);
350                     throw Throwables.propagate(exception);
351                 },
352                 httpResponse -> {
353                     final long requestDuration = System.currentTimeMillis() - start;
354                     publishEvent(request, requestDuration, httpResponse.getStatusLine().getStatusCode());
355                     try {
356                         return translate(httpResponse);
357                     } catch (IOException e) {
358                         throw Throwables.propagate(e);
359                     }
360                 }
361         ));
362     }
363 
364     private void publishEvent(Request request, long requestDuration, int statusCode) {
365         if (HttpStatus.OK.code <= statusCode && statusCode < HttpStatus.MULTIPLE_CHOICES.code) {
366             eventConsumer.apply(new HttpRequestCompletedEvent(
367                     request.getUri().toString(),
368                     request.getMethod().name(),
369                     statusCode,
370                     requestDuration,
371                     request.getAttributes()));
372         } else {
373             eventConsumer.apply(new HttpRequestFailedEvent(
374                     request.getUri().toString(),
375                     request.getMethod().name(),
376                     statusCode,
377                     requestDuration,
378                     request.getAttributes()));
379         }
380     }
381 
382     private void publishEvent(Request request, long requestDuration, Throwable ex) {
383         eventConsumer.apply(new HttpRequestFailedEvent(
384                 request.getUri().toString(),
385                 request.getMethod().name(),
386                 ex.toString(),
387                 requestDuration,
388                 request.getAttributes()));
389     }
390 
391     private PromiseHttpAsyncClient getPromiseHttpAsyncClient(Request request) {
392         return new SettableFuturePromiseHttpPromiseAsyncClient<>(
393                 request.isCacheDisabled() ? nonCachingHttpClient : httpClient,
394                 threadLocalContextManager, callbackExecutor);
395     }
396 
397     private Throwable maybeTranslate(Throwable ex) {
398         if (ex instanceof EntityTooLargeException) {
399             EntityTooLargeException tooLarge = (EntityTooLargeException) ex;
400             try {
401                 // don't include the cause to ensure that the HttpResponse is released
402                 return new ResponseTooLargeException(translate(tooLarge.getResponse()), ex.getMessage());
403             } catch (IOException e) {
404                 // could not translate, just return the original exception
405             }
406         }
407         return ex;
408     }
409 
410     private Response translate(HttpResponse httpResponse) throws IOException {
411         StatusLine status = httpResponse.getStatusLine();
412         Response.Builder responseBuilder = DefaultResponse.builder()
413                 .setMaxEntitySize(httpClientOptions.getMaxEntitySize())
414                 .setStatusCode(status.getStatusCode())
415                 .setStatusText(status.getReasonPhrase());
416 
417         Header[] httpHeaders = httpResponse.getAllHeaders();
418         for (Header httpHeader : httpHeaders) {
419             responseBuilder.setHeader(httpHeader.getName(), httpHeader.getValue());
420         }
421         final HttpEntity entity = httpResponse.getEntity();
422         if (entity != null) {
423             responseBuilder.setEntityStream(entity.getContent());
424         }
425         return responseBuilder.build();
426     }
427 
428     @Override
429     public void destroy() throws Exception {
430         callbackExecutor.shutdown();
431         nonCachingHttpClient.close();
432     }
433 
434     @Override
435     public void flushCacheByUriPattern(Pattern urlPattern) {
436         httpCacheStorage.flushByUriPattern(urlPattern);
437     }
438 
439     private static final class NoOpThreadLocalContextManager<C> implements ThreadLocalContextManager<C> {
440         @Override
441         public C getThreadLocalContext() {
442             return null;
443         }
444 
445         @Override
446         public void setThreadLocalContext(C context) {
447         }
448 
449         @Override
450         public void clearThreadLocalContext() {
451         }
452     }
453 
454     private static final class DefaultApplicationNameSupplier implements Supplier<String> {
455         private final ApplicationProperties applicationProperties;
456 
457         public DefaultApplicationNameSupplier(ApplicationProperties applicationProperties) {
458             this.applicationProperties = checkNotNull(applicationProperties);
459         }
460 
461         @Override
462         public String get() {
463             return format("%s-%s (%s)",
464                     applicationProperties.getDisplayName(),
465                     applicationProperties.getVersion(),
466                     applicationProperties.getBuildNumber());
467         }
468     }
469 
470     private static class EventConsumerFunction implements Function<Object, Void> {
471         private final EventPublisher eventPublisher;
472 
473         public EventConsumerFunction(EventPublisher eventPublisher) {
474             this.eventPublisher = eventPublisher;
475         }
476 
477         @Override
478         public Void apply(Object event) {
479             eventPublisher.publish(event);
480             return null;
481         }
482     }
483 
484     private static String[] split(final String s) {
485         if (TextUtils.isBlank(s)) {
486             return null;
487         }
488         return s.split(" *, *");
489     }
490 }