View Javadoc

1   package com.atlassian.messagequeue.internal.scheduler;
2   
3   import com.atlassian.messagequeue.Message;
4   import com.atlassian.messagequeue.MessageInformationService;
5   import com.atlassian.messagequeue.MessageRunnerKey;
6   import com.atlassian.messagequeue.MessageRunnerNotRegisteredException;
7   import com.atlassian.messagequeue.MessageRunnerService;
8   import com.atlassian.messagequeue.MessageRunnerServiceException;
9   import com.atlassian.messagequeue.internal.core.MessageRunnerRegistryHelper;
10  import com.atlassian.messagequeue.internal.core.NestedMessage;
11  import com.atlassian.messagequeue.internal.core.NestedMessageConstants;
12  import com.atlassian.messagequeue.internal.core.NestedMessageSerializer;
13  import com.atlassian.messagequeue.registry.MessageContext;
14  import com.atlassian.messagequeue.registry.MessageRunner;
15  import com.atlassian.scheduler.JobRunner;
16  import com.atlassian.scheduler.JobRunnerRequest;
17  import com.atlassian.scheduler.JobRunnerResponse;
18  import com.atlassian.scheduler.SchedulerService;
19  import com.atlassian.scheduler.SchedulerServiceException;
20  import com.atlassian.scheduler.config.JobConfig;
21  import com.atlassian.scheduler.config.JobId;
22  import com.atlassian.scheduler.config.JobRunnerKey;
23  import com.atlassian.scheduler.config.RunMode;
24  import com.atlassian.scheduler.config.Schedule;
25  import com.atlassian.workcontext.api.WorkContextAvailable;
26  import org.slf4j.Logger;
27  import org.slf4j.LoggerFactory;
28  
29  import javax.annotation.Nullable;
30  import javax.annotation.PostConstruct;
31  import javax.annotation.PreDestroy;
32  
33  import java.io.Serializable;
34  import java.util.Date;
35  import java.util.HashMap;
36  import java.util.Map;
37  import java.util.Optional;
38  import java.util.UUID;
39  import java.util.concurrent.TimeUnit;
40  
41  import static java.util.Objects.requireNonNull;
42  
43  /**
44   * An implementation of {@code MessageRunnerService} that is backed by the {@link SchedulerService} that provides
45   * at-least-once delivery semantics.
46   *
47   * <p>When a {@code Message} is passed to {@link #addMessage(Message)}, a job is scheduled to run
48   * as-soon-as-possible to process this {@code Message}.
49   *
50   * <p>When the processing of a {@code Message} fails (i.e. {@link MessageRunner#processMessage(MessageContext)} throws an exception),
51   * the {@code Message} will be delivered up to a total of {@code maxDeliveryCount} times at intervals of {@code deliveryIntervalMillis}
52   * milliseconds until it is processed successfully. The values of these parameters can be specified when constructing an
53   * instance of this class.
54   *
55   * <p>The processing time of a {@code Message} can exceed {@code deliveryIntervalMillis}. When this happens, re-delivery
56   * of the {@code Message} is skipped. Another re-delivery will be attempted after {@code deliveryIntervalMillis} milliseconds.
57   * Therefore, only one {@code MessageRunner} will be processing a particular {@code Message} at any given time.
58   *
59   * <p>The processing time of a {@code Message} is not currently limited. However long processing times are discouraged.
60   *
61   * <p>If a {@code Message} fails to be processed successfully after {@code maxDeliveryCount} attempts,
62   * a {@link com.atlassian.scheduler.status.RunDetails} instance is recorded to note this occurence. A collection of
63   * {@link com.atlassian.scheduler.status.RunDetails} can be viewed as a dead letter queue.
64   *
65   * <p>When constructing an instance of this class with a {@link SchedulerService} implementation that supports
66   * persistence of jobs, all {@code Message}s passed to {@link #addMessage(Message)} will be persisted.
67   *
68   * <p>To properly initialise an instance of this service, a client must call {@link #init()} after
69   * constructing an instance (or else ensure the dependency injection framework will automatically invoke methods
70   * annotated with @PostConstruct).
71   *
72   * <p>To properly dispose of an instance of this service, a client must call {@link #shutdown()} (or else ensure the
73   * dependency injection framework will automatically invoke methods annotated with @PreDestroy).
74   */
75  public class SchedulerMessageRunnerService implements MessageRunnerService {
76      static final String DELIVERY_COUNT_PARAMETER = "deliveryCount";
77      static final JobRunnerKey AMQ_JOB_RUNNER_KEY = JobRunnerKey.of("amqJobRunner");
78  
79      private static final Logger log = LoggerFactory.getLogger(SchedulerMessageRunnerService.class);
80      private static final long DEFAULT_DELIVERY_INTERVAL_MILLIS = TimeUnit.SECONDS.toMillis(Integer.getInteger("amq.scheduler.delivery.interval.seconds", 30));
81      private static final int DEFAULT_MAX_DELIVERY_COUNT = Integer.getInteger("amq.scheduler.max.delivery.count", 3);
82      private static final String PAYLOAD_JOB_PARAMETER = "payload";
83  
84      private final SchedulerService schedulerService;
85      private final MessageRunnerRegistryHelper messageRunnerRegistryHelper;
86      private final NestedMessageSerializer nestedMessageSerializer;
87      private final MessageInformationService messageInformationService;
88      private final long deliveryIntervalMillis;
89      private final int maxDeliveryCount;
90      private volatile boolean shutdown = false;
91  
92      /**
93       * Constructs a new instance of SchedulerMessageRunnerService.
94       * @param schedulerService the scheduler service
95       * @param messageRunnerRegistryHelper the message runner registry helper
96       * @param messageInformationService message information service
97       * @param nestedMessageSerializer the nested message serializer
98       */
99      public SchedulerMessageRunnerService(SchedulerService schedulerService,
100                                          MessageRunnerRegistryHelper messageRunnerRegistryHelper,
101                                          MessageInformationService messageInformationService,
102                                          NestedMessageSerializer nestedMessageSerializer) {
103         this(schedulerService, messageRunnerRegistryHelper, messageInformationService, nestedMessageSerializer,
104                 DEFAULT_DELIVERY_INTERVAL_MILLIS, DEFAULT_MAX_DELIVERY_COUNT);
105     }
106 
107     /**
108      * Constructs a new instance of SchedulerMessageRunnerService.
109      * @param schedulerService the scheduler service
110      * @param messageRunnerRegistryHelper the message runner registry helper
111      * @param messageInformationService message information service
112      * @param nestedMessageSerializer nested message serializer
113      * @param deliveryIntervalMillis the time interval in milliseconds between successive deliveries of a message (must be greater than 0)
114      * @param maxDeliveryCount the maximum number of times to deliver a message if message processing fails and the message is not acknowledged (must be greater than 0)
115      */
116     public SchedulerMessageRunnerService(SchedulerService schedulerService,
117                                          MessageRunnerRegistryHelper messageRunnerRegistryHelper,
118                                          MessageInformationService messageInformationService, NestedMessageSerializer nestedMessageSerializer,
119                                          long deliveryIntervalMillis, int maxDeliveryCount) {
120         this.schedulerService = requireNonNull(schedulerService, "schedulerService");
121         this.messageRunnerRegistryHelper = requireNonNull(messageRunnerRegistryHelper, "messageRunnerRegistryHelper");
122         this.nestedMessageSerializer = requireNonNull(nestedMessageSerializer, "nestedMessageSerializer");
123         this.messageInformationService = requireNonNull(messageInformationService, "messageInformationService");
124 
125         if (deliveryIntervalMillis <= 0) {
126             throw new IllegalArgumentException("deliveryIntervalMillis must be greater than 0. Received: " + deliveryIntervalMillis);
127         }
128         this.deliveryIntervalMillis = deliveryIntervalMillis;
129 
130         if (maxDeliveryCount <= 0) {
131             throw new IllegalArgumentException("maxDeliveryCount must be greater than 0. Received: " + maxDeliveryCount);
132         }
133         this.maxDeliveryCount = maxDeliveryCount;
134 
135         log.info("Constructing {} (maxDeliveryCount: {}, deliveryIntervalMinutes: {})", this.getClass().getSimpleName(), maxDeliveryCount, TimeUnit.MILLISECONDS.toMinutes(deliveryIntervalMillis));
136     }
137 
138     /**
139      * Initialises this instance.
140      */
141     @PostConstruct
142     public void init() {
143         log.info("Registering AMQ JobRunner");
144         schedulerService.registerJobRunner(AMQ_JOB_RUNNER_KEY, new AmqJobRunner());
145     }
146 
147     private class AmqJobRunner implements JobRunner {
148         @Nullable
149         @Override
150         public JobRunnerResponse runJob(JobRunnerRequest jobRunnerRequest) {
151             final JobRunnerResponse validateResponse = validate(jobRunnerRequest);
152             if (validateResponse != null) {
153                 return validateResponse;
154             }
155 
156             final JobRunnerResponse incrementDeliveryCountResponse = incrementDeliveryCount(jobRunnerRequest);
157             if (incrementDeliveryCountResponse != null) {
158                 return incrementDeliveryCountResponse;
159             }
160 
161             final String payload = (String) jobRunnerRequest.getJobConfig().getParameters().get(PAYLOAD_JOB_PARAMETER);
162             final NestedMessage nestedMessage = nestedMessageSerializer.deserialize(payload);
163 
164             return processNestedMessage(nestedMessage, new SchedulerMessageContext(jobRunnerRequest, nestedMessage.getPayload(), schedulerService));
165         }
166 
167         JobRunnerResponse processNestedMessage(NestedMessage nestedMessage, MessageContext messageContext) {
168             final String messageId = messageContext.getMessageId().orElse("");
169             final String tenantId = nestedMessage.getAttribute(NestedMessageConstants.TENANT_ID_ATTRIBUTE_NAME);
170             if (tenantId == null) {
171                 final String errorMessageFormat = "Message received that does not contain required '%s' attribute. Message will be acknowledged (messageID: %s)";
172                 final String errorMessage = String.format(errorMessageFormat, NestedMessageConstants.TENANT_ID_ATTRIBUTE_NAME, messageId);
173                 log.error(errorMessage);
174                 messageContext.acknowledge();
175 
176                 return JobRunnerResponse.aborted(errorMessage);
177             }
178 
179             final String messageRunnerKey = nestedMessage.getAttribute(NestedMessageConstants.MESSAGE_RUNNER_KEY_ATTRIBUTE_NAME);
180             if (messageRunnerKey == null) {
181                 final String errorMessageFormat = "Message received that does not contain required '%s' attribute. Message will be acknowledged (messageID: %s)";
182                 final String errorMessage = String.format(errorMessageFormat, NestedMessageConstants.MESSAGE_RUNNER_KEY_ATTRIBUTE_NAME, messageId);
183                 log.error(errorMessage);
184                 messageContext.acknowledge();
185 
186                 return JobRunnerResponse.aborted(errorMessage);
187             }
188 
189             final Optional<MessageRunner> messageRunner = messageRunnerRegistryHelper.getMessageRunner(MessageRunnerKey.of(messageRunnerKey));
190             if (!messageRunner.isPresent()) {
191                 final String errorMessageFormat = "No MessageRunner found to process message (messageID: %s, messageRunnerKey: %s)";
192                 final String errorMessage = String.format(errorMessageFormat, NestedMessageConstants.MESSAGE_RUNNER_KEY_ATTRIBUTE_NAME, messageId);
193                 log.error(errorMessage);
194 
195                 return JobRunnerResponse.aborted(errorMessage);
196             }
197 
198             if (!WorkContextAvailable.isWorkContextAvailable()) {
199                 final String errorMessageFormat = "No WorkContext available to process message (messageID: %s, messageRunnerKey: %s)";
200                 final String errorMessage = String.format(errorMessageFormat, NestedMessageConstants.MESSAGE_RUNNER_KEY_ATTRIBUTE_NAME, messageId);
201                 log.error(errorMessage);
202 
203                 return JobRunnerResponse.aborted(errorMessage);
204             }
205 
206             try {
207                 messageRunner.get().processMessage(messageContext);
208 
209                 if (messageContext.shouldAutoAcknowledgeMessage()) {
210                     messageContext.acknowledge();
211                 }
212             } catch (Exception e) {
213                 log.error("Invoking MessageRunner for message failed with an exception (messageID: {}, messageRunnerKey: {})", messageId, messageRunnerKey, e);
214             }
215 
216             return JobRunnerResponse.success();
217         }
218 
219         @Nullable
220         JobRunnerResponse validate(JobRunnerRequest jobRunnerRequest) {
221             final JobId jobId = jobRunnerRequest.getJobId();
222             final Map<String, Serializable> jobParameters = jobRunnerRequest.getJobConfig().getParameters();
223             final String payload = (String) jobParameters.get(PAYLOAD_JOB_PARAMETER);
224             final Serializable deliveryCount = jobParameters.get(DELIVERY_COUNT_PARAMETER);
225 
226             if (!jobParameters.containsKey(DELIVERY_COUNT_PARAMETER)) {
227                 final String format = "Job missing required parameter: %s. Job will be unscheduled (JobId: %s, JobPayload: %s)";
228                 log.error(String.format(format, DELIVERY_COUNT_PARAMETER, jobId, "omitted"));
229                 schedulerService.unscheduleJob(jobId);
230                 return JobRunnerResponse.aborted(String.format(format, DELIVERY_COUNT_PARAMETER, jobId, payload));
231             } else if (!(deliveryCount instanceof Integer)) {
232                 final String format = "Job %s parameter of the wrong type. Expecting int, found: %s. Job will be unscheduled (JobId: %s, JobPayload: %s)";
233                 log.error(String.format(format, DELIVERY_COUNT_PARAMETER, deliveryCount, jobId, "omitted"));
234                 schedulerService.unscheduleJob(jobId);
235                 return JobRunnerResponse.aborted(String.format(format, DELIVERY_COUNT_PARAMETER, deliveryCount, jobId, payload));
236             }
237 
238             return null;
239         }
240 
241         @Nullable
242         JobRunnerResponse incrementDeliveryCount(JobRunnerRequest jobRunnerRequest) {
243             final JobId jobId = jobRunnerRequest.getJobId();
244             final Map<String, Serializable> jobParameters = jobRunnerRequest.getJobConfig().getParameters();
245             final String payload = (String) jobParameters.get(PAYLOAD_JOB_PARAMETER);
246 
247             int deliveryCount = (int) jobParameters.get(DELIVERY_COUNT_PARAMETER);
248             if (++deliveryCount > maxDeliveryCount) {
249                 final String format = "Max delivery count reached for job. Job will be unscheduled (JobId: %s, JobPayload: %s)";
250                 log.warn(String.format(format, jobId, "omitted"));
251                 schedulerService.unscheduleJob(jobId);
252                 return JobRunnerResponse.aborted(String.format(format, jobId, payload));
253             } else {
254                 final Map<String, Serializable> newJobParameters = new HashMap<>(jobParameters);
255                 newJobParameters.put(DELIVERY_COUNT_PARAMETER, deliveryCount);
256                 try {
257                     schedulerService.scheduleJob(jobId, jobRunnerRequest.getJobConfig()
258                             .withSchedule(Schedule.forInterval(deliveryIntervalMillis, getNextDeliveryTime(jobRunnerRequest.getStartTime())))
259                             .withParameters(newJobParameters));
260                 } catch (SchedulerServiceException e) {
261                     final String format = "Error rescheduling job (JobId: %s, JobPayload: %s)";
262                     log.error(String.format(format, jobId, "omitted"), e);
263                     return JobRunnerResponse.failed(String.format(format, jobId, payload));
264                 }
265                 log.debug("JobId: {}, deliveryCount: {}", jobId, deliveryCount);
266             }
267 
268             return null;
269         }
270     }
271 
272     private Date getNextDeliveryTime(Date thisDeliveryTime) {
273         return new Date(thisDeliveryTime.getTime() + deliveryIntervalMillis);
274     }
275 
276     /**
277      * Shuts down this instance.
278      */
279     @PreDestroy
280     public void shutdown() {
281         schedulerService.unregisterJobRunner(AMQ_JOB_RUNNER_KEY);
282         shutdown = true;
283     }
284 
285     @Override
286     public void addMessage(Message message) {
287         if (shutdown) {
288             throw new MessageRunnerServiceException("message could not be added after shutdown (runnerKey: " + message.getRunnerKey() + ")");
289         }
290 
291         if (!messageRunnerRegistryHelper.getMessageRunner(message.getRunnerKey()).isPresent()) {
292             throw new MessageRunnerNotRegisteredException(message.getRunnerKey());
293         }
294 
295         final Map<String, Serializable> jobParameters = new HashMap<>();
296         jobParameters.put(PAYLOAD_JOB_PARAMETER, messageInformationService.toPayload(message));
297         jobParameters.put(DELIVERY_COUNT_PARAMETER, 0);
298 
299         final JobConfig jobConfig = JobConfig.forJobRunnerKey(AMQ_JOB_RUNNER_KEY)
300                 .withRunMode(RunMode.RUN_ONCE_PER_CLUSTER)
301                 .withSchedule(Schedule.forInterval(deliveryIntervalMillis, getFirstDeliveryTime()))
302                 .withParameters(jobParameters);
303 
304         try {
305             // Instead of scheduleJobWithGeneratedId, use scheduleJob with an easy to debug job ID
306             schedulerService.scheduleJob(getJobId(message), jobConfig);
307         } catch (SchedulerServiceException e) {
308             log.error("Could not schedule job for message with key {}", message.getRunnerKey().toString(), e);
309             throw new MessageRunnerServiceException("Could not add message", e);
310         }
311     }
312 
313     /**
314      * Returns message first delivery time.
315      *
316      * @return {@code null} to delivery as soon as possible; deeper in the past means sooner (higher priority)
317      */
318     protected Date getFirstDeliveryTime() {
319         return null;
320     }
321 
322     private JobId getJobId(Message message) {
323         return JobId.of("amq-" + message.getRunnerKey() + UUID.randomUUID());
324     }
325 }