View Javadoc

1   package com.atlassian.scheduler.caesium.impl;
2   
3   import com.atlassian.annotations.Internal;
4   import com.atlassian.scheduler.JobRunner;
5   import com.atlassian.scheduler.JobRunnerRequest;
6   import com.atlassian.scheduler.JobRunnerResponse;
7   import com.atlassian.scheduler.SchedulerRuntimeException;
8   import com.atlassian.scheduler.SchedulerServiceException;
9   import com.atlassian.scheduler.caesium.migration.LazyMigratingParameterMapSerializer;
10  import com.atlassian.scheduler.caesium.spi.CaesiumSchedulerConfiguration;
11  import com.atlassian.scheduler.caesium.spi.ClusteredJob;
12  import com.atlassian.scheduler.caesium.spi.ClusteredJobDao;
13  import com.atlassian.scheduler.config.CronScheduleInfo;
14  import com.atlassian.scheduler.config.IntervalScheduleInfo;
15  import com.atlassian.scheduler.config.JobConfig;
16  import com.atlassian.scheduler.config.JobId;
17  import com.atlassian.scheduler.config.JobRunnerKey;
18  import com.atlassian.scheduler.config.RunMode;
19  import com.atlassian.scheduler.config.Schedule;
20  import com.atlassian.scheduler.core.AbstractSchedulerService;
21  import com.atlassian.scheduler.core.JobLauncher;
22  import com.atlassian.scheduler.core.spi.RunDetailsDao;
23  import com.atlassian.scheduler.core.status.LazyJobDetails;
24  import com.atlassian.scheduler.core.status.SimpleJobDetails;
25  import com.atlassian.scheduler.core.util.ParameterMapSerializer;
26  import com.atlassian.scheduler.cron.CronSyntaxException;
27  import com.atlassian.scheduler.status.JobDetails;
28  import com.atlassian.util.concurrent.Sink;
29  import com.google.common.annotations.VisibleForTesting;
30  import com.google.common.collect.ImmutableList;
31  import com.google.common.collect.ImmutableMap;
32  import com.google.common.collect.ImmutableSet;
33  import org.slf4j.Logger;
34  import org.slf4j.LoggerFactory;
35  
36  import javax.annotation.Nonnull;
37  import javax.annotation.Nullable;
38  import java.io.Serializable;
39  import java.util.Collection;
40  import java.util.Date;
41  import java.util.List;
42  import java.util.Map;
43  import java.util.Set;
44  import java.util.TreeMap;
45  import java.util.concurrent.ConcurrentHashMap;
46  import java.util.concurrent.ConcurrentMap;
47  import java.util.concurrent.ThreadFactory;
48  import java.util.concurrent.TimeUnit;
49  import java.util.concurrent.atomic.AtomicBoolean;
50  
51  import static com.atlassian.scheduler.config.RunMode.RUN_LOCALLY;
52  import static com.atlassian.scheduler.config.RunMode.RUN_ONCE_PER_CLUSTER;
53  import static com.atlassian.scheduler.core.util.CronExpressionQuantizer.quantizeSecondsField;
54  import static com.atlassian.scheduler.core.util.TimeIntervalQuantizer.quantizeToMinutes;
55  import static com.atlassian.util.concurrent.Assertions.notNull;
56  
57  /**
58   * Simple direct implementation of a scheduler service
59   */
60  public class CaesiumSchedulerService extends AbstractSchedulerService {
61      private static final Logger LOG = LoggerFactory.getLogger(CaesiumSchedulerService.class);
62      private static final JobId REFRESH_JOB_ID = JobId.of("CaesiumSchedulerService.RefreshJob");
63      private static final JobRunnerKey REFRESH_JOB_RUNNER_KEY = JobRunnerKey.of("CaesiumSchedulerService.RefreshJob");
64      private static final int DEFAULT_JOB_MAP_SIZE = 256;
65  
66      /**
67       * Used to cap the number of attempts to alter the state of a clustered job.
68       */
69      private static final int MAX_TRIES = 50;
70  
71      /**
72       * The default number of workers to use when the configuration gives a non-positive value
73       */
74      private static final int DEFAULT_WORKER_COUNT = 4;
75  
76      private final ConcurrentMap<JobId, JobDetails> localJobs = new ConcurrentHashMap<JobId, JobDetails>(DEFAULT_JOB_MAP_SIZE);
77      private final RefreshJob refreshJob = new RefreshJob();
78      private final AtomicBoolean started = new AtomicBoolean();
79  
80      private final ClusteredJobDao clusteredJobDao;
81      private final CaesiumSchedulerConfiguration config;
82      private final SchedulerQueue queue;
83      private final RunTimeCalculator runTimeCalculator;
84  
85      @SuppressWarnings("unused")
86      public CaesiumSchedulerService(final CaesiumSchedulerConfiguration config, final RunDetailsDao runDetailsDao,
87                                     final ClusteredJobDao clusteredJobDao) {
88          this(config, runDetailsDao, clusteredJobDao, createParameterMapSerializer(config));
89      }
90  
91      @SuppressWarnings("unused")
92      public CaesiumSchedulerService(final CaesiumSchedulerConfiguration config, final RunDetailsDao runDetailsDao,
93                                     final ClusteredJobDao clusteredJobDao, final ParameterMapSerializer serializer) {
94          super(runDetailsDao, notNull("serializer", serializer));
95          this.clusteredJobDao = clusteredJobDao;
96          this.config = config;
97          this.queue = new SchedulerQueueImpl(clusteredJobDao);
98          this.runTimeCalculator = new RunTimeCalculator(config);
99      }
100 
101     @VisibleForTesting
102     CaesiumSchedulerService(final CaesiumSchedulerConfiguration config,
103                             final RunDetailsDao runDetailsDao, final ClusteredJobDao clusteredJobDao, final SchedulerQueue queue,
104                             final RunTimeCalculator runTimeCalculator) {
105         super(runDetailsDao, createParameterMapSerializer(config));
106         this.clusteredJobDao = clusteredJobDao;
107         this.config = config;
108         this.queue = queue;
109         this.runTimeCalculator = runTimeCalculator;
110     }
111 
112     @Override
113     public void scheduleJob(final JobId jobId, final JobConfig jobConfig)
114             throws SchedulerServiceException {
115         notNull("jobId", jobId);
116         notNull("jobConfig", jobConfig);
117 
118         try {
119             LOG.debug("scheduleJob: {}: {}", jobId, jobConfig);
120 
121             switch (jobConfig.getRunMode()) {
122                 case RUN_LOCALLY:
123                     scheduleLocalJob(jobId, jobConfig);
124                     break;
125                 case RUN_ONCE_PER_CLUSTER:
126                     scheduleClusteredJob(jobId, jobConfig);
127                     break;
128                 default:
129                     throw new IllegalArgumentException("Unsupported run mode: " + jobConfig.getRunMode());
130             }
131         } catch (SchedulerRuntimeException sre) {
132             throw checked(sre);
133         }
134     }
135 
136     private void scheduleLocalJob(final JobId jobId, final JobConfig jobConfig) throws SchedulerServiceException {
137         final Date nextRunTime = runTimeCalculator.firstRunTime(jobId, jobConfig);
138         final Map<String, Serializable> parameters = jobConfig.getParameters();
139         final SimpleJobDetails jobDetails = new SimpleJobDetails(jobId, jobConfig.getJobRunnerKey(),
140                 RUN_LOCALLY, quantize(jobConfig.getSchedule()), nextRunTime,
141                 getParameterMapSerializer().serializeParameters(parameters), parameters);
142 
143         clusteredJobDao.delete(jobId);
144         localJobs.put(jobId, jobDetails);
145         enqueueJob(jobId, nextRunTime);
146     }
147 
148     private void scheduleClusteredJob(final JobId jobId, final JobConfig jobConfig) throws SchedulerServiceException {
149         final Date nextRunTime = runTimeCalculator.firstRunTime(jobId, jobConfig);
150         final ClusteredJob clusteredJob = ImmutableClusteredJob.builder()
151                 .jobId(jobId)
152                 .jobRunnerKey(jobConfig.getJobRunnerKey())
153                 .schedule(quantize(jobConfig.getSchedule()))
154                 .nextRunTime(nextRunTime)
155                 .parameters(getParameterMapSerializer().serializeParameters(jobConfig.getParameters()))
156                 .build();
157 
158         localJobs.remove(jobId);
159         createOrReplaceWithRetry(clusteredJob);
160         enqueueJob(jobId, nextRunTime);
161     }
162 
163     private void createOrReplaceWithRetry(final ClusteredJob clusteredJob)
164             throws SchedulerServiceException {
165         for (int attempt = 1; attempt <= MAX_TRIES; ++attempt) {
166             clusteredJobDao.delete(clusteredJob.getJobId());
167             if (clusteredJobDao.create(clusteredJob)) {
168                 return;
169             }
170         }
171 
172         throw new SchedulerServiceException("Unable to either create or replace clustered job: " + clusteredJob);
173     }
174 
175 
176     @Override
177     public void unscheduleJob(final JobId jobId) {
178         // Deliberately avoiding a short-circuit; we want to remove from both
179         boolean found = localJobs.remove(jobId) != null;
180         found |= clusteredJobDao.delete(jobId);
181 
182         // Also remove it from the queue (regardless of whether or not we think it's actually in there)
183         queue.remove(jobId);
184 
185         if (found) {
186             LOG.debug("unscheduleJob: {}", jobId);
187         } else {
188             LOG.debug("unscheduleJob for non-existent jobId: {}", jobId);
189         }
190     }
191 
192     @Nullable
193     @Override
194     public JobDetails getJobDetails(final JobId jobId) {
195         final JobDetails localJob = localJobs.get(jobId);
196         if (localJob != null) {
197             return localJob;
198         }
199 
200         final ClusteredJob clusteredJob = clusteredJobDao.find(jobId);
201         return (clusteredJob != null) ? toJobDetails(clusteredJob) : null;
202     }
203 
204     @Nonnull
205     @Override
206     public Set<JobRunnerKey> getJobRunnerKeysForAllScheduledJobs() {
207         final ImmutableSet.Builder<JobRunnerKey> keys = ImmutableSet.builder();
208         for (JobDetails localJob : localJobs.values()) {
209             keys.add(localJob.getJobRunnerKey());
210         }
211         keys.addAll(clusteredJobDao.findAllJobRunnerKeys());
212         return keys.build();
213     }
214 
215     @Nonnull
216     @Override
217     public List<JobDetails> getJobsByJobRunnerKey(final JobRunnerKey jobRunnerKey) {
218         final Map<JobId, JobDetails> jobs = new TreeMap<JobId, JobDetails>();
219         for (JobDetails jobDetails : localJobs.values()) {
220             if (jobDetails.getJobRunnerKey().equals(jobRunnerKey)) {
221                 jobs.put(jobDetails.getJobId(), jobDetails);
222             }
223         }
224 
225         final Collection<ClusteredJob> clusteredJobs = clusteredJobDao.findByJobRunnerKey(jobRunnerKey);
226         for (ClusteredJob clusteredJob : clusteredJobs) {
227             jobs.put(clusteredJob.getJobId(), toJobDetails(clusteredJob));
228         }
229 
230         return ImmutableList.copyOf(jobs.values());
231     }
232 
233     @Nullable
234     @Override
235     public Date calculateNextRunTime(Schedule schedule) throws SchedulerServiceException {
236         return runTimeCalculator.nextRunTime(schedule, null);
237     }
238 
239     @Override
240     protected void startImpl() throws SchedulerServiceException {
241         queue.resume();
242 
243         if (started.compareAndSet(false, true)) {
244             startWorkers();
245             refreshClusteredJobs();
246             scheduleRefreshJob();
247         }
248     }
249 
250 
251     private void startWorkers() {
252         final SchedulerQueueWorker worker = new SchedulerQueueWorker(queue, new Sink<QueuedJob>() {
253             @Override
254             public void consume(QueuedJob job) {
255                 executeQueuedJob(job);
256             }
257         });
258 
259         int workerCount = config.workerThreadCount();
260         if (workerCount <= 0) {
261             workerCount = DEFAULT_WORKER_COUNT;
262         }
263 
264         final ThreadFactory threadFactory = new WorkerThreadFactory();
265         for (int i = 1; i <= workerCount; ++i) {
266             threadFactory.newThread(worker).start();
267         }
268     }
269 
270     @Override
271     protected void standbyImpl() throws SchedulerServiceException {
272         queue.pause();
273     }
274 
275     @Override
276     protected void shutdownImpl() {
277         queue.close();
278     }
279 
280     /**
281      * Preform a one-off refresh of a cluster job that is suspected of changing state.
282      * <p>
283      * This may be useful to an implementation that is using external messages to communicate
284      * changes to clustered job schedules.
285      * </p>
286      *
287      * @param jobId the jobId of the job whose persisted state may have changed
288      */
289     public void refreshClusteredJob(JobId jobId) {
290         final Date nextRunTime = clusteredJobDao.getNextRunTime(jobId);
291         if (nextRunTime == null) {
292             if (localJobs.containsKey(jobId)) {
293                 // Well, we'd better not remove it, then...
294                 LOG.debug("Asked to refresh job '{}', but it is a local job so that was a bit silly.", jobId);
295             } else {
296                 queue.remove(jobId);
297             }
298             return;
299         }
300 
301         localJobs.remove(jobId);
302         try {
303             queue.add(new QueuedJob(jobId, nextRunTime.getTime()));
304         } catch (SchedulerQueueImpl.SchedulerShutdownException sse) {
305             LOG.debug("Refresh failed for job '{}' due to scheduler shutdown", jobId, sse);
306         }
307     }
308 
309     /**
310      * Perform the routine maintenance that is necessary to guarantee that this cluster node has
311      * up-to-date information concerning all clustered jobs.
312      * <p>
313      * If the scheduler has already been shutdown, then this request is ignored.
314      * </p>
315      *
316      * @see CaesiumSchedulerConfiguration#refreshClusteredJobsIntervalInMinutes()
317      */
318     public void refreshClusteredJobs() {
319         final Map<JobId, Date> clusteredJobs = queue.refreshClusteredJobs();
320         localJobs.keySet().removeAll(clusteredJobs.keySet());
321     }
322 
323     private void scheduleRefreshJob() throws SchedulerServiceException {
324         final int refreshInterval = config.refreshClusteredJobsIntervalInMinutes();
325         if (refreshInterval > 0) {
326             registerJobRunner(REFRESH_JOB_RUNNER_KEY, refreshJob);
327 
328             final long millis = TimeUnit.MINUTES.toMillis(refreshInterval);
329             final Schedule schedule = Schedule.forInterval(millis, new Date(now() + millis));
330             scheduleJob(REFRESH_JOB_ID, JobConfig.forJobRunnerKey(REFRESH_JOB_RUNNER_KEY)
331                     .withRunMode(RUN_LOCALLY)
332                     .withSchedule(schedule));
333         } else {
334             unscheduleJob(REFRESH_JOB_ID);
335             unregisterJobRunner(REFRESH_JOB_RUNNER_KEY);
336         }
337     }
338 
339     /**
340      * Callback from a queue worker to handle a {@code QueuedJob} that is now due.
341      *
342      * @param job the job to be executed
343      */
344     void executeQueuedJob(final QueuedJob job) {
345         final JobDetails jobDetails = localJobs.get(job.getJobId());
346         if (jobDetails != null) {
347             executeLocalJob(jobDetails);
348         } else {
349             executeClusteredJob(job);
350         }
351     }
352 
353     void executeLocalJob(final JobDetails jobDetails) {
354         final Date firedAt = new Date(now());
355         final JobId jobId = jobDetails.getJobId();
356 
357         final Date scheduledRunTime = jobDetails.getNextRunTime();
358         if (scheduledRunTime == null || firedAt.getTime() < scheduledRunTime.getTime()) {
359             // It's very strange that a local job would launch early.  Best guess is that somebody
360             // rescheduled it right in the middle of the launching process or that the clock moved.
361             // Either one is pretty weird.
362             LOG.debug("Launch for job '{}' either too early or after it's been deleted; scheduledRunTime={}",
363                     jobDetails, scheduledRunTime);
364             enqueueJob(jobId, scheduledRunTime);
365             return;
366         }
367 
368         enqueueJob(jobId, calculateNextRunTime(jobDetails, firedAt));
369         launchJob(RUN_LOCALLY, firedAt, jobDetails);
370     }
371 
372     void executeClusteredJob(final QueuedJob queuedJob) {
373         final Date firedAt = new Date(now());
374         final JobId jobId = queuedJob.getJobId();
375         final ClusteredJob clusteredJob = clusteredJobDao.find(jobId);
376         if (clusteredJob == null) {
377             LOG.debug("Failed to claim '{}' for run at {}; the job no longer exists.", jobId, firedAt);
378             return;
379         }
380 
381         final JobDetails jobDetails = toJobDetails(clusteredJob);
382         final Date scheduledRunTime = jobDetails.getNextRunTime();
383         if (scheduledRunTime == null || queuedJob.getDeadline() < scheduledRunTime.getTime()) {
384             enqueueJob(jobId, scheduledRunTime);
385             return;
386         }
387 
388         final Date nextRunTime = calculateNextRunTime(jobDetails, firedAt);
389         if (!clusteredJobDao.updateNextRunTime(jobId, nextRunTime, clusteredJob.getVersion())) {
390             LOG.debug("Failed to claim '{}' for run at {}; guess another node got there first?", jobId, nextRunTime);
391             refreshClusteredJob(jobId);
392             return;
393         }
394 
395         enqueueJob(jobId, nextRunTime);
396         launchJob(RUN_ONCE_PER_CLUSTER, firedAt, jobDetails);
397     }
398 
399     private void launchJob(final RunMode runMode, final Date firedAt, final JobDetails jobDetails) {
400         final JobLauncher launcher = new JobLauncher(this, runMode, firedAt, jobDetails.getJobId(), jobDetails);
401         launcher.launch();
402     }
403 
404     // Log errors for bad cron expressions instead of letting them kill us.  This really
405     // shouldn't happen because the bad expression should have prevented the job from being
406     // scheduled in the first place, but just in case...
407     @Nullable
408     private Date calculateNextRunTime(final JobDetails jobDetails, final Date prevRunTime) {
409         try {
410             return runTimeCalculator.nextRunTime(jobDetails.getSchedule(), prevRunTime);
411         } catch (CronSyntaxException cse) {
412             LOG.error("Clustered job '{}' has invalid cron schedule '{}' and will never run.",
413                     jobDetails.getJobId(), jobDetails.getSchedule().getCronScheduleInfo().getCronExpression());
414             return null;
415         }
416     }
417 
418 
419     protected void enqueueJob(final JobId jobId, @Nullable final Date expectedTime) {
420         try {
421             if (expectedTime == null) {
422                 queue.remove(jobId);
423                 LOG.debug("Job '{}' has a null nextRunTime, which means we never expect it to run again", jobId);
424                 return;
425             }
426 
427             queue.add(new QueuedJob(jobId, expectedTime.getTime()));
428             LOG.debug("Enqueued job '{}' for {}", jobId, expectedTime);
429         } catch (SchedulerQueue.SchedulerShutdownException sse) {
430             LOG.debug("Could not enqueue job '{}' because we're in the middle of shutting down", jobId, sse);
431         }
432     }
433 
434 
435     @Nonnull
436     JobDetails toJobDetails(final ClusteredJob clusteredJob) {
437         return new LazyJobDetails(this, clusteredJob.getJobId(), clusteredJob.getJobRunnerKey(),
438                 RUN_ONCE_PER_CLUSTER, clusteredJob.getSchedule(), clusteredJob.getNextRunTime(),
439                 clusteredJob.getRawParameters());
440     }
441 
442     private Schedule quantize(final Schedule schedule) {
443         if (config.useFineGrainedSchedules()) {
444             return schedule;
445         }
446 
447         switch (schedule.getType()) {
448             case INTERVAL: {
449                 final IntervalScheduleInfo info = schedule.getIntervalScheduleInfo();
450                 return Schedule.forInterval(quantizeToMinutes(info.getIntervalInMillis()), info.getFirstRunTime());
451             }
452 
453             case CRON_EXPRESSION: {
454                 final CronScheduleInfo info = schedule.getCronScheduleInfo();
455                 return Schedule.forCronExpression(quantizeSecondsField(info.getCronExpression()), info.getTimeZone());
456             }
457 
458             default:
459                 throw new IllegalStateException("Unsupported schedule type: " + schedule.getType());
460         }
461     }
462 
463     /**
464      * Returns {@code System.currentTimeMillis()}.
465      *
466      * @return {@code System.currentTimeMillis()}
467      */
468     @VisibleForTesting
469     long now() {
470         return System.currentTimeMillis();
471     }
472 
473     /**
474      * Returns the jobs that are currently in the scheduler's "to-do" list, in a map that is ordered by when they are scheduled
475      * to run.
476      * <p>
477      * Note that this is not part of the {@code atlassian-scheduler} API and has been provided primarily as a debugging and
478      * instrumentation tool.
479      * </p>
480      *
481      * @return a map of the jobs that are currently scheduled to run, ordered by their scheduled run time.
482      */
483     @Internal
484     public Map<JobId, Date> getPendingJobs() {
485         final ImmutableMap.Builder<JobId, Date> jobMap = ImmutableMap.builder();
486         for (QueuedJob job : queue.getPendingJobs()) {
487             jobMap.put(job.getJobId(), new Date(job.getDeadline()));
488         }
489         return jobMap.build();
490     }
491 
492     /**
493      * The default factory method used for parameter map serializers.
494      *
495      * @param config the caesium scheduler configuration
496      * @return the newly created parameter map serializer
497      */
498     protected static ParameterMapSerializer createParameterMapSerializer(final CaesiumSchedulerConfiguration config) {
499         if (config.useQuartzJobDataMapMigration()) {
500             return new LazyMigratingParameterMapSerializer();
501         }
502         return new ParameterMapSerializer();
503     }
504 
505 
506     class RefreshJob implements JobRunner {
507         @Nullable
508         @Override
509         public JobRunnerResponse runJob(JobRunnerRequest request) {
510             refreshClusteredJobs();
511             return JobRunnerResponse.success();
512         }
513     }
514 }