1 package com.atlassian.messagequeue.internal.scheduler;
2
3 import com.atlassian.messagequeue.Message;
4 import com.atlassian.messagequeue.MessageInformationService;
5 import com.atlassian.messagequeue.MessageRunnerKey;
6 import com.atlassian.messagequeue.MessageRunnerNotRegisteredException;
7 import com.atlassian.messagequeue.MessageRunnerService;
8 import com.atlassian.messagequeue.MessageRunnerServiceException;
9 import com.atlassian.messagequeue.internal.core.MessageRunnerRegistryHelper;
10 import com.atlassian.messagequeue.internal.core.NestedMessage;
11 import com.atlassian.messagequeue.internal.core.NestedMessageConstants;
12 import com.atlassian.messagequeue.internal.core.NestedMessageSerializer;
13 import com.atlassian.messagequeue.registry.MessageContext;
14 import com.atlassian.messagequeue.registry.MessageRunner;
15 import com.atlassian.scheduler.JobRunner;
16 import com.atlassian.scheduler.JobRunnerRequest;
17 import com.atlassian.scheduler.JobRunnerResponse;
18 import com.atlassian.scheduler.SchedulerService;
19 import com.atlassian.scheduler.SchedulerServiceException;
20 import com.atlassian.scheduler.config.JobConfig;
21 import com.atlassian.scheduler.config.JobId;
22 import com.atlassian.scheduler.config.JobRunnerKey;
23 import com.atlassian.scheduler.config.RunMode;
24 import com.atlassian.scheduler.config.Schedule;
25 import com.atlassian.workcontext.api.WorkContextAvailable;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28
29 import javax.annotation.Nullable;
30 import javax.annotation.PostConstruct;
31 import javax.annotation.PreDestroy;
32
33 import java.io.Serializable;
34 import java.util.Date;
35 import java.util.HashMap;
36 import java.util.Map;
37 import java.util.Optional;
38 import java.util.concurrent.TimeUnit;
39
40 import static java.util.Objects.requireNonNull;
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74 public class SchedulerMessageRunnerService implements MessageRunnerService {
75 static final String DELIVERY_COUNT_PARAMETER = "deliveryCount";
76 static final JobRunnerKey AMQ_JOB_RUNNER_KEY = JobRunnerKey.of("amqJobRunner");
77
78 private static final Logger log = LoggerFactory.getLogger(SchedulerMessageRunnerService.class);
79 private static final long DEFAULT_DELIVERY_INTERVAL_MILLIS = TimeUnit.SECONDS.toMillis(Integer.getInteger("amq.scheduler.delivery.interval.seconds", 30));
80 private static final int DEFAULT_MAX_DELIVERY_COUNT = Integer.getInteger("amq.scheduler.max.delivery.count", 3);
81 private static final String PAYLOAD_JOB_PARAMETER = "payload";
82
83 private final SchedulerService schedulerService;
84 private final MessageRunnerRegistryHelper messageRunnerRegistryHelper;
85 private final NestedMessageSerializer nestedMessageSerializer;
86 private final MessageInformationService messageInformationService;
87 private final long deliveryIntervalMillis;
88 private final int maxDeliveryCount;
89 private volatile boolean shutdown = false;
90
91
92
93
94
95
96
97
98 public SchedulerMessageRunnerService(SchedulerService schedulerService,
99 MessageRunnerRegistryHelper messageRunnerRegistryHelper,
100 MessageInformationService messageInformationService,
101 NestedMessageSerializer nestedMessageSerializer) {
102 this(schedulerService, messageRunnerRegistryHelper, messageInformationService, nestedMessageSerializer,
103 DEFAULT_DELIVERY_INTERVAL_MILLIS, DEFAULT_MAX_DELIVERY_COUNT);
104 }
105
106
107
108
109
110
111
112
113
114
115 public SchedulerMessageRunnerService(SchedulerService schedulerService,
116 MessageRunnerRegistryHelper messageRunnerRegistryHelper,
117 MessageInformationService messageInformationService, NestedMessageSerializer nestedMessageSerializer,
118 long deliveryIntervalMillis, int maxDeliveryCount) {
119 this.schedulerService = requireNonNull(schedulerService, "schedulerService");
120 this.messageRunnerRegistryHelper = requireNonNull(messageRunnerRegistryHelper, "messageRunnerRegistryHelper");
121 this.nestedMessageSerializer = requireNonNull(nestedMessageSerializer, "nestedMessageSerializer");
122 this.messageInformationService = requireNonNull(messageInformationService, "messageInformationService");
123
124 if (deliveryIntervalMillis <= 0) {
125 throw new IllegalArgumentException("deliveryIntervalMillis must be greater than 0. Received: " + deliveryIntervalMillis);
126 }
127 this.deliveryIntervalMillis = deliveryIntervalMillis;
128
129 if (maxDeliveryCount <= 0) {
130 throw new IllegalArgumentException("maxDeliveryCount must be greater than 0. Received: " + maxDeliveryCount);
131 }
132 this.maxDeliveryCount = maxDeliveryCount;
133
134 log.info("Constructing {} (maxDeliveryCount: {}, deliveryIntervalMinutes: {})", this.getClass().getSimpleName(), maxDeliveryCount, TimeUnit.MILLISECONDS.toMinutes(deliveryIntervalMillis));
135 }
136
137
138
139
140 @PostConstruct
141 public void init() {
142 log.info("Registering AMQ JobRunner");
143 schedulerService.registerJobRunner(AMQ_JOB_RUNNER_KEY, new AmqJobRunner());
144 }
145
146 private class AmqJobRunner implements JobRunner {
147 @Nullable
148 @Override
149 public JobRunnerResponse runJob(JobRunnerRequest jobRunnerRequest) {
150 final JobRunnerResponse validateResponse = validate(jobRunnerRequest);
151 if (validateResponse != null) {
152 return validateResponse;
153 }
154
155 final JobRunnerResponse incrementDeliveryCountResponse = incrementDeliveryCount(jobRunnerRequest);
156 if (incrementDeliveryCountResponse != null) {
157 return incrementDeliveryCountResponse;
158 }
159
160 final String payload = (String) jobRunnerRequest.getJobConfig().getParameters().get(PAYLOAD_JOB_PARAMETER);
161 final NestedMessage nestedMessage = nestedMessageSerializer.deserialize(payload);
162
163 return processNestedMessage(nestedMessage, new SchedulerMessageContext(jobRunnerRequest, nestedMessage.getPayload(), schedulerService));
164 }
165
166 JobRunnerResponse processNestedMessage(NestedMessage nestedMessage, MessageContext messageContext) {
167 final String messageId = messageContext.getMessageId().orElse("");
168 final String tenantId = nestedMessage.getAttribute(NestedMessageConstants.TENANT_ID_ATTRIBUTE_NAME);
169 if (tenantId == null) {
170 final String errorMessageFormat = "Message received that does not contain required '%s' attribute. Message will be acknowledged (messageID: %s)";
171 final String errorMessage = String.format(errorMessageFormat, NestedMessageConstants.TENANT_ID_ATTRIBUTE_NAME, messageId);
172 log.error(errorMessage);
173 messageContext.acknowledge();
174
175 return JobRunnerResponse.aborted(errorMessage);
176 }
177
178 final String messageRunnerKey = nestedMessage.getAttribute(NestedMessageConstants.MESSAGE_RUNNER_KEY_ATTRIBUTE_NAME);
179 if (messageRunnerKey == null) {
180 final String errorMessageFormat = "Message received that does not contain required '%s' attribute. Message will be acknowledged (messageID: %s)";
181 final String errorMessage = String.format(errorMessageFormat, NestedMessageConstants.MESSAGE_RUNNER_KEY_ATTRIBUTE_NAME, messageId);
182 log.error(errorMessage);
183 messageContext.acknowledge();
184
185 return JobRunnerResponse.aborted(errorMessage);
186 }
187
188 final Optional<MessageRunner> messageRunner = messageRunnerRegistryHelper.getMessageRunner(MessageRunnerKey.of(messageRunnerKey));
189 if (!messageRunner.isPresent()) {
190 final String errorMessageFormat = "No MessageRunner found to process message (messageID: %s, messageRunnerKey: %s)";
191 final String errorMessage = String.format(errorMessageFormat, NestedMessageConstants.MESSAGE_RUNNER_KEY_ATTRIBUTE_NAME, messageId);
192 log.error(errorMessage);
193
194 return JobRunnerResponse.aborted(errorMessage);
195 }
196
197 if (!WorkContextAvailable.isWorkContextAvailable()) {
198 final String errorMessageFormat = "No WorkContext available to process message (messageID: %s, messageRunnerKey: %s)";
199 final String errorMessage = String.format(errorMessageFormat, NestedMessageConstants.MESSAGE_RUNNER_KEY_ATTRIBUTE_NAME, messageId);
200 log.error(errorMessage);
201
202 return JobRunnerResponse.aborted(errorMessage);
203 }
204
205 try {
206 messageRunner.get().processMessage(messageContext);
207
208 if (messageContext.shouldAutoAcknowledgeMessage()) {
209 messageContext.acknowledge();
210 }
211 } catch (Exception e) {
212 log.error("Invoking MessageRunner for message failed with an exception (messageID: {}, messageRunnerKey: {})", messageId, messageRunnerKey, e);
213 }
214
215 return JobRunnerResponse.success();
216 }
217
218 @Nullable
219 JobRunnerResponse validate(JobRunnerRequest jobRunnerRequest) {
220 final JobId jobId = jobRunnerRequest.getJobId();
221 final Map<String, Serializable> jobParameters = jobRunnerRequest.getJobConfig().getParameters();
222 final String payload = (String) jobParameters.get(PAYLOAD_JOB_PARAMETER);
223 final Serializable deliveryCount = jobParameters.get(DELIVERY_COUNT_PARAMETER);
224
225 if (!jobParameters.containsKey(DELIVERY_COUNT_PARAMETER)) {
226 final String format = "Job missing required parameter: %s. Job will be unscheduled (JobId: %s, JobPayload: %s)";
227 log.error(String.format(format, DELIVERY_COUNT_PARAMETER, jobId, "omitted"));
228 schedulerService.unscheduleJob(jobId);
229 return JobRunnerResponse.aborted(String.format(format, DELIVERY_COUNT_PARAMETER, jobId, payload));
230 } else if (!(deliveryCount instanceof Integer)) {
231 final String format = "Job %s parameter of the wrong type. Expecting int, found: %s. Job will be unscheduled (JobId: %s, JobPayload: %s)";
232 log.error(String.format(format, DELIVERY_COUNT_PARAMETER, deliveryCount, jobId, "omitted"));
233 schedulerService.unscheduleJob(jobId);
234 return JobRunnerResponse.aborted(String.format(format, DELIVERY_COUNT_PARAMETER, deliveryCount, jobId, payload));
235 }
236
237 return null;
238 }
239
240 @Nullable
241 JobRunnerResponse incrementDeliveryCount(JobRunnerRequest jobRunnerRequest) {
242 final JobId jobId = jobRunnerRequest.getJobId();
243 final Map<String, Serializable> jobParameters = jobRunnerRequest.getJobConfig().getParameters();
244 final String payload = (String) jobParameters.get(PAYLOAD_JOB_PARAMETER);
245
246 int deliveryCount = (int) jobParameters.get(DELIVERY_COUNT_PARAMETER);
247 if (++deliveryCount > maxDeliveryCount) {
248 final String format = "Max delivery count reached for job. Job will be unscheduled (JobId: %s, JobPayload: %s)";
249 log.warn(String.format(format, jobId, "omitted"));
250 schedulerService.unscheduleJob(jobId);
251 return JobRunnerResponse.aborted(String.format(format, jobId, payload));
252 } else {
253 final Map<String, Serializable> newJobParameters = new HashMap<>(jobParameters);
254 newJobParameters.put(DELIVERY_COUNT_PARAMETER, deliveryCount);
255 try {
256 schedulerService.scheduleJob(jobId, jobRunnerRequest.getJobConfig()
257 .withSchedule(Schedule.forInterval(deliveryIntervalMillis, getFirstRunTime(jobRunnerRequest.getStartTime())))
258 .withParameters(newJobParameters));
259 } catch (SchedulerServiceException e) {
260 final String format = "Error rescheduling job (JobId: %s, JobPayload: %s)";
261 log.error(String.format(format, jobId, "omitted"), e);
262 return JobRunnerResponse.failed(String.format(format, jobId, payload));
263 }
264 log.debug("JobId: {}, deliveryCount: {}", jobId, deliveryCount);
265 }
266
267 return null;
268 }
269 }
270
271 private Date getFirstRunTime(Date jobStartTime) {
272 return new Date(jobStartTime.getTime() + deliveryIntervalMillis);
273 }
274
275
276
277
278 @PreDestroy
279 public void shutdown() {
280 schedulerService.unregisterJobRunner(AMQ_JOB_RUNNER_KEY);
281 shutdown = true;
282 }
283
284 @Override
285 public void addMessage(Message message) {
286 if (shutdown) {
287 throw new MessageRunnerServiceException("message could not be added after shutdown (runnerKey: " + message.getRunnerKey() + ")");
288 }
289
290 if (!messageRunnerRegistryHelper.getMessageRunner(message.getRunnerKey()).isPresent()) {
291 throw new MessageRunnerNotRegisteredException(message.getRunnerKey());
292 }
293
294 final Map<String, Serializable> jobParameters = new HashMap<>();
295 jobParameters.put(PAYLOAD_JOB_PARAMETER, messageInformationService.toPayload(message));
296 jobParameters.put(DELIVERY_COUNT_PARAMETER, 0);
297
298 final JobConfig jobConfig = JobConfig.forJobRunnerKey(AMQ_JOB_RUNNER_KEY)
299 .withRunMode(RunMode.RUN_ONCE_PER_CLUSTER)
300 .withSchedule(Schedule.forInterval(deliveryIntervalMillis, null))
301 .withParameters(jobParameters);
302
303 try {
304 schedulerService.scheduleJobWithGeneratedId(jobConfig);
305 } catch (SchedulerServiceException e) {
306 log.error("Could not schedule job for message with key {}", message.getRunnerKey().toString(), e);
307 throw new MessageRunnerServiceException("Could not add message", e);
308 }
309 }
310 }