1 package com.atlassian.messagequeue.internal.sqs;
2
3 import com.amazonaws.services.sqs.AmazonSQS;
4 import com.amazonaws.services.sqs.model.MessageNotInflightException;
5 import com.amazonaws.services.sqs.model.ReceiptHandleIsInvalidException;
6 import com.atlassian.messagequeue.internal.core.DefaultThreadFactory;
7 import org.slf4j.Logger;
8 import org.slf4j.LoggerFactory;
9
10 import java.util.concurrent.Future;
11 import java.util.concurrent.ScheduledThreadPoolExecutor;
12 import java.util.concurrent.TimeUnit;
13 import java.util.concurrent.atomic.AtomicInteger;
14
15 import static com.atlassian.messagequeue.internal.sqs.SQSMessageRunnerService.AWAIT_TERMINATION_TIMEOUT_SECONDS;
16 import static java.util.Objects.requireNonNull;
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54 class SQSMessageVisibilityTimeoutManager {
55 private static final Logger log = LoggerFactory.getLogger(SQSMessageVisibilityTimeoutManager.class);
56
57 private static final int VISIBILITY_TIMEOUT_EXTENSION_WORKERS = Integer.getInteger("amq.sqs.visibility.timeout.extension.workers", Runtime.getRuntime().availableProcessors());
58
59
60
61
62
63
64
65
66
67
68
69
70
71 private static final int VISIBILITY_TIMEOUT_EXTENSION_SCHEDULING_BUFFER_SECONDS = Integer.getInteger("amq.sqs.visibility.timeout.extension.scheduling.buffer", 15);
72
73 final ScheduledThreadPoolExecutor visibilityTimeoutExtensionWorkerPool;
74
75 private final AmazonSQS amazonSQSClient;
76 private final int schedulingBufferSeconds;
77
78 SQSMessageVisibilityTimeoutManager(AmazonSQS amazonSQSClient) {
79 this(amazonSQSClient, VISIBILITY_TIMEOUT_EXTENSION_WORKERS, VISIBILITY_TIMEOUT_EXTENSION_SCHEDULING_BUFFER_SECONDS);
80 }
81
82 SQSMessageVisibilityTimeoutManager(AmazonSQS amazonSQSClient, int visibilityTimeoutExtensionWorkers) {
83 this(amazonSQSClient, visibilityTimeoutExtensionWorkers, VISIBILITY_TIMEOUT_EXTENSION_SCHEDULING_BUFFER_SECONDS);
84 }
85
86
87
88
89 SQSMessageVisibilityTimeoutManager(AmazonSQS amazonSQSClient,
90 int visibilityTimeoutExtensionWorkers,
91 int schedulingBufferSeconds) {
92 this.amazonSQSClient = amazonSQSClient;
93 if (visibilityTimeoutExtensionWorkers <= 0) {
94 throw new IllegalArgumentException("visibilityTimeoutExtensionWorkers must be greater than 0. Received: " + visibilityTimeoutExtensionWorkers);
95 }
96 this.visibilityTimeoutExtensionWorkerPool = new ScheduledThreadPoolExecutor(visibilityTimeoutExtensionWorkers,
97 new DefaultThreadFactory("sqs-visibility-timeout-extension-thread-%d", (t, throwable) -> {
98 log.warn("SQS visibility timeout extension thread '{}' died due to an exception.", t.getName(), throwable);
99 }));
100 visibilityTimeoutExtensionWorkerPool.setRemoveOnCancelPolicy(true);
101 this.schedulingBufferSeconds = schedulingBufferSeconds;
102 }
103
104
105
106
107
108
109
110
111 Future<?> scheduleVisibilityTimeoutExtension(SQSConsumerQueueConfig queueConfig, String receiptHandle) {
112 return visibilityTimeoutExtensionWorkerPool.scheduleAtFixedRate(new VisibilityTimeoutExtender(queueConfig, receiptHandle),
113 queueConfig.getVisibilityExtensionPeriod(),
114 queueConfig.getVisibilityExtensionPeriod(),
115 TimeUnit.SECONDS);
116 }
117
118 private class VisibilityTimeoutExtender implements Runnable {
119 private final AtomicInteger numberOfExtensions = new AtomicInteger(0);
120 private final SQSConsumerQueueConfig queueConfig;
121 private final String receiptHandle;
122
123 VisibilityTimeoutExtender(SQSConsumerQueueConfig queueConfig, String receiptHandle) {
124 this.queueConfig = requireNonNull(queueConfig, "queueConfig");
125 this.receiptHandle = requireNonNull(receiptHandle, "receiptHandle");
126 }
127
128 @Override
129 public void run() {
130 try {
131 amazonSQSClient.changeMessageVisibility(queueConfig.getQueueUrl(), receiptHandle,
132 queueConfig.getVisibilityExtensionPeriod() + schedulingBufferSeconds);
133 } catch (MessageNotInflightException e) {
134 log.warn("Message visibility timeout extension failed: Message is not in flight.", e);
135 throw e;
136 } catch (ReceiptHandleIsInvalidException e) {
137 log.warn("Message visibility timeout extension failed: Receipt handle {} is invalid.", receiptHandle, e);
138 throw e;
139 }
140
141 numberOfExtensions.incrementAndGet();
142
143 if (log.isInfoEnabled()) {
144 final int elapsedTimeSinceMessageReceiptSeconds = queueConfig.getVisibilityExtensionPeriod() + (numberOfExtensions.get() * queueConfig.getVisibilityExtensionPeriod());
145 final int totalVisibilityTimeoutSeconds =
146 elapsedTimeSinceMessageReceiptSeconds + queueConfig.getVisibilityExtensionPeriod() + schedulingBufferSeconds;
147
148 log.info("Message visibility timeout extended by {} seconds (elapsedTimeSinceMessageReceiptSeconds: {}, totalVisibilityTimeoutSeconds: {}, numberOfExtensions: {}, receiptHandle: {})",
149 queueConfig.getVisibilityExtensionPeriod() + schedulingBufferSeconds, elapsedTimeSinceMessageReceiptSeconds, totalVisibilityTimeoutSeconds, numberOfExtensions.get(), receiptHandle);
150 }
151 }
152 }
153
154
155
156
157 int getVisibilityTimeoutSeconds(SQSConsumerQueueConfig queueConfig) {
158 return queueConfig.getVisibilityExtensionPeriod() + schedulingBufferSeconds;
159 }
160
161 void shutdown() {
162 visibilityTimeoutExtensionWorkerPool.shutdown();
163 try {
164 if (!visibilityTimeoutExtensionWorkerPool.awaitTermination(AWAIT_TERMINATION_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
165 log.warn("Pool did not terminate in {} seconds", AWAIT_TERMINATION_TIMEOUT_SECONDS);
166 }
167 } catch (InterruptedException ie) {
168 Thread.currentThread().interrupt();
169 }
170 }
171 }