1 package com.atlassian.messagequeue.internal.sqs;
2
3 import com.amazonaws.services.sqs.AmazonSQSClient;
4 import com.amazonaws.services.sqs.model.MessageNotInflightException;
5 import com.amazonaws.services.sqs.model.ReceiptHandleIsInvalidException;
6 import org.slf4j.Logger;
7 import org.slf4j.LoggerFactory;
8
9 import java.util.concurrent.Future;
10 import java.util.concurrent.ScheduledThreadPoolExecutor;
11 import java.util.concurrent.TimeUnit;
12 import java.util.concurrent.atomic.AtomicInteger;
13
14 import static com.atlassian.messagequeue.internal.sqs.SQSMessageRunnerService.AWAIT_TERMINATION_TIMEOUT_SECONDS;
15 import static java.util.Objects.requireNonNull;
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53 class SQSMessageVisibilityTimeoutManager {
54 private static final Logger log = LoggerFactory.getLogger(SQSMessageVisibilityTimeoutManager.class);
55
56 private static final int VISIBILITY_TIMEOUT_EXTENSION_WORKERS = Integer.getInteger("amq.sqs.visibility.timeout.extension.workers", Runtime.getRuntime().availableProcessors());
57
58
59
60
61
62
63
64
65
66
67
68
69
70 private static final int VISIBILITY_TIMEOUT_EXTENSION_SCHEDULING_BUFFER_SECONDS = Integer.getInteger("amq.sqs.visibility.timeout.extension.scheduling.buffer", 15);
71
72
73
74
75 private static final int PERIOD_SECONDS = Integer.getInteger("amq.sqs.visibility.timeout.extension.period", 60);
76
77 final ScheduledThreadPoolExecutor visibilityTimeoutExtensionWorkerPool;
78
79 private final int periodSeconds;
80 private final int visibilityTimeoutSeconds;
81
82 private final AmazonSQSClient amazonSQSClient;
83 private final String queueUrl;
84
85 SQSMessageVisibilityTimeoutManager(AmazonSQSClient amazonSQSClient, String queueUrl) {
86 this(amazonSQSClient, queueUrl, PERIOD_SECONDS, VISIBILITY_TIMEOUT_EXTENSION_SCHEDULING_BUFFER_SECONDS, VISIBILITY_TIMEOUT_EXTENSION_WORKERS);
87 }
88
89
90
91
92 SQSMessageVisibilityTimeoutManager(AmazonSQSClient amazonSQSClient, String queueUrl,
93 int periodSeconds,
94 int schedulingBufferSeconds,
95 int visibilityTimeoutExtensionWorkers) {
96 this.queueUrl = queueUrl;
97 this.amazonSQSClient = amazonSQSClient;
98 this.periodSeconds = periodSeconds;
99 if (visibilityTimeoutExtensionWorkers <= 0) {
100 throw new IllegalArgumentException("visibilityTimeoutExtensionWorkers must be greater than 0. Received: " + visibilityTimeoutExtensionWorkers);
101 }
102 this.visibilityTimeoutExtensionWorkerPool = new ScheduledThreadPoolExecutor(visibilityTimeoutExtensionWorkers,
103 new DefaultThreadFactory("sqs-visibility-timeout-extension-thread-%d", (t, throwable) -> {
104 log.warn("SQS visibility timeout extension thread '{}' died due to an exception.", t.getName(), throwable);
105 }));
106 visibilityTimeoutExtensionWorkerPool.setRemoveOnCancelPolicy(true);
107
108 if (schedulingBufferSeconds <= 0) {
109 throw new IllegalArgumentException("schedulingBufferSeconds must be greater than 0. Received: " + schedulingBufferSeconds);
110 }
111 this.visibilityTimeoutSeconds = periodSeconds + schedulingBufferSeconds;
112
113 log.info("Constructing {} (visibilityTimeoutSeconds: {}, periodSeconds: {}, schedulingBufferSeconds: {}, visibilityTimeoutExtensionWorkers: {})",
114 this.getClass().getSimpleName(), visibilityTimeoutSeconds, periodSeconds, schedulingBufferSeconds, visibilityTimeoutExtensionWorkers);
115
116 }
117
118
119
120
121
122
123
124
125 Future<?> scheduleVisibilityTimeoutExtension(String receiptHandle) {
126 return visibilityTimeoutExtensionWorkerPool.scheduleAtFixedRate(new VisibilityTimeoutExtender(receiptHandle),
127 periodSeconds,
128 periodSeconds,
129 TimeUnit.SECONDS);
130 }
131
132 private class VisibilityTimeoutExtender implements Runnable {
133 private final AtomicInteger numberOfExtensions = new AtomicInteger(0);
134 private final String receiptHandle;
135
136 VisibilityTimeoutExtender(String receiptHandle) {
137 this.receiptHandle = requireNonNull(receiptHandle, "receiptHandle");
138 }
139
140 @Override
141 public void run() {
142 try {
143 amazonSQSClient.changeMessageVisibility(queueUrl, receiptHandle, visibilityTimeoutSeconds);
144 } catch (MessageNotInflightException e) {
145 log.warn("Message visibility timeout extension failed: Message is not in flight.", e);
146 throw e;
147 } catch (ReceiptHandleIsInvalidException e) {
148 log.warn("Message visibility timeout extension failed: Receipt handle {} is invalid.", receiptHandle, e);
149 throw e;
150 }
151
152 numberOfExtensions.incrementAndGet();
153
154 if (log.isInfoEnabled()) {
155 final int elapsedTimeSinceMessageReceiptSeconds = periodSeconds + (numberOfExtensions.get() * periodSeconds);
156 final int totalVisibilityTimeoutSeconds =
157 elapsedTimeSinceMessageReceiptSeconds + visibilityTimeoutSeconds;
158
159 log.info("Message visibility timeout extended by {} seconds (elapsedTimeSinceMessageReceiptSeconds: {}, totalVisibilityTimeoutSeconds: {}, numberOfExtensions: {}, receiptHandle: {})",
160 visibilityTimeoutSeconds, elapsedTimeSinceMessageReceiptSeconds, totalVisibilityTimeoutSeconds, numberOfExtensions.get(), receiptHandle);
161 }
162 }
163 }
164
165
166
167
168 int getVisibilityTimeoutSeconds() {
169 return visibilityTimeoutSeconds;
170 }
171
172 void shutdown() {
173 visibilityTimeoutExtensionWorkerPool.shutdown();
174 try {
175 if (!visibilityTimeoutExtensionWorkerPool.awaitTermination(AWAIT_TERMINATION_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
176 log.warn("Pool did not terminate in {} seconds", AWAIT_TERMINATION_TIMEOUT_SECONDS);
177 }
178 } catch (InterruptedException ie) {
179 Thread.currentThread().interrupt();
180 }
181 }
182 }