View Javadoc

1   package com.atlassian.messagequeue.internal.sqs;
2   
3   import com.amazonaws.services.sqs.AmazonSQS;
4   import com.amazonaws.services.sqs.model.MessageNotInflightException;
5   import com.amazonaws.services.sqs.model.ReceiptHandleIsInvalidException;
6   import com.atlassian.messagequeue.internal.core.DefaultThreadFactory;
7   import org.slf4j.Logger;
8   import org.slf4j.LoggerFactory;
9   
10  import java.util.concurrent.Future;
11  import java.util.concurrent.ScheduledThreadPoolExecutor;
12  import java.util.concurrent.TimeUnit;
13  import java.util.concurrent.atomic.AtomicInteger;
14  
15  import static com.atlassian.messagequeue.internal.sqs.SQSMessageRunnerService.AWAIT_TERMINATION_TIMEOUT_SECONDS;
16  import static java.util.Objects.requireNonNull;
17  
18  /**
19   * Responsible for automatically extending the visibility timeout for the receipt of a message.
20   *
21   * <p>Algorithm:
22   * <pre>
23   *
24   *                       extend
25   *               +-------------------+ (sqs-visibility-timeout-extension-thread)
26   *
27   * +--receive----X-----+-------X-----+-------X-----+----> (sqs-consumer-thread)
28   *
29   *                             +-------------------+ (sqs-visibility-timeout-extension-thread)
30   *                                     extend
31   *
32   *
33   * |<-----t----->|<-----t----->|<-----t----->|<-----t----->|
34   *               |<-b->|       |<-b->|       |<-b->|
35   *
36   *
37   *
38   *    t = period (time between successive attempts to extend the visibility timeout)
39   *    b = buffer (time for executing the task that extends the visibility timeout)
40   * </pre>
41   *
42   * <ol>
43   *     <li>Receive message from SQS with a visibility timeout of {@code t + b} on the SQS consumer thread</li>
44   *     <li>Schedule a task to extend the visibility timeout after a delay of {@code t} (and then again at {@code t * 2} and {@code t * 3} and so on).
45   *     This task will execute concurrent to the SQS consumer thread</li>
46   *     <li>When executed, an enabled task will extend the visibility timeout by {@code t + b}. The value of {@code b} should be large enough to allow the
47   *     next extension of the visibility timeout to execute</li>
48   *     <li>Continue extending the visibility timeout periodically until the message is acknowledged, message processing terminates or there is an
49   *     error extending the visibility timeout (e.g. when extending beyond the limit allowed or for a message that has been deleted)</li>
50   * </ol>
51   *
52   * @see <a href="http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/AboutVT.html">Visibility Timeouts</a>
53   */
54  class SQSMessageVisibilityTimeoutManager {
55      private static final Logger log = LoggerFactory.getLogger(SQSMessageVisibilityTimeoutManager.class);
56  
57      private static final int VISIBILITY_TIMEOUT_EXTENSION_WORKERS = Integer.getInteger("amq.sqs.visibility.timeout.extension.workers", Runtime.getRuntime().availableProcessors());
58  
59      /**
60       * Defines the amount of time (in seconds) to allow an enabled scheduled task to be executed (this includes the delay
61       * before the task is scheduled onto a thread for execution and the time for the task itself to execute to completion).
62       *
63       * <p>There are no real-time guarantees around when enabled scheduled tasks will begin execution (except that they won't execute
64       * before the scheduled time).
65       *
66       * <p>A scheduled task is <em>enabled</em> in the sense that the required delay before it is to be executed has elapsed
67       * and it is ready for execution.
68       *
69       * @see ScheduledThreadPoolExecutor
70       */
71      private static final int VISIBILITY_TIMEOUT_EXTENSION_SCHEDULING_BUFFER_SECONDS = Integer.getInteger("amq.sqs.visibility.timeout.extension.scheduling.buffer", 15);
72  
73      final ScheduledThreadPoolExecutor visibilityTimeoutExtensionWorkerPool;
74  
75      private final AmazonSQS amazonSQSClient;
76      private final int schedulingBufferSeconds;
77  
78      SQSMessageVisibilityTimeoutManager(AmazonSQS amazonSQSClient) {
79          this(amazonSQSClient, VISIBILITY_TIMEOUT_EXTENSION_WORKERS, VISIBILITY_TIMEOUT_EXTENSION_SCHEDULING_BUFFER_SECONDS);
80      }
81  
82      SQSMessageVisibilityTimeoutManager(AmazonSQS amazonSQSClient, int visibilityTimeoutExtensionWorkers) {
83          this(amazonSQSClient, visibilityTimeoutExtensionWorkers, VISIBILITY_TIMEOUT_EXTENSION_SCHEDULING_BUFFER_SECONDS);
84      }
85  
86      /**
87       * visible for testing
88       */
89      SQSMessageVisibilityTimeoutManager(AmazonSQS amazonSQSClient,
90                                         int visibilityTimeoutExtensionWorkers,
91                                         int schedulingBufferSeconds) {
92          this.amazonSQSClient = amazonSQSClient;
93          if (visibilityTimeoutExtensionWorkers <= 0) {
94              throw new IllegalArgumentException("visibilityTimeoutExtensionWorkers must be greater than 0. Received: " + visibilityTimeoutExtensionWorkers);
95          }
96          this.visibilityTimeoutExtensionWorkerPool = new ScheduledThreadPoolExecutor(visibilityTimeoutExtensionWorkers,
97                  new DefaultThreadFactory("sqs-visibility-timeout-extension-thread-%d", (t, throwable) -> {
98                      log.warn("SQS visibility timeout extension thread '{}' died due to an exception.", t.getName(), throwable);
99                  }));
100         visibilityTimeoutExtensionWorkerPool.setRemoveOnCancelPolicy(true); // we are not inspecting the task queue and desire to remove tasks as soon as they are cancelled (not after the required delay elapses)
101         this.schedulingBufferSeconds = schedulingBufferSeconds;
102     }
103 
104     /**
105      * Schedules an extension of the visibility timeout for a particular receipt of a message (identifed by the specified {@code receiptHandle}).
106      *
107      * @param receiptHandle identifies a particular receipt of a message
108      *
109      * @return a {@code Future} that represents the action to extend the visibility timeout (you can cancel or query the status of this action via this reference)
110      */
111     Future<?> scheduleVisibilityTimeoutExtension(SQSConsumerQueueConfig queueConfig, String receiptHandle) {
112         return visibilityTimeoutExtensionWorkerPool.scheduleAtFixedRate(new VisibilityTimeoutExtender(queueConfig, receiptHandle),
113                 queueConfig.getVisibilityExtensionPeriod(),
114                 queueConfig.getVisibilityExtensionPeriod(),
115                 TimeUnit.SECONDS);
116     }
117 
118     private class VisibilityTimeoutExtender implements Runnable {
119         private final AtomicInteger numberOfExtensions = new AtomicInteger(0);
120         private final SQSConsumerQueueConfig queueConfig;
121         private final String receiptHandle;
122 
123         VisibilityTimeoutExtender(SQSConsumerQueueConfig queueConfig, String receiptHandle) {
124             this.queueConfig = requireNonNull(queueConfig, "queueConfig");
125             this.receiptHandle = requireNonNull(receiptHandle, "receiptHandle");
126         }
127 
128         @Override
129         public void run() {
130             try {
131                 amazonSQSClient.changeMessageVisibility(queueConfig.getQueueUrl(), receiptHandle,
132                         queueConfig.getVisibilityExtensionPeriod() + schedulingBufferSeconds);
133             } catch (MessageNotInflightException e) {
134                 log.warn("Message visibility timeout extension failed: Message is not in flight.", e);
135                 throw e;
136             } catch (ReceiptHandleIsInvalidException e) {
137                 log.warn("Message visibility timeout extension failed: Receipt handle {} is invalid.", receiptHandle, e);
138                 throw e;
139             }
140 
141             numberOfExtensions.incrementAndGet();
142 
143             if (log.isInfoEnabled()) {
144                 final int elapsedTimeSinceMessageReceiptSeconds = queueConfig.getVisibilityExtensionPeriod() + (numberOfExtensions.get() * queueConfig.getVisibilityExtensionPeriod());
145                 final int totalVisibilityTimeoutSeconds =
146                         elapsedTimeSinceMessageReceiptSeconds + queueConfig.getVisibilityExtensionPeriod() + schedulingBufferSeconds;
147 
148                 log.info("Message visibility timeout extended by {} seconds (elapsedTimeSinceMessageReceiptSeconds: {}, totalVisibilityTimeoutSeconds: {}, numberOfExtensions: {}, receiptHandle: {})",
149                         queueConfig.getVisibilityExtensionPeriod() + schedulingBufferSeconds, elapsedTimeSinceMessageReceiptSeconds, totalVisibilityTimeoutSeconds, numberOfExtensions.get(), receiptHandle);
150             }
151         }
152     }
153 
154     /**
155      * @return the visibility timeout in seconds that is used for (a) receiving a message (b) extending the visibility timeout of a received message
156      */
157     int getVisibilityTimeoutSeconds(SQSConsumerQueueConfig queueConfig) {
158         return queueConfig.getVisibilityExtensionPeriod() + schedulingBufferSeconds;
159     }
160 
161     void shutdown() {
162         visibilityTimeoutExtensionWorkerPool.shutdown();
163         try {
164             if (!visibilityTimeoutExtensionWorkerPool.awaitTermination(AWAIT_TERMINATION_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
165                 log.warn("Pool did not terminate in {} seconds", AWAIT_TERMINATION_TIMEOUT_SECONDS);
166             }
167         } catch (InterruptedException ie) {
168             Thread.currentThread().interrupt();
169         }
170     }
171 }