1 package com.atlassian.messagequeue.internal.scheduler;
2
3 import com.atlassian.messagequeue.Message;
4 import com.atlassian.messagequeue.MessageRunnerKey;
5 import com.atlassian.messagequeue.MessageRunnerNotRegisteredException;
6 import com.atlassian.messagequeue.MessageRunnerServiceException;
7 import com.atlassian.messagequeue.internal.core.DefaultMessageInformationService;
8 import com.atlassian.messagequeue.internal.core.JacksonNestedMessageSerializer;
9 import com.atlassian.messagequeue.internal.core.MessageRunnerRegistryHelper;
10 import com.atlassian.messagequeue.registry.MessageContext;
11 import com.atlassian.messagequeue.registry.MessageRunner;
12 import com.atlassian.scheduler.caesium.impl.CaesiumSchedulerService;
13 import com.atlassian.scheduler.caesium.impl.MemoryClusteredJobDao;
14 import com.atlassian.scheduler.caesium.spi.CaesiumSchedulerConfiguration;
15 import com.atlassian.scheduler.config.JobConfig;
16 import com.atlassian.scheduler.config.JobId;
17 import com.atlassian.scheduler.config.RunMode;
18 import com.atlassian.scheduler.config.Schedule;
19 import com.atlassian.scheduler.core.impl.MemoryRunDetailsDao;
20 import com.atlassian.scheduler.status.RunDetails;
21 import com.atlassian.scheduler.status.RunOutcome;
22 import com.atlassian.tenant.api.TenantContext;
23 import com.atlassian.tenant.api.TenantContextProvider;
24 import com.atlassian.tenant.impl.TenantIdSetter;
25 import com.atlassian.workcontext.api.WorkContextDoorway;
26 import com.google.common.util.concurrent.Uninterruptibles;
27 import org.hamcrest.core.Is;
28 import org.junit.After;
29 import org.junit.Before;
30 import org.junit.Rule;
31 import org.junit.Test;
32 import org.mockito.ArgumentCaptor;
33 import org.mockito.Captor;
34 import org.mockito.Mock;
35 import org.mockito.junit.MockitoJUnit;
36 import org.mockito.junit.MockitoRule;
37
38 import java.util.Collections;
39 import java.util.List;
40 import java.util.Optional;
41 import java.util.concurrent.CountDownLatch;
42 import java.util.concurrent.ExecutorService;
43 import java.util.concurrent.Executors;
44 import java.util.concurrent.TimeUnit;
45 import java.util.function.Supplier;
46 import java.util.stream.Collectors;
47 import java.util.stream.IntStream;
48
49 import static com.atlassian.messagequeue.internal.scheduler.SchedulerMessageRunnerService.AMQ_JOB_RUNNER_KEY;
50 import static com.atlassian.messagequeue.internal.scheduler.SchedulerMessageRunnerService.DELIVERY_COUNT_PARAMETER;
51 import static org.hamcrest.CoreMatchers.containsString;
52 import static org.hamcrest.CoreMatchers.is;
53 import static org.hamcrest.MatcherAssert.assertThat;
54 import static org.hamcrest.Matchers.containsInAnyOrder;
55 import static org.mockito.Matchers.any;
56 import static org.mockito.Mockito.after;
57 import static org.mockito.Mockito.doThrow;
58 import static org.mockito.Mockito.never;
59 import static org.mockito.Mockito.spy;
60 import static org.mockito.Mockito.timeout;
61 import static org.mockito.Mockito.verify;
62 import static org.mockito.Mockito.when;
63
64 public class SchedulerMessageRunnerServiceTest {
65 private static final MessageRunnerKey TEST_MESSAGE_RUNNER_KEY = MessageRunnerKey.of("testMessageRunner");
66 private static final String TENANT_ID = "tenant-123";
67 private static final int MAX_DELIVERY_COUNT = 3;
68 private static final int VERIFY_TIMEOUT_MILLIS = 500;
69 private static final int DELIVERY_INTERVAL_MILLIS = 100;
70
71 @Rule
72 public MockitoRule mockitoRule = MockitoJUnit.rule();
73 @Mock
74 private CaesiumSchedulerConfiguration config;
75 @Mock
76 private MessageRunner testMessageRunner;
77 @Mock
78 private MessageRunnerRegistryHelper messageRunnerRegistryHelper;
79 @Mock
80 private TenantIdSetter tenantIdSetter;
81 @Mock
82 private TenantContextProvider tenantContextProvider;
83 @Captor
84 private ArgumentCaptor<MessageContext> messageContextArgumentCaptor;
85
86 private SchedulerMessageRunnerService schedulerMessageRunnerService;
87 private CaesiumSchedulerService caesiumSchedulerService;
88 private MemoryRunDetailsDao runDetailsDao;
89
90 private static final WorkContextDoorway DOORWAY = new WorkContextDoorway();
91
92 @Before
93 public void setUp() throws Exception {
94 when(config.useFineGrainedSchedules()).thenReturn(true);
95 runDetailsDao = new MemoryRunDetailsDao();
96 caesiumSchedulerService = spy(new CaesiumSchedulerService(config, runDetailsDao,
97 new MemoryClusteredJobDao()) {
98 @Override
99 public void preJob() {
100 super.preJob();
101 DOORWAY.open();
102 tenantIdSetter.setTenantId(TENANT_ID);
103 }
104
105 @Override
106 public void postJob() {
107 DOORWAY.close();
108 super.postJob();
109 }
110 });
111
112 when(messageRunnerRegistryHelper.getMessageRunner(TEST_MESSAGE_RUNNER_KEY)).thenReturn(Optional.of(testMessageRunner));
113 when(tenantContextProvider.getTenantContext()).thenReturn(new TenantContext.Builder()
114 .tenantId(TENANT_ID)
115 .jdbcUrl("jdbcUrl")
116 .jdbcUsername("username")
117 .jdbcPassword("password").build());
118
119 final JacksonNestedMessageSerializer nestedMessageSerializer = new JacksonNestedMessageSerializer();
120 schedulerMessageRunnerService = new SchedulerMessageRunnerService(caesiumSchedulerService,
121 messageRunnerRegistryHelper, new DefaultMessageInformationService("queueUrl", tenantContextProvider, nestedMessageSerializer), nestedMessageSerializer,
122 DELIVERY_INTERVAL_MILLIS, MAX_DELIVERY_COUNT);
123
124 schedulerMessageRunnerService.init();
125 caesiumSchedulerService.start();
126 }
127
128 @After
129 public void tearDown() throws Exception {
130 caesiumSchedulerService.shutdown();
131 schedulerMessageRunnerService.shutdown();
132 }
133
134 @Test
135 public void messageWithPayloadIsProcessedAtLeastOnce() throws Exception {
136 schedulerMessageRunnerService.addMessage(Message.create(TEST_MESSAGE_RUNNER_KEY, "payload"));
137
138 verify(testMessageRunner, timeout(VERIFY_TIMEOUT_MILLIS).times(1)).processMessage(messageContextArgumentCaptor.capture());
139 assertThat(messageContextArgumentCaptor.getValue().getPayload(), is(Optional.of("payload")));
140 }
141
142 @Test
143 public void messageWithNullPayloadIsProcessedAtLeastOnce() throws Exception {
144 final Message message = Message.create(TEST_MESSAGE_RUNNER_KEY, null);
145
146 schedulerMessageRunnerService.addMessage(message);
147
148 verify(testMessageRunner, timeout(VERIFY_TIMEOUT_MILLIS).times(1)).processMessage(messageContextArgumentCaptor.capture());
149 assertThat(messageContextArgumentCaptor.getValue().getPayload(), is(Optional.empty()));
150 }
151
152 @Test
153 public void messageAutoAcknowledgedAndNotRedeliveredAfterSuccessfulProcessing() throws Exception {
154 schedulerMessageRunnerService.addMessage(Message.create(TEST_MESSAGE_RUNNER_KEY, "payload"));
155
156 verify(testMessageRunner, after(VERIFY_TIMEOUT_MILLIS * 2).times(1)).processMessage(any(MessageContext.class));
157 }
158
159 @Test(expected = MessageRunnerNotRegisteredException.class)
160 public void failFastIfNoMessageRunnerRegisteredForMessage() throws Exception {
161 final MessageRunnerKey fooMessageRunner = MessageRunnerKey.of("fooMessageRunner");
162 final Message message = Message.create(fooMessageRunner, "payload");
163 when(messageRunnerRegistryHelper.getMessageRunner(fooMessageRunner)).thenReturn(Optional.empty());
164
165 schedulerMessageRunnerService.addMessage(message);
166 }
167
168 @Test
169 public void problematicMessageDeliveredMaxDeliveryCountTimes() throws Exception {
170 doThrow(Exception.class).when(testMessageRunner).processMessage(any(MessageContext.class));
171
172 schedulerMessageRunnerService.addMessage(Message.create(TEST_MESSAGE_RUNNER_KEY, "payload"));
173
174 final int waitMillis = DELIVERY_INTERVAL_MILLIS * MAX_DELIVERY_COUNT + 100;
175 final int expectedDeliveryCount = MAX_DELIVERY_COUNT;
176 verify(testMessageRunner, after(waitMillis).times(expectedDeliveryCount)).processMessage(messageContextArgumentCaptor.capture());
177 final MessageContext messageContext = messageContextArgumentCaptor.getValue();
178 String messageId = messageContext.getMessageId().orElse(null);
179
180
181 final RunDetails runDetails = runDetailsDao.getLastRunForJob(JobId.of(messageId));
182 assertThat(runDetails.getRunOutcome(), is(RunOutcome.ABORTED));
183 assertThat(runDetails.getMessage(), containsString(messageContext.getMessageId().orElse(null)));
184 assertThat(runDetails.getMessage(), containsString(messageContext.getPayload().orElse(null)));
185 }
186
187 @Test
188 public void addMultipleMessagesSerially() throws Exception {
189 int numberOfMessages = 20;
190 Supplier<IntStream> range = () -> IntStream.range(0, numberOfMessages);
191
192 range.get().forEach(i -> {
193 schedulerMessageRunnerService.addMessage(Message.create(TEST_MESSAGE_RUNNER_KEY, String.valueOf(i)));
194 });
195
196 String[] expectedPayloads = range.get().mapToObj(String::valueOf).toArray(String[]::new);
197 verify(testMessageRunner, timeout(VERIFY_TIMEOUT_MILLIS).times(numberOfMessages)).processMessage(messageContextArgumentCaptor.capture());
198 assertThat(toPayloadValues(messageContextArgumentCaptor.getAllValues()), containsInAnyOrder(expectedPayloads));
199 }
200
201 @Test
202 public void addMultipleMessagesConcurrently() throws Exception {
203 int numberOfMessages = 20;
204 Supplier<IntStream> range = () -> IntStream.range(0, numberOfMessages);
205
206 ExecutorService pool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
207 try {
208 range.get().forEach(i -> {
209 pool.execute(() -> {
210 schedulerMessageRunnerService.addMessage(Message.create(TEST_MESSAGE_RUNNER_KEY, String.valueOf(i)));
211 });
212 });
213
214 String[] expectedPayloads = range.get().mapToObj(String::valueOf).toArray(String[]::new);
215 verify(testMessageRunner, timeout(VERIFY_TIMEOUT_MILLIS).times(numberOfMessages)).processMessage(messageContextArgumentCaptor.capture());
216 assertThat(toPayloadValues(messageContextArgumentCaptor.getAllValues()), containsInAnyOrder(expectedPayloads));
217 } finally {
218 pool.shutdown();
219 pool.awaitTermination(1, TimeUnit.SECONDS);
220 }
221 }
222
223 @Test
224 public void jobThatRunsLongerThanJobIntervalAllowedToCompleteOnce() throws Exception {
225 final int sleepMillis = DELIVERY_INTERVAL_MILLIS * 3;
226 final MessageRunnerKey messageRunnerKey = MessageRunnerKey.of("longRunningMessageRunnerKey");
227 final MessageRunner messageRunner = spy(new DelegatingMessageRunner(messageContext -> {
228 Uninterruptibles.sleepUninterruptibly(sleepMillis, TimeUnit.MILLISECONDS);
229 }));
230 when(messageRunnerRegistryHelper.getMessageRunner(messageRunnerKey)).thenReturn(Optional.of(messageRunner));
231
232 schedulerMessageRunnerService.addMessage(Message.create(messageRunnerKey, "payload"));
233
234 verify(messageRunner, after(sleepMillis * 2).times(1)).processMessage(any(MessageContext.class));
235 }
236
237 @Test
238 public void problematicJobThatRunsLongerThanJobIntervalAllowedToFailMaxDeliveryCountTimes() throws Exception {
239 final int sleepFor = DELIVERY_INTERVAL_MILLIS * 3;
240 final MessageRunnerKey messageRunnerKey = MessageRunnerKey.of("longRunningMessageRunnerKey");
241 final MessageRunner messageRunner = spy(new DelegatingMessageRunner(messageContext -> {
242 Uninterruptibles.sleepUninterruptibly(sleepFor, TimeUnit.MILLISECONDS);
243 throw new RuntimeException();
244 }));
245 when(messageRunnerRegistryHelper.getMessageRunner(messageRunnerKey)).thenReturn(Optional.of(messageRunner));
246
247 schedulerMessageRunnerService.addMessage(Message.create(messageRunnerKey, "payload"));
248
249 verify(messageRunner, after(sleepFor * 2 * 3).times(MAX_DELIVERY_COUNT)).processMessage(any(MessageContext.class));
250 }
251
252 @Test
253 public void messageAckedEarlyDeliveredExactlyOnce() throws Exception {
254 final MessageRunnerKey messageRunnerKey = MessageRunnerKey.of("earlyAckMessageRunnerKey");
255 final MessageRunner messageRunner = spy(new DelegatingMessageRunner(messageContext -> {
256 messageContext.acknowledge();
257 throw new RuntimeException();
258 }));
259 when(messageRunnerRegistryHelper.getMessageRunner(messageRunnerKey)).thenReturn(Optional.of(messageRunner));
260
261 schedulerMessageRunnerService.addMessage(Message.create(messageRunnerKey, "payload"));
262
263 verify(messageRunner, after(DELIVERY_INTERVAL_MILLIS * 3).times(1)).processMessage(any(MessageContext.class));
264 }
265
266 @Test
267 public void abortJobIfMissingRequiredParameter() throws Exception {
268 final JobConfig jobConfig = JobConfig.forJobRunnerKey(AMQ_JOB_RUNNER_KEY)
269 .withRunMode(RunMode.RUN_ONCE_PER_CLUSTER)
270 .withSchedule(Schedule.forInterval(DELIVERY_INTERVAL_MILLIS, null));
271
272 final JobId jobId = caesiumSchedulerService.scheduleJobWithGeneratedId(jobConfig);
273
274 verify(caesiumSchedulerService, timeout(1000).times(1)).unscheduleJob(jobId);
275 }
276
277 @Test
278 public void abortJobIfDeliveryCountParameterNotAnInt() throws Exception {
279 final JobConfig jobConfig = JobConfig.forJobRunnerKey(AMQ_JOB_RUNNER_KEY)
280 .withRunMode(RunMode.RUN_ONCE_PER_CLUSTER)
281 .withSchedule(Schedule.forInterval(DELIVERY_INTERVAL_MILLIS, null))
282 .withParameters(Collections.singletonMap(DELIVERY_COUNT_PARAMETER, "1"));
283
284 final JobId jobId = caesiumSchedulerService.scheduleJobWithGeneratedId(jobConfig);
285
286 verify(caesiumSchedulerService, timeout(1000).times(1)).unscheduleJob(jobId);
287 }
288
289 @Test
290 public void messageNotDeliveredAfterShutdown() throws Exception {
291 caesiumSchedulerService.standby();
292 schedulerMessageRunnerService.addMessage(Message.create(TEST_MESSAGE_RUNNER_KEY, "payload"));
293 schedulerMessageRunnerService.shutdown();
294 caesiumSchedulerService.start();
295
296 verify(testMessageRunner, after(DELIVERY_INTERVAL_MILLIS * 2).times(0)).processMessage(any(MessageContext.class));
297 }
298
299 @Test
300 public void messageProducedDuringSchedulerStandbyDeliveredWhenSchedulerStarted() throws Exception {
301 caesiumSchedulerService.standby();
302 schedulerMessageRunnerService.addMessage(Message.create(TEST_MESSAGE_RUNNER_KEY, "payload"));
303 caesiumSchedulerService.start();
304
305 verify(testMessageRunner, after(DELIVERY_INTERVAL_MILLIS * 2).times(1)).processMessage(any(MessageContext.class));
306 }
307
308 @Test(expected = MessageRunnerServiceException.class)
309 public void messageProducedAfterSchedulerStopped() throws Exception {
310 schedulerMessageRunnerService.shutdown();
311 schedulerMessageRunnerService.addMessage(Message.create(TEST_MESSAGE_RUNNER_KEY, "payload"));
312 }
313
314 @Test
315 public void messageRunnerResponsiveToCancellation() throws Exception {
316 final CountDownLatch enterLatch = new CountDownLatch(1);
317 final CountDownLatch exitLatch = new CountDownLatch(1);
318 final MessageRunnerKey messageRunnerKey = MessageRunnerKey.of("longRunningMessageRunnerKey");
319 final int sleepMillis = 100;
320 final MessageRunner messageRunner = messageContext -> {
321 enterLatch.countDown();
322 while (!messageContext.isCancellationRequested()) {
323 Uninterruptibles.sleepUninterruptibly(sleepMillis, TimeUnit.MILLISECONDS);
324 }
325 exitLatch.countDown();
326 };
327 when(messageRunnerRegistryHelper.getMessageRunner(messageRunnerKey)).thenReturn(Optional.of(messageRunner));
328 schedulerMessageRunnerService.addMessage(Message.create(messageRunnerKey, "payload"));
329 assertThat("MessageRunner did not start. Cannot assert cancellation on a MessageRunner that did not start",
330 enterLatch.await(1, TimeUnit.SECONDS), Is.is(true));
331
332 caesiumSchedulerService.shutdown();
333
334 assertThat("MessageRunner did not respond to cancellation", exitLatch.await(sleepMillis * 2, TimeUnit.MILLISECONDS), is(true));
335 }
336
337 @Test
338 public void cancelAutoAcknowledgement() throws Exception {
339 final MessageRunnerKey messageRunnerKey = MessageRunnerKey.of("cancelAckMessageRunnerKey");
340 final MessageRunner messageRunner = spy(new DelegatingMessageRunner(MessageContext::cancelAutoAcknowledgementOfMessage));
341 when(messageRunnerRegistryHelper.getMessageRunner(messageRunnerKey)).thenReturn(Optional.of(messageRunner));
342
343 schedulerMessageRunnerService.addMessage(Message.create(messageRunnerKey, "payload"));
344
345 verify(messageRunner, timeout(DELIVERY_INTERVAL_MILLIS + 50).times(1)).processMessage(messageContextArgumentCaptor.capture());
346 final JobId jobId = JobId.of(messageContextArgumentCaptor.getValue().getMessageId().get());
347 verify(caesiumSchedulerService, never()).unscheduleJob(jobId);
348 }
349
350 @Test
351 public void messageNotProcessedIfWorkContextNotAvailable() throws Exception {
352 runDetailsDao = new MemoryRunDetailsDao();
353 CaesiumSchedulerService caesiumSchedulerService = new CaesiumSchedulerService(config, runDetailsDao,
354 new MemoryClusteredJobDao());
355
356 SchedulerMessageRunnerService schedulerMessageRunnerService = new SchedulerMessageRunnerService(caesiumSchedulerService,
357 messageRunnerRegistryHelper, new DefaultMessageInformationService("queueUrl", tenantContextProvider,
358 new JacksonNestedMessageSerializer()), new JacksonNestedMessageSerializer(),
359 DELIVERY_INTERVAL_MILLIS, MAX_DELIVERY_COUNT);
360
361 try {
362 schedulerMessageRunnerService.init();
363 caesiumSchedulerService.start();
364
365 schedulerMessageRunnerService.addMessage(Message.create(TEST_MESSAGE_RUNNER_KEY, "payload"));
366
367 verify(testMessageRunner, after(VERIFY_TIMEOUT_MILLIS).never()).processMessage(any(MessageContext.class));
368 } finally {
369 caesiumSchedulerService.shutdown();
370 schedulerMessageRunnerService.shutdown();
371 }
372 }
373
374 private List<String> toPayloadValues(List<MessageContext> messageContexts) {
375 return messageContexts.stream().map(messageContext -> messageContext.getPayload().orElse(null)).collect(Collectors.toList());
376 }
377
378 private class DelegatingMessageRunner implements MessageRunner {
379 private final MessageRunner delegate;
380
381 public DelegatingMessageRunner(MessageRunner delegate) {
382 this.delegate = delegate;
383 }
384
385 @Override
386 public void processMessage(MessageContext context) {
387 delegate.processMessage(context);
388 }
389 }
390 }