View Javadoc

1   package com.atlassian.messagequeue.internal.scheduler;
2   
3   import com.atlassian.messagequeue.Message;
4   import com.atlassian.messagequeue.MessageRunnerKey;
5   import com.atlassian.messagequeue.MessageRunnerNotRegisteredException;
6   import com.atlassian.messagequeue.MessageRunnerServiceException;
7   import com.atlassian.messagequeue.TenantDataIdSupplier;
8   import com.atlassian.messagequeue.internal.core.DefaultMessageInformationService;
9   import com.atlassian.messagequeue.internal.core.JacksonNestedMessageSerializer;
10  import com.atlassian.messagequeue.internal.core.MessageRunnerRegistryHelper;
11  import com.atlassian.messagequeue.registry.MessageContext;
12  import com.atlassian.messagequeue.registry.MessageRunner;
13  import com.atlassian.scheduler.caesium.impl.CaesiumSchedulerService;
14  import com.atlassian.scheduler.caesium.impl.MemoryClusteredJobDao;
15  import com.atlassian.scheduler.caesium.spi.CaesiumSchedulerConfiguration;
16  import com.atlassian.scheduler.config.JobConfig;
17  import com.atlassian.scheduler.config.JobId;
18  import com.atlassian.scheduler.config.RunMode;
19  import com.atlassian.scheduler.config.Schedule;
20  import com.atlassian.scheduler.core.impl.MemoryRunDetailsDao;
21  import com.atlassian.scheduler.status.RunDetails;
22  import com.atlassian.scheduler.status.RunOutcome;
23  import com.atlassian.tenant.api.TenantContext;
24  import com.atlassian.tenant.api.TenantContextProvider;
25  import com.atlassian.tenant.impl.TenantIdSetter;
26  import com.atlassian.workcontext.api.WorkContextDoorway;
27  import com.google.common.util.concurrent.Uninterruptibles;
28  import org.hamcrest.core.Is;
29  import org.junit.After;
30  import org.junit.Before;
31  import org.junit.Rule;
32  import org.junit.Test;
33  import org.mockito.ArgumentCaptor;
34  import org.mockito.Captor;
35  import org.mockito.Mock;
36  import org.mockito.junit.MockitoJUnit;
37  import org.mockito.junit.MockitoRule;
38  
39  import java.util.Collections;
40  import java.util.List;
41  import java.util.Optional;
42  import java.util.concurrent.CountDownLatch;
43  import java.util.concurrent.ExecutorService;
44  import java.util.concurrent.Executors;
45  import java.util.concurrent.TimeUnit;
46  import java.util.function.Supplier;
47  import java.util.stream.Collectors;
48  import java.util.stream.IntStream;
49  
50  import static com.atlassian.messagequeue.internal.scheduler.SchedulerMessageRunnerService.AMQ_JOB_RUNNER_KEY;
51  import static com.atlassian.messagequeue.internal.scheduler.SchedulerMessageRunnerService.DELIVERY_COUNT_PARAMETER;
52  import static org.hamcrest.CoreMatchers.containsString;
53  import static org.hamcrest.CoreMatchers.is;
54  import static org.hamcrest.MatcherAssert.assertThat;
55  import static org.hamcrest.Matchers.containsInAnyOrder;
56  import static org.mockito.Matchers.any;
57  import static org.mockito.Mockito.after;
58  import static org.mockito.Mockito.doThrow;
59  import static org.mockito.Mockito.never;
60  import static org.mockito.Mockito.spy;
61  import static org.mockito.Mockito.timeout;
62  import static org.mockito.Mockito.verify;
63  import static org.mockito.Mockito.when;
64  
65  public class SchedulerMessageRunnerServiceTest {
66      private static final MessageRunnerKey TEST_MESSAGE_RUNNER_KEY = MessageRunnerKey.of("testMessageRunner");
67      private static final String TENANT_ID = "tenant-123";
68      private static final int MAX_DELIVERY_COUNT = 3;
69      private static final int VERIFY_TIMEOUT_MILLIS = 500;
70      private static final int DELIVERY_INTERVAL_MILLIS = 100;
71  
72      @Rule
73      public MockitoRule mockitoRule = MockitoJUnit.rule();
74      @Mock
75      private CaesiumSchedulerConfiguration config;
76      @Mock
77      private MessageRunner testMessageRunner;
78      @Mock
79      private MessageRunnerRegistryHelper messageRunnerRegistryHelper;
80      @Mock
81      private TenantIdSetter tenantIdSetter;
82      @Mock
83      private TenantContextProvider tenantContextProvider;
84      @Captor
85      private ArgumentCaptor<MessageContext> messageContextArgumentCaptor;
86  
87      private SchedulerMessageRunnerService schedulerMessageRunnerService;
88      private CaesiumSchedulerService caesiumSchedulerService;
89      private MemoryRunDetailsDao runDetailsDao;
90  
91      private static final WorkContextDoorway DOORWAY = new WorkContextDoorway();
92      private final TenantDataIdSupplier tenantDataIdSetter = () -> "1";
93  
94      @Before
95      public void setUp() throws Exception {
96          when(config.enableClusteredJobs()).thenReturn(true);
97          when(config.useFineGrainedSchedules()).thenReturn(true);
98          when(config.workerThreadCount()).thenReturn(1);
99  
100         runDetailsDao = new MemoryRunDetailsDao();
101 
102         when(tenantContextProvider.isTenanted()).thenReturn(true);
103         when(tenantContextProvider.getTenantContext()).thenReturn(new TenantContext.Builder()
104                 .tenantId(TENANT_ID)
105                 .jdbcUrl("jdbcUrl")
106                 .jdbcUsername("username")
107                 .jdbcPassword("password").build());
108 
109         caesiumSchedulerService = spy(new CaesiumSchedulerService(config, runDetailsDao, new MemoryClusteredJobDao(), tenantContextProvider) {
110             @Override
111             public void preJob() {
112                 super.preJob();
113                 DOORWAY.open();
114                 tenantIdSetter.setTenantId(TENANT_ID);
115             }
116 
117             @Override
118             public void postJob() {
119                 DOORWAY.close();
120                 super.postJob();
121             }
122         });
123 
124         when(messageRunnerRegistryHelper.getMessageRunner(TEST_MESSAGE_RUNNER_KEY)).thenReturn(Optional.of(testMessageRunner));
125 
126         final JacksonNestedMessageSerializer nestedMessageSerializer = new JacksonNestedMessageSerializer();
127         schedulerMessageRunnerService = new SchedulerMessageRunnerService(
128                 caesiumSchedulerService,
129                 messageRunnerRegistryHelper,
130                 new DefaultMessageInformationService("queueUrl",
131                         tenantContextProvider,
132                         nestedMessageSerializer,
133                         tenantDataIdSetter),
134                 nestedMessageSerializer,
135                 DELIVERY_INTERVAL_MILLIS,
136                 MAX_DELIVERY_COUNT);
137 
138         schedulerMessageRunnerService.init();
139         caesiumSchedulerService.start();
140     }
141 
142     @After
143     public void tearDown() throws Exception {
144         caesiumSchedulerService.shutdown();
145         schedulerMessageRunnerService.shutdown();
146     }
147 
148     @Test
149     public void messageWithPayloadIsProcessedAtLeastOnce() throws Exception {
150         schedulerMessageRunnerService.addMessage(Message.create(TEST_MESSAGE_RUNNER_KEY, "payload"));
151 
152         verify(testMessageRunner, timeout(VERIFY_TIMEOUT_MILLIS).times(1)).processMessage(messageContextArgumentCaptor.capture());
153         assertThat(messageContextArgumentCaptor.getValue().getPayload(), is(Optional.of("payload")));
154     }
155 
156     @Test
157     public void messageWithNullPayloadIsProcessedAtLeastOnce() throws Exception {
158         final Message message = Message.create(TEST_MESSAGE_RUNNER_KEY, null);
159 
160         schedulerMessageRunnerService.addMessage(message);
161 
162         verify(testMessageRunner, timeout(VERIFY_TIMEOUT_MILLIS).times(1)).processMessage(messageContextArgumentCaptor.capture());
163         assertThat(messageContextArgumentCaptor.getValue().getPayload(), is(Optional.empty()));
164     }
165 
166     @Test
167     public void messageAutoAcknowledgedAndNotRedeliveredAfterSuccessfulProcessing() throws Exception {
168         schedulerMessageRunnerService.addMessage(Message.create(TEST_MESSAGE_RUNNER_KEY, "payload"));
169 
170         verify(testMessageRunner, after(VERIFY_TIMEOUT_MILLIS * 2).times(1)).processMessage(any(MessageContext.class));
171     }
172 
173     @Test(expected = MessageRunnerNotRegisteredException.class)
174     public void failFastIfNoMessageRunnerRegisteredForMessage() throws Exception {
175         final MessageRunnerKey fooMessageRunner = MessageRunnerKey.of("fooMessageRunner");
176         final Message message = Message.create(fooMessageRunner, "payload");
177         when(messageRunnerRegistryHelper.getMessageRunner(fooMessageRunner)).thenReturn(Optional.empty());
178 
179         schedulerMessageRunnerService.addMessage(message);
180     }
181 
182     @Test
183     public void problematicMessageDeliveredMaxDeliveryCountTimes() throws Exception {
184         doThrow(Exception.class).when(testMessageRunner).processMessage(any(MessageContext.class));
185 
186         schedulerMessageRunnerService.addMessage(Message.create(TEST_MESSAGE_RUNNER_KEY, "payload"));
187 
188         final int waitMillis = DELIVERY_INTERVAL_MILLIS * MAX_DELIVERY_COUNT + 100; // we expect all deliveries to have been made after waiting waitMillis
189         verify(testMessageRunner, after(waitMillis).times(MAX_DELIVERY_COUNT)).processMessage(messageContextArgumentCaptor.capture());
190         final MessageContext messageContext = messageContextArgumentCaptor.getValue();
191         String messageId = messageContext.getMessageId().orElse(null);
192 
193         // can use run details like a dead letter queue (that we can query for messages that failed processing for all deliveries)
194         final RunDetails runDetails = runDetailsDao.getLastRunForJob(JobId.of(messageId));
195         assertThat(runDetails.getRunOutcome(), is(RunOutcome.ABORTED));
196         assertThat(runDetails.getMessage(), containsString(messageContext.getMessageId().orElse(null)));
197         assertThat(runDetails.getMessage(), containsString(messageContext.getPayload().orElse(null)));
198     }
199 
200     @Test
201     public void addMultipleMessagesSerially() throws Exception {
202         int numberOfMessages = 20;
203         Supplier<IntStream> range = () -> IntStream.range(0, numberOfMessages);
204 
205         range.get().forEach(i ->
206             schedulerMessageRunnerService.addMessage(Message.create(TEST_MESSAGE_RUNNER_KEY, String.valueOf(i)))
207         );
208 
209         String[] expectedPayloads = range.get().mapToObj(String::valueOf).toArray(String[]::new);
210         verify(testMessageRunner, timeout(VERIFY_TIMEOUT_MILLIS).times(numberOfMessages)).processMessage(messageContextArgumentCaptor.capture());
211         assertThat(toPayloadValues(messageContextArgumentCaptor.getAllValues()), containsInAnyOrder(expectedPayloads));
212     }
213 
214     @Test
215     public void addMultipleMessagesConcurrently() throws Exception {
216         int numberOfMessages = 20;
217         Supplier<IntStream> range = () -> IntStream.range(0, numberOfMessages);
218 
219         ExecutorService pool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
220         try {
221             range.get().forEach(i ->
222                 pool.execute(() ->
223                     schedulerMessageRunnerService.addMessage(Message.create(TEST_MESSAGE_RUNNER_KEY, String.valueOf(i)))
224                 )
225             );
226 
227             String[] expectedPayloads = range.get().mapToObj(String::valueOf).toArray(String[]::new);
228             verify(testMessageRunner, timeout(VERIFY_TIMEOUT_MILLIS).times(numberOfMessages)).processMessage(messageContextArgumentCaptor.capture());
229             assertThat(toPayloadValues(messageContextArgumentCaptor.getAllValues()), containsInAnyOrder(expectedPayloads));
230         } finally {
231             pool.shutdown();
232             pool.awaitTermination(1, TimeUnit.SECONDS);
233         }
234     }
235 
236     @Test
237     public void jobThatRunsLongerThanJobIntervalAllowedToCompleteOnce() throws Exception {
238         final int sleepMillis = DELIVERY_INTERVAL_MILLIS * 3;
239         final MessageRunnerKey messageRunnerKey = MessageRunnerKey.of("longRunningMessageRunnerKey");
240         final MessageRunner messageRunner = spy(new DelegatingMessageRunner(messageContext ->
241             Uninterruptibles.sleepUninterruptibly(sleepMillis, TimeUnit.MILLISECONDS)
242         ));
243         when(messageRunnerRegistryHelper.getMessageRunner(messageRunnerKey)).thenReturn(Optional.of(messageRunner));
244 
245         schedulerMessageRunnerService.addMessage(Message.create(messageRunnerKey, "payload"));
246 
247         verify(messageRunner, after(sleepMillis * 2).times(1)).processMessage(any(MessageContext.class));
248     }
249 
250     @Test
251     public void problematicJobThatRunsLongerThanJobIntervalAllowedToFailMaxDeliveryCountTimes() throws Exception {
252         final int sleepFor = DELIVERY_INTERVAL_MILLIS * 3;
253         final MessageRunnerKey messageRunnerKey = MessageRunnerKey.of("longRunningMessageRunnerKey");
254         final MessageRunner messageRunner = spy(new DelegatingMessageRunner(messageContext -> {
255             Uninterruptibles.sleepUninterruptibly(sleepFor, TimeUnit.MILLISECONDS);
256             throw new RuntimeException();
257         }));
258         when(messageRunnerRegistryHelper.getMessageRunner(messageRunnerKey)).thenReturn(Optional.of(messageRunner));
259 
260         schedulerMessageRunnerService.addMessage(Message.create(messageRunnerKey, "payload"));
261 
262         verify(messageRunner, after(sleepFor * 2 * 3).times(MAX_DELIVERY_COUNT)).processMessage(any(MessageContext.class));
263     }
264 
265     @Test
266     public void messageAckedEarlyDeliveredExactlyOnce() throws Exception {
267         final MessageRunnerKey messageRunnerKey = MessageRunnerKey.of("earlyAckMessageRunnerKey");
268         final MessageRunner messageRunner = spy(new DelegatingMessageRunner(messageContext -> {
269             messageContext.acknowledge();
270             throw new RuntimeException();
271         }));
272         when(messageRunnerRegistryHelper.getMessageRunner(messageRunnerKey)).thenReturn(Optional.of(messageRunner));
273 
274         schedulerMessageRunnerService.addMessage(Message.create(messageRunnerKey, "payload"));
275 
276         verify(messageRunner, after(DELIVERY_INTERVAL_MILLIS * 3).times(1)).processMessage(any(MessageContext.class));
277     }
278 
279     @Test
280     public void abortJobIfMissingRequiredParameter() throws Exception {
281         final JobConfig jobConfig = JobConfig.forJobRunnerKey(AMQ_JOB_RUNNER_KEY)
282                 .withRunMode(RunMode.RUN_ONCE_PER_CLUSTER)
283                 .withSchedule(Schedule.forInterval(DELIVERY_INTERVAL_MILLIS, null)); // job config specifies no parameters
284 
285         final JobId jobId = caesiumSchedulerService.scheduleJobWithGeneratedId(jobConfig);
286 
287         verify(caesiumSchedulerService, timeout(1000).times(1)).unscheduleJob(jobId); // invalid jobs should be unscheduled
288     }
289 
290     @Test
291     public void abortJobIfDeliveryCountParameterNotAnInt() throws Exception {
292         final JobConfig jobConfig = JobConfig.forJobRunnerKey(AMQ_JOB_RUNNER_KEY)
293                 .withRunMode(RunMode.RUN_ONCE_PER_CLUSTER)
294                 .withSchedule(Schedule.forInterval(DELIVERY_INTERVAL_MILLIS, null))
295                 .withParameters(Collections.singletonMap(DELIVERY_COUNT_PARAMETER, "1"));
296 
297         final JobId jobId = caesiumSchedulerService.scheduleJobWithGeneratedId(jobConfig);
298 
299         verify(caesiumSchedulerService, timeout(1000).times(1)).unscheduleJob(jobId); // invalid jobs should be unscheduled
300     }
301 
302     @Test
303     public void messageNotDeliveredAfterShutdown() throws Exception {
304         caesiumSchedulerService.standby();
305         schedulerMessageRunnerService.addMessage(Message.create(TEST_MESSAGE_RUNNER_KEY, "payload"));
306         schedulerMessageRunnerService.shutdown();
307         caesiumSchedulerService.start();
308 
309         verify(testMessageRunner, after(DELIVERY_INTERVAL_MILLIS * 2).times(0)).processMessage(any(MessageContext.class));
310     }
311 
312     @Test
313     public void messageProducedDuringSchedulerStandbyDeliveredWhenSchedulerStarted() throws Exception {
314         caesiumSchedulerService.standby();
315         schedulerMessageRunnerService.addMessage(Message.create(TEST_MESSAGE_RUNNER_KEY, "payload"));
316         caesiumSchedulerService.start();
317 
318         verify(testMessageRunner, after(DELIVERY_INTERVAL_MILLIS * 2).times(1)).processMessage(any(MessageContext.class));
319     }
320 
321     @Test(expected = MessageRunnerServiceException.class)
322     public void messageProducedAfterSchedulerStopped() throws Exception {
323         schedulerMessageRunnerService.shutdown();
324         schedulerMessageRunnerService.addMessage(Message.create(TEST_MESSAGE_RUNNER_KEY, "payload"));
325     }
326 
327     @Test
328     public void messageRunnerResponsiveToCancellation() throws Exception {
329         final CountDownLatch enterLatch = new CountDownLatch(1);
330         final CountDownLatch exitLatch = new CountDownLatch(1);
331         final MessageRunnerKey messageRunnerKey = MessageRunnerKey.of("longRunningMessageRunnerKey");
332         final int sleepMillis = 100;
333         final MessageRunner messageRunner = messageContext -> {
334             enterLatch.countDown();
335             while (!messageContext.isCancellationRequested()) {
336                 Uninterruptibles.sleepUninterruptibly(sleepMillis, TimeUnit.MILLISECONDS);
337             }
338             exitLatch.countDown();
339         };
340         when(messageRunnerRegistryHelper.getMessageRunner(messageRunnerKey)).thenReturn(Optional.of(messageRunner));
341         schedulerMessageRunnerService.addMessage(Message.create(messageRunnerKey, "payload"));
342         assertThat("MessageRunner did not start. Cannot assert cancellation on a MessageRunner that did not start",
343                 enterLatch.await(1, TimeUnit.SECONDS), Is.is(true));
344 
345         caesiumSchedulerService.shutdown(); // shutdown scheduler and in doing so cancel any running jobs
346 
347         assertThat("MessageRunner did not respond to cancellation", exitLatch.await(sleepMillis * 2, TimeUnit.MILLISECONDS), is(true));
348     }
349 
350     @Test
351     public void cancelAutoAcknowledgement() throws Exception {
352         final MessageRunnerKey messageRunnerKey = MessageRunnerKey.of("cancelAckMessageRunnerKey");
353         final MessageRunner messageRunner = spy(new DelegatingMessageRunner(MessageContext::cancelAutoAcknowledgementOfMessage));
354         when(messageRunnerRegistryHelper.getMessageRunner(messageRunnerKey)).thenReturn(Optional.of(messageRunner));
355 
356         schedulerMessageRunnerService.addMessage(Message.create(messageRunnerKey, "payload"));
357 
358         verify(messageRunner, timeout(DELIVERY_INTERVAL_MILLIS + 50).times(1)).processMessage(messageContextArgumentCaptor.capture());
359         final JobId jobId = JobId.of(messageContextArgumentCaptor.getValue().getMessageId().get());
360         verify(caesiumSchedulerService, never()).unscheduleJob(jobId);
361     }
362 
363     @Test
364     public void messageNotProcessedIfWorkContextNotAvailable() throws Exception {
365         runDetailsDao = new MemoryRunDetailsDao();
366         CaesiumSchedulerService caesiumSchedulerService = new CaesiumSchedulerService(config, runDetailsDao, new MemoryClusteredJobDao(), tenantContextProvider);
367 
368         SchedulerMessageRunnerService schedulerMessageRunnerService = new SchedulerMessageRunnerService(caesiumSchedulerService,
369                 messageRunnerRegistryHelper, new DefaultMessageInformationService(
370                 "queueUrl",
371                 tenantContextProvider,
372                 new JacksonNestedMessageSerializer(),
373                 tenantDataIdSetter),
374                 new JacksonNestedMessageSerializer(),
375                 DELIVERY_INTERVAL_MILLIS,
376                 MAX_DELIVERY_COUNT);
377 
378         try {
379             schedulerMessageRunnerService.init();
380             caesiumSchedulerService.start();
381 
382             schedulerMessageRunnerService.addMessage(Message.create(TEST_MESSAGE_RUNNER_KEY, "payload"));
383 
384             verify(testMessageRunner, after(VERIFY_TIMEOUT_MILLIS).never()).processMessage(any(MessageContext.class));
385         } finally {
386             caesiumSchedulerService.shutdown();
387             schedulerMessageRunnerService.shutdown();
388         }
389     }
390 
391     private List<String> toPayloadValues(List<MessageContext> messageContexts) {
392         return messageContexts.stream().map(messageContext -> messageContext.getPayload().orElse(null)).collect(Collectors.toList());
393     }
394 
395     private static class DelegatingMessageRunner implements MessageRunner {
396         private final MessageRunner delegate;
397 
398         DelegatingMessageRunner(MessageRunner delegate) {
399             this.delegate = delegate;
400         }
401 
402         @Override
403         public void processMessage(MessageContext context) {
404             delegate.processMessage(context);
405         }
406     }
407 }