View Javadoc

1   package com.atlassian.messagequeue.internal.scheduler;
2   
3   import com.atlassian.messagequeue.Message;
4   import com.atlassian.messagequeue.MessageInformationService;
5   import com.atlassian.messagequeue.MessageRunnerKey;
6   import com.atlassian.messagequeue.MessageRunnerNotRegisteredException;
7   import com.atlassian.messagequeue.MessageRunnerService;
8   import com.atlassian.messagequeue.MessageRunnerServiceException;
9   import com.atlassian.messagequeue.internal.core.MessageRunnerRegistryHelper;
10  import com.atlassian.messagequeue.internal.core.NestedMessage;
11  import com.atlassian.messagequeue.internal.core.NestedMessageConstants;
12  import com.atlassian.messagequeue.internal.core.NestedMessageSerializer;
13  import com.atlassian.messagequeue.registry.MessageContext;
14  import com.atlassian.messagequeue.registry.MessageRunner;
15  import com.atlassian.scheduler.JobRunner;
16  import com.atlassian.scheduler.JobRunnerRequest;
17  import com.atlassian.scheduler.JobRunnerResponse;
18  import com.atlassian.scheduler.SchedulerService;
19  import com.atlassian.scheduler.SchedulerServiceException;
20  import com.atlassian.scheduler.config.JobConfig;
21  import com.atlassian.scheduler.config.JobId;
22  import com.atlassian.scheduler.config.JobRunnerKey;
23  import com.atlassian.scheduler.config.RunMode;
24  import com.atlassian.scheduler.config.Schedule;
25  import com.atlassian.workcontext.api.WorkContextAvailable;
26  import org.slf4j.Logger;
27  import org.slf4j.LoggerFactory;
28  
29  import javax.annotation.Nullable;
30  import javax.annotation.PostConstruct;
31  import javax.annotation.PreDestroy;
32  
33  import java.io.Serializable;
34  import java.util.Date;
35  import java.util.HashMap;
36  import java.util.Map;
37  import java.util.Optional;
38  import java.util.concurrent.TimeUnit;
39  
40  import static java.util.Objects.requireNonNull;
41  
42  /**
43   * An implementation of {@code MessageRunnerService} that is backed by the {@link SchedulerService} that provides
44   * at-least-once delivery semantics.
45   *
46   * <p>When a {@code Message} is passed to {@link #addMessage(Message)}, a job is scheduled to run
47   * as-soon-as-possible to process this {@code Message}.
48   *
49   * <p>When the processing of a {@code Message} fails (i.e. {@link MessageRunner#processMessage(MessageContext)} throws an exception),
50   * the {@code Message} will be delivered up to a total of {@code maxDeliveryCount} times at intervals of {@code deliveryIntervalMillis}
51   * milliseconds until it is processed successfully. The values of these parameters can be specified when constructing an
52   * instance of this class.
53   *
54   * <p>The processing time of a {@code Message} can exceed {@code deliveryIntervalMillis}. When this happens, re-delivery
55   * of the {@code Message} is skipped. Another re-delivery will be attempted after {@code deliveryIntervalMillis} milliseconds.
56   * Therefore, only one {@code MessageRunner} will be processing a particular {@code Message} at any given time.
57   *
58   * <p>The processing time of a {@code Message} is not currently limited. However long processing times are discouraged.
59   *
60   * <p>If a {@code Message} fails to be processed successfully after {@code maxDeliveryCount} attempts,
61   * a {@link com.atlassian.scheduler.status.RunDetails} instance is recorded to note this occurence. A collection of
62   * {@link com.atlassian.scheduler.status.RunDetails} can be viewed as a dead letter queue.
63   *
64   * <p>When constructing an instance of this class with a {@link SchedulerService} implementation that supports
65   * persistence of jobs, all {@code Message}s passed to {@link #addMessage(Message)} will be persisted.
66   *
67   * <p>To properly initialise an instance of this service, a client must call {@link #init()} after
68   * constructing an instance (or else ensure the dependency injection framework will automatically invoke methods
69   * annotated with @PostConstruct).
70   *
71   * <p>To properly dispose of an instance of this service, a client must call {@link #shutdown()} (or else ensure the
72   * dependency injection framework will automatically invoke methods annotated with @PreDestroy).
73   */
74  public class SchedulerMessageRunnerService implements MessageRunnerService {
75      static final String DELIVERY_COUNT_PARAMETER = "deliveryCount";
76      static final JobRunnerKey AMQ_JOB_RUNNER_KEY = JobRunnerKey.of("amqJobRunner");
77  
78      private static final Logger log = LoggerFactory.getLogger(SchedulerMessageRunnerService.class);
79      private static final long DEFAULT_DELIVERY_INTERVAL_MILLIS = TimeUnit.SECONDS.toMillis(Integer.getInteger("amq.scheduler.delivery.interval.seconds", 30));
80      private static final int DEFAULT_MAX_DELIVERY_COUNT = Integer.getInteger("amq.scheduler.max.delivery.count", 3);
81      private static final String PAYLOAD_JOB_PARAMETER = "payload";
82  
83      private final SchedulerService schedulerService;
84      private final MessageRunnerRegistryHelper messageRunnerRegistryHelper;
85      private final NestedMessageSerializer nestedMessageSerializer;
86      private final MessageInformationService messageInformationService;
87      private final long deliveryIntervalMillis;
88      private final int maxDeliveryCount;
89      private volatile boolean shutdown = false;
90  
91      /**
92       * Constructs a new instance of SchedulerMessageRunnerService.
93       * @param schedulerService the scheduler service
94       * @param messageRunnerRegistryHelper the message runner registry helper
95       * @param messageInformationService message information service
96       * @param nestedMessageSerializer the nested message serializer
97       */
98      public SchedulerMessageRunnerService(SchedulerService schedulerService,
99                                           MessageRunnerRegistryHelper messageRunnerRegistryHelper,
100                                          MessageInformationService messageInformationService,
101                                          NestedMessageSerializer nestedMessageSerializer) {
102         this(schedulerService, messageRunnerRegistryHelper, messageInformationService, nestedMessageSerializer,
103                 DEFAULT_DELIVERY_INTERVAL_MILLIS, DEFAULT_MAX_DELIVERY_COUNT);
104     }
105 
106     /**
107      * Constructs a new instance of SchedulerMessageRunnerService.
108      * @param schedulerService the scheduler service
109      * @param messageRunnerRegistryHelper the message runner registry helper
110      * @param messageInformationService message information service
111      * @param nestedMessageSerializer nested message serializer
112      * @param deliveryIntervalMillis the time interval in milliseconds between successive deliveries of a message (must be greater than 0)
113      * @param maxDeliveryCount the maximum number of times to deliver a message if message processing fails and the message is not acknowledged (must be greater than 0)
114      */
115     public SchedulerMessageRunnerService(SchedulerService schedulerService,
116                                          MessageRunnerRegistryHelper messageRunnerRegistryHelper,
117                                          MessageInformationService messageInformationService, NestedMessageSerializer nestedMessageSerializer,
118                                          long deliveryIntervalMillis, int maxDeliveryCount) {
119         this.schedulerService = requireNonNull(schedulerService, "schedulerService");
120         this.messageRunnerRegistryHelper = requireNonNull(messageRunnerRegistryHelper, "messageRunnerRegistryHelper");
121         this.nestedMessageSerializer = requireNonNull(nestedMessageSerializer, "nestedMessageSerializer");
122         this.messageInformationService = requireNonNull(messageInformationService, "messageInformationService");
123 
124         if (deliveryIntervalMillis <= 0) {
125             throw new IllegalArgumentException("deliveryIntervalMillis must be greater than 0. Received: " + deliveryIntervalMillis);
126         }
127         this.deliveryIntervalMillis = deliveryIntervalMillis;
128 
129         if (maxDeliveryCount <= 0) {
130             throw new IllegalArgumentException("maxDeliveryCount must be greater than 0. Received: " + maxDeliveryCount);
131         }
132         this.maxDeliveryCount = maxDeliveryCount;
133 
134         log.info("Constructing {} (maxDeliveryCount: {}, deliveryIntervalMinutes: {})", this.getClass().getSimpleName(), maxDeliveryCount, TimeUnit.MILLISECONDS.toMinutes(deliveryIntervalMillis));
135     }
136 
137     /**
138      * Initialises this instance.
139      */
140     @PostConstruct
141     public void init() {
142         log.info("Registering AMQ JobRunner");
143         schedulerService.registerJobRunner(AMQ_JOB_RUNNER_KEY, new AmqJobRunner());
144     }
145 
146     private class AmqJobRunner implements JobRunner {
147         @Nullable
148         @Override
149         public JobRunnerResponse runJob(JobRunnerRequest jobRunnerRequest) {
150             final JobRunnerResponse validateResponse = validate(jobRunnerRequest);
151             if (validateResponse != null) {
152                 return validateResponse;
153             }
154 
155             final JobRunnerResponse incrementDeliveryCountResponse = incrementDeliveryCount(jobRunnerRequest);
156             if (incrementDeliveryCountResponse != null) {
157                 return incrementDeliveryCountResponse;
158             }
159 
160             final String payload = (String) jobRunnerRequest.getJobConfig().getParameters().get(PAYLOAD_JOB_PARAMETER);
161             final NestedMessage nestedMessage = nestedMessageSerializer.deserialize(payload);
162 
163             return processNestedMessage(nestedMessage, new SchedulerMessageContext(jobRunnerRequest, nestedMessage.getPayload(), schedulerService));
164         }
165 
166         JobRunnerResponse processNestedMessage(NestedMessage nestedMessage, MessageContext messageContext) {
167             final String messageId = messageContext.getMessageId().orElse("");
168             final String tenantId = nestedMessage.getAttribute(NestedMessageConstants.TENANT_ID_ATTRIBUTE_NAME);
169             if (tenantId == null) {
170                 final String errorMessageFormat = "Message received that does not contain required '%s' attribute. Message will be acknowledged (messageID: %s)";
171                 final String errorMessage = String.format(errorMessageFormat, NestedMessageConstants.TENANT_ID_ATTRIBUTE_NAME, messageId);
172                 log.error(errorMessage);
173                 messageContext.acknowledge();
174 
175                 return JobRunnerResponse.aborted(errorMessage);
176             }
177 
178             final String messageRunnerKey = nestedMessage.getAttribute(NestedMessageConstants.MESSAGE_RUNNER_KEY_ATTRIBUTE_NAME);
179             if (messageRunnerKey == null) {
180                 final String errorMessageFormat = "Message received that does not contain required '%s' attribute. Message will be acknowledged (messageID: %s)";
181                 final String errorMessage = String.format(errorMessageFormat, NestedMessageConstants.MESSAGE_RUNNER_KEY_ATTRIBUTE_NAME, messageId);
182                 log.error(errorMessage);
183                 messageContext.acknowledge();
184 
185                 return JobRunnerResponse.aborted(errorMessage);
186             }
187 
188             final Optional<MessageRunner> messageRunner = messageRunnerRegistryHelper.getMessageRunner(MessageRunnerKey.of(messageRunnerKey));
189             if (!messageRunner.isPresent()) {
190                 final String errorMessageFormat = "No MessageRunner found to process message (messageID: %s, messageRunnerKey: %s)";
191                 final String errorMessage = String.format(errorMessageFormat, NestedMessageConstants.MESSAGE_RUNNER_KEY_ATTRIBUTE_NAME, messageId);
192                 log.error(errorMessage);
193 
194                 return JobRunnerResponse.aborted(errorMessage);
195             }
196 
197             if (!WorkContextAvailable.isWorkContextAvailable()) {
198                 final String errorMessageFormat = "No WorkContext available to process message (messageID: %s, messageRunnerKey: %s)";
199                 final String errorMessage = String.format(errorMessageFormat, NestedMessageConstants.MESSAGE_RUNNER_KEY_ATTRIBUTE_NAME, messageId);
200                 log.error(errorMessage);
201 
202                 return JobRunnerResponse.aborted(errorMessage);
203             }
204 
205             try {
206                 messageRunner.get().processMessage(messageContext);
207 
208                 if (messageContext.shouldAutoAcknowledgeMessage()) {
209                     messageContext.acknowledge();
210                 }
211             } catch (Exception e) {
212                 log.error("Invoking MessageRunner for message failed with an exception (messageID: {}, messageRunnerKey: {})", messageId, messageRunnerKey, e);
213             }
214 
215             return JobRunnerResponse.success();
216         }
217 
218         @Nullable
219         JobRunnerResponse validate(JobRunnerRequest jobRunnerRequest) {
220             final JobId jobId = jobRunnerRequest.getJobId();
221             final Map<String, Serializable> jobParameters = jobRunnerRequest.getJobConfig().getParameters();
222             final String payload = (String) jobParameters.get(PAYLOAD_JOB_PARAMETER);
223             final Serializable deliveryCount = jobParameters.get(DELIVERY_COUNT_PARAMETER);
224 
225             if (!jobParameters.containsKey(DELIVERY_COUNT_PARAMETER)) {
226                 final String format = "Job missing required parameter: %s. Job will be unscheduled (JobId: %s, JobPayload: %s)";
227                 log.error(String.format(format, DELIVERY_COUNT_PARAMETER, jobId, "omitted"));
228                 schedulerService.unscheduleJob(jobId);
229                 return JobRunnerResponse.aborted(String.format(format, DELIVERY_COUNT_PARAMETER, jobId, payload));
230             } else if (!(deliveryCount instanceof Integer)) {
231                 final String format = "Job %s parameter of the wrong type. Expecting int, found: %s. Job will be unscheduled (JobId: %s, JobPayload: %s)";
232                 log.error(String.format(format, DELIVERY_COUNT_PARAMETER, deliveryCount, jobId, "omitted"));
233                 schedulerService.unscheduleJob(jobId);
234                 return JobRunnerResponse.aborted(String.format(format, DELIVERY_COUNT_PARAMETER, deliveryCount, jobId, payload));
235             }
236 
237             return null;
238         }
239 
240         @Nullable
241         JobRunnerResponse incrementDeliveryCount(JobRunnerRequest jobRunnerRequest) {
242             final JobId jobId = jobRunnerRequest.getJobId();
243             final Map<String, Serializable> jobParameters = jobRunnerRequest.getJobConfig().getParameters();
244             final String payload = (String) jobParameters.get(PAYLOAD_JOB_PARAMETER);
245 
246             int deliveryCount = (int) jobParameters.get(DELIVERY_COUNT_PARAMETER);
247             if (++deliveryCount > maxDeliveryCount) {
248                 final String format = "Max delivery count reached for job. Job will be unscheduled (JobId: %s, JobPayload: %s)";
249                 log.warn(String.format(format, jobId, "omitted"));
250                 schedulerService.unscheduleJob(jobId);
251                 return JobRunnerResponse.aborted(String.format(format, jobId, payload));
252             } else {
253                 final Map<String, Serializable> newJobParameters = new HashMap<>(jobParameters);
254                 newJobParameters.put(DELIVERY_COUNT_PARAMETER, deliveryCount);
255                 try {
256                     schedulerService.scheduleJob(jobId, jobRunnerRequest.getJobConfig()
257                             .withSchedule(Schedule.forInterval(deliveryIntervalMillis, getFirstRunTime(jobRunnerRequest.getStartTime())))
258                             .withParameters(newJobParameters));
259                 } catch (SchedulerServiceException e) {
260                     final String format = "Error rescheduling job (JobId: %s, JobPayload: %s)";
261                     log.error(String.format(format, jobId, "omitted"), e);
262                     return JobRunnerResponse.failed(String.format(format, jobId, payload));
263                 }
264                 log.debug("JobId: {}, deliveryCount: {}", jobId, deliveryCount);
265             }
266 
267             return null;
268         }
269     }
270 
271     private Date getFirstRunTime(Date jobStartTime) {
272         return new Date(jobStartTime.getTime() + deliveryIntervalMillis);
273     }
274 
275     /**
276      * Shuts down this instance.
277      */
278     @PreDestroy
279     public void shutdown() {
280         schedulerService.unregisterJobRunner(AMQ_JOB_RUNNER_KEY);
281         shutdown = true;
282     }
283 
284     @Override
285     public void addMessage(Message message) {
286         if (shutdown) {
287             throw new MessageRunnerServiceException("message could not be added after shutdown (runnerKey: " + message.getRunnerKey() + ")");
288         }
289 
290         if (!messageRunnerRegistryHelper.getMessageRunner(message.getRunnerKey()).isPresent()) {
291             throw new MessageRunnerNotRegisteredException(message.getRunnerKey());
292         }
293 
294         final Map<String, Serializable> jobParameters = new HashMap<>();
295         jobParameters.put(PAYLOAD_JOB_PARAMETER, messageInformationService.toPayload(message));
296         jobParameters.put(DELIVERY_COUNT_PARAMETER, 0);
297 
298         final JobConfig jobConfig = JobConfig.forJobRunnerKey(AMQ_JOB_RUNNER_KEY)
299                 .withRunMode(RunMode.RUN_ONCE_PER_CLUSTER)
300                 .withSchedule(Schedule.forInterval(deliveryIntervalMillis, null))
301                 .withParameters(jobParameters);
302 
303         try {
304             schedulerService.scheduleJobWithGeneratedId(jobConfig);
305         } catch (SchedulerServiceException e) {
306             log.error("Could not schedule job for message with key {}", message.getRunnerKey().toString(), e);
307             throw new MessageRunnerServiceException("Could not add message", e);
308         }
309     }
310 }