View Javadoc

1   package com.atlassian.messagequeue.internal.scheduler;
2   
3   import com.atlassian.messagequeue.Message;
4   import com.atlassian.messagequeue.MessageRunnerKey;
5   import com.atlassian.messagequeue.MessageRunnerNotRegisteredException;
6   import com.atlassian.messagequeue.MessageRunnerServiceException;
7   import com.atlassian.messagequeue.internal.core.DefaultMessageInformationService;
8   import com.atlassian.messagequeue.internal.core.JacksonNestedMessageSerializer;
9   import com.atlassian.messagequeue.internal.core.MessageRunnerRegistryHelper;
10  import com.atlassian.messagequeue.registry.MessageContext;
11  import com.atlassian.messagequeue.registry.MessageRunner;
12  import com.atlassian.scheduler.caesium.impl.CaesiumSchedulerService;
13  import com.atlassian.scheduler.caesium.impl.MemoryClusteredJobDao;
14  import com.atlassian.scheduler.caesium.spi.CaesiumSchedulerConfiguration;
15  import com.atlassian.scheduler.config.JobConfig;
16  import com.atlassian.scheduler.config.JobId;
17  import com.atlassian.scheduler.config.RunMode;
18  import com.atlassian.scheduler.config.Schedule;
19  import com.atlassian.scheduler.core.impl.MemoryRunDetailsDao;
20  import com.atlassian.scheduler.status.RunDetails;
21  import com.atlassian.scheduler.status.RunOutcome;
22  import com.atlassian.tenant.api.TenantContext;
23  import com.atlassian.tenant.api.TenantContextProvider;
24  import com.atlassian.tenant.impl.TenantIdSetter;
25  import com.atlassian.workcontext.api.WorkContextDoorway;
26  import com.google.common.util.concurrent.Uninterruptibles;
27  import org.hamcrest.core.Is;
28  import org.junit.After;
29  import org.junit.Before;
30  import org.junit.Rule;
31  import org.junit.Test;
32  import org.mockito.ArgumentCaptor;
33  import org.mockito.Captor;
34  import org.mockito.Mock;
35  import org.mockito.junit.MockitoJUnit;
36  import org.mockito.junit.MockitoRule;
37  
38  import java.util.Collections;
39  import java.util.List;
40  import java.util.Optional;
41  import java.util.concurrent.CountDownLatch;
42  import java.util.concurrent.ExecutorService;
43  import java.util.concurrent.Executors;
44  import java.util.concurrent.TimeUnit;
45  import java.util.function.Supplier;
46  import java.util.stream.Collectors;
47  import java.util.stream.IntStream;
48  
49  import static com.atlassian.messagequeue.internal.scheduler.SchedulerMessageRunnerService.AMQ_JOB_RUNNER_KEY;
50  import static com.atlassian.messagequeue.internal.scheduler.SchedulerMessageRunnerService.DELIVERY_COUNT_PARAMETER;
51  import static org.hamcrest.CoreMatchers.containsString;
52  import static org.hamcrest.CoreMatchers.is;
53  import static org.hamcrest.MatcherAssert.assertThat;
54  import static org.hamcrest.Matchers.containsInAnyOrder;
55  import static org.mockito.Matchers.any;
56  import static org.mockito.Mockito.after;
57  import static org.mockito.Mockito.doThrow;
58  import static org.mockito.Mockito.never;
59  import static org.mockito.Mockito.spy;
60  import static org.mockito.Mockito.timeout;
61  import static org.mockito.Mockito.verify;
62  import static org.mockito.Mockito.when;
63  
64  public class SchedulerMessageRunnerServiceTest {
65      private static final MessageRunnerKey TEST_MESSAGE_RUNNER_KEY = MessageRunnerKey.of("testMessageRunner");
66      private static final String TENANT_ID = "tenant-123";
67      private static final int MAX_DELIVERY_COUNT = 3;
68      private static final int VERIFY_TIMEOUT_MILLIS = 500;
69      private static final int DELIVERY_INTERVAL_MILLIS = 100;
70  
71      @Rule
72      public MockitoRule mockitoRule = MockitoJUnit.rule();
73      @Mock
74      private CaesiumSchedulerConfiguration config;
75      @Mock
76      private MessageRunner testMessageRunner;
77      @Mock
78      private MessageRunnerRegistryHelper messageRunnerRegistryHelper;
79      @Mock
80      private TenantIdSetter tenantIdSetter;
81      @Mock
82      private TenantContextProvider tenantContextProvider;
83      @Captor
84      private ArgumentCaptor<MessageContext> messageContextArgumentCaptor;
85  
86      private SchedulerMessageRunnerService schedulerMessageRunnerService;
87      private CaesiumSchedulerService caesiumSchedulerService;
88      private MemoryRunDetailsDao runDetailsDao;
89  
90      private static final WorkContextDoorway DOORWAY = new WorkContextDoorway();
91  
92      @Before
93      public void setUp() throws Exception {
94          when(config.useFineGrainedSchedules()).thenReturn(true);
95          runDetailsDao = new MemoryRunDetailsDao();
96          caesiumSchedulerService = spy(new CaesiumSchedulerService(config, runDetailsDao,
97                  new MemoryClusteredJobDao()) {
98              @Override
99              public void preJob() {
100                 super.preJob();
101                 DOORWAY.open();
102                 tenantIdSetter.setTenantId(TENANT_ID);
103             }
104 
105             @Override
106             public void postJob() {
107                 DOORWAY.close();
108                 super.postJob();
109             }
110         });
111 
112         when(messageRunnerRegistryHelper.getMessageRunner(TEST_MESSAGE_RUNNER_KEY)).thenReturn(Optional.of(testMessageRunner));
113         when(tenantContextProvider.getTenantContext()).thenReturn(new TenantContext.Builder()
114                 .tenantId(TENANT_ID)
115                 .jdbcUrl("jdbcUrl")
116                 .jdbcUsername("username")
117                 .jdbcPassword("password").build());
118 
119         final JacksonNestedMessageSerializer nestedMessageSerializer = new JacksonNestedMessageSerializer();
120         schedulerMessageRunnerService = new SchedulerMessageRunnerService(caesiumSchedulerService,
121                 messageRunnerRegistryHelper, new DefaultMessageInformationService("queueUrl", tenantContextProvider, nestedMessageSerializer), nestedMessageSerializer,
122                 DELIVERY_INTERVAL_MILLIS, MAX_DELIVERY_COUNT);
123 
124         schedulerMessageRunnerService.init();
125         caesiumSchedulerService.start();
126     }
127 
128     @After
129     public void tearDown() throws Exception {
130         caesiumSchedulerService.shutdown();
131         schedulerMessageRunnerService.shutdown();
132     }
133 
134     @Test
135     public void messageWithPayloadIsProcessedAtLeastOnce() throws Exception {
136         schedulerMessageRunnerService.addMessage(Message.create(TEST_MESSAGE_RUNNER_KEY, "payload"));
137 
138         verify(testMessageRunner, timeout(VERIFY_TIMEOUT_MILLIS).times(1)).processMessage(messageContextArgumentCaptor.capture());
139         assertThat(messageContextArgumentCaptor.getValue().getPayload(), is(Optional.of("payload")));
140     }
141 
142     @Test
143     public void messageWithNullPayloadIsProcessedAtLeastOnce() throws Exception {
144         final Message message = Message.create(TEST_MESSAGE_RUNNER_KEY, null);
145 
146         schedulerMessageRunnerService.addMessage(message);
147 
148         verify(testMessageRunner, timeout(VERIFY_TIMEOUT_MILLIS).times(1)).processMessage(messageContextArgumentCaptor.capture());
149         assertThat(messageContextArgumentCaptor.getValue().getPayload(), is(Optional.empty()));
150     }
151 
152     @Test
153     public void messageAutoAcknowledgedAndNotRedeliveredAfterSuccessfulProcessing() throws Exception {
154         schedulerMessageRunnerService.addMessage(Message.create(TEST_MESSAGE_RUNNER_KEY, "payload"));
155 
156         verify(testMessageRunner, after(VERIFY_TIMEOUT_MILLIS * 2).times(1)).processMessage(any(MessageContext.class));
157     }
158 
159     @Test(expected = MessageRunnerNotRegisteredException.class)
160     public void failFastIfNoMessageRunnerRegisteredForMessage() throws Exception {
161         final MessageRunnerKey fooMessageRunner = MessageRunnerKey.of("fooMessageRunner");
162         final Message message = Message.create(fooMessageRunner, "payload");
163         when(messageRunnerRegistryHelper.getMessageRunner(fooMessageRunner)).thenReturn(Optional.empty());
164 
165         schedulerMessageRunnerService.addMessage(message);
166     }
167 
168     @Test
169     public void problematicMessageDeliveredMaxDeliveryCountTimes() throws Exception {
170         doThrow(Exception.class).when(testMessageRunner).processMessage(any(MessageContext.class));
171 
172         schedulerMessageRunnerService.addMessage(Message.create(TEST_MESSAGE_RUNNER_KEY, "payload"));
173 
174         final int waitMillis = DELIVERY_INTERVAL_MILLIS * MAX_DELIVERY_COUNT + 100; // we expect all deliveries to have been made after waiting waitMillis
175         final int expectedDeliveryCount = MAX_DELIVERY_COUNT;
176         verify(testMessageRunner, after(waitMillis).times(expectedDeliveryCount)).processMessage(messageContextArgumentCaptor.capture());
177         final MessageContext messageContext = messageContextArgumentCaptor.getValue();
178         String messageId = messageContext.getMessageId().orElse(null);
179 
180         // can use run details like a dead letter queue (that we can query for messages that failed processing for all deliveries)
181         final RunDetails runDetails = runDetailsDao.getLastRunForJob(JobId.of(messageId));
182         assertThat(runDetails.getRunOutcome(), is(RunOutcome.ABORTED));
183         assertThat(runDetails.getMessage(), containsString(messageContext.getMessageId().orElse(null)));
184         assertThat(runDetails.getMessage(), containsString(messageContext.getPayload().orElse(null)));
185     }
186 
187     @Test
188     public void addMultipleMessagesSerially() throws Exception {
189         int numberOfMessages = 20;
190         Supplier<IntStream> range = () -> IntStream.range(0, numberOfMessages);
191 
192         range.get().forEach(i -> {
193             schedulerMessageRunnerService.addMessage(Message.create(TEST_MESSAGE_RUNNER_KEY, String.valueOf(i)));
194         });
195 
196         String[] expectedPayloads = range.get().mapToObj(String::valueOf).toArray(String[]::new);
197         verify(testMessageRunner, timeout(VERIFY_TIMEOUT_MILLIS).times(numberOfMessages)).processMessage(messageContextArgumentCaptor.capture());
198         assertThat(toPayloadValues(messageContextArgumentCaptor.getAllValues()), containsInAnyOrder(expectedPayloads));
199     }
200 
201     @Test
202     public void addMultipleMessagesConcurrently() throws Exception {
203         int numberOfMessages = 20;
204         Supplier<IntStream> range = () -> IntStream.range(0, numberOfMessages);
205 
206         ExecutorService pool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
207         try {
208             range.get().forEach(i -> {
209                 pool.execute(() -> {
210                     schedulerMessageRunnerService.addMessage(Message.create(TEST_MESSAGE_RUNNER_KEY, String.valueOf(i)));
211                 });
212             });
213 
214             String[] expectedPayloads = range.get().mapToObj(String::valueOf).toArray(String[]::new);
215             verify(testMessageRunner, timeout(VERIFY_TIMEOUT_MILLIS).times(numberOfMessages)).processMessage(messageContextArgumentCaptor.capture());
216             assertThat(toPayloadValues(messageContextArgumentCaptor.getAllValues()), containsInAnyOrder(expectedPayloads));
217         } finally {
218             pool.shutdown();
219             pool.awaitTermination(1, TimeUnit.SECONDS);
220         }
221     }
222 
223     @Test
224     public void jobThatRunsLongerThanJobIntervalAllowedToCompleteOnce() throws Exception {
225         final int sleepMillis = DELIVERY_INTERVAL_MILLIS * 3;
226         final MessageRunnerKey messageRunnerKey = MessageRunnerKey.of("longRunningMessageRunnerKey");
227         final MessageRunner messageRunner = spy(new DelegatingMessageRunner(messageContext -> {
228             Uninterruptibles.sleepUninterruptibly(sleepMillis, TimeUnit.MILLISECONDS);
229         }));
230         when(messageRunnerRegistryHelper.getMessageRunner(messageRunnerKey)).thenReturn(Optional.of(messageRunner));
231 
232         schedulerMessageRunnerService.addMessage(Message.create(messageRunnerKey, "payload"));
233 
234         verify(messageRunner, after(sleepMillis * 2).times(1)).processMessage(any(MessageContext.class));
235     }
236 
237     @Test
238     public void problematicJobThatRunsLongerThanJobIntervalAllowedToFailMaxDeliveryCountTimes() throws Exception {
239         final int sleepFor = DELIVERY_INTERVAL_MILLIS * 3;
240         final MessageRunnerKey messageRunnerKey = MessageRunnerKey.of("longRunningMessageRunnerKey");
241         final MessageRunner messageRunner = spy(new DelegatingMessageRunner(messageContext -> {
242             Uninterruptibles.sleepUninterruptibly(sleepFor, TimeUnit.MILLISECONDS);
243             throw new RuntimeException();
244         }));
245         when(messageRunnerRegistryHelper.getMessageRunner(messageRunnerKey)).thenReturn(Optional.of(messageRunner));
246 
247         schedulerMessageRunnerService.addMessage(Message.create(messageRunnerKey, "payload"));
248 
249         verify(messageRunner, after(sleepFor * 2 * 3).times(MAX_DELIVERY_COUNT)).processMessage(any(MessageContext.class));
250     }
251 
252     @Test
253     public void messageAckedEarlyDeliveredExactlyOnce() throws Exception {
254         final MessageRunnerKey messageRunnerKey = MessageRunnerKey.of("earlyAckMessageRunnerKey");
255         final MessageRunner messageRunner = spy(new DelegatingMessageRunner(messageContext -> {
256             messageContext.acknowledge();
257             throw new RuntimeException();
258         }));
259         when(messageRunnerRegistryHelper.getMessageRunner(messageRunnerKey)).thenReturn(Optional.of(messageRunner));
260 
261         schedulerMessageRunnerService.addMessage(Message.create(messageRunnerKey, "payload"));
262 
263         verify(messageRunner, after(DELIVERY_INTERVAL_MILLIS * 3).times(1)).processMessage(any(MessageContext.class));
264     }
265 
266     @Test
267     public void abortJobIfMissingRequiredParameter() throws Exception {
268         final JobConfig jobConfig = JobConfig.forJobRunnerKey(AMQ_JOB_RUNNER_KEY)
269                 .withRunMode(RunMode.RUN_ONCE_PER_CLUSTER)
270                 .withSchedule(Schedule.forInterval(DELIVERY_INTERVAL_MILLIS, null)); // job config specifies no parameters
271 
272         final JobId jobId = caesiumSchedulerService.scheduleJobWithGeneratedId(jobConfig);
273 
274         verify(caesiumSchedulerService, timeout(1000).times(1)).unscheduleJob(jobId); // invalid jobs should be unscheduled
275     }
276 
277     @Test
278     public void abortJobIfDeliveryCountParameterNotAnInt() throws Exception {
279         final JobConfig jobConfig = JobConfig.forJobRunnerKey(AMQ_JOB_RUNNER_KEY)
280                 .withRunMode(RunMode.RUN_ONCE_PER_CLUSTER)
281                 .withSchedule(Schedule.forInterval(DELIVERY_INTERVAL_MILLIS, null))
282                 .withParameters(Collections.singletonMap(DELIVERY_COUNT_PARAMETER, "1"));
283 
284         final JobId jobId = caesiumSchedulerService.scheduleJobWithGeneratedId(jobConfig);
285 
286         verify(caesiumSchedulerService, timeout(1000).times(1)).unscheduleJob(jobId); // invalid jobs should be unscheduled
287     }
288 
289     @Test
290     public void messageNotDeliveredAfterShutdown() throws Exception {
291         caesiumSchedulerService.standby();
292         schedulerMessageRunnerService.addMessage(Message.create(TEST_MESSAGE_RUNNER_KEY, "payload"));
293         schedulerMessageRunnerService.shutdown();
294         caesiumSchedulerService.start();
295 
296         verify(testMessageRunner, after(DELIVERY_INTERVAL_MILLIS * 2).times(0)).processMessage(any(MessageContext.class));
297     }
298 
299     @Test
300     public void messageProducedDuringSchedulerStandbyDeliveredWhenSchedulerStarted() throws Exception {
301         caesiumSchedulerService.standby();
302         schedulerMessageRunnerService.addMessage(Message.create(TEST_MESSAGE_RUNNER_KEY, "payload"));
303         caesiumSchedulerService.start();
304 
305         verify(testMessageRunner, after(DELIVERY_INTERVAL_MILLIS * 2).times(1)).processMessage(any(MessageContext.class));
306     }
307 
308     @Test(expected = MessageRunnerServiceException.class)
309     public void messageProducedAfterSchedulerStopped() throws Exception {
310         schedulerMessageRunnerService.shutdown();
311         schedulerMessageRunnerService.addMessage(Message.create(TEST_MESSAGE_RUNNER_KEY, "payload"));
312     }
313 
314     @Test
315     public void messageRunnerResponsiveToCancellation() throws Exception {
316         final CountDownLatch enterLatch = new CountDownLatch(1);
317         final CountDownLatch exitLatch = new CountDownLatch(1);
318         final MessageRunnerKey messageRunnerKey = MessageRunnerKey.of("longRunningMessageRunnerKey");
319         final int sleepMillis = 100;
320         final MessageRunner messageRunner = messageContext -> {
321             enterLatch.countDown();
322             while (!messageContext.isCancellationRequested()) {
323                 Uninterruptibles.sleepUninterruptibly(sleepMillis, TimeUnit.MILLISECONDS);
324             }
325             exitLatch.countDown();
326         };
327         when(messageRunnerRegistryHelper.getMessageRunner(messageRunnerKey)).thenReturn(Optional.of(messageRunner));
328         schedulerMessageRunnerService.addMessage(Message.create(messageRunnerKey, "payload"));
329         assertThat("MessageRunner did not start. Cannot assert cancellation on a MessageRunner that did not start",
330                 enterLatch.await(1, TimeUnit.SECONDS), Is.is(true));
331 
332         caesiumSchedulerService.shutdown(); // shutdown scheduler and in doing so cancel any running jobs
333 
334         assertThat("MessageRunner did not respond to cancellation", exitLatch.await(sleepMillis * 2, TimeUnit.MILLISECONDS), is(true));
335     }
336 
337     @Test
338     public void cancelAutoAcknowledgement() throws Exception {
339         final MessageRunnerKey messageRunnerKey = MessageRunnerKey.of("cancelAckMessageRunnerKey");
340         final MessageRunner messageRunner = spy(new DelegatingMessageRunner(MessageContext::cancelAutoAcknowledgementOfMessage));
341         when(messageRunnerRegistryHelper.getMessageRunner(messageRunnerKey)).thenReturn(Optional.of(messageRunner));
342 
343         schedulerMessageRunnerService.addMessage(Message.create(messageRunnerKey, "payload"));
344 
345         verify(messageRunner, timeout(DELIVERY_INTERVAL_MILLIS + 50).times(1)).processMessage(messageContextArgumentCaptor.capture());
346         final JobId jobId = JobId.of(messageContextArgumentCaptor.getValue().getMessageId().get());
347         verify(caesiumSchedulerService, never()).unscheduleJob(jobId);
348     }
349 
350     @Test
351     public void messageNotProcessedIfWorkContextNotAvailable() throws Exception {
352         runDetailsDao = new MemoryRunDetailsDao();
353         CaesiumSchedulerService caesiumSchedulerService = new CaesiumSchedulerService(config, runDetailsDao,
354                 new MemoryClusteredJobDao());
355 
356         SchedulerMessageRunnerService schedulerMessageRunnerService = new SchedulerMessageRunnerService(caesiumSchedulerService,
357                 messageRunnerRegistryHelper, new DefaultMessageInformationService("queueUrl", tenantContextProvider,
358                 new JacksonNestedMessageSerializer()), new JacksonNestedMessageSerializer(),
359                 DELIVERY_INTERVAL_MILLIS, MAX_DELIVERY_COUNT);
360 
361         try {
362             schedulerMessageRunnerService.init();
363             caesiumSchedulerService.start();
364 
365             schedulerMessageRunnerService.addMessage(Message.create(TEST_MESSAGE_RUNNER_KEY, "payload"));
366 
367             verify(testMessageRunner, after(VERIFY_TIMEOUT_MILLIS).never()).processMessage(any(MessageContext.class));
368         } finally {
369             caesiumSchedulerService.shutdown();
370             schedulerMessageRunnerService.shutdown();
371         }
372     }
373 
374     private List<String> toPayloadValues(List<MessageContext> messageContexts) {
375         return messageContexts.stream().map(messageContext -> messageContext.getPayload().orElse(null)).collect(Collectors.toList());
376     }
377 
378     private class DelegatingMessageRunner implements MessageRunner {
379         private final MessageRunner delegate;
380 
381         public DelegatingMessageRunner(MessageRunner delegate) {
382             this.delegate = delegate;
383         }
384 
385         @Override
386         public void processMessage(MessageContext context) {
387             delegate.processMessage(context);
388         }
389     }
390 }