View Javadoc

1   package com.atlassian.scheduler.quartz2;
2   
3   import com.atlassian.scheduler.SchedulerRuntimeException;
4   import com.atlassian.scheduler.SchedulerService;
5   import com.atlassian.scheduler.SchedulerServiceException;
6   import com.atlassian.scheduler.config.JobConfig;
7   import com.atlassian.scheduler.config.JobId;
8   import com.atlassian.scheduler.config.JobRunnerKey;
9   import com.atlassian.scheduler.config.RunMode;
10  import com.atlassian.scheduler.config.Schedule;
11  import com.atlassian.scheduler.core.AbstractSchedulerService;
12  import com.atlassian.scheduler.core.JobLauncher;
13  import com.atlassian.scheduler.core.spi.RunDetailsDao;
14  import com.atlassian.scheduler.core.spi.SchedulerServiceConfiguration;
15  import com.atlassian.scheduler.quartz2.spi.Quartz2SchedulerConfiguration;
16  import com.atlassian.scheduler.status.JobDetails;
17  import com.google.common.collect.ImmutableList;
18  import com.google.common.collect.ImmutableSet;
19  import org.quartz.Scheduler;
20  import org.quartz.Trigger;
21  import org.quartz.TriggerBuilder;
22  import org.slf4j.Logger;
23  import org.slf4j.LoggerFactory;
24  
25  import javax.annotation.Nonnull;
26  import javax.annotation.Nullable;
27  import java.io.Serializable;
28  import java.util.Comparator;
29  import java.util.Date;
30  import java.util.HashSet;
31  import java.util.List;
32  import java.util.Set;
33  
34  import static com.atlassian.scheduler.config.RunMode.RUN_LOCALLY;
35  import static com.atlassian.scheduler.config.RunMode.RUN_ONCE_PER_CLUSTER;
36  import static com.atlassian.scheduler.quartz2.Quartz2SchedulerFacade.createClustered;
37  import static com.atlassian.scheduler.quartz2.Quartz2SchedulerFacade.createLocal;
38  import static com.atlassian.util.concurrent.Assertions.notNull;
39  import static com.google.common.collect.Lists.newArrayList;
40  import static com.google.common.collect.Sets.newHashSet;
41  import static java.util.Collections.sort;
42  import static org.quartz.TriggerKey.triggerKey;
43  
44  /**
45   * Quartz 2.x implementation of a {@link SchedulerService}.
46   * <ul>
47   * <li>Job runner keys are mapped to Quartz {@code JobKey} names, with the {@code Job} being created or
48   * destroyed automatically based on whether or not it has any existing {@code Trigger}s.</li>
49   * <li>All Quartz {@code Job}s use {@link Quartz2Job}, which immediately delegates to {@link JobLauncher}.</li>
50   * <li>Job ids are mapped to Quartz {@code TriggerKey} names.</li>
51   * <li>The parameters map is serialized to a {@code byte[]} and stored in the {@code JobDataMap}
52   * for the Quartz {@code Trigger}.</li>
53   * </ul>
54   */
55  public class Quartz2SchedulerService extends AbstractSchedulerService {
56      private static final Logger LOG = LoggerFactory.getLogger(Quartz2SchedulerService.class);
57  
58      private final Quartz2SchedulerFacade localJobs;
59      private final Quartz2SchedulerFacade clusteredJobs;
60      private final Quartz2TriggerFactory triggerFactory;
61      private final Quartz2JobDetailsFactory jobDetailsFactory;
62  
63      @SuppressWarnings("unused")
64      public Quartz2SchedulerService(final RunDetailsDao runDetailsDao, final Quartz2SchedulerConfiguration config)
65              throws SchedulerServiceException {
66          super(runDetailsDao);
67          this.localJobs = createLocal(this, config);
68          this.clusteredJobs = createClustered(this, config);
69          this.triggerFactory = new Quartz2TriggerFactory(config, getParameterMapSerializer());
70          this.jobDetailsFactory = new Quartz2JobDetailsFactory(this);
71      }
72  
73      @SuppressWarnings("unused")
74      public Quartz2SchedulerService(final RunDetailsDao runDetailsDao, final SchedulerServiceConfiguration config,
75                                     final Scheduler localScheduler, final Scheduler clusteredScheduler)
76              throws SchedulerServiceException {
77          super(runDetailsDao);
78          this.localJobs = createLocal(this, localScheduler);
79          this.clusteredJobs = createClustered(this, clusteredScheduler);
80          this.triggerFactory = new Quartz2TriggerFactory(config, getParameterMapSerializer());
81          this.jobDetailsFactory = new Quartz2JobDetailsFactory(this);
82      }
83  
84      @Override
85      public void scheduleJob(final JobId jobId, final JobConfig jobConfig) throws SchedulerServiceException {
86          try {
87              notNull("jobConfig", jobConfig);
88              LOG.debug("scheduleJob: {}: {}", jobId, jobConfig);
89  
90              localJobs.unscheduleJob(jobId);
91              clusteredJobs.unscheduleJob(jobId);
92  
93              final Quartz2SchedulerFacade facade = getFacade(jobConfig.getRunMode());
94              final JobRunnerKey jobRunnerKey = jobConfig.getJobRunnerKey();
95              final TriggerBuilder<?> trigger = triggerFactory.buildTrigger(jobId, jobConfig);
96              facade.scheduleJob(jobRunnerKey, trigger);
97          } catch (SchedulerRuntimeException sre) {
98              throw checked(sre);
99          }
100     }
101 
102 
103     @Override
104     public void unscheduleJob(final JobId jobId) {
105         // Deliberately avoiding a short-circuit; we want to remove from both
106         boolean found = localJobs.unscheduleJob(jobId);
107         found |= clusteredJobs.unscheduleJob(jobId);
108         if (found) {
109             LOG.debug("unscheduleJob: {}", jobId);
110         } else {
111             LOG.debug("unscheduleJob for non-existent jobId: {}", jobId);
112         }
113     }
114 
115     @Nullable
116     @Override
117     public Date calculateNextRunTime(Schedule schedule) throws SchedulerServiceException {
118         final Trigger trigger = triggerFactory.buildTrigger(schedule)
119                 .withIdentity(triggerKey("name", "group"))
120                 .build();
121         return trigger.getFireTimeAfter(new Date());
122     }
123 
124     @Nullable
125     @Override
126     public JobDetails getJobDetails(final JobId jobId) {
127         JobDetails jobDetails = getJobDetails(clusteredJobs, jobId, RUN_ONCE_PER_CLUSTER);
128         if (jobDetails == null) {
129             jobDetails = getJobDetails(localJobs, jobId, RUN_LOCALLY);
130         }
131         return jobDetails;
132     }
133 
134     @Nonnull
135     @Override
136     public Set<JobRunnerKey> getJobRunnerKeysForAllScheduledJobs() {
137         final Set<JobRunnerKey> jobRunnerKeys = new HashSet<JobRunnerKey>(localJobs.getJobRunnerKeys());
138         jobRunnerKeys.addAll(clusteredJobs.getJobRunnerKeys());
139         return ImmutableSet.copyOf(jobRunnerKeys);
140     }
141 
142     @Nullable
143     private JobDetails getJobDetails(Quartz2SchedulerFacade facade, JobId jobId, RunMode runMode) {
144         final Trigger trigger = facade.getTrigger(jobId);
145         return (trigger != null) ? jobDetailsFactory.buildJobDetails(jobId, trigger, runMode) : null;
146     }
147 
148     @Nonnull
149     @Override
150     public List<JobDetails> getJobsByJobRunnerKey(final JobRunnerKey jobRunnerKey) {
151         final UniqueJobDetailsCollector collector = new UniqueJobDetailsCollector();
152         collector.collect(RUN_ONCE_PER_CLUSTER, clusteredJobs.getTriggersOfJob(jobRunnerKey));
153         collector.collect(RUN_LOCALLY, localJobs.getTriggersOfJob(jobRunnerKey));
154         return collector.getResults();
155     }
156 
157     /**
158      * Start the threads associated with each quartz scheduler. It is the responsibility of the underlying
159      * JobStores to determine whether or not to trigger jobs which should have run while the scheduler was
160      * in standby mode. This is usually controlled by a misfire threshold on the JobStore implementation
161      */
162     @Override
163     protected void startImpl() throws SchedulerServiceException {
164         boolean abort = true;
165         localJobs.start();
166         try {
167             clusteredJobs.start();
168             abort = false;
169         } finally {
170             if (abort) {
171                 localJobs.standby();
172             }
173         }
174     }
175 
176     /**
177      * Stop the threads associated with each quartz scheduler
178      */
179     @Override
180     protected void standbyImpl() throws SchedulerServiceException {
181         try {
182             localJobs.standby();
183         } finally {
184             clusteredJobs.standby();
185         }
186     }
187 
188     @Override
189     protected void shutdownImpl() {
190         try {
191             localJobs.shutdown();
192         } finally {
193             clusteredJobs.shutdown();
194         }
195     }
196 
197     private Quartz2SchedulerFacade getFacade(RunMode runMode) {
198         switch (notNull("runMode", runMode)) {
199             case RUN_LOCALLY:
200                 return localJobs;
201             case RUN_ONCE_PER_CLUSTER:
202                 return clusteredJobs;
203         }
204         throw new IllegalArgumentException("runMode=" + runMode);
205     }
206 
207 
208     /**
209      * Since it is possible for another node in the cluster to register the same jobId for running clustered
210      * when we had it set to run locally, deal with this by pretending the local one is not there.
211      */
212     class UniqueJobDetailsCollector {
213         final Set<String> jobIdsSeen = newHashSet();
214         final List<JobDetails> jobs = newArrayList();
215 
216         void collect(RunMode runMode, final List<? extends Trigger> triggers) {
217             for (Trigger trigger : triggers) {
218                 final String jobId = trigger.getKey().getName();
219                 if (jobIdsSeen.add(jobId)) {
220                     try {
221                         jobs.add(jobDetailsFactory.buildJobDetails(JobId.of(jobId), trigger, runMode));
222                     } catch (SchedulerRuntimeException sre) {
223                         LOG.debug("Unable to reconstruct log details for jobId '{}': {}", jobId, sre);
224                     }
225                 }
226             }
227         }
228 
229         List<JobDetails> getResults() {
230             sort(jobs, new SortByJobId());
231             return ImmutableList.copyOf(jobs);
232         }
233     }
234 
235     static class SortByJobId implements Comparator<JobDetails>, Serializable {
236         private static final long serialVersionUID = 1L;
237 
238         @Override
239         public int compare(final JobDetails jd1, final JobDetails jd2) {
240             return jd1.getJobId().compareTo(jd2.getJobId());
241         }
242     }
243 }