View Javadoc

1   package com.atlassian.messagequeue.internal.sqs;
2   
3   import com.amazonaws.services.sqs.AmazonSQS;
4   import com.amazonaws.services.sqs.model.InvalidMessageContentsException;
5   import com.amazonaws.services.sqs.model.SendMessageRequest;
6   import com.amazonaws.services.sqs.model.SendMessageResult;
7   import com.amazonaws.services.sqs.model.UnsupportedOperationException;
8   import com.atlassian.messagequeue.Message;
9   import com.atlassian.messagequeue.MessageInformationService;
10  import com.atlassian.messagequeue.MessageRunnerNotRegisteredException;
11  import com.atlassian.messagequeue.MessageRunnerService;
12  import com.atlassian.messagequeue.MessageRunnerServiceException;
13  import com.atlassian.messagequeue.TenantDataIdSupplier;
14  import com.atlassian.messagequeue.internal.core.DefaultThreadFactory;
15  import com.atlassian.messagequeue.internal.core.MessageRunnerRegistryHelper;
16  import com.atlassian.messagequeue.internal.core.MessageValidatorRegistryHelper;
17  import com.atlassian.messagequeue.internal.core.NestedMessageConsumer;
18  import com.atlassian.messagequeue.internal.core.NestedMessageSerializer;
19  import com.atlassian.tenant.api.TenantContextProvider;
20  import com.atlassian.tenant.impl.TenantIdSetter;
21  import org.slf4j.Logger;
22  import org.slf4j.LoggerFactory;
23  import org.slf4j.MDC;
24  
25  import javax.annotation.Nonnull;
26  import javax.annotation.PreDestroy;
27  import java.util.Collections;
28  import java.util.HashMap;
29  import java.util.HashSet;
30  import java.util.Map;
31  import java.util.Set;
32  import java.util.concurrent.ExecutorService;
33  import java.util.concurrent.LinkedBlockingQueue;
34  import java.util.concurrent.ThreadPoolExecutor;
35  import java.util.concurrent.TimeUnit;
36  import java.util.concurrent.atomic.AtomicBoolean;
37  
38  import static com.atlassian.messagequeue.internal.core.NestedMessageConsumer.MDC_MESSAGE_RUNNER_KEY;
39  import static java.util.Objects.requireNonNull;
40  
41  /**
42   * SQS message runner service that produces messages onto an SQS queue and manages a pool of threads that consume from said queue.
43   * <p>
44   * To properly initialise an instance of this service, a client must call {@link #initialiseMessageConsumers()} after
45   * constructing an instance (or else ensure the dependency injection framework will automatically invoke methods
46   * annotated with @PostConstruct).
47   * <p>
48   * To properly dispose of an instance of this service, a client must call {@link #shutdown()} (or else ensure the
49   * dependency injection framework will automatically invoke methods annotated with @PreDestroy).
50   */
51  public class SQSMessageRunnerService implements MessageRunnerService {
52      static final String SENT_TIMESTAMP = "SentTimestamp";
53      static final String APPROXIMATE_RECEIVE_COUNT = "ApproximateReceiveCount";
54      static final long AWAIT_TERMINATION_TIMEOUT_SECONDS = Long.getLong("amq.sqs.await.termination.timeout", TimeUnit.SECONDS.convert(1, TimeUnit.HOURS));
55      static final String MDC_MESSAGE_ID = "amq.messageId";
56  
57      private static final Logger log = LoggerFactory.getLogger(SQSMessageRunnerService.class);
58  
59      private static final int DEFAULT_RECEIVE_WAIT_TIME_SECONDS = Integer.getInteger("amq.sqs.receive.wait.time", 20);
60  
61      private final AmazonSQS amazonSQSClient;
62      private final MessageRunnerRegistryHelper messageRunnerRegistryHelper;
63      private final AtomicBoolean shuttingDown = new AtomicBoolean(false);
64      private final int receiveWaitTimeSeconds;
65      private final Map<String, ExecutorService> queueNameToConsumerThreadPools = new HashMap<>();
66      private final AtomicBoolean initialised = new AtomicBoolean(false);
67      private final MessageInformationService messageInformationService;
68      private final NestedMessageSerializer nestedMessageSerializer;
69      private final NestedMessageConsumer nestedMessageConsumer;
70      private final SQSMessageVisibilityTimeoutManager sqsMessageVisibilityTimeoutManager;
71      private final SQSMessageRunnerKeyToProducerMapper messageRunnerKeyToProducerMapper;
72      private final Set<SQSConsumerQueueConfig> consumerQueueConfigs;
73  
74      private SQSMessageRunnerService(Builder builder) {
75          if (builder.messageValidatorRegistryHelper == null) {
76              builder.messageValidatorRegistryHelper = Collections::emptyList;
77          }
78  
79          if (builder.tenantDataIdSupplier == null) {
80              builder.tenantDataIdSupplier = () -> "";
81          }
82  
83          if (builder.receiveWaitTimeSeconds == null) {
84              builder.receiveWaitTimeSeconds = DEFAULT_RECEIVE_WAIT_TIME_SECONDS;
85          }
86  
87          if (builder.sqsMessageVisibilityTimeoutManager == null) {
88              builder.sqsMessageVisibilityTimeoutManager = new SQSMessageVisibilityTimeoutManager(
89                      requireNonNull(builder.amazonSQSClient));
90          }
91  
92          this.amazonSQSClient = requireNonNull(builder.amazonSQSClient);
93          this.messageRunnerRegistryHelper = requireNonNull(builder.messageRunnerRegistryHelper);
94          this.receiveWaitTimeSeconds = builder.receiveWaitTimeSeconds;
95          this.nestedMessageSerializer = requireNonNull(builder.nestedMessageSerializer);
96          this.nestedMessageConsumer = new NestedMessageConsumer(messageRunnerRegistryHelper, requireNonNull(builder.tenantIdSetter), requireNonNull(builder.messageValidatorRegistryHelper));
97          this.messageRunnerKeyToProducerMapper = requireNonNull(builder.messageRunnerKeyToProducerMapper);
98          this.consumerQueueConfigs = requireNonNull(builder.consumerQueueConfigs);
99  
100         // Create separate thread pools for configured queues
101         consumerQueueConfigs.forEach(queueConfig -> {
102             // Create ThreadPools for the queue
103             final ThreadPoolExecutor queueExecutor = new ThreadPoolExecutor(queueConfig.getCorePoolSize(), queueConfig.getMaxPoolSize(), 0L, TimeUnit.MILLISECONDS,
104                     new LinkedBlockingQueue<>(),
105                     new DefaultThreadFactory("sqs-consumer-thread-" + queueConfig.getQueueName() + "-%d", (t, throwable) -> {
106                         log.warn("SQS consumer thread '{}' died due to an exception.", t.getName(), throwable);
107                     }));
108             queueNameToConsumerThreadPools.put(queueConfig.getQueueName(), queueExecutor);
109         });
110 
111         if (builder.messageInformationService == null) {
112             builder.messageInformationService = new SQSMessageInformationService(messageRunnerKeyToProducerMapper,
113                     requireNonNull(builder.tenantContextProvider, "Either provide a TenantContextProvider, or a MessageInformationService"),
114                     requireNonNull(builder.nestedMessageSerializer, "Either provide a NestedMessageSerializer, or a MessageInformationService"),
115                     requireNonNull(builder.tenantDataIdSupplier));
116         }
117         this.messageInformationService = requireNonNull(builder.messageInformationService);
118 
119         this.sqsMessageVisibilityTimeoutManager = requireNonNull(builder.sqsMessageVisibilityTimeoutManager, "sqsMessageVisibilityTimeoutManager");
120     }
121 
122     public synchronized void initialiseMessageConsumers() {
123         if (initialised.get()) {
124             log.info("SQS consumers have already been initialised. Skipping it.");
125             return;
126         }
127 
128         log.info("Initialising SQS message consumers");
129 
130         consumerQueueConfigs.forEach(queueConfig -> {
131             log.info("Constructing queue consumer thread pool (queueName: {}, concurrentConsumers:  {}, queueUrl: {}, receiveWaitTimeSeconds: {})",
132                     queueConfig.getQueueName(), queueConfig.getCorePoolSize(), queueConfig.getQueueUrl(), receiveWaitTimeSeconds);
133 
134             // Spawn message consumer threads
135             final ExecutorService executorService = queueNameToConsumerThreadPools.get(queueConfig.getQueueName());
136             for (int i = 0; i < queueConfig.getCorePoolSize(); i++) {
137                 executorService.execute(new SQSMessageConsumer(queueConfig, receiveWaitTimeSeconds, shuttingDown,
138                         amazonSQSClient, nestedMessageSerializer, nestedMessageConsumer, sqsMessageVisibilityTimeoutManager));
139             }
140         });
141 
142         initialised.compareAndSet(false, true);
143     }
144 
145     @PreDestroy
146     public void shutdown() {
147         log.info("Shutting down {}", this.getClass().getSimpleName());
148 
149         shuttingDown.compareAndSet(false, true);
150 
151         // Tell all executors to shutdown
152         queueNameToConsumerThreadPools.forEach((queueName, executor) -> executor.shutdown());
153         // Wait for all executors to shutdown
154         queueNameToConsumerThreadPools.forEach((queueName, executor) -> {
155             try {
156                 if (!executor.awaitTermination(AWAIT_TERMINATION_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
157                     log.warn("Pool for queue {} did not terminate in {} seconds", queueName, AWAIT_TERMINATION_TIMEOUT_SECONDS);
158                 }
159             } catch (InterruptedException ie) {
160                 Thread.currentThread().interrupt();
161             }
162         });
163 
164         sqsMessageVisibilityTimeoutManager.shutdown();
165     }
166 
167     @Override
168     public void addMessage(@Nonnull Message message) {
169         if (!messageRunnerRegistryHelper.getMessageRunner(message.getRunnerKey()).isPresent()) {
170             throw new MessageRunnerNotRegisteredException(message.getRunnerKey());
171         }
172 
173         final SendMessageResult sendMessageResult;
174         final String queueUrl = messageInformationService.getQueueUrl(message.getRunnerKey());
175         try {
176             sendMessageResult = amazonSQSClient.sendMessage(new SendMessageRequest(queueUrl,
177                     messageInformationService.toPayload(message)));
178         } catch (InvalidMessageContentsException e) {
179             throw new MessageRunnerServiceException(e);
180         } catch (UnsupportedOperationException e) {
181             throw new java.lang.UnsupportedOperationException(e);
182         }
183 
184         if (log.isInfoEnabled()) {
185             MDC.put(MDC_MESSAGE_ID, sendMessageResult.getMessageId());
186             MDC.put(MDC_MESSAGE_RUNNER_KEY, message.getRunnerKey().toString());
187             try {
188                 log.info("Message produced to {} (messageId: {}, messageRunnerKey: {})",
189                         queueUrl, sendMessageResult.getMessageId(), message.getRunnerKey().toString());
190             } finally {
191                 MDC.remove(MDC_MESSAGE_ID);
192                 MDC.remove(MDC_MESSAGE_RUNNER_KEY);
193             }
194         }
195     }
196 
197     /**
198      * Create a new SQSMessageRunnerService builder
199      *
200      * @param messageRunnerKeyToProducerMapper mapping from {@link com.atlassian.messagequeue.MessageRunnerKey}s to {@link SQSProducerQueueConfig}s
201      * @param consumerQueueConfigs             all {@link SQSConsumerQueueConfig}s should be listened to.
202      * @since 2.1.0
203      */
204     public static Builder newBuilder(SQSMessageRunnerKeyToProducerMapper messageRunnerKeyToProducerMapper, Set<SQSConsumerQueueConfig> consumerQueueConfigs) {
205         return new Builder(messageRunnerKeyToProducerMapper, consumerQueueConfigs);
206     }
207 
208     /**
209      * A builder to help make the SQSMessageRunnerService.
210      * <p>
211      * The {@link TenantDataIdSupplier} and {@link MessageValidatorRegistryHelper} are optional.
212      * You may choose to provide a {@link MessageInformationService} instead of a {@link TenantContextProvider}, and a {@link NestedMessageSerializer}.
213      * You may optionally provide a {@link SQSMessageVisibilityTimeoutManager}
214      */
215     public static class Builder {
216         private AmazonSQS amazonSQSClient;
217         private MessageRunnerRegistryHelper messageRunnerRegistryHelper;
218         private TenantIdSetter tenantIdSetter;
219         private MessageInformationService messageInformationService;
220         private NestedMessageSerializer nestedMessageSerializer;
221         private MessageValidatorRegistryHelper messageValidatorRegistryHelper;
222         private Integer receiveWaitTimeSeconds;
223         private SQSMessageVisibilityTimeoutManager sqsMessageVisibilityTimeoutManager;
224         private TenantContextProvider<?> tenantContextProvider;
225         private TenantDataIdSupplier tenantDataIdSupplier;
226         private SQSMessageRunnerKeyToProducerMapper messageRunnerKeyToProducerMapper;
227         private Set<SQSConsumerQueueConfig> consumerQueueConfigs;
228 
229         Builder(SQSMessageRunnerKeyToProducerMapper messageRunnerKeyToProducerMapper, Set<SQSConsumerQueueConfig> consumerQueueConfigs) {
230             this.messageRunnerKeyToProducerMapper = requireNonNull(messageRunnerKeyToProducerMapper);
231             this.consumerQueueConfigs = new HashSet<>(requireNonNull(consumerQueueConfigs));
232         }
233 
234         public Builder withAmazonSQSClient(AmazonSQS amazonSQSClient) {
235             this.amazonSQSClient = amazonSQSClient;
236             return this;
237         }
238 
239         public Builder withMessageRunnerRegistryHelper(MessageRunnerRegistryHelper messageRunnerRegistryHelper) {
240             this.messageRunnerRegistryHelper = messageRunnerRegistryHelper;
241             return this;
242         }
243 
244         public Builder withTenantIdSetter(TenantIdSetter tenantIdSetter) {
245             this.tenantIdSetter = tenantIdSetter;
246             return this;
247         }
248 
249         public Builder withMessageInformationService(MessageInformationService messageInformationService) {
250             this.messageInformationService = messageInformationService;
251             return this;
252         }
253 
254         public Builder withNestedMessageSerializer(NestedMessageSerializer nestedMessageSerializer) {
255             this.nestedMessageSerializer = nestedMessageSerializer;
256             return this;
257         }
258 
259         public Builder withMessageValidatorRegistryHelper(MessageValidatorRegistryHelper messageValidatorRegistryHelper) {
260             this.messageValidatorRegistryHelper = messageValidatorRegistryHelper;
261             return this;
262         }
263 
264         /* For testing only */
265         Builder withReceiveWaitTimeSeconds(int receiveWaitTimeSeconds) {
266             this.receiveWaitTimeSeconds = receiveWaitTimeSeconds;
267             return this;
268         }
269 
270         public Builder withSqsMessageVisibilityTimeoutManager(SQSMessageVisibilityTimeoutManager sqsMessageVisibilityTimeoutManager) {
271             this.sqsMessageVisibilityTimeoutManager = sqsMessageVisibilityTimeoutManager;
272             return this;
273         }
274 
275         public Builder withTenantContextProvider(TenantContextProvider<?> tenantContextProvider) {
276             this.tenantContextProvider = tenantContextProvider;
277             return this;
278         }
279 
280         public Builder withTenantDataIdSupplier(TenantDataIdSupplier tenantDataIdSupplier) {
281             this.tenantDataIdSupplier = tenantDataIdSupplier;
282             return this;
283         }
284 
285         public SQSMessageRunnerService build() {
286             return new SQSMessageRunnerService(this);
287         }
288     }
289 
290 
291 }