1 package com.atlassian.messagequeue.internal.scheduler;
2
3 import com.atlassian.messagequeue.Message;
4 import com.atlassian.messagequeue.MessageRunnerKey;
5 import com.atlassian.messagequeue.MessageRunnerNotRegisteredException;
6 import com.atlassian.messagequeue.MessageRunnerServiceException;
7 import com.atlassian.messagequeue.TenantDataIdSupplier;
8 import com.atlassian.messagequeue.internal.core.DefaultMessageInformationService;
9 import com.atlassian.messagequeue.internal.core.JacksonNestedMessageSerializer;
10 import com.atlassian.messagequeue.internal.core.MessageRunnerRegistryHelper;
11 import com.atlassian.messagequeue.registry.MessageContext;
12 import com.atlassian.messagequeue.registry.MessageRunner;
13 import com.atlassian.scheduler.caesium.impl.CaesiumSchedulerService;
14 import com.atlassian.scheduler.caesium.impl.MemoryClusteredJobDao;
15 import com.atlassian.scheduler.caesium.spi.CaesiumSchedulerConfiguration;
16 import com.atlassian.scheduler.config.JobConfig;
17 import com.atlassian.scheduler.config.JobId;
18 import com.atlassian.scheduler.config.RunMode;
19 import com.atlassian.scheduler.config.Schedule;
20 import com.atlassian.scheduler.core.impl.MemoryRunDetailsDao;
21 import com.atlassian.scheduler.status.RunDetails;
22 import com.atlassian.scheduler.status.RunOutcome;
23 import com.atlassian.tenant.api.TenantContext;
24 import com.atlassian.tenant.api.TenantContextProvider;
25 import com.atlassian.tenant.impl.TenantIdSetter;
26 import com.atlassian.workcontext.api.WorkContextDoorway;
27 import com.google.common.util.concurrent.Uninterruptibles;
28 import org.hamcrest.core.Is;
29 import org.junit.After;
30 import org.junit.Before;
31 import org.junit.Rule;
32 import org.junit.Test;
33 import org.mockito.ArgumentCaptor;
34 import org.mockito.Captor;
35 import org.mockito.Mock;
36 import org.mockito.junit.MockitoJUnit;
37 import org.mockito.junit.MockitoRule;
38
39 import java.util.Collections;
40 import java.util.List;
41 import java.util.Optional;
42 import java.util.concurrent.CountDownLatch;
43 import java.util.concurrent.ExecutorService;
44 import java.util.concurrent.Executors;
45 import java.util.concurrent.TimeUnit;
46 import java.util.function.Supplier;
47 import java.util.stream.Collectors;
48 import java.util.stream.IntStream;
49
50 import static com.atlassian.messagequeue.internal.scheduler.SchedulerMessageRunnerService.AMQ_JOB_RUNNER_KEY;
51 import static com.atlassian.messagequeue.internal.scheduler.SchedulerMessageRunnerService.DELIVERY_COUNT_PARAMETER;
52 import static org.hamcrest.CoreMatchers.containsString;
53 import static org.hamcrest.CoreMatchers.is;
54 import static org.hamcrest.MatcherAssert.assertThat;
55 import static org.hamcrest.Matchers.containsInAnyOrder;
56 import static org.mockito.Matchers.any;
57 import static org.mockito.Mockito.after;
58 import static org.mockito.Mockito.doThrow;
59 import static org.mockito.Mockito.never;
60 import static org.mockito.Mockito.spy;
61 import static org.mockito.Mockito.timeout;
62 import static org.mockito.Mockito.verify;
63 import static org.mockito.Mockito.when;
64
65 public class SchedulerMessageRunnerServiceTest {
66 private static final MessageRunnerKey TEST_MESSAGE_RUNNER_KEY = MessageRunnerKey.of("testMessageRunner");
67 private static final String TENANT_ID = "tenant-123";
68 private static final int MAX_DELIVERY_COUNT = 3;
69 private static final int VERIFY_TIMEOUT_MILLIS = 500;
70 private static final int DELIVERY_INTERVAL_MILLIS = 100;
71
72 @Rule
73 public MockitoRule mockitoRule = MockitoJUnit.rule();
74 @Mock
75 private CaesiumSchedulerConfiguration config;
76 @Mock
77 private MessageRunner testMessageRunner;
78 @Mock
79 private MessageRunnerRegistryHelper messageRunnerRegistryHelper;
80 @Mock
81 private TenantIdSetter tenantIdSetter;
82 @Mock
83 private TenantContextProvider tenantContextProvider;
84 @Captor
85 private ArgumentCaptor<MessageContext> messageContextArgumentCaptor;
86
87 private SchedulerMessageRunnerService schedulerMessageRunnerService;
88 private CaesiumSchedulerService caesiumSchedulerService;
89 private MemoryRunDetailsDao runDetailsDao;
90
91 private static final WorkContextDoorway DOORWAY = new WorkContextDoorway();
92 private final TenantDataIdSupplier tenantDataIdSetter = () -> "1";
93
94 @Before
95 public void setUp() throws Exception {
96 when(config.enableClusteredJobs()).thenReturn(true);
97 when(config.useFineGrainedSchedules()).thenReturn(true);
98 when(config.workerThreadCount()).thenReturn(1);
99
100 runDetailsDao = new MemoryRunDetailsDao();
101
102 when(tenantContextProvider.isTenanted()).thenReturn(true);
103 when(tenantContextProvider.getTenantContext()).thenReturn(new TenantContext.Builder()
104 .tenantId(TENANT_ID)
105 .jdbcUrl("jdbcUrl")
106 .jdbcUsername("username")
107 .jdbcPassword("password").build());
108
109 caesiumSchedulerService = spy(new CaesiumSchedulerService(config, runDetailsDao, new MemoryClusteredJobDao(), tenantContextProvider) {
110 @Override
111 public void preJob() {
112 super.preJob();
113 DOORWAY.open();
114 tenantIdSetter.setTenantId(TENANT_ID);
115 }
116
117 @Override
118 public void postJob() {
119 DOORWAY.close();
120 super.postJob();
121 }
122 });
123
124 when(messageRunnerRegistryHelper.getMessageRunner(TEST_MESSAGE_RUNNER_KEY)).thenReturn(Optional.of(testMessageRunner));
125
126 final JacksonNestedMessageSerializer nestedMessageSerializer = new JacksonNestedMessageSerializer();
127 schedulerMessageRunnerService = new SchedulerMessageRunnerService(
128 caesiumSchedulerService,
129 messageRunnerRegistryHelper,
130 new DefaultMessageInformationService("queueUrl",
131 tenantContextProvider,
132 nestedMessageSerializer,
133 tenantDataIdSetter),
134 nestedMessageSerializer,
135 DELIVERY_INTERVAL_MILLIS,
136 MAX_DELIVERY_COUNT);
137
138 schedulerMessageRunnerService.init();
139 caesiumSchedulerService.start();
140 }
141
142 @After
143 public void tearDown() throws Exception {
144 caesiumSchedulerService.shutdown();
145 schedulerMessageRunnerService.shutdown();
146 }
147
148 @Test
149 public void messageWithPayloadIsProcessedAtLeastOnce() throws Exception {
150 schedulerMessageRunnerService.addMessage(Message.create(TEST_MESSAGE_RUNNER_KEY, "payload"));
151
152 verify(testMessageRunner, timeout(VERIFY_TIMEOUT_MILLIS).times(1)).processMessage(messageContextArgumentCaptor.capture());
153 assertThat(messageContextArgumentCaptor.getValue().getPayload(), is(Optional.of("payload")));
154 }
155
156 @Test
157 public void messageWithNullPayloadIsProcessedAtLeastOnce() throws Exception {
158 final Message message = Message.create(TEST_MESSAGE_RUNNER_KEY, null);
159
160 schedulerMessageRunnerService.addMessage(message);
161
162 verify(testMessageRunner, timeout(VERIFY_TIMEOUT_MILLIS).times(1)).processMessage(messageContextArgumentCaptor.capture());
163 assertThat(messageContextArgumentCaptor.getValue().getPayload(), is(Optional.empty()));
164 }
165
166 @Test
167 public void messageAutoAcknowledgedAndNotRedeliveredAfterSuccessfulProcessing() throws Exception {
168 schedulerMessageRunnerService.addMessage(Message.create(TEST_MESSAGE_RUNNER_KEY, "payload"));
169
170 verify(testMessageRunner, after(VERIFY_TIMEOUT_MILLIS * 2).times(1)).processMessage(any(MessageContext.class));
171 }
172
173 @Test(expected = MessageRunnerNotRegisteredException.class)
174 public void failFastIfNoMessageRunnerRegisteredForMessage() throws Exception {
175 final MessageRunnerKey fooMessageRunner = MessageRunnerKey.of("fooMessageRunner");
176 final Message message = Message.create(fooMessageRunner, "payload");
177 when(messageRunnerRegistryHelper.getMessageRunner(fooMessageRunner)).thenReturn(Optional.empty());
178
179 schedulerMessageRunnerService.addMessage(message);
180 }
181
182 @Test
183 public void problematicMessageDeliveredMaxDeliveryCountTimes() throws Exception {
184 doThrow(Exception.class).when(testMessageRunner).processMessage(any(MessageContext.class));
185
186 schedulerMessageRunnerService.addMessage(Message.create(TEST_MESSAGE_RUNNER_KEY, "payload"));
187
188 final int waitMillis = DELIVERY_INTERVAL_MILLIS * MAX_DELIVERY_COUNT + 100;
189 verify(testMessageRunner, after(waitMillis).times(MAX_DELIVERY_COUNT)).processMessage(messageContextArgumentCaptor.capture());
190 final MessageContext messageContext = messageContextArgumentCaptor.getValue();
191 String messageId = messageContext.getMessageId().orElse(null);
192
193
194 final RunDetails runDetails = runDetailsDao.getLastRunForJob(JobId.of(messageId));
195 assertThat(runDetails.getRunOutcome(), is(RunOutcome.ABORTED));
196 assertThat(runDetails.getMessage(), containsString(messageContext.getMessageId().orElse(null)));
197 assertThat(runDetails.getMessage(), containsString(messageContext.getPayload().orElse(null)));
198 }
199
200 @Test
201 public void addMultipleMessagesSerially() throws Exception {
202 int numberOfMessages = 20;
203 Supplier<IntStream> range = () -> IntStream.range(0, numberOfMessages);
204
205 range.get().forEach(i ->
206 schedulerMessageRunnerService.addMessage(Message.create(TEST_MESSAGE_RUNNER_KEY, String.valueOf(i)))
207 );
208
209 String[] expectedPayloads = range.get().mapToObj(String::valueOf).toArray(String[]::new);
210 verify(testMessageRunner, timeout(VERIFY_TIMEOUT_MILLIS).times(numberOfMessages)).processMessage(messageContextArgumentCaptor.capture());
211 assertThat(toPayloadValues(messageContextArgumentCaptor.getAllValues()), containsInAnyOrder(expectedPayloads));
212 }
213
214 @Test
215 public void addMultipleMessagesConcurrently() throws Exception {
216 int numberOfMessages = 20;
217 Supplier<IntStream> range = () -> IntStream.range(0, numberOfMessages);
218
219 ExecutorService pool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
220 try {
221 range.get().forEach(i ->
222 pool.execute(() ->
223 schedulerMessageRunnerService.addMessage(Message.create(TEST_MESSAGE_RUNNER_KEY, String.valueOf(i)))
224 )
225 );
226
227 String[] expectedPayloads = range.get().mapToObj(String::valueOf).toArray(String[]::new);
228 verify(testMessageRunner, timeout(VERIFY_TIMEOUT_MILLIS).times(numberOfMessages)).processMessage(messageContextArgumentCaptor.capture());
229 assertThat(toPayloadValues(messageContextArgumentCaptor.getAllValues()), containsInAnyOrder(expectedPayloads));
230 } finally {
231 pool.shutdown();
232 pool.awaitTermination(1, TimeUnit.SECONDS);
233 }
234 }
235
236 @Test
237 public void jobThatRunsLongerThanJobIntervalAllowedToCompleteOnce() throws Exception {
238 final int sleepMillis = DELIVERY_INTERVAL_MILLIS * 3;
239 final MessageRunnerKey messageRunnerKey = MessageRunnerKey.of("longRunningMessageRunnerKey");
240 final MessageRunner messageRunner = spy(new DelegatingMessageRunner(messageContext ->
241 Uninterruptibles.sleepUninterruptibly(sleepMillis, TimeUnit.MILLISECONDS)
242 ));
243 when(messageRunnerRegistryHelper.getMessageRunner(messageRunnerKey)).thenReturn(Optional.of(messageRunner));
244
245 schedulerMessageRunnerService.addMessage(Message.create(messageRunnerKey, "payload"));
246
247 verify(messageRunner, after(sleepMillis * 2).times(1)).processMessage(any(MessageContext.class));
248 }
249
250 @Test
251 public void problematicJobThatRunsLongerThanJobIntervalAllowedToFailMaxDeliveryCountTimes() throws Exception {
252 final int sleepFor = DELIVERY_INTERVAL_MILLIS * 3;
253 final MessageRunnerKey messageRunnerKey = MessageRunnerKey.of("longRunningMessageRunnerKey");
254 final MessageRunner messageRunner = spy(new DelegatingMessageRunner(messageContext -> {
255 Uninterruptibles.sleepUninterruptibly(sleepFor, TimeUnit.MILLISECONDS);
256 throw new RuntimeException();
257 }));
258 when(messageRunnerRegistryHelper.getMessageRunner(messageRunnerKey)).thenReturn(Optional.of(messageRunner));
259
260 schedulerMessageRunnerService.addMessage(Message.create(messageRunnerKey, "payload"));
261
262 verify(messageRunner, after(sleepFor * 2 * 3).times(MAX_DELIVERY_COUNT)).processMessage(any(MessageContext.class));
263 }
264
265 @Test
266 public void messageAckedEarlyDeliveredExactlyOnce() throws Exception {
267 final MessageRunnerKey messageRunnerKey = MessageRunnerKey.of("earlyAckMessageRunnerKey");
268 final MessageRunner messageRunner = spy(new DelegatingMessageRunner(messageContext -> {
269 messageContext.acknowledge();
270 throw new RuntimeException();
271 }));
272 when(messageRunnerRegistryHelper.getMessageRunner(messageRunnerKey)).thenReturn(Optional.of(messageRunner));
273
274 schedulerMessageRunnerService.addMessage(Message.create(messageRunnerKey, "payload"));
275
276 verify(messageRunner, after(DELIVERY_INTERVAL_MILLIS * 3).times(1)).processMessage(any(MessageContext.class));
277 }
278
279 @Test
280 public void abortJobIfMissingRequiredParameter() throws Exception {
281 final JobConfig jobConfig = JobConfig.forJobRunnerKey(AMQ_JOB_RUNNER_KEY)
282 .withRunMode(RunMode.RUN_ONCE_PER_CLUSTER)
283 .withSchedule(Schedule.forInterval(DELIVERY_INTERVAL_MILLIS, null));
284
285 final JobId jobId = caesiumSchedulerService.scheduleJobWithGeneratedId(jobConfig);
286
287 verify(caesiumSchedulerService, timeout(1000).times(1)).unscheduleJob(jobId);
288 }
289
290 @Test
291 public void abortJobIfDeliveryCountParameterNotAnInt() throws Exception {
292 final JobConfig jobConfig = JobConfig.forJobRunnerKey(AMQ_JOB_RUNNER_KEY)
293 .withRunMode(RunMode.RUN_ONCE_PER_CLUSTER)
294 .withSchedule(Schedule.forInterval(DELIVERY_INTERVAL_MILLIS, null))
295 .withParameters(Collections.singletonMap(DELIVERY_COUNT_PARAMETER, "1"));
296
297 final JobId jobId = caesiumSchedulerService.scheduleJobWithGeneratedId(jobConfig);
298
299 verify(caesiumSchedulerService, timeout(1000).times(1)).unscheduleJob(jobId);
300 }
301
302 @Test
303 public void messageNotDeliveredAfterShutdown() throws Exception {
304 caesiumSchedulerService.standby();
305 schedulerMessageRunnerService.addMessage(Message.create(TEST_MESSAGE_RUNNER_KEY, "payload"));
306 schedulerMessageRunnerService.shutdown();
307 caesiumSchedulerService.start();
308
309 verify(testMessageRunner, after(DELIVERY_INTERVAL_MILLIS * 2).times(0)).processMessage(any(MessageContext.class));
310 }
311
312 @Test
313 public void messageProducedDuringSchedulerStandbyDeliveredWhenSchedulerStarted() throws Exception {
314 caesiumSchedulerService.standby();
315 schedulerMessageRunnerService.addMessage(Message.create(TEST_MESSAGE_RUNNER_KEY, "payload"));
316 caesiumSchedulerService.start();
317
318 verify(testMessageRunner, after(DELIVERY_INTERVAL_MILLIS * 2).times(1)).processMessage(any(MessageContext.class));
319 }
320
321 @Test(expected = MessageRunnerServiceException.class)
322 public void messageProducedAfterSchedulerStopped() throws Exception {
323 schedulerMessageRunnerService.shutdown();
324 schedulerMessageRunnerService.addMessage(Message.create(TEST_MESSAGE_RUNNER_KEY, "payload"));
325 }
326
327 @Test
328 public void messageRunnerResponsiveToCancellation() throws Exception {
329 final CountDownLatch enterLatch = new CountDownLatch(1);
330 final CountDownLatch exitLatch = new CountDownLatch(1);
331 final MessageRunnerKey messageRunnerKey = MessageRunnerKey.of("longRunningMessageRunnerKey");
332 final int sleepMillis = 100;
333 final MessageRunner messageRunner = messageContext -> {
334 enterLatch.countDown();
335 while (!messageContext.isCancellationRequested()) {
336 Uninterruptibles.sleepUninterruptibly(sleepMillis, TimeUnit.MILLISECONDS);
337 }
338 exitLatch.countDown();
339 };
340 when(messageRunnerRegistryHelper.getMessageRunner(messageRunnerKey)).thenReturn(Optional.of(messageRunner));
341 schedulerMessageRunnerService.addMessage(Message.create(messageRunnerKey, "payload"));
342 assertThat("MessageRunner did not start. Cannot assert cancellation on a MessageRunner that did not start",
343 enterLatch.await(1, TimeUnit.SECONDS), Is.is(true));
344
345 caesiumSchedulerService.shutdown();
346
347 assertThat("MessageRunner did not respond to cancellation", exitLatch.await(sleepMillis * 2, TimeUnit.MILLISECONDS), is(true));
348 }
349
350 @Test
351 public void cancelAutoAcknowledgement() throws Exception {
352 final MessageRunnerKey messageRunnerKey = MessageRunnerKey.of("cancelAckMessageRunnerKey");
353 final MessageRunner messageRunner = spy(new DelegatingMessageRunner(MessageContext::cancelAutoAcknowledgementOfMessage));
354 when(messageRunnerRegistryHelper.getMessageRunner(messageRunnerKey)).thenReturn(Optional.of(messageRunner));
355
356 schedulerMessageRunnerService.addMessage(Message.create(messageRunnerKey, "payload"));
357
358 verify(messageRunner, timeout(DELIVERY_INTERVAL_MILLIS + 50).times(1)).processMessage(messageContextArgumentCaptor.capture());
359 final JobId jobId = JobId.of(messageContextArgumentCaptor.getValue().getMessageId().get());
360 verify(caesiumSchedulerService, never()).unscheduleJob(jobId);
361 }
362
363 @Test
364 public void messageNotProcessedIfWorkContextNotAvailable() throws Exception {
365 runDetailsDao = new MemoryRunDetailsDao();
366 CaesiumSchedulerService caesiumSchedulerService = new CaesiumSchedulerService(config, runDetailsDao, new MemoryClusteredJobDao(), tenantContextProvider);
367
368 SchedulerMessageRunnerService schedulerMessageRunnerService = new SchedulerMessageRunnerService(caesiumSchedulerService,
369 messageRunnerRegistryHelper, new DefaultMessageInformationService(
370 "queueUrl",
371 tenantContextProvider,
372 new JacksonNestedMessageSerializer(),
373 tenantDataIdSetter),
374 new JacksonNestedMessageSerializer(),
375 DELIVERY_INTERVAL_MILLIS,
376 MAX_DELIVERY_COUNT);
377
378 try {
379 schedulerMessageRunnerService.init();
380 caesiumSchedulerService.start();
381
382 schedulerMessageRunnerService.addMessage(Message.create(TEST_MESSAGE_RUNNER_KEY, "payload"));
383
384 verify(testMessageRunner, after(VERIFY_TIMEOUT_MILLIS).never()).processMessage(any(MessageContext.class));
385 } finally {
386 caesiumSchedulerService.shutdown();
387 schedulerMessageRunnerService.shutdown();
388 }
389 }
390
391 private List<String> toPayloadValues(List<MessageContext> messageContexts) {
392 return messageContexts.stream().map(messageContext -> messageContext.getPayload().orElse(null)).collect(Collectors.toList());
393 }
394
395 private static class DelegatingMessageRunner implements MessageRunner {
396 private final MessageRunner delegate;
397
398 DelegatingMessageRunner(MessageRunner delegate) {
399 this.delegate = delegate;
400 }
401
402 @Override
403 public void processMessage(MessageContext context) {
404 delegate.processMessage(context);
405 }
406 }
407 }