View Javadoc

1   package com.atlassian.messagequeue.internal.sqs;
2   
3   import com.amazonaws.services.sqs.AmazonSQSClient;
4   import com.amazonaws.services.sqs.model.MessageNotInflightException;
5   import com.amazonaws.services.sqs.model.ReceiptHandleIsInvalidException;
6   import org.slf4j.Logger;
7   import org.slf4j.LoggerFactory;
8   
9   import java.util.concurrent.Future;
10  import java.util.concurrent.ScheduledThreadPoolExecutor;
11  import java.util.concurrent.TimeUnit;
12  import java.util.concurrent.atomic.AtomicInteger;
13  
14  import static com.atlassian.messagequeue.internal.sqs.SQSMessageRunnerService.AWAIT_TERMINATION_TIMEOUT_SECONDS;
15  import static java.util.Objects.requireNonNull;
16  
17  /**
18   * Responsible for automatically extending the visibility timeout for the receipt of a message.
19   *
20   * <p>Algorithm:
21   * <pre>
22   *
23   *                       extend
24   *               +-------------------+ (sqs-visibility-timeout-extension-thread)
25   *
26   * +--receive----X-----+-------X-----+-------X-----+----> (sqs-consumer-thread)
27   *
28   *                             +-------------------+ (sqs-visibility-timeout-extension-thread)
29   *                                     extend
30   *
31   *
32   * |<-----t----->|<-----t----->|<-----t----->|<-----t----->|
33   *               |<-b->|       |<-b->|       |<-b->|
34   *
35   *
36   *
37   *    t = period (time between successive attempts to extend the visibility timeout)
38   *    b = buffer (time for executing the task that extends the visibility timeout)
39   * </pre>
40   *
41   * <ol>
42   *     <li>Receive message from SQS with a visibility timeout of {@code t + b} on the SQS consumer thread</li>
43   *     <li>Schedule a task to extend the visibility timeout after a delay of {@code t} (and then again at {@code t * 2} and {@code t * 3} and so on).
44   *     This task will execute concurrent to the SQS consumer thread</li>
45   *     <li>When executed, an enabled task will extend the visibility timeout by {@code t + b}. The value of {@code b} should be large enough to allow the
46   *     next extension of the visibility timeout to execute</li>
47   *     <li>Continue extending the visibility timeout periodically until the message is acknowledged, message processing terminates or there is an
48   *     error extending the visibility timeout (e.g. when extending beyond the limit allowed or for a message that has been deleted)</li>
49   * </ol>
50   *
51   * @see <a href="http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/AboutVT.html">Visibility Timeouts</a>
52   */
53  class SQSMessageVisibilityTimeoutManager {
54      private static final Logger log = LoggerFactory.getLogger(SQSMessageVisibilityTimeoutManager.class);
55  
56      private static final int VISIBILITY_TIMEOUT_EXTENSION_WORKERS = Integer.getInteger("amq.sqs.visibility.timeout.extension.workers", Runtime.getRuntime().availableProcessors());
57  
58      /**
59       * Defines the amount of time (in seconds) to allow an enabled scheduled task to be executed (this includes the delay
60       * before the task is scheduled onto a thread for execution and the time for the task itself to execute to completion).
61       *
62       * <p>There are no real-time guarantees around when enabled scheduled tasks will begin execution (except that they won't execute
63       * before the scheduled time).
64       *
65       * <p>A scheduled task is <em>enabled</em> in the sense that the required delay before it is to be executed has elapsed
66       * and it is ready for execution.
67       *
68       * @see ScheduledThreadPoolExecutor
69       */
70      private static final int VISIBILITY_TIMEOUT_EXTENSION_SCHEDULING_BUFFER_SECONDS = Integer.getInteger("amq.sqs.visibility.timeout.extension.scheduling.buffer", 15);
71  
72      /**
73       * the time interval in seconds between successive calls to SQS API to extend the visibility timeout for the receipt of a message
74       */
75      private static final int PERIOD_SECONDS = Integer.getInteger("amq.sqs.visibility.timeout.extension.period", 60);
76  
77      final ScheduledThreadPoolExecutor visibilityTimeoutExtensionWorkerPool;
78  
79      private final int periodSeconds;
80      private final int visibilityTimeoutSeconds;
81  
82      private final AmazonSQSClient amazonSQSClient;
83      private final String queueUrl;
84  
85      SQSMessageVisibilityTimeoutManager(AmazonSQSClient amazonSQSClient, String queueUrl) {
86          this(amazonSQSClient, queueUrl, PERIOD_SECONDS, VISIBILITY_TIMEOUT_EXTENSION_SCHEDULING_BUFFER_SECONDS, VISIBILITY_TIMEOUT_EXTENSION_WORKERS);
87      }
88  
89      /**
90       * visible for testing
91       */
92      SQSMessageVisibilityTimeoutManager(AmazonSQSClient amazonSQSClient, String queueUrl,
93                                         int periodSeconds,
94                                         int schedulingBufferSeconds,
95                                         int visibilityTimeoutExtensionWorkers) {
96          this.queueUrl = queueUrl;
97          this.amazonSQSClient = amazonSQSClient;
98          this.periodSeconds = periodSeconds;
99          if (visibilityTimeoutExtensionWorkers <= 0) {
100             throw new IllegalArgumentException("visibilityTimeoutExtensionWorkers must be greater than 0. Received: " + visibilityTimeoutExtensionWorkers);
101         }
102         this.visibilityTimeoutExtensionWorkerPool = new ScheduledThreadPoolExecutor(visibilityTimeoutExtensionWorkers,
103                 new DefaultThreadFactory("sqs-visibility-timeout-extension-thread-%d", (t, throwable) -> {
104                     log.warn("SQS visibility timeout extension thread '{}' died due to an exception.", t.getName(), throwable);
105                 }));
106         visibilityTimeoutExtensionWorkerPool.setRemoveOnCancelPolicy(true); // we are not inspecting the task queue and desire to remove tasks as soon as they are cancelled (not after the required delay elapses)
107 
108         if (schedulingBufferSeconds <= 0) {
109             throw new IllegalArgumentException("schedulingBufferSeconds must be greater than 0. Received: " + schedulingBufferSeconds);
110         }
111         this.visibilityTimeoutSeconds = periodSeconds + schedulingBufferSeconds;
112 
113         log.info("Constructing {} (visibilityTimeoutSeconds: {}, periodSeconds: {}, schedulingBufferSeconds: {}, visibilityTimeoutExtensionWorkers: {})",
114                 this.getClass().getSimpleName(), visibilityTimeoutSeconds, periodSeconds, schedulingBufferSeconds, visibilityTimeoutExtensionWorkers);
115 
116     }
117 
118     /**
119      * Schedules an extension of the visibility timeout for a particular receipt of a message (identifed by the specified {@code receiptHandle}).
120      *
121      * @param receiptHandle identifies a particular receipt of a message
122      *
123      * @return a {@code Future} that represents the action to extend the visibility timeout (you can cancel or query the status of this action via this reference)
124      */
125     Future<?> scheduleVisibilityTimeoutExtension(String receiptHandle) {
126         return visibilityTimeoutExtensionWorkerPool.scheduleAtFixedRate(new VisibilityTimeoutExtender(receiptHandle),
127                 periodSeconds,
128                 periodSeconds,
129                 TimeUnit.SECONDS);
130     }
131 
132     private class VisibilityTimeoutExtender implements Runnable {
133         private final AtomicInteger numberOfExtensions = new AtomicInteger(0);
134         private final String receiptHandle;
135 
136         VisibilityTimeoutExtender(String receiptHandle) {
137             this.receiptHandle = requireNonNull(receiptHandle, "receiptHandle");
138         }
139 
140         @Override
141         public void run() {
142             try {
143                 amazonSQSClient.changeMessageVisibility(queueUrl, receiptHandle, visibilityTimeoutSeconds);
144             } catch (MessageNotInflightException e) {
145                 log.warn("Message visibility timeout extension failed: Message is not in flight.", e);
146                 throw e;
147             } catch (ReceiptHandleIsInvalidException e) {
148                 log.warn("Message visibility timeout extension failed: Receipt handle {} is invalid.", receiptHandle, e);
149                 throw e;
150             }
151 
152             numberOfExtensions.incrementAndGet();
153 
154             if (log.isInfoEnabled()) {
155                 final int elapsedTimeSinceMessageReceiptSeconds = periodSeconds + (numberOfExtensions.get() * periodSeconds);
156                 final int totalVisibilityTimeoutSeconds =
157                         elapsedTimeSinceMessageReceiptSeconds + visibilityTimeoutSeconds;
158 
159                 log.info("Message visibility timeout extended by {} seconds (elapsedTimeSinceMessageReceiptSeconds: {}, totalVisibilityTimeoutSeconds: {}, numberOfExtensions: {}, receiptHandle: {})",
160                         visibilityTimeoutSeconds, elapsedTimeSinceMessageReceiptSeconds, totalVisibilityTimeoutSeconds, numberOfExtensions.get(), receiptHandle);
161             }
162         }
163     }
164 
165     /**
166      * @return the visibility timeout in seconds that is used for (a) receiving a message (b) extending the visibility timeout of a received message
167      */
168     int getVisibilityTimeoutSeconds() {
169         return visibilityTimeoutSeconds;
170     }
171 
172     void shutdown() {
173         visibilityTimeoutExtensionWorkerPool.shutdown();
174         try {
175             if (!visibilityTimeoutExtensionWorkerPool.awaitTermination(AWAIT_TERMINATION_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
176                 log.warn("Pool did not terminate in {} seconds", AWAIT_TERMINATION_TIMEOUT_SECONDS);
177             }
178         } catch (InterruptedException ie) {
179             Thread.currentThread().interrupt();
180         }
181     }
182 }