1 package com.atlassian.messagequeue.internal.scheduler;
2
3 import com.atlassian.messagequeue.Message;
4 import com.atlassian.messagequeue.MessageInformationService;
5 import com.atlassian.messagequeue.MessageRunnerKey;
6 import com.atlassian.messagequeue.MessageRunnerNotRegisteredException;
7 import com.atlassian.messagequeue.MessageRunnerService;
8 import com.atlassian.messagequeue.MessageRunnerServiceException;
9 import com.atlassian.messagequeue.internal.core.MessageRunnerRegistryHelper;
10 import com.atlassian.messagequeue.internal.core.NestedMessage;
11 import com.atlassian.messagequeue.internal.core.NestedMessageConstants;
12 import com.atlassian.messagequeue.internal.core.NestedMessageSerializer;
13 import com.atlassian.messagequeue.registry.MessageContext;
14 import com.atlassian.messagequeue.registry.MessageRunner;
15 import com.atlassian.scheduler.JobRunner;
16 import com.atlassian.scheduler.JobRunnerRequest;
17 import com.atlassian.scheduler.JobRunnerResponse;
18 import com.atlassian.scheduler.SchedulerService;
19 import com.atlassian.scheduler.SchedulerServiceException;
20 import com.atlassian.scheduler.config.JobConfig;
21 import com.atlassian.scheduler.config.JobId;
22 import com.atlassian.scheduler.config.JobRunnerKey;
23 import com.atlassian.scheduler.config.RunMode;
24 import com.atlassian.scheduler.config.Schedule;
25 import com.atlassian.workcontext.api.WorkContextAvailable;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28
29 import javax.annotation.Nullable;
30 import javax.annotation.PostConstruct;
31 import javax.annotation.PreDestroy;
32
33 import java.io.Serializable;
34 import java.util.Date;
35 import java.util.HashMap;
36 import java.util.Map;
37 import java.util.Optional;
38 import java.util.UUID;
39 import java.util.concurrent.TimeUnit;
40
41 import static java.util.Objects.requireNonNull;
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75 public class SchedulerMessageRunnerService implements MessageRunnerService {
76 static final String DELIVERY_COUNT_PARAMETER = "deliveryCount";
77 static final JobRunnerKey AMQ_JOB_RUNNER_KEY = JobRunnerKey.of("amqJobRunner");
78
79 private static final Logger log = LoggerFactory.getLogger(SchedulerMessageRunnerService.class);
80 private static final long DEFAULT_DELIVERY_INTERVAL_MILLIS = TimeUnit.SECONDS.toMillis(Integer.getInteger("amq.scheduler.delivery.interval.seconds", 30));
81 private static final int DEFAULT_MAX_DELIVERY_COUNT = Integer.getInteger("amq.scheduler.max.delivery.count", 3);
82 private static final String PAYLOAD_JOB_PARAMETER = "payload";
83
84 private final SchedulerService schedulerService;
85 private final MessageRunnerRegistryHelper messageRunnerRegistryHelper;
86 private final NestedMessageSerializer nestedMessageSerializer;
87 private final MessageInformationService messageInformationService;
88 private final long deliveryIntervalMillis;
89 private final int maxDeliveryCount;
90 private volatile boolean shutdown = false;
91
92
93
94
95
96
97
98
99 public SchedulerMessageRunnerService(SchedulerService schedulerService,
100 MessageRunnerRegistryHelper messageRunnerRegistryHelper,
101 MessageInformationService messageInformationService,
102 NestedMessageSerializer nestedMessageSerializer) {
103 this(schedulerService, messageRunnerRegistryHelper, messageInformationService, nestedMessageSerializer,
104 DEFAULT_DELIVERY_INTERVAL_MILLIS, DEFAULT_MAX_DELIVERY_COUNT);
105 }
106
107
108
109
110
111
112
113
114
115
116 public SchedulerMessageRunnerService(SchedulerService schedulerService,
117 MessageRunnerRegistryHelper messageRunnerRegistryHelper,
118 MessageInformationService messageInformationService, NestedMessageSerializer nestedMessageSerializer,
119 long deliveryIntervalMillis, int maxDeliveryCount) {
120 this.schedulerService = requireNonNull(schedulerService, "schedulerService");
121 this.messageRunnerRegistryHelper = requireNonNull(messageRunnerRegistryHelper, "messageRunnerRegistryHelper");
122 this.nestedMessageSerializer = requireNonNull(nestedMessageSerializer, "nestedMessageSerializer");
123 this.messageInformationService = requireNonNull(messageInformationService, "messageInformationService");
124
125 if (deliveryIntervalMillis <= 0) {
126 throw new IllegalArgumentException("deliveryIntervalMillis must be greater than 0. Received: " + deliveryIntervalMillis);
127 }
128 this.deliveryIntervalMillis = deliveryIntervalMillis;
129
130 if (maxDeliveryCount <= 0) {
131 throw new IllegalArgumentException("maxDeliveryCount must be greater than 0. Received: " + maxDeliveryCount);
132 }
133 this.maxDeliveryCount = maxDeliveryCount;
134
135 log.info("Constructing {} (maxDeliveryCount: {}, deliveryIntervalMinutes: {})", this.getClass().getSimpleName(), maxDeliveryCount, TimeUnit.MILLISECONDS.toMinutes(deliveryIntervalMillis));
136 }
137
138
139
140
141 @PostConstruct
142 public void init() {
143 log.info("Registering AMQ JobRunner");
144 schedulerService.registerJobRunner(AMQ_JOB_RUNNER_KEY, new AmqJobRunner());
145 }
146
147 private class AmqJobRunner implements JobRunner {
148 @Nullable
149 @Override
150 public JobRunnerResponse runJob(JobRunnerRequest jobRunnerRequest) {
151 final JobRunnerResponse validateResponse = validate(jobRunnerRequest);
152 if (validateResponse != null) {
153 return validateResponse;
154 }
155
156 final JobRunnerResponse incrementDeliveryCountResponse = incrementDeliveryCount(jobRunnerRequest);
157 if (incrementDeliveryCountResponse != null) {
158 return incrementDeliveryCountResponse;
159 }
160
161 final String payload = (String) jobRunnerRequest.getJobConfig().getParameters().get(PAYLOAD_JOB_PARAMETER);
162 final NestedMessage nestedMessage = nestedMessageSerializer.deserialize(payload);
163
164 return processNestedMessage(nestedMessage, new SchedulerMessageContext(jobRunnerRequest, nestedMessage.getPayload(), schedulerService));
165 }
166
167 JobRunnerResponse processNestedMessage(NestedMessage nestedMessage, MessageContext messageContext) {
168 final String messageId = messageContext.getMessageId().orElse("");
169 final String tenantId = nestedMessage.getAttribute(NestedMessageConstants.TENANT_ID_ATTRIBUTE_NAME);
170 if (tenantId == null) {
171 final String errorMessageFormat = "Message received that does not contain required '%s' attribute. Message will be acknowledged (messageID: %s)";
172 final String errorMessage = String.format(errorMessageFormat, NestedMessageConstants.TENANT_ID_ATTRIBUTE_NAME, messageId);
173 log.error(errorMessage);
174 messageContext.acknowledge();
175
176 return JobRunnerResponse.aborted(errorMessage);
177 }
178
179 final String messageRunnerKey = nestedMessage.getAttribute(NestedMessageConstants.MESSAGE_RUNNER_KEY_ATTRIBUTE_NAME);
180 if (messageRunnerKey == null) {
181 final String errorMessageFormat = "Message received that does not contain required '%s' attribute. Message will be acknowledged (messageID: %s)";
182 final String errorMessage = String.format(errorMessageFormat, NestedMessageConstants.MESSAGE_RUNNER_KEY_ATTRIBUTE_NAME, messageId);
183 log.error(errorMessage);
184 messageContext.acknowledge();
185
186 return JobRunnerResponse.aborted(errorMessage);
187 }
188
189 final Optional<MessageRunner> messageRunner = messageRunnerRegistryHelper.getMessageRunner(MessageRunnerKey.of(messageRunnerKey));
190 if (!messageRunner.isPresent()) {
191 final String errorMessageFormat = "No MessageRunner found to process message (messageID: %s, messageRunnerKey: %s)";
192 final String errorMessage = String.format(errorMessageFormat, NestedMessageConstants.MESSAGE_RUNNER_KEY_ATTRIBUTE_NAME, messageId);
193 log.error(errorMessage);
194
195 return JobRunnerResponse.aborted(errorMessage);
196 }
197
198 if (!WorkContextAvailable.isWorkContextAvailable()) {
199 final String errorMessageFormat = "No WorkContext available to process message (messageID: %s, messageRunnerKey: %s)";
200 final String errorMessage = String.format(errorMessageFormat, NestedMessageConstants.MESSAGE_RUNNER_KEY_ATTRIBUTE_NAME, messageId);
201 log.error(errorMessage);
202
203 return JobRunnerResponse.aborted(errorMessage);
204 }
205
206 try {
207 messageRunner.get().processMessage(messageContext);
208
209 if (messageContext.shouldAutoAcknowledgeMessage()) {
210 messageContext.acknowledge();
211 }
212 } catch (Exception e) {
213 log.error("Invoking MessageRunner for message failed with an exception (messageID: {}, messageRunnerKey: {})", messageId, messageRunnerKey, e);
214 }
215
216 return JobRunnerResponse.success();
217 }
218
219 @Nullable
220 JobRunnerResponse validate(JobRunnerRequest jobRunnerRequest) {
221 final JobId jobId = jobRunnerRequest.getJobId();
222 final Map<String, Serializable> jobParameters = jobRunnerRequest.getJobConfig().getParameters();
223 final String payload = (String) jobParameters.get(PAYLOAD_JOB_PARAMETER);
224 final Serializable deliveryCount = jobParameters.get(DELIVERY_COUNT_PARAMETER);
225
226 if (!jobParameters.containsKey(DELIVERY_COUNT_PARAMETER)) {
227 final String format = "Job missing required parameter: %s. Job will be unscheduled (JobId: %s, JobPayload: %s)";
228 log.error(String.format(format, DELIVERY_COUNT_PARAMETER, jobId, "omitted"));
229 schedulerService.unscheduleJob(jobId);
230 return JobRunnerResponse.aborted(String.format(format, DELIVERY_COUNT_PARAMETER, jobId, payload));
231 } else if (!(deliveryCount instanceof Integer)) {
232 final String format = "Job %s parameter of the wrong type. Expecting int, found: %s. Job will be unscheduled (JobId: %s, JobPayload: %s)";
233 log.error(String.format(format, DELIVERY_COUNT_PARAMETER, deliveryCount, jobId, "omitted"));
234 schedulerService.unscheduleJob(jobId);
235 return JobRunnerResponse.aborted(String.format(format, DELIVERY_COUNT_PARAMETER, deliveryCount, jobId, payload));
236 }
237
238 return null;
239 }
240
241 @Nullable
242 JobRunnerResponse incrementDeliveryCount(JobRunnerRequest jobRunnerRequest) {
243 final JobId jobId = jobRunnerRequest.getJobId();
244 final Map<String, Serializable> jobParameters = jobRunnerRequest.getJobConfig().getParameters();
245 final String payload = (String) jobParameters.get(PAYLOAD_JOB_PARAMETER);
246
247 int deliveryCount = (int) jobParameters.get(DELIVERY_COUNT_PARAMETER);
248 if (++deliveryCount > maxDeliveryCount) {
249 final String format = "Max delivery count reached for job. Job will be unscheduled (JobId: %s, JobPayload: %s)";
250 log.warn(String.format(format, jobId, "omitted"));
251 schedulerService.unscheduleJob(jobId);
252 return JobRunnerResponse.aborted(String.format(format, jobId, payload));
253 } else {
254 final Map<String, Serializable> newJobParameters = new HashMap<>(jobParameters);
255 newJobParameters.put(DELIVERY_COUNT_PARAMETER, deliveryCount);
256 try {
257 schedulerService.scheduleJob(jobId, jobRunnerRequest.getJobConfig()
258 .withSchedule(Schedule.forInterval(deliveryIntervalMillis, getNextDeliveryTime(jobRunnerRequest.getStartTime())))
259 .withParameters(newJobParameters));
260 } catch (SchedulerServiceException e) {
261 final String format = "Error rescheduling job (JobId: %s, JobPayload: %s)";
262 log.error(String.format(format, jobId, "omitted"), e);
263 return JobRunnerResponse.failed(String.format(format, jobId, payload));
264 }
265 log.debug("JobId: {}, deliveryCount: {}", jobId, deliveryCount);
266 }
267
268 return null;
269 }
270 }
271
272 private Date getNextDeliveryTime(Date thisDeliveryTime) {
273 return new Date(thisDeliveryTime.getTime() + deliveryIntervalMillis);
274 }
275
276
277
278
279 @PreDestroy
280 public void shutdown() {
281 schedulerService.unregisterJobRunner(AMQ_JOB_RUNNER_KEY);
282 shutdown = true;
283 }
284
285 @Override
286 public void addMessage(Message message) {
287 if (shutdown) {
288 throw new MessageRunnerServiceException("message could not be added after shutdown (runnerKey: " + message.getRunnerKey() + ")");
289 }
290
291 if (!messageRunnerRegistryHelper.getMessageRunner(message.getRunnerKey()).isPresent()) {
292 throw new MessageRunnerNotRegisteredException(message.getRunnerKey());
293 }
294
295 final Map<String, Serializable> jobParameters = new HashMap<>();
296 jobParameters.put(PAYLOAD_JOB_PARAMETER, messageInformationService.toPayload(message));
297 jobParameters.put(DELIVERY_COUNT_PARAMETER, 0);
298
299 final JobConfig jobConfig = JobConfig.forJobRunnerKey(AMQ_JOB_RUNNER_KEY)
300 .withRunMode(RunMode.RUN_ONCE_PER_CLUSTER)
301 .withSchedule(Schedule.forInterval(deliveryIntervalMillis, getFirstDeliveryTime()))
302 .withParameters(jobParameters);
303
304 try {
305
306 schedulerService.scheduleJob(getJobId(message), jobConfig);
307 } catch (SchedulerServiceException e) {
308 log.error("Could not schedule job for message with key {}", message.getRunnerKey().toString(), e);
309 throw new MessageRunnerServiceException("Could not add message", e);
310 }
311 }
312
313
314
315
316
317
318 protected Date getFirstDeliveryTime() {
319 return null;
320 }
321
322 private JobId getJobId(Message message) {
323 return JobId.of("amq-" + message.getRunnerKey() + UUID.randomUUID());
324 }
325 }