1 package com.atlassian.scheduler.caesium.impl;
2
3 import com.atlassian.annotations.Internal;
4 import com.atlassian.scheduler.JobRunner;
5 import com.atlassian.scheduler.JobRunnerRequest;
6 import com.atlassian.scheduler.JobRunnerResponse;
7 import com.atlassian.scheduler.SchedulerRuntimeException;
8 import com.atlassian.scheduler.SchedulerServiceException;
9 import com.atlassian.scheduler.caesium.migration.LazyMigratingParameterMapSerializer;
10 import com.atlassian.scheduler.caesium.spi.CaesiumSchedulerConfiguration;
11 import com.atlassian.scheduler.caesium.spi.ClusteredJob;
12 import com.atlassian.scheduler.caesium.spi.ClusteredJobDao;
13 import com.atlassian.scheduler.config.CronScheduleInfo;
14 import com.atlassian.scheduler.config.IntervalScheduleInfo;
15 import com.atlassian.scheduler.config.JobConfig;
16 import com.atlassian.scheduler.config.JobId;
17 import com.atlassian.scheduler.config.JobRunnerKey;
18 import com.atlassian.scheduler.config.RunMode;
19 import com.atlassian.scheduler.config.Schedule;
20 import com.atlassian.scheduler.core.AbstractSchedulerService;
21 import com.atlassian.scheduler.core.JobLauncher;
22 import com.atlassian.scheduler.core.spi.RunDetailsDao;
23 import com.atlassian.scheduler.core.status.LazyJobDetails;
24 import com.atlassian.scheduler.core.status.SimpleJobDetails;
25 import com.atlassian.scheduler.core.util.ParameterMapSerializer;
26 import com.atlassian.scheduler.cron.CronSyntaxException;
27 import com.atlassian.scheduler.status.JobDetails;
28 import com.atlassian.util.concurrent.Sink;
29 import com.google.common.annotations.VisibleForTesting;
30 import com.google.common.collect.ImmutableList;
31 import com.google.common.collect.ImmutableMap;
32 import com.google.common.collect.ImmutableSet;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 import javax.annotation.Nonnull;
37 import javax.annotation.Nullable;
38 import java.io.Serializable;
39 import java.util.Collection;
40 import java.util.Date;
41 import java.util.List;
42 import java.util.Map;
43 import java.util.Set;
44 import java.util.TreeMap;
45 import java.util.concurrent.ConcurrentHashMap;
46 import java.util.concurrent.ConcurrentMap;
47 import java.util.concurrent.ThreadFactory;
48 import java.util.concurrent.TimeUnit;
49 import java.util.concurrent.atomic.AtomicBoolean;
50
51 import static com.atlassian.scheduler.config.RunMode.RUN_LOCALLY;
52 import static com.atlassian.scheduler.config.RunMode.RUN_ONCE_PER_CLUSTER;
53 import static com.atlassian.scheduler.core.util.CronExpressionQuantizer.quantizeSecondsField;
54 import static com.atlassian.scheduler.core.util.TimeIntervalQuantizer.quantizeToMinutes;
55 import static com.atlassian.util.concurrent.Assertions.notNull;
56
57
58
59
60 public class CaesiumSchedulerService extends AbstractSchedulerService {
61 private static final Logger LOG = LoggerFactory.getLogger(CaesiumSchedulerService.class);
62 private static final JobId REFRESH_JOB_ID = JobId.of("CaesiumSchedulerService.RefreshJob");
63 private static final JobRunnerKey REFRESH_JOB_RUNNER_KEY = JobRunnerKey.of("CaesiumSchedulerService.RefreshJob");
64 private static final int DEFAULT_JOB_MAP_SIZE = 256;
65
66
67
68
69 private static final int MAX_TRIES = 50;
70
71
72
73
74 private static final int DEFAULT_WORKER_COUNT = 4;
75
76 private final ConcurrentMap<JobId, JobDetails> localJobs = new ConcurrentHashMap<JobId, JobDetails>(DEFAULT_JOB_MAP_SIZE);
77 private final RefreshJob refreshJob = new RefreshJob();
78 private final AtomicBoolean started = new AtomicBoolean();
79
80 private final ClusteredJobDao clusteredJobDao;
81 private final CaesiumSchedulerConfiguration config;
82 private final SchedulerQueue queue;
83 private final RunTimeCalculator runTimeCalculator;
84
85 @SuppressWarnings("unused")
86 public CaesiumSchedulerService(final CaesiumSchedulerConfiguration config, final RunDetailsDao runDetailsDao,
87 final ClusteredJobDao clusteredJobDao) {
88 this(config, runDetailsDao, clusteredJobDao, createParameterMapSerializer(config));
89 }
90
91 @SuppressWarnings("unused")
92 public CaesiumSchedulerService(final CaesiumSchedulerConfiguration config, final RunDetailsDao runDetailsDao,
93 final ClusteredJobDao clusteredJobDao, final ParameterMapSerializer serializer) {
94 super(runDetailsDao, notNull("serializer", serializer));
95 this.clusteredJobDao = clusteredJobDao;
96 this.config = config;
97 this.queue = new SchedulerQueueImpl(clusteredJobDao);
98 this.runTimeCalculator = new RunTimeCalculator(config);
99 }
100
101 @VisibleForTesting
102 CaesiumSchedulerService(final CaesiumSchedulerConfiguration config,
103 final RunDetailsDao runDetailsDao, final ClusteredJobDao clusteredJobDao, final SchedulerQueue queue,
104 final RunTimeCalculator runTimeCalculator) {
105 super(runDetailsDao, createParameterMapSerializer(config));
106 this.clusteredJobDao = clusteredJobDao;
107 this.config = config;
108 this.queue = queue;
109 this.runTimeCalculator = runTimeCalculator;
110 }
111
112 @Override
113 public void scheduleJob(final JobId jobId, final JobConfig jobConfig)
114 throws SchedulerServiceException {
115 notNull("jobId", jobId);
116 notNull("jobConfig", jobConfig);
117
118 try {
119 LOG.debug("scheduleJob: {}: {}", jobId, jobConfig);
120
121 switch (jobConfig.getRunMode()) {
122 case RUN_LOCALLY:
123 scheduleLocalJob(jobId, jobConfig);
124 break;
125 case RUN_ONCE_PER_CLUSTER:
126 scheduleClusteredJob(jobId, jobConfig);
127 break;
128 default:
129 throw new IllegalArgumentException("Unsupported run mode: " + jobConfig.getRunMode());
130 }
131 } catch (SchedulerRuntimeException sre) {
132 throw checked(sre);
133 }
134 }
135
136 private void scheduleLocalJob(final JobId jobId, final JobConfig jobConfig) throws SchedulerServiceException {
137 final Date nextRunTime = runTimeCalculator.firstRunTime(jobId, jobConfig);
138 final Map<String, Serializable> parameters = jobConfig.getParameters();
139 final SimpleJobDetails jobDetails = new SimpleJobDetails(jobId, jobConfig.getJobRunnerKey(),
140 RUN_LOCALLY, quantize(jobConfig.getSchedule()), nextRunTime,
141 getParameterMapSerializer().serializeParameters(parameters), parameters);
142
143 clusteredJobDao.delete(jobId);
144 localJobs.put(jobId, jobDetails);
145 enqueueJob(jobId, nextRunTime);
146 }
147
148 private void scheduleClusteredJob(final JobId jobId, final JobConfig jobConfig) throws SchedulerServiceException {
149 final Date nextRunTime = runTimeCalculator.firstRunTime(jobId, jobConfig);
150 final ClusteredJob clusteredJob = ImmutableClusteredJob.builder()
151 .jobId(jobId)
152 .jobRunnerKey(jobConfig.getJobRunnerKey())
153 .schedule(quantize(jobConfig.getSchedule()))
154 .nextRunTime(nextRunTime)
155 .parameters(getParameterMapSerializer().serializeParameters(jobConfig.getParameters()))
156 .build();
157
158 localJobs.remove(jobId);
159 createOrReplaceWithRetry(clusteredJob);
160 enqueueJob(jobId, nextRunTime);
161 }
162
163 private void createOrReplaceWithRetry(final ClusteredJob clusteredJob)
164 throws SchedulerServiceException {
165 for (int attempt = 1; attempt <= MAX_TRIES; ++attempt) {
166 clusteredJobDao.delete(clusteredJob.getJobId());
167 if (clusteredJobDao.create(clusteredJob)) {
168 return;
169 }
170 }
171
172 throw new SchedulerServiceException("Unable to either create or replace clustered job: " + clusteredJob);
173 }
174
175
176 @Override
177 public void unscheduleJob(final JobId jobId) {
178
179 boolean found = localJobs.remove(jobId) != null;
180 found |= clusteredJobDao.delete(jobId);
181
182
183 queue.remove(jobId);
184
185 if (found) {
186 LOG.debug("unscheduleJob: {}", jobId);
187 } else {
188 LOG.debug("unscheduleJob for non-existent jobId: {}", jobId);
189 }
190 }
191
192 @Nullable
193 @Override
194 public JobDetails getJobDetails(final JobId jobId) {
195 final JobDetails localJob = localJobs.get(jobId);
196 if (localJob != null) {
197 return localJob;
198 }
199
200 final ClusteredJob clusteredJob = clusteredJobDao.find(jobId);
201 return (clusteredJob != null) ? toJobDetails(clusteredJob) : null;
202 }
203
204 @Nonnull
205 @Override
206 public Set<JobRunnerKey> getJobRunnerKeysForAllScheduledJobs() {
207 final ImmutableSet.Builder<JobRunnerKey> keys = ImmutableSet.builder();
208 for (JobDetails localJob : localJobs.values()) {
209 keys.add(localJob.getJobRunnerKey());
210 }
211 keys.addAll(clusteredJobDao.findAllJobRunnerKeys());
212 return keys.build();
213 }
214
215 @Nonnull
216 @Override
217 public List<JobDetails> getJobsByJobRunnerKey(final JobRunnerKey jobRunnerKey) {
218 final Map<JobId, JobDetails> jobs = new TreeMap<JobId, JobDetails>();
219 for (JobDetails jobDetails : localJobs.values()) {
220 if (jobDetails.getJobRunnerKey().equals(jobRunnerKey)) {
221 jobs.put(jobDetails.getJobId(), jobDetails);
222 }
223 }
224
225 final Collection<ClusteredJob> clusteredJobs = clusteredJobDao.findByJobRunnerKey(jobRunnerKey);
226 for (ClusteredJob clusteredJob : clusteredJobs) {
227 jobs.put(clusteredJob.getJobId(), toJobDetails(clusteredJob));
228 }
229
230 return ImmutableList.copyOf(jobs.values());
231 }
232
233 @Nullable
234 @Override
235 public Date calculateNextRunTime(Schedule schedule) throws SchedulerServiceException {
236 return runTimeCalculator.nextRunTime(schedule, null);
237 }
238
239 @Override
240 protected void startImpl() throws SchedulerServiceException {
241 queue.resume();
242
243 if (started.compareAndSet(false, true)) {
244 startWorkers();
245 refreshClusteredJobs();
246 scheduleRefreshJob();
247 }
248 }
249
250
251 private void startWorkers() {
252 final SchedulerQueueWorker worker = new SchedulerQueueWorker(queue, new Sink<QueuedJob>() {
253 @Override
254 public void consume(QueuedJob job) {
255 executeQueuedJob(job);
256 }
257 });
258
259 int workerCount = config.workerThreadCount();
260 if (workerCount <= 0) {
261 workerCount = DEFAULT_WORKER_COUNT;
262 }
263
264 final ThreadFactory threadFactory = new WorkerThreadFactory();
265 for (int i = 1; i <= workerCount; ++i) {
266 threadFactory.newThread(worker).start();
267 }
268 }
269
270 @Override
271 protected void standbyImpl() throws SchedulerServiceException {
272 queue.pause();
273 }
274
275 @Override
276 protected void shutdownImpl() {
277 queue.close();
278 }
279
280
281
282
283
284
285
286
287
288
289 public void refreshClusteredJob(JobId jobId) {
290 final Date nextRunTime = clusteredJobDao.getNextRunTime(jobId);
291 if (nextRunTime == null) {
292 if (localJobs.containsKey(jobId)) {
293
294 LOG.debug("Asked to refresh job '{}', but it is a local job so that was a bit silly.", jobId);
295 } else {
296 queue.remove(jobId);
297 }
298 return;
299 }
300
301 localJobs.remove(jobId);
302 try {
303 queue.add(new QueuedJob(jobId, nextRunTime.getTime()));
304 } catch (SchedulerQueueImpl.SchedulerShutdownException sse) {
305 LOG.debug("Refresh failed for job '{}' due to scheduler shutdown", jobId, sse);
306 }
307 }
308
309
310
311
312
313
314
315
316
317
318 public void refreshClusteredJobs() {
319 final Map<JobId, Date> clusteredJobs = queue.refreshClusteredJobs();
320 localJobs.keySet().removeAll(clusteredJobs.keySet());
321 }
322
323 private void scheduleRefreshJob() throws SchedulerServiceException {
324 final int refreshInterval = config.refreshClusteredJobsIntervalInMinutes();
325 if (refreshInterval > 0) {
326 registerJobRunner(REFRESH_JOB_RUNNER_KEY, refreshJob);
327
328 final long millis = TimeUnit.MINUTES.toMillis(refreshInterval);
329 final Schedule schedule = Schedule.forInterval(millis, new Date(now() + millis));
330 scheduleJob(REFRESH_JOB_ID, JobConfig.forJobRunnerKey(REFRESH_JOB_RUNNER_KEY)
331 .withRunMode(RUN_LOCALLY)
332 .withSchedule(schedule));
333 } else {
334 unscheduleJob(REFRESH_JOB_ID);
335 unregisterJobRunner(REFRESH_JOB_RUNNER_KEY);
336 }
337 }
338
339
340
341
342
343
344 void executeQueuedJob(final QueuedJob job) {
345 final JobDetails jobDetails = localJobs.get(job.getJobId());
346 if (jobDetails != null) {
347 executeLocalJob(jobDetails);
348 } else {
349 executeClusteredJob(job);
350 }
351 }
352
353 void executeLocalJob(final JobDetails jobDetails) {
354 final Date firedAt = new Date(now());
355 final JobId jobId = jobDetails.getJobId();
356
357 final Date scheduledRunTime = jobDetails.getNextRunTime();
358 if (scheduledRunTime == null || firedAt.getTime() < scheduledRunTime.getTime()) {
359
360
361
362 LOG.debug("Launch for job '{}' either too early or after it's been deleted; scheduledRunTime={}",
363 jobDetails, scheduledRunTime);
364 enqueueJob(jobId, scheduledRunTime);
365 return;
366 }
367
368 enqueueJob(jobId, calculateNextRunTime(jobDetails, firedAt));
369 launchJob(RUN_LOCALLY, firedAt, jobDetails);
370 }
371
372 void executeClusteredJob(final QueuedJob queuedJob) {
373 final Date firedAt = new Date(now());
374 final JobId jobId = queuedJob.getJobId();
375 final ClusteredJob clusteredJob = clusteredJobDao.find(jobId);
376 if (clusteredJob == null) {
377 LOG.debug("Failed to claim '{}' for run at {}; the job no longer exists.", jobId, firedAt);
378 return;
379 }
380
381 final JobDetails jobDetails = toJobDetails(clusteredJob);
382 final Date scheduledRunTime = jobDetails.getNextRunTime();
383 if (scheduledRunTime == null || queuedJob.getDeadline() < scheduledRunTime.getTime()) {
384 enqueueJob(jobId, scheduledRunTime);
385 return;
386 }
387
388 final Date nextRunTime = calculateNextRunTime(jobDetails, firedAt);
389 if (!clusteredJobDao.updateNextRunTime(jobId, nextRunTime, clusteredJob.getVersion())) {
390 LOG.debug("Failed to claim '{}' for run at {}; guess another node got there first?", jobId, nextRunTime);
391 refreshClusteredJob(jobId);
392 return;
393 }
394
395 enqueueJob(jobId, nextRunTime);
396 launchJob(RUN_ONCE_PER_CLUSTER, firedAt, jobDetails);
397 }
398
399 private void launchJob(final RunMode runMode, final Date firedAt, final JobDetails jobDetails) {
400 final JobLauncher launcher = new JobLauncher(this, runMode, firedAt, jobDetails.getJobId(), jobDetails);
401 launcher.launch();
402 }
403
404
405
406
407 @Nullable
408 private Date calculateNextRunTime(final JobDetails jobDetails, final Date prevRunTime) {
409 try {
410 return runTimeCalculator.nextRunTime(jobDetails.getSchedule(), prevRunTime);
411 } catch (CronSyntaxException cse) {
412 LOG.error("Clustered job '{}' has invalid cron schedule '{}' and will never run.",
413 jobDetails.getJobId(), jobDetails.getSchedule().getCronScheduleInfo().getCronExpression());
414 return null;
415 }
416 }
417
418
419 protected void enqueueJob(final JobId jobId, @Nullable final Date expectedTime) {
420 try {
421 if (expectedTime == null) {
422 queue.remove(jobId);
423 LOG.debug("Job '{}' has a null nextRunTime, which means we never expect it to run again", jobId);
424 return;
425 }
426
427 queue.add(new QueuedJob(jobId, expectedTime.getTime()));
428 LOG.debug("Enqueued job '{}' for {}", jobId, expectedTime);
429 } catch (SchedulerQueue.SchedulerShutdownException sse) {
430 LOG.debug("Could not enqueue job '{}' because we're in the middle of shutting down", jobId, sse);
431 }
432 }
433
434
435 @Nonnull
436 JobDetails toJobDetails(final ClusteredJob clusteredJob) {
437 return new LazyJobDetails(this, clusteredJob.getJobId(), clusteredJob.getJobRunnerKey(),
438 RUN_ONCE_PER_CLUSTER, clusteredJob.getSchedule(), clusteredJob.getNextRunTime(),
439 clusteredJob.getRawParameters());
440 }
441
442 private Schedule quantize(final Schedule schedule) {
443 if (config.useFineGrainedSchedules()) {
444 return schedule;
445 }
446
447 switch (schedule.getType()) {
448 case INTERVAL: {
449 final IntervalScheduleInfo info = schedule.getIntervalScheduleInfo();
450 return Schedule.forInterval(quantizeToMinutes(info.getIntervalInMillis()), info.getFirstRunTime());
451 }
452
453 case CRON_EXPRESSION: {
454 final CronScheduleInfo info = schedule.getCronScheduleInfo();
455 return Schedule.forCronExpression(quantizeSecondsField(info.getCronExpression()), info.getTimeZone());
456 }
457
458 default:
459 throw new IllegalStateException("Unsupported schedule type: " + schedule.getType());
460 }
461 }
462
463
464
465
466
467
468 @VisibleForTesting
469 long now() {
470 return System.currentTimeMillis();
471 }
472
473
474
475
476
477
478
479
480
481
482
483 @Internal
484 public Map<JobId, Date> getPendingJobs() {
485 final ImmutableMap.Builder<JobId, Date> jobMap = ImmutableMap.builder();
486 for (QueuedJob job : queue.getPendingJobs()) {
487 jobMap.put(job.getJobId(), new Date(job.getDeadline()));
488 }
489 return jobMap.build();
490 }
491
492
493
494
495
496
497
498 protected static ParameterMapSerializer createParameterMapSerializer(final CaesiumSchedulerConfiguration config) {
499 if (config.useQuartzJobDataMapMigration()) {
500 return new LazyMigratingParameterMapSerializer();
501 }
502 return new ParameterMapSerializer();
503 }
504
505
506 class RefreshJob implements JobRunner {
507 @Nullable
508 @Override
509 public JobRunnerResponse runJob(JobRunnerRequest request) {
510 refreshClusteredJobs();
511 return JobRunnerResponse.success();
512 }
513 }
514 }