View Javadoc

1   package com.atlassian.httpclient.apache.httpcomponents;
2   
3   import com.atlassian.event.api.EventPublisher;
4   import com.atlassian.httpclient.apache.httpcomponents.cache.FlushableHttpCacheStorage;
5   import com.atlassian.httpclient.apache.httpcomponents.cache.FlushableHttpCacheStorageImpl;
6   import com.atlassian.httpclient.apache.httpcomponents.cache.LoggingHttpCacheStorage;
7   import com.atlassian.httpclient.apache.httpcomponents.proxy.ProxyConfigFactory;
8   import com.atlassian.httpclient.apache.httpcomponents.proxy.ProxyCredentialsProvider;
9   import com.atlassian.httpclient.api.*;
10  import com.atlassian.httpclient.api.factory.HttpClientOptions;
11  import com.atlassian.httpclient.base.AbstractHttpClient;
12  import com.atlassian.httpclient.base.event.HttpRequestCompletedEvent;
13  import com.atlassian.httpclient.base.event.HttpRequestFailedEvent;
14  import com.atlassian.sal.api.ApplicationProperties;
15  import com.atlassian.sal.api.executor.ThreadLocalContextManager;
16  import com.atlassian.util.concurrent.ThreadFactories;
17  import com.google.common.base.*;
18  import com.google.common.primitives.Ints;
19  import org.apache.http.Header;
20  import org.apache.http.HttpEntity;
21  import org.apache.http.HttpResponse;
22  import org.apache.http.StatusLine;
23  import org.apache.http.client.config.CookieSpecs;
24  import org.apache.http.client.config.RequestConfig;
25  import org.apache.http.client.methods.*;
26  import org.apache.http.config.Registry;
27  import org.apache.http.config.RegistryBuilder;
28  import org.apache.http.conn.ssl.SSLContextBuilder;
29  import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
30  import org.apache.http.conn.ssl.X509HostnameVerifier;
31  import org.apache.http.impl.client.ProxyAuthenticationStrategy;
32  import org.apache.http.impl.client.cache.CacheConfig;
33  import org.apache.http.impl.client.cache.CachingHttpAsyncClient;
34  import org.apache.http.impl.conn.DefaultSchemePortResolver;
35  import org.apache.http.impl.conn.SystemDefaultRoutePlanner;
36  import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
37  import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
38  import org.apache.http.impl.nio.client.HttpAsyncClients;
39  import org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionFactory;
40  import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager;
41  import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
42  import org.apache.http.impl.nio.reactor.IOReactorConfig;
43  import org.apache.http.nio.conn.NoopIOSessionStrategy;
44  import org.apache.http.nio.conn.SchemeIOSessionStrategy;
45  import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
46  import org.apache.http.nio.reactor.IOReactorException;
47  import org.apache.http.nio.reactor.IOReactorExceptionHandler;
48  import org.apache.http.protocol.BasicHttpContext;
49  import org.apache.http.util.TextUtils;
50  import org.slf4j.Logger;
51  import org.slf4j.LoggerFactory;
52  import org.springframework.beans.factory.DisposableBean;
53  
54  import javax.net.ssl.SSLContext;
55  import javax.net.ssl.SSLSession;
56  import javax.net.ssl.SSLSocket;
57  import java.io.IOException;
58  import java.security.GeneralSecurityException;
59  import java.security.KeyManagementException;
60  import java.security.KeyStoreException;
61  import java.security.NoSuchAlgorithmException;
62  import java.security.cert.X509Certificate;
63  import java.util.List;
64  import java.util.Map;
65  import java.util.concurrent.ExecutorService;
66  import java.util.concurrent.TimeUnit;
67  import java.util.regex.Pattern;
68  
69  import static com.atlassian.util.concurrent.Promises.rejected;
70  import static com.google.common.base.Preconditions.checkNotNull;
71  import static java.lang.String.format;
72  
73  public final class ApacheAsyncHttpClient<C> extends AbstractHttpClient implements HttpClient, DisposableBean {
74      private final Logger log = LoggerFactory.getLogger(this.getClass());
75  
76      private static final Supplier<String> httpClientVersion = Suppliers.memoize(
77              () -> MavenUtils.getVersion("com.atlassian.httpclient", "atlassian-httpclient-api"));
78  
79      private final Function<Object, Void> eventConsumer;
80      private final Supplier<String> applicationName;
81      private final ThreadLocalContextManager<C> threadLocalContextManager;
82      private final ExecutorService callbackExecutor;
83      private final HttpClientOptions httpClientOptions;
84  
85      private final CachingHttpAsyncClient httpClient;
86      private final CloseableHttpAsyncClient nonCachingHttpClient;
87      private final FlushableHttpCacheStorage httpCacheStorage;
88  
89      public ApacheAsyncHttpClient(EventPublisher eventConsumer, ApplicationProperties applicationProperties,
90                                   ThreadLocalContextManager<C> threadLocalContextManager) {
91          this(eventConsumer, applicationProperties, threadLocalContextManager, new HttpClientOptions());
92      }
93  
94      public ApacheAsyncHttpClient(EventPublisher eventConsumer,
95                                   ApplicationProperties applicationProperties,
96                                   ThreadLocalContextManager<C> threadLocalContextManager,
97                                   HttpClientOptions options) {
98          this(new DefaultApplicationNameSupplier(applicationProperties),
99                  new EventConsumerFunction(eventConsumer),
100                 threadLocalContextManager,
101                 options);
102     }
103 
104     public ApacheAsyncHttpClient(String applicationName) {
105         this(applicationName, new HttpClientOptions());
106     }
107 
108     public ApacheAsyncHttpClient(String applicationName, final HttpClientOptions options) {
109         this(Suppliers.ofInstance(applicationName), Functions.constant(null), new NoOpThreadLocalContextManager<>(), options);
110     }
111 
112     public ApacheAsyncHttpClient(final Supplier<String> applicationName,
113                                  final Function<Object, Void> eventConsumer,
114                                  final ThreadLocalContextManager<C> threadLocalContextManager,
115                                  final HttpClientOptions options) {
116         this.eventConsumer = checkNotNull(eventConsumer);
117         this.applicationName = checkNotNull(applicationName);
118         this.threadLocalContextManager = checkNotNull(threadLocalContextManager);
119         this.httpClientOptions = checkNotNull(options);
120 
121         try {
122             final IOReactorConfig reactorConfig = IOReactorConfig.custom()
123                     .setIoThreadCount(options.getIoThreadCount())
124                     .setSelectInterval(options.getIoSelectInterval())
125                     .setInterestOpQueued(true)
126                     .build();
127 
128             final DefaultConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(reactorConfig);
129             ioReactor.setExceptionHandler(new IOReactorExceptionHandler() {
130                 @Override
131                 public boolean handle(final IOException e) {
132                     log.error("IO exception in reactor ", e);
133                     return false;
134                 }
135 
136                 @Override
137                 public boolean handle(final RuntimeException e) {
138                     log.error("Fatal runtime error", e);
139                     return false;
140                 }
141             });
142 
143             List<String> bannedAddresses = options.getBlacklistedAddresses();
144             HostResolver resolver;
145             if (bannedAddresses.isEmpty()) {
146                 resolver = DefaultHostResolver.INSTANCE;
147             } else {
148                 resolver = new BannedHostResolver(bannedAddresses);
149             }
150 
151             final PoolingNHttpClientConnectionManager connectionManager = new PoolingNHttpClientConnectionManager(
152                     ioReactor,
153                     ManagedNHttpClientConnectionFactory.INSTANCE,
154                     getRegistry(options),
155                     DefaultSchemePortResolver.INSTANCE,
156                     resolver::resolve,
157                     options.getConnectionPoolTimeToLive(),
158                     TimeUnit.MILLISECONDS) {
159                 @SuppressWarnings("MethodDoesntCallSuperMethod")
160                 @Override
161                 protected void finalize() {
162                     // prevent the PoolingClientAsyncConnectionManager from logging - this causes exceptions due to
163                     // the ClassLoader probably having been removed when the plugin shuts down.  Added a
164                     // PluginEventListener to make sure the shutdown method is called while the plugin classloader
165                     // is still active.
166                 }
167             };
168 
169             final RequestConfig requestConfig = RequestConfig.custom()
170                     .setConnectTimeout((int) options.getConnectionTimeout())
171                     .setConnectionRequestTimeout((int) options.getLeaseTimeout())
172                     .setCookieSpec(options.getIgnoreCookies() ? CookieSpecs.IGNORE_COOKIES : CookieSpecs.DEFAULT)
173                     .setSocketTimeout((int) options.getSocketTimeout())
174                     .build();
175 
176             connectionManager.setDefaultMaxPerRoute(options.getMaxConnectionsPerHost());
177             connectionManager.setMaxTotal(options.getMaxTotalConnections());
178 
179             final HttpAsyncClientBuilder clientBuilder = HttpAsyncClients.custom()
180                     .setThreadFactory(ThreadFactories.namedThreadFactory(options.getThreadPrefix() + "-io", ThreadFactories.Type.DAEMON))
181                     .setDefaultIOReactorConfig(reactorConfig)
182                     .setConnectionManager(connectionManager)
183                     .setRedirectStrategy(new RedirectStrategy())
184                     .setUserAgent(getUserAgent(options))
185                     .setDefaultRequestConfig(requestConfig);
186 
187             // set up a route planner if there is proxy configuration
188             ProxyConfigFactory.getProxyConfig(options).forEach(proxyConfig -> {
189                 // don't be fooled by its name. If SystemDefaultRoutePlanner is passed a proxy selector it will use that
190                 // instead of creating the default one that reads system properties
191                 clientBuilder.setRoutePlanner(new SystemDefaultRoutePlanner(DefaultSchemePortResolver.INSTANCE, proxyConfig.toProxySelector()));
192 
193                 ProxyCredentialsProvider.build(options).forEach(credsProvider -> {
194                     clientBuilder.setProxyAuthenticationStrategy(ProxyAuthenticationStrategy.INSTANCE);
195                     clientBuilder.setDefaultCredentialsProvider(credsProvider);
196                 });
197             });
198 
199             this.nonCachingHttpClient = new BoundedHttpAsyncClient(clientBuilder.build(),
200                     Ints.saturatedCast(options.getMaxEntitySize()));
201 
202             final CacheConfig cacheConfig = CacheConfig.custom()
203                     .setMaxCacheEntries(options.getMaxCacheEntries())
204                     .setSharedCache(false)
205                     .setNeverCacheHTTP10ResponsesWithQueryString(false)
206                     .setMaxObjectSize(options.getMaxCacheObjectSize())
207                     .build();
208 
209             this.httpCacheStorage = new LoggingHttpCacheStorage(new FlushableHttpCacheStorageImpl(cacheConfig));
210             this.httpClient = new CachingHttpAsyncClient(nonCachingHttpClient, httpCacheStorage, cacheConfig);
211             this.callbackExecutor = options.getCallbackExecutor();
212 
213             nonCachingHttpClient.start();
214         } catch (IOReactorException e) {
215             throw new RuntimeException("Reactor " + options.getThreadPrefix() + "not set up correctly", e);
216         }
217     }
218 
219     private Registry<SchemeIOSessionStrategy> getRegistry(final HttpClientOptions options) {
220         try {
221             final TrustSelfSignedStrategy strategy = options.trustSelfSignedCertificates() ?
222                     new TrustSelfSignedStrategy() : null;
223 
224             final SSLContext sslContext = new SSLContextBuilder()
225                     .useTLS()
226                     .loadTrustMaterial(null, strategy)
227                     .build();
228 
229             final SSLIOSessionStrategy sslioSessionStrategy = new SSLIOSessionStrategy(
230                     sslContext,
231                     split(System.getProperty("https.protocols")),
232                     split(System.getProperty("https.cipherSuites")),
233                     options.trustSelfSignedCertificates() ?
234                             getSelfSignedVerifier() : SSLIOSessionStrategy.BROWSER_COMPATIBLE_HOSTNAME_VERIFIER);
235 
236             return RegistryBuilder.<SchemeIOSessionStrategy>create()
237                     .register("http", NoopIOSessionStrategy.INSTANCE)
238                     .register("https", sslioSessionStrategy)
239                     .build();
240         } catch (KeyManagementException | NoSuchAlgorithmException | KeyStoreException e) {
241             return getFallbackRegistry(e);
242         }
243     }
244 
245     private X509HostnameVerifier getSelfSignedVerifier() {
246         return new X509HostnameVerifier() {
247             @Override
248             public void verify(final String host, final SSLSocket ssl) {
249                 log.debug("Verification for certificates from {0} disabled", host);
250             }
251 
252             @Override
253             public void verify(final String host, final X509Certificate cert) {
254                 log.debug("Verification for certificates from {0} disabled", host);
255             }
256 
257             @Override
258             public void verify(final String host, final String[] cns, final String[] subjectAlts) {
259                 log.debug("Verification for certificates from {0} disabled", host);
260             }
261 
262             @Override
263             public boolean verify(final String host, final SSLSession sslSession) {
264                 log.debug("Verification for certificates from {0} disabled", host);
265                 return true;
266             }
267         };
268     }
269 
270     private Registry<SchemeIOSessionStrategy> getFallbackRegistry(final GeneralSecurityException e) {
271         log.error("Error when creating scheme session strategy registry", e);
272         return RegistryBuilder.<SchemeIOSessionStrategy>create()
273                 .register("http", NoopIOSessionStrategy.INSTANCE)
274                 .register("https", SSLIOSessionStrategy.getDefaultStrategy())
275                 .build();
276     }
277 
278     private String getUserAgent(HttpClientOptions options) {
279         return format("Atlassian HttpClient %s / %s / %s",
280                 httpClientVersion.get(),
281                 applicationName.get(),
282                 options.getUserAgent());
283     }
284 
285     @Override
286     public final ResponsePromise execute(final Request request) {
287         try {
288             return doExecute(request);
289         } catch (Throwable t) {
290             return ResponsePromises.toResponsePromise(rejected(t, Response.class));
291         }
292     }
293 
294     private ResponsePromise doExecute(final Request request) {
295         httpClientOptions.getRequestPreparer().apply(request);
296 
297         final long start = System.currentTimeMillis();
298         final HttpRequestBase op;
299         final String uri = request.getUri().toString();
300         final Request.Method method = request.getMethod();
301         switch (method) {
302             case GET:
303                 op = new HttpGet(uri);
304                 break;
305             case POST:
306                 op = new HttpPost(uri);
307                 break;
308             case PUT:
309                 op = new HttpPut(uri);
310                 break;
311             case DELETE:
312                 op = new HttpDelete(uri);
313                 break;
314             case OPTIONS:
315                 op = new HttpOptions(uri);
316                 break;
317             case HEAD:
318                 op = new HttpHead(uri);
319                 break;
320             case TRACE:
321                 op = new HttpTrace(uri);
322                 break;
323             default:
324                 throw new UnsupportedOperationException(method.toString());
325         }
326         if (request.hasEntity()) {
327             new RequestEntityEffect(request).apply(op);
328         }
329 
330         for (Map.Entry<String, String> entry : request.getHeaders().entrySet()) {
331             op.setHeader(entry.getKey(), entry.getValue());
332         }
333 
334         final PromiseHttpAsyncClient asyncClient = getPromiseHttpAsyncClient(request);
335         return ResponsePromises.toResponsePromise(asyncClient.execute(op, new BasicHttpContext()).fold(
336                 ex -> {
337                     final long requestDuration = System.currentTimeMillis() - start;
338                     Throwable exception = maybeTranslate(ex);
339                     publishEvent(request, requestDuration, exception);
340                     throw Throwables.propagate(exception);
341                 },
342                 httpResponse -> {
343                     final long requestDuration = System.currentTimeMillis() - start;
344                     publishEvent(request, requestDuration, httpResponse.getStatusLine().getStatusCode());
345                     try {
346                         return translate(httpResponse);
347                     } catch (IOException e) {
348                         throw Throwables.propagate(e);
349                     }
350                 }
351         ));
352     }
353 
354     private void publishEvent(Request request, long requestDuration, int statusCode) {
355         if (HttpStatus.OK.code <= statusCode && statusCode < HttpStatus.MULTIPLE_CHOICES.code) {
356             eventConsumer.apply(new HttpRequestCompletedEvent(
357                     request.getUri().toString(),
358                     request.getMethod().name(),
359                     statusCode,
360                     requestDuration,
361                     request.getAttributes()));
362         } else {
363             eventConsumer.apply(new HttpRequestFailedEvent(
364                     request.getUri().toString(),
365                     request.getMethod().name(),
366                     statusCode,
367                     requestDuration,
368                     request.getAttributes()));
369         }
370     }
371 
372     private void publishEvent(Request request, long requestDuration, Throwable ex) {
373         eventConsumer.apply(new HttpRequestFailedEvent(
374                 request.getUri().toString(),
375                 request.getMethod().name(),
376                 ex.toString(),
377                 requestDuration,
378                 request.getAttributes()));
379     }
380 
381     private PromiseHttpAsyncClient getPromiseHttpAsyncClient(Request request) {
382         return new SettableFuturePromiseHttpPromiseAsyncClient<>(
383                 request.isCacheDisabled() ? nonCachingHttpClient : httpClient,
384                 threadLocalContextManager, callbackExecutor);
385     }
386 
387     private Throwable maybeTranslate(Throwable ex) {
388         if (ex instanceof EntityTooLargeException) {
389             EntityTooLargeException tooLarge = (EntityTooLargeException) ex;
390             try {
391                 // don't include the cause to ensure that the HttpResponse is released
392                 return new ResponseTooLargeException(translate(tooLarge.getResponse()), ex.getMessage());
393             } catch (IOException e) {
394                 // could not translate, just return the original exception
395             }
396         }
397         return ex;
398     }
399 
400     private Response translate(HttpResponse httpResponse) throws IOException {
401         StatusLine status = httpResponse.getStatusLine();
402         Response.Builder responseBuilder = DefaultResponse.builder()
403                 .setMaxEntitySize(httpClientOptions.getMaxEntitySize())
404                 .setStatusCode(status.getStatusCode())
405                 .setStatusText(status.getReasonPhrase());
406 
407         Header[] httpHeaders = httpResponse.getAllHeaders();
408         for (Header httpHeader : httpHeaders) {
409             responseBuilder.setHeader(httpHeader.getName(), httpHeader.getValue());
410         }
411         final HttpEntity entity = httpResponse.getEntity();
412         if (entity != null) {
413             responseBuilder.setEntityStream(entity.getContent());
414         }
415         return responseBuilder.build();
416     }
417 
418     @Override
419     public void destroy() throws Exception {
420         callbackExecutor.shutdown();
421         nonCachingHttpClient.close();
422     }
423 
424     @Override
425     public void flushCacheByUriPattern(Pattern urlPattern) {
426         httpCacheStorage.flushByUriPattern(urlPattern);
427     }
428 
429     private static final class NoOpThreadLocalContextManager<C> implements ThreadLocalContextManager<C> {
430         @Override
431         public C getThreadLocalContext() {
432             return null;
433         }
434 
435         @Override
436         public void setThreadLocalContext(C context) {
437         }
438 
439         @Override
440         public void clearThreadLocalContext() {
441         }
442     }
443 
444     private static final class DefaultApplicationNameSupplier implements Supplier<String> {
445         private final ApplicationProperties applicationProperties;
446 
447         public DefaultApplicationNameSupplier(ApplicationProperties applicationProperties) {
448             this.applicationProperties = checkNotNull(applicationProperties);
449         }
450 
451         @Override
452         public String get() {
453             return format("%s-%s (%s)",
454                     applicationProperties.getDisplayName(),
455                     applicationProperties.getVersion(),
456                     applicationProperties.getBuildNumber());
457         }
458     }
459 
460     private static class EventConsumerFunction implements Function<Object, Void> {
461         private final EventPublisher eventPublisher;
462 
463         public EventConsumerFunction(EventPublisher eventPublisher) {
464             this.eventPublisher = eventPublisher;
465         }
466 
467         @Override
468         public Void apply(Object event) {
469             eventPublisher.publish(event);
470             return null;
471         }
472     }
473 
474     private static String[] split(final String s) {
475         if (TextUtils.isBlank(s)) {
476             return null;
477         }
478         return s.split(" *, *");
479     }
480 }