View Javadoc

1   package com.atlassian.scheduler.quartz1;
2   
3   import com.atlassian.scheduler.SchedulerRuntimeException;
4   import com.atlassian.scheduler.SchedulerService;
5   import com.atlassian.scheduler.SchedulerServiceException;
6   import com.atlassian.scheduler.config.JobConfig;
7   import com.atlassian.scheduler.config.JobId;
8   import com.atlassian.scheduler.config.JobRunnerKey;
9   import com.atlassian.scheduler.config.RunMode;
10  import com.atlassian.scheduler.config.Schedule;
11  import com.atlassian.scheduler.core.AbstractSchedulerService;
12  import com.atlassian.scheduler.core.JobLauncher;
13  import com.atlassian.scheduler.core.spi.RunDetailsDao;
14  import com.atlassian.scheduler.core.spi.SchedulerServiceConfiguration;
15  import com.atlassian.scheduler.quartz1.spi.Quartz1SchedulerConfiguration;
16  import com.atlassian.scheduler.status.JobDetails;
17  import com.google.common.collect.ImmutableList;
18  import com.google.common.collect.ImmutableSet;
19  import org.quartz.Scheduler;
20  import org.quartz.Trigger;
21  import org.slf4j.Logger;
22  import org.slf4j.LoggerFactory;
23  
24  import javax.annotation.Nonnull;
25  import javax.annotation.Nullable;
26  import java.util.Date;
27  import java.util.HashSet;
28  import java.util.List;
29  import java.util.Set;
30  
31  import static com.atlassian.scheduler.config.RunMode.RUN_LOCALLY;
32  import static com.atlassian.scheduler.config.RunMode.RUN_ONCE_PER_CLUSTER;
33  import static com.atlassian.scheduler.quartz1.Quartz1SchedulerFacade.createClustered;
34  import static com.atlassian.scheduler.quartz1.Quartz1SchedulerFacade.createLocal;
35  import static com.atlassian.util.concurrent.Assertions.notNull;
36  import static com.google.common.collect.Lists.newArrayList;
37  import static com.google.common.collect.Sets.newHashSet;
38  import static java.util.Collections.sort;
39  
40  /**
41   * Quartz 1.x implementation of a {@link SchedulerService}.
42   * <ul>
43   * <li>Job runner keys are mapped to Quartz {@code Job} names, with the {@code Job} being created or
44   * destroyed automatically based on whether or not it has any existing {@code Trigger}s.</li>
45   * <li>All Quartz {@code Job}s use {@link Quartz1Job}, which immediately delegates to {@link JobLauncher}.</li>
46   * <li>Job ids are mapped to Quartz {@code Trigger} names.</li>
47   * <li>The parameters map is serialized to a {@code byte[]} and stored in the {@code JobDataMap}
48   * for the Quartz {@code Trigger}.</li>
49   * </ul>
50   */
51  public class Quartz1SchedulerService extends AbstractSchedulerService {
52      private static final Logger LOG = LoggerFactory.getLogger(Quartz1SchedulerService.class);
53  
54      private final Quartz1SchedulerFacade localJobs;
55      private final Quartz1SchedulerFacade clusteredJobs;
56      private final Quartz1TriggerFactory triggerFactory;
57      private final Quartz1JobDetailsFactory jobDetailsFactory;
58  
59  
60      @SuppressWarnings("unused")
61      public Quartz1SchedulerService(final RunDetailsDao runDetailsDao, final Quartz1SchedulerConfiguration config)
62              throws SchedulerServiceException {
63          super(runDetailsDao);
64          this.localJobs = createLocal(this, config);
65          this.clusteredJobs = createClustered(this, config);
66          this.triggerFactory = new Quartz1TriggerFactory(config, getParameterMapSerializer());
67          this.jobDetailsFactory = new Quartz1JobDetailsFactory(this);
68      }
69  
70      @SuppressWarnings("unused")
71      public Quartz1SchedulerService(final RunDetailsDao runDetailsDao, final SchedulerServiceConfiguration config,
72                                     final Scheduler localScheduler, final Scheduler clusteredScheduler)
73              throws SchedulerServiceException {
74          super(runDetailsDao);
75          this.localJobs = createLocal(this, localScheduler);
76          this.clusteredJobs = createClustered(this, clusteredScheduler);
77          this.triggerFactory = new Quartz1TriggerFactory(config, getParameterMapSerializer());
78          this.jobDetailsFactory = new Quartz1JobDetailsFactory(this);
79      }
80  
81  
82      @Override
83      public void scheduleJob(final JobId jobId, final JobConfig jobConfig)
84              throws SchedulerServiceException {
85          try {
86              notNull("jobConfig", jobConfig);
87              LOG.debug("scheduleJob: {}: {}", jobId, jobConfig);
88  
89              localJobs.unscheduleJob(jobId);
90              clusteredJobs.unscheduleJob(jobId);
91  
92              final Quartz1SchedulerFacade facade = getFacade(jobConfig.getRunMode());
93              final JobRunnerKey jobRunnerKey = jobConfig.getJobRunnerKey();
94              final Trigger trigger = triggerFactory.buildTrigger(jobId, jobConfig);
95              facade.scheduleJob(jobRunnerKey, trigger);
96          } catch (SchedulerRuntimeException sre) {
97              throw checked(sre);
98          }
99      }
100 
101 
102     @Override
103     public void unscheduleJob(final JobId jobId) {
104         // Deliberately avoiding a short-circuit; we want to remove from both
105         boolean found = localJobs.unscheduleJob(jobId);
106         found |= clusteredJobs.unscheduleJob(jobId);
107         if (found) {
108             LOG.debug("unscheduleJob: {}", jobId);
109         } else {
110             LOG.debug("unscheduleJob for non-existent jobId: {}", jobId);
111         }
112     }
113 
114     @Nullable
115     @Override
116     public Date calculateNextRunTime(Schedule schedule) throws SchedulerServiceException {
117         final Trigger trigger = triggerFactory.buildTrigger(schedule);
118         return trigger.getFireTimeAfter(new Date());
119     }
120 
121     @Nullable
122     @Override
123     public JobDetails getJobDetails(final JobId jobId) {
124         JobDetails jobDetails = getJobDetails(clusteredJobs, jobId, RUN_ONCE_PER_CLUSTER);
125         if (jobDetails == null) {
126             jobDetails = getJobDetails(localJobs, jobId, RUN_LOCALLY);
127         }
128         return jobDetails;
129     }
130 
131     @Nullable
132     private JobDetails getJobDetails(Quartz1SchedulerFacade facade, JobId jobId, RunMode runMode) {
133         final Trigger trigger = facade.getTrigger(jobId);
134         return (trigger != null) ? jobDetailsFactory.buildJobDetails(jobId, trigger, runMode) : null;
135     }
136 
137     @Nonnull
138     @Override
139     public Set<JobRunnerKey> getJobRunnerKeysForAllScheduledJobs() {
140         final Set<JobRunnerKey> jobRunnerKeys = new HashSet<JobRunnerKey>(localJobs.getJobRunnerKeys());
141         jobRunnerKeys.addAll(clusteredJobs.getJobRunnerKeys());
142         return ImmutableSet.copyOf(jobRunnerKeys);
143     }
144 
145     @Nonnull
146     @Override
147     public List<JobDetails> getJobsByJobRunnerKey(final JobRunnerKey jobRunnerKey) {
148         final UniqueJobDetailsCollector collector = new UniqueJobDetailsCollector();
149         collector.collect(RUN_ONCE_PER_CLUSTER, clusteredJobs.getTriggersOfJob(jobRunnerKey));
150         collector.collect(RUN_LOCALLY, localJobs.getTriggersOfJob(jobRunnerKey));
151         return collector.getResults();
152     }
153 
154     /**
155      * Returns the actual Quartz scheduler providing {@link RunMode#RUN_LOCALLY local} scheduling.  This
156      * method will be removed in a future release and its use should be avoided if at all possible.  It is
157      * provided only because Atlassian applications have historically exposed Quartz 1.x to plugins and
158      * may need to continue to do so as part of the migration to the {@code atlassian-scheduler} library.
159      *
160      * @return the actual Quartz scheduler providing {@link RunMode#RUN_LOCALLY local} scheduling
161      * @see #getClusteredQuartzScheduler()
162      * @deprecated Provided as a last resort only.  Avoid accessing Quartz directly if possible.  Since v1.0
163      */
164     @Deprecated
165     public Scheduler getLocalQuartzScheduler() {
166         return localJobs.getQuartz();
167     }
168 
169     /**
170      * Returns the actual Quartz scheduler providing {@link RunMode#RUN_ONCE_PER_CLUSTER clustered} scheduling.  This
171      * method will be removed in a future release and its use should be avoided if at all possible.  It is
172      * provided only because Atlassian applications have historically exposed Quartz 1.x to plugins and
173      * may need to continue to do so as part of the migration to the {@code atlassian-scheduler} library.
174      *
175      * @return the actual Quartz scheduler providing {@link RunMode#RUN_ONCE_PER_CLUSTER clustered} scheduling
176      * @see #getLocalQuartzScheduler()
177      * @deprecated Provided as a last resort only.  Avoid accessing Quartz directly if possible.  Since v1.0
178      */
179     @Deprecated
180     public Scheduler getClusteredQuartzScheduler() {
181         return clusteredJobs.getQuartz();
182     }
183 
184     /**
185      * Start the threads associated with each quartz scheduler. It is the responsibility of the underlying
186      * JobStores to determine whether or not to trigger jobs which should have run while the scheduler was
187      * in standby mode. This is usually controlled by a misfire threshold on the JobStore implementation
188      */
189     @Override
190     protected void startImpl() throws SchedulerServiceException {
191         boolean abort = true;
192         localJobs.start();
193         try {
194             clusteredJobs.start();
195             abort = false;
196         } finally {
197             if (abort) {
198                 localJobs.standby();
199             }
200         }
201     }
202 
203     /**
204      * Stop the threads associated with each quartz scheduler
205      */
206     @Override
207     protected void standbyImpl() throws SchedulerServiceException {
208         try {
209             localJobs.standby();
210         } finally {
211             clusteredJobs.standby();
212         }
213     }
214 
215     @Override
216     protected void shutdownImpl() {
217         try {
218             localJobs.shutdown();
219         } finally {
220             clusteredJobs.shutdown();
221         }
222     }
223 
224     private Quartz1SchedulerFacade getFacade(RunMode runMode) {
225         switch (runMode) {
226             case RUN_LOCALLY:
227                 return localJobs;
228             case RUN_ONCE_PER_CLUSTER:
229                 return clusteredJobs;
230         }
231         throw new IllegalArgumentException("runMode=" + runMode);
232     }
233 
234 
235     /**
236      * Since it is possible for another node in the cluster to register the same jobId for running clustered
237      * when we had it set to run locally, deal with this by pretending the local one is not there.
238      */
239     class UniqueJobDetailsCollector {
240         final Set<String> jobIdsSeen = newHashSet();
241         final List<JobDetails> jobs = newArrayList();
242 
243         void collect(RunMode runMode, final Trigger[] triggers) {
244             for (Trigger trigger : triggers) {
245                 final String jobId = trigger.getName();
246                 if (jobIdsSeen.add(jobId)) {
247                     try {
248                         jobs.add(jobDetailsFactory.buildJobDetails(JobId.of(jobId), trigger, runMode));
249                     } catch (SchedulerRuntimeException sre) {
250                         LOG.debug("Unable to reconstruct log details for jobId '{}'", jobId, sre);
251                     }
252                 }
253             }
254         }
255 
256         List<JobDetails> getResults() {
257             sort(jobs, BY_JOB_ID);
258             return ImmutableList.copyOf(jobs);
259         }
260     }
261 
262 }