View Javadoc

1   package com.atlassian.messagequeue.internal.sqs;
2   
3   import com.amazonaws.services.sqs.AmazonSQS;
4   import com.amazonaws.services.sqs.model.InvalidIdFormatException;
5   import com.amazonaws.services.sqs.model.MessageNotInflightException;
6   import com.amazonaws.services.sqs.model.ReceiptHandleIsInvalidException;
7   import com.atlassian.messagequeue.MessageAcknowledgementException;
8   import com.atlassian.messagequeue.MessageRunnerServiceException;
9   import com.atlassian.messagequeue.registry.MessageContext;
10  import org.slf4j.Logger;
11  import org.slf4j.LoggerFactory;
12  
13  import javax.annotation.Nonnull;
14  import javax.annotation.Nullable;
15  import java.time.Duration;
16  import java.util.Optional;
17  import java.util.concurrent.Future;
18  import java.util.concurrent.TimeUnit;
19  import java.util.concurrent.atomic.AtomicBoolean;
20  
21  import static java.util.Objects.requireNonNull;
22  
23  /**
24   * SQS message context.
25   */
26  class SQSMessageContext implements MessageContext {
27      static final long TWELVE_HOURS_AS_SECONDS = Duration.ofHours(12).getSeconds();
28  
29      private static final Logger log = LoggerFactory.getLogger(SQSMessageContext.class);
30  
31      private final AmazonSQS amazonSQSClient;
32      private final String messageId;
33      @Nullable
34      private final String payload;
35      private final String queueUrl;
36      private final String receiptHandle;
37      private final AtomicBoolean acknowledged = new AtomicBoolean(false);
38      private final AtomicBoolean shuttingDown;
39      private boolean autoAcknowledgeMessage = true;
40      private final Future<?> visibilityTimeoutExtensionFuture;
41  
42      SQSMessageContext(@Nonnull String messageId, @Nullable String payload, AmazonSQS amazonSQSClient,
43                               String queueUrl, String receiptHandle, AtomicBoolean shuttingDown, Future<?> visibilityTimeoutExtensionFuture) {
44          this.messageId = requireNonNull(messageId, "messageId");
45          this.payload = payload;
46          this.amazonSQSClient = requireNonNull(amazonSQSClient, "amazonSQSClient");
47          this.queueUrl = requireNonNull(queueUrl, "queueUrl");
48          this.receiptHandle = requireNonNull(receiptHandle, "receiptHandle");
49          this.shuttingDown = requireNonNull(shuttingDown, "shuttingDown");
50          this.visibilityTimeoutExtensionFuture = requireNonNull(visibilityTimeoutExtensionFuture, "visibilityTimeoutExtensionFuture");
51      }
52  
53      @Override
54      public Optional<String> getMessageId() {
55          return Optional.of(messageId);
56      }
57  
58      @Override
59      public Optional<String> getPayload() {
60          return Optional.ofNullable(payload);
61      }
62  
63      @Override
64      public void acknowledge() {
65          if (acknowledged.compareAndSet(false, true)) {
66              try {
67                  amazonSQSClient.deleteMessage(queueUrl, receiptHandle);
68  
69                  if (log.isInfoEnabled()) {
70                      log.info("Message acknowledged (messageId: {}, receiptHandle: {})", messageId, receiptHandle);
71                  }
72              } catch (ReceiptHandleIsInvalidException | InvalidIdFormatException e) {
73                  throw new MessageAcknowledgementException("Error acknowledging message: receipt handle invalid " + receiptHandle, e);
74              }
75  
76              if (!visibilityTimeoutExtensionFuture.isDone()) {
77                  final boolean cancelled = visibilityTimeoutExtensionFuture.cancel(true);
78                  log.info("Cancelled extension of visibility timeout (in response to message acknowledgement). Cancellation status: {}", cancelled);
79              }
80          }
81      }
82  
83      @Override
84      public boolean isCancellationRequested() {
85          return shuttingDown.get();
86      }
87  
88      synchronized void changeMessageProcessingTimeout(long timeout, TimeUnit timeUnit) {
89          final long timeoutAsSeconds = timeUnit.toSeconds(timeout);
90          if (timeoutAsSeconds > TWELVE_HOURS_AS_SECONDS) {
91              throw new IllegalArgumentException("Timeout too high. Ensure timeout is, in seconds, between 0 to 43200 (12 hours)");
92          }
93  
94          try {
95              amazonSQSClient.changeMessageVisibility(queueUrl, receiptHandle, (int) timeoutAsSeconds); // safe cast since legal values must be less than 43200
96          } catch (MessageNotInflightException | ReceiptHandleIsInvalidException e) {
97              throw new MessageRunnerServiceException("Error changing message processing timeout: ", e);
98          }
99      }
100 
101     @Override
102     public boolean shouldAutoAcknowledgeMessage() {
103         return autoAcknowledgeMessage;
104     }
105 
106     @Override
107     public void cancelAutoAcknowledgementOfMessage() {
108         autoAcknowledgeMessage = false;
109     }
110 }