1 package com.atlassian.messagequeue.internal.sqs;
2
3 import com.amazonaws.services.sqs.AmazonSQS;
4 import com.amazonaws.services.sqs.model.InvalidIdFormatException;
5 import com.amazonaws.services.sqs.model.MessageNotInflightException;
6 import com.amazonaws.services.sqs.model.ReceiptHandleIsInvalidException;
7 import com.atlassian.messagequeue.MessageAcknowledgementException;
8 import com.atlassian.messagequeue.MessageRunnerServiceException;
9 import com.atlassian.messagequeue.registry.MessageContext;
10 import org.slf4j.Logger;
11 import org.slf4j.LoggerFactory;
12
13 import javax.annotation.Nonnull;
14 import javax.annotation.Nullable;
15 import java.time.Duration;
16 import java.util.Optional;
17 import java.util.concurrent.Future;
18 import java.util.concurrent.TimeUnit;
19 import java.util.concurrent.atomic.AtomicBoolean;
20
21 import static java.util.Objects.requireNonNull;
22
23
24
25
26 class SQSMessageContext implements MessageContext {
27 static final long TWELVE_HOURS_AS_SECONDS = Duration.ofHours(12).getSeconds();
28
29 private static final Logger log = LoggerFactory.getLogger(SQSMessageContext.class);
30
31 private final AmazonSQS amazonSQSClient;
32 private final String messageId;
33 @Nullable
34 private final String payload;
35 private final String queueUrl;
36 private final String receiptHandle;
37 private final AtomicBoolean acknowledged = new AtomicBoolean(false);
38 private final AtomicBoolean shuttingDown;
39 private boolean autoAcknowledgeMessage = true;
40 private final Future<?> visibilityTimeoutExtensionFuture;
41
42 SQSMessageContext(@Nonnull String messageId, @Nullable String payload, AmazonSQS amazonSQSClient,
43 String queueUrl, String receiptHandle, AtomicBoolean shuttingDown, Future<?> visibilityTimeoutExtensionFuture) {
44 this.messageId = requireNonNull(messageId, "messageId");
45 this.payload = payload;
46 this.amazonSQSClient = requireNonNull(amazonSQSClient, "amazonSQSClient");
47 this.queueUrl = requireNonNull(queueUrl, "queueUrl");
48 this.receiptHandle = requireNonNull(receiptHandle, "receiptHandle");
49 this.shuttingDown = requireNonNull(shuttingDown, "shuttingDown");
50 this.visibilityTimeoutExtensionFuture = requireNonNull(visibilityTimeoutExtensionFuture, "visibilityTimeoutExtensionFuture");
51 }
52
53 @Override
54 public Optional<String> getMessageId() {
55 return Optional.of(messageId);
56 }
57
58 @Override
59 public Optional<String> getPayload() {
60 return Optional.ofNullable(payload);
61 }
62
63 @Override
64 public void acknowledge() {
65 if (acknowledged.compareAndSet(false, true)) {
66 try {
67 amazonSQSClient.deleteMessage(queueUrl, receiptHandle);
68
69 if (log.isInfoEnabled()) {
70 log.info("Message acknowledged (messageId: {}, receiptHandle: {})", messageId, receiptHandle);
71 }
72 } catch (ReceiptHandleIsInvalidException | InvalidIdFormatException e) {
73 throw new MessageAcknowledgementException("Error acknowledging message: receipt handle invalid " + receiptHandle, e);
74 }
75
76 if (!visibilityTimeoutExtensionFuture.isDone()) {
77 final boolean cancelled = visibilityTimeoutExtensionFuture.cancel(true);
78 log.info("Cancelled extension of visibility timeout (in response to message acknowledgement). Cancellation status: {}", cancelled);
79 }
80 }
81 }
82
83 @Override
84 public boolean isCancellationRequested() {
85 return shuttingDown.get();
86 }
87
88 synchronized void changeMessageProcessingTimeout(long timeout, TimeUnit timeUnit) {
89 final long timeoutAsSeconds = timeUnit.toSeconds(timeout);
90 if (timeoutAsSeconds > TWELVE_HOURS_AS_SECONDS) {
91 throw new IllegalArgumentException("Timeout too high. Ensure timeout is, in seconds, between 0 to 43200 (12 hours)");
92 }
93
94 try {
95 amazonSQSClient.changeMessageVisibility(queueUrl, receiptHandle, (int) timeoutAsSeconds);
96 } catch (MessageNotInflightException | ReceiptHandleIsInvalidException e) {
97 throw new MessageRunnerServiceException("Error changing message processing timeout: ", e);
98 }
99 }
100
101 @Override
102 public boolean shouldAutoAcknowledgeMessage() {
103 return autoAcknowledgeMessage;
104 }
105
106 @Override
107 public void cancelAutoAcknowledgementOfMessage() {
108 autoAcknowledgeMessage = false;
109 }
110 }