1 package com.atlassian.messagequeue.internal.sqs;
2
3 import com.amazonaws.services.sqs.AmazonSQS;
4 import com.amazonaws.services.sqs.model.InvalidMessageContentsException;
5 import com.amazonaws.services.sqs.model.SendMessageRequest;
6 import com.amazonaws.services.sqs.model.SendMessageResult;
7 import com.amazonaws.services.sqs.model.UnsupportedOperationException;
8 import com.atlassian.messagequeue.Message;
9 import com.atlassian.messagequeue.MessageInformationService;
10 import com.atlassian.messagequeue.MessageRunnerNotRegisteredException;
11 import com.atlassian.messagequeue.MessageRunnerService;
12 import com.atlassian.messagequeue.MessageRunnerServiceException;
13 import com.atlassian.messagequeue.TenantDataIdSupplier;
14 import com.atlassian.messagequeue.internal.core.DefaultThreadFactory;
15 import com.atlassian.messagequeue.internal.core.MessageRunnerRegistryHelper;
16 import com.atlassian.messagequeue.internal.core.MessageValidatorRegistryHelper;
17 import com.atlassian.messagequeue.internal.core.NestedMessageConsumer;
18 import com.atlassian.messagequeue.internal.core.NestedMessageSerializer;
19 import com.atlassian.tenant.api.TenantContextProvider;
20 import com.atlassian.tenant.impl.TenantIdSetter;
21 import org.slf4j.Logger;
22 import org.slf4j.LoggerFactory;
23 import org.slf4j.MDC;
24
25 import javax.annotation.Nonnull;
26 import javax.annotation.PreDestroy;
27 import java.util.Collections;
28 import java.util.HashMap;
29 import java.util.HashSet;
30 import java.util.Map;
31 import java.util.Set;
32 import java.util.concurrent.ExecutorService;
33 import java.util.concurrent.LinkedBlockingQueue;
34 import java.util.concurrent.ThreadPoolExecutor;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.atomic.AtomicBoolean;
37
38 import static com.atlassian.messagequeue.internal.core.NestedMessageConsumer.MDC_MESSAGE_RUNNER_KEY;
39 import static java.util.Objects.requireNonNull;
40
41
42
43
44
45
46
47
48
49
50
51 public class SQSMessageRunnerService implements MessageRunnerService {
52 static final String SENT_TIMESTAMP = "SentTimestamp";
53 static final String APPROXIMATE_RECEIVE_COUNT = "ApproximateReceiveCount";
54 static final long AWAIT_TERMINATION_TIMEOUT_SECONDS = Long.getLong("amq.sqs.await.termination.timeout", TimeUnit.SECONDS.convert(1, TimeUnit.HOURS));
55 static final String MDC_MESSAGE_ID = "amq.messageId";
56
57 private static final Logger log = LoggerFactory.getLogger(SQSMessageRunnerService.class);
58
59 private static final int DEFAULT_RECEIVE_WAIT_TIME_SECONDS = Integer.getInteger("amq.sqs.receive.wait.time", 20);
60
61 private final AmazonSQS amazonSQSClient;
62 private final MessageRunnerRegistryHelper messageRunnerRegistryHelper;
63 private final AtomicBoolean shuttingDown = new AtomicBoolean(false);
64 private final int receiveWaitTimeSeconds;
65 private final Map<String, ExecutorService> queueNameToConsumerThreadPools = new HashMap<>();
66 private final AtomicBoolean initialised = new AtomicBoolean(false);
67 private final MessageInformationService messageInformationService;
68 private final NestedMessageSerializer nestedMessageSerializer;
69 private final NestedMessageConsumer nestedMessageConsumer;
70 private final SQSMessageVisibilityTimeoutManager sqsMessageVisibilityTimeoutManager;
71 private final SQSMessageRunnerKeyToProducerMapper messageRunnerKeyToProducerMapper;
72 private final Set<SQSConsumerQueueConfig> consumerQueueConfigs;
73
74 private SQSMessageRunnerService(Builder builder) {
75 if (builder.messageValidatorRegistryHelper == null) {
76 builder.messageValidatorRegistryHelper = Collections::emptyList;
77 }
78
79 if (builder.tenantDataIdSupplier == null) {
80 builder.tenantDataIdSupplier = () -> "";
81 }
82
83 if (builder.receiveWaitTimeSeconds == null) {
84 builder.receiveWaitTimeSeconds = DEFAULT_RECEIVE_WAIT_TIME_SECONDS;
85 }
86
87 if (builder.sqsMessageVisibilityTimeoutManager == null) {
88 builder.sqsMessageVisibilityTimeoutManager = new SQSMessageVisibilityTimeoutManager(
89 requireNonNull(builder.amazonSQSClient));
90 }
91
92 this.amazonSQSClient = requireNonNull(builder.amazonSQSClient);
93 this.messageRunnerRegistryHelper = requireNonNull(builder.messageRunnerRegistryHelper);
94 this.receiveWaitTimeSeconds = builder.receiveWaitTimeSeconds;
95 this.nestedMessageSerializer = requireNonNull(builder.nestedMessageSerializer);
96 this.nestedMessageConsumer = new NestedMessageConsumer(messageRunnerRegistryHelper, requireNonNull(builder.tenantIdSetter), requireNonNull(builder.messageValidatorRegistryHelper));
97 this.messageRunnerKeyToProducerMapper = requireNonNull(builder.messageRunnerKeyToProducerMapper);
98 this.consumerQueueConfigs = requireNonNull(builder.consumerQueueConfigs);
99
100
101 consumerQueueConfigs.forEach(queueConfig -> {
102
103 final ThreadPoolExecutor queueExecutor = new ThreadPoolExecutor(queueConfig.getCorePoolSize(), queueConfig.getMaxPoolSize(), 0L, TimeUnit.MILLISECONDS,
104 new LinkedBlockingQueue<>(),
105 new DefaultThreadFactory("sqs-consumer-thread-" + queueConfig.getQueueName() + "-%d", (t, throwable) -> {
106 log.warn("SQS consumer thread '{}' died due to an exception.", t.getName(), throwable);
107 }));
108 queueNameToConsumerThreadPools.put(queueConfig.getQueueName(), queueExecutor);
109 });
110
111 if (builder.messageInformationService == null) {
112 builder.messageInformationService = new SQSMessageInformationService(messageRunnerKeyToProducerMapper,
113 requireNonNull(builder.tenantContextProvider, "Either provide a TenantContextProvider, or a MessageInformationService"),
114 requireNonNull(builder.nestedMessageSerializer, "Either provide a NestedMessageSerializer, or a MessageInformationService"),
115 requireNonNull(builder.tenantDataIdSupplier));
116 }
117 this.messageInformationService = requireNonNull(builder.messageInformationService);
118
119 this.sqsMessageVisibilityTimeoutManager = requireNonNull(builder.sqsMessageVisibilityTimeoutManager, "sqsMessageVisibilityTimeoutManager");
120 }
121
122 public synchronized void initialiseMessageConsumers() {
123 if (initialised.get()) {
124 log.info("SQS consumers have already been initialised. Skipping it.");
125 return;
126 }
127
128 log.info("Initialising SQS message consumers");
129
130 consumerQueueConfigs.forEach(queueConfig -> {
131 log.info("Constructing queue consumer thread pool (queueName: {}, concurrentConsumers: {}, queueUrl: {}, receiveWaitTimeSeconds: {})",
132 queueConfig.getQueueName(), queueConfig.getCorePoolSize(), queueConfig.getQueueUrl(), receiveWaitTimeSeconds);
133
134
135 final ExecutorService executorService = queueNameToConsumerThreadPools.get(queueConfig.getQueueName());
136 for (int i = 0; i < queueConfig.getCorePoolSize(); i++) {
137 executorService.execute(new SQSMessageConsumer(queueConfig, receiveWaitTimeSeconds, shuttingDown,
138 amazonSQSClient, nestedMessageSerializer, nestedMessageConsumer, sqsMessageVisibilityTimeoutManager));
139 }
140 });
141
142 initialised.compareAndSet(false, true);
143 }
144
145 @PreDestroy
146 public void shutdown() {
147 log.info("Shutting down {}", this.getClass().getSimpleName());
148
149 shuttingDown.compareAndSet(false, true);
150
151
152 queueNameToConsumerThreadPools.forEach((queueName, executor) -> executor.shutdown());
153
154 queueNameToConsumerThreadPools.forEach((queueName, executor) -> {
155 try {
156 if (!executor.awaitTermination(AWAIT_TERMINATION_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
157 log.warn("Pool for queue {} did not terminate in {} seconds", queueName, AWAIT_TERMINATION_TIMEOUT_SECONDS);
158 }
159 } catch (InterruptedException ie) {
160 Thread.currentThread().interrupt();
161 }
162 });
163
164 sqsMessageVisibilityTimeoutManager.shutdown();
165 }
166
167 @Override
168 public void addMessage(@Nonnull Message message) {
169 if (!messageRunnerRegistryHelper.getMessageRunner(message.getRunnerKey()).isPresent()) {
170 throw new MessageRunnerNotRegisteredException(message.getRunnerKey());
171 }
172
173 final SendMessageResult sendMessageResult;
174 final String queueUrl = messageInformationService.getQueueUrl(message.getRunnerKey());
175 try {
176 sendMessageResult = amazonSQSClient.sendMessage(new SendMessageRequest(queueUrl,
177 messageInformationService.toPayload(message)));
178 } catch (InvalidMessageContentsException e) {
179 throw new MessageRunnerServiceException(e);
180 } catch (UnsupportedOperationException e) {
181 throw new java.lang.UnsupportedOperationException(e);
182 }
183
184 if (log.isInfoEnabled()) {
185 MDC.put(MDC_MESSAGE_ID, sendMessageResult.getMessageId());
186 MDC.put(MDC_MESSAGE_RUNNER_KEY, message.getRunnerKey().toString());
187 try {
188 log.info("Message produced to {} (messageId: {}, messageRunnerKey: {})",
189 queueUrl, sendMessageResult.getMessageId(), message.getRunnerKey().toString());
190 } finally {
191 MDC.remove(MDC_MESSAGE_ID);
192 MDC.remove(MDC_MESSAGE_RUNNER_KEY);
193 }
194 }
195 }
196
197
198
199
200
201
202
203
204 public static Builder newBuilder(SQSMessageRunnerKeyToProducerMapper messageRunnerKeyToProducerMapper, Set<SQSConsumerQueueConfig> consumerQueueConfigs) {
205 return new Builder(messageRunnerKeyToProducerMapper, consumerQueueConfigs);
206 }
207
208
209
210
211
212
213
214
215 public static class Builder {
216 private AmazonSQS amazonSQSClient;
217 private MessageRunnerRegistryHelper messageRunnerRegistryHelper;
218 private TenantIdSetter tenantIdSetter;
219 private MessageInformationService messageInformationService;
220 private NestedMessageSerializer nestedMessageSerializer;
221 private MessageValidatorRegistryHelper messageValidatorRegistryHelper;
222 private Integer receiveWaitTimeSeconds;
223 private SQSMessageVisibilityTimeoutManager sqsMessageVisibilityTimeoutManager;
224 private TenantContextProvider<?> tenantContextProvider;
225 private TenantDataIdSupplier tenantDataIdSupplier;
226 private SQSMessageRunnerKeyToProducerMapper messageRunnerKeyToProducerMapper;
227 private Set<SQSConsumerQueueConfig> consumerQueueConfigs;
228
229 Builder(SQSMessageRunnerKeyToProducerMapper messageRunnerKeyToProducerMapper, Set<SQSConsumerQueueConfig> consumerQueueConfigs) {
230 this.messageRunnerKeyToProducerMapper = requireNonNull(messageRunnerKeyToProducerMapper);
231 this.consumerQueueConfigs = new HashSet<>(requireNonNull(consumerQueueConfigs));
232 }
233
234 public Builder withAmazonSQSClient(AmazonSQS amazonSQSClient) {
235 this.amazonSQSClient = amazonSQSClient;
236 return this;
237 }
238
239 public Builder withMessageRunnerRegistryHelper(MessageRunnerRegistryHelper messageRunnerRegistryHelper) {
240 this.messageRunnerRegistryHelper = messageRunnerRegistryHelper;
241 return this;
242 }
243
244 public Builder withTenantIdSetter(TenantIdSetter tenantIdSetter) {
245 this.tenantIdSetter = tenantIdSetter;
246 return this;
247 }
248
249 public Builder withMessageInformationService(MessageInformationService messageInformationService) {
250 this.messageInformationService = messageInformationService;
251 return this;
252 }
253
254 public Builder withNestedMessageSerializer(NestedMessageSerializer nestedMessageSerializer) {
255 this.nestedMessageSerializer = nestedMessageSerializer;
256 return this;
257 }
258
259 public Builder withMessageValidatorRegistryHelper(MessageValidatorRegistryHelper messageValidatorRegistryHelper) {
260 this.messageValidatorRegistryHelper = messageValidatorRegistryHelper;
261 return this;
262 }
263
264
265 Builder withReceiveWaitTimeSeconds(int receiveWaitTimeSeconds) {
266 this.receiveWaitTimeSeconds = receiveWaitTimeSeconds;
267 return this;
268 }
269
270 public Builder withSqsMessageVisibilityTimeoutManager(SQSMessageVisibilityTimeoutManager sqsMessageVisibilityTimeoutManager) {
271 this.sqsMessageVisibilityTimeoutManager = sqsMessageVisibilityTimeoutManager;
272 return this;
273 }
274
275 public Builder withTenantContextProvider(TenantContextProvider<?> tenantContextProvider) {
276 this.tenantContextProvider = tenantContextProvider;
277 return this;
278 }
279
280 public Builder withTenantDataIdSupplier(TenantDataIdSupplier tenantDataIdSupplier) {
281 this.tenantDataIdSupplier = tenantDataIdSupplier;
282 return this;
283 }
284
285 public SQSMessageRunnerService build() {
286 return new SQSMessageRunnerService(this);
287 }
288 }
289
290
291 }