1 package com.atlassian.scheduler.quartz2;
2
3 import com.atlassian.scheduler.SchedulerRuntimeException;
4 import com.atlassian.scheduler.SchedulerService;
5 import com.atlassian.scheduler.SchedulerServiceException;
6 import com.atlassian.scheduler.config.JobConfig;
7 import com.atlassian.scheduler.config.JobId;
8 import com.atlassian.scheduler.config.JobRunnerKey;
9 import com.atlassian.scheduler.config.RunMode;
10 import com.atlassian.scheduler.config.Schedule;
11 import com.atlassian.scheduler.core.AbstractSchedulerService;
12 import com.atlassian.scheduler.core.JobLauncher;
13 import com.atlassian.scheduler.core.spi.RunDetailsDao;
14 import com.atlassian.scheduler.core.spi.SchedulerServiceConfiguration;
15 import com.atlassian.scheduler.quartz2.spi.Quartz2SchedulerConfiguration;
16 import com.atlassian.scheduler.status.JobDetails;
17 import com.google.common.collect.ImmutableList;
18 import com.google.common.collect.ImmutableSet;
19 import org.quartz.Scheduler;
20 import org.quartz.Trigger;
21 import org.quartz.TriggerBuilder;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
24
25 import javax.annotation.Nonnull;
26 import javax.annotation.Nullable;
27 import java.io.Serializable;
28 import java.util.Comparator;
29 import java.util.Date;
30 import java.util.HashSet;
31 import java.util.List;
32 import java.util.Set;
33
34 import static com.atlassian.scheduler.config.RunMode.RUN_LOCALLY;
35 import static com.atlassian.scheduler.config.RunMode.RUN_ONCE_PER_CLUSTER;
36 import static com.atlassian.scheduler.quartz2.Quartz2SchedulerFacade.createClustered;
37 import static com.atlassian.scheduler.quartz2.Quartz2SchedulerFacade.createLocal;
38 import static com.atlassian.util.concurrent.Assertions.notNull;
39 import static com.google.common.collect.Lists.newArrayList;
40 import static com.google.common.collect.Sets.newHashSet;
41 import static java.util.Collections.sort;
42 import static org.quartz.TriggerKey.triggerKey;
43
44
45
46
47
48
49
50
51
52
53
54
55 public class Quartz2SchedulerService extends AbstractSchedulerService {
56 private static final Logger LOG = LoggerFactory.getLogger(Quartz2SchedulerService.class);
57
58 private final Quartz2SchedulerFacade localJobs;
59 private final Quartz2SchedulerFacade clusteredJobs;
60 private final Quartz2TriggerFactory triggerFactory;
61 private final Quartz2JobDetailsFactory jobDetailsFactory;
62
63 @SuppressWarnings("unused")
64 public Quartz2SchedulerService(final RunDetailsDao runDetailsDao, final Quartz2SchedulerConfiguration config)
65 throws SchedulerServiceException {
66 super(runDetailsDao);
67 this.localJobs = createLocal(this, config);
68 this.clusteredJobs = createClustered(this, config);
69 this.triggerFactory = new Quartz2TriggerFactory(config, getParameterMapSerializer());
70 this.jobDetailsFactory = new Quartz2JobDetailsFactory(this);
71 }
72
73 @SuppressWarnings("unused")
74 public Quartz2SchedulerService(final RunDetailsDao runDetailsDao, final SchedulerServiceConfiguration config,
75 final Scheduler localScheduler, final Scheduler clusteredScheduler)
76 throws SchedulerServiceException {
77 super(runDetailsDao);
78 this.localJobs = createLocal(this, localScheduler);
79 this.clusteredJobs = createClustered(this, clusteredScheduler);
80 this.triggerFactory = new Quartz2TriggerFactory(config, getParameterMapSerializer());
81 this.jobDetailsFactory = new Quartz2JobDetailsFactory(this);
82 }
83
84 @Override
85 public void scheduleJob(final JobId jobId, final JobConfig jobConfig) throws SchedulerServiceException {
86 try {
87 notNull("jobConfig", jobConfig);
88 LOG.debug("scheduleJob: {}: {}", jobId, jobConfig);
89
90 localJobs.unscheduleJob(jobId);
91 clusteredJobs.unscheduleJob(jobId);
92
93 final Quartz2SchedulerFacade facade = getFacade(jobConfig.getRunMode());
94 final JobRunnerKey jobRunnerKey = jobConfig.getJobRunnerKey();
95 final TriggerBuilder<?> trigger = triggerFactory.buildTrigger(jobId, jobConfig);
96 facade.scheduleJob(jobRunnerKey, trigger);
97 } catch (SchedulerRuntimeException sre) {
98 throw checked(sre);
99 }
100 }
101
102
103 @Override
104 public void unscheduleJob(final JobId jobId) {
105
106 boolean found = localJobs.unscheduleJob(jobId);
107 found |= clusteredJobs.unscheduleJob(jobId);
108 if (found) {
109 LOG.debug("unscheduleJob: {}", jobId);
110 } else {
111 LOG.debug("unscheduleJob for non-existent jobId: {}", jobId);
112 }
113 }
114
115 @Nullable
116 @Override
117 public Date calculateNextRunTime(Schedule schedule) throws SchedulerServiceException {
118 final Trigger trigger = triggerFactory.buildTrigger(schedule)
119 .withIdentity(triggerKey("name", "group"))
120 .build();
121 return trigger.getFireTimeAfter(new Date());
122 }
123
124 @Nullable
125 @Override
126 public JobDetails getJobDetails(final JobId jobId) {
127 JobDetails jobDetails = getJobDetails(clusteredJobs, jobId, RUN_ONCE_PER_CLUSTER);
128 if (jobDetails == null) {
129 jobDetails = getJobDetails(localJobs, jobId, RUN_LOCALLY);
130 }
131 return jobDetails;
132 }
133
134 @Nonnull
135 @Override
136 public Set<JobRunnerKey> getJobRunnerKeysForAllScheduledJobs() {
137 final Set<JobRunnerKey> jobRunnerKeys = new HashSet<JobRunnerKey>(localJobs.getJobRunnerKeys());
138 jobRunnerKeys.addAll(clusteredJobs.getJobRunnerKeys());
139 return ImmutableSet.copyOf(jobRunnerKeys);
140 }
141
142 @Nullable
143 private JobDetails getJobDetails(Quartz2SchedulerFacade facade, JobId jobId, RunMode runMode) {
144 final Trigger trigger = facade.getTrigger(jobId);
145 return (trigger != null) ? jobDetailsFactory.buildJobDetails(jobId, trigger, runMode) : null;
146 }
147
148 @Nonnull
149 @Override
150 public List<JobDetails> getJobsByJobRunnerKey(final JobRunnerKey jobRunnerKey) {
151 final UniqueJobDetailsCollector collector = new UniqueJobDetailsCollector();
152 collector.collect(RUN_ONCE_PER_CLUSTER, clusteredJobs.getTriggersOfJob(jobRunnerKey));
153 collector.collect(RUN_LOCALLY, localJobs.getTriggersOfJob(jobRunnerKey));
154 return collector.getResults();
155 }
156
157
158
159
160
161
162 @Override
163 protected void startImpl() throws SchedulerServiceException {
164 boolean abort = true;
165 localJobs.start();
166 try {
167 clusteredJobs.start();
168 abort = false;
169 } finally {
170 if (abort) {
171 localJobs.standby();
172 }
173 }
174 }
175
176
177
178
179 @Override
180 protected void standbyImpl() throws SchedulerServiceException {
181 try {
182 localJobs.standby();
183 } finally {
184 clusteredJobs.standby();
185 }
186 }
187
188 @Override
189 protected void shutdownImpl() {
190 try {
191 localJobs.shutdown();
192 } finally {
193 clusteredJobs.shutdown();
194 }
195 }
196
197 private Quartz2SchedulerFacade getFacade(RunMode runMode) {
198 switch (notNull("runMode", runMode)) {
199 case RUN_LOCALLY:
200 return localJobs;
201 case RUN_ONCE_PER_CLUSTER:
202 return clusteredJobs;
203 }
204 throw new IllegalArgumentException("runMode=" + runMode);
205 }
206
207
208
209
210
211
212 class UniqueJobDetailsCollector {
213 final Set<String> jobIdsSeen = newHashSet();
214 final List<JobDetails> jobs = newArrayList();
215
216 void collect(RunMode runMode, final List<? extends Trigger> triggers) {
217 for (Trigger trigger : triggers) {
218 final String jobId = trigger.getKey().getName();
219 if (jobIdsSeen.add(jobId)) {
220 try {
221 jobs.add(jobDetailsFactory.buildJobDetails(JobId.of(jobId), trigger, runMode));
222 } catch (SchedulerRuntimeException sre) {
223 LOG.debug("Unable to reconstruct log details for jobId '{}': {}", jobId, sre);
224 }
225 }
226 }
227 }
228
229 List<JobDetails> getResults() {
230 sort(jobs, new SortByJobId());
231 return ImmutableList.copyOf(jobs);
232 }
233 }
234
235 static class SortByJobId implements Comparator<JobDetails>, Serializable {
236 private static final long serialVersionUID = 1L;
237
238 @Override
239 public int compare(final JobDetails jd1, final JobDetails jd2) {
240 return jd1.getJobId().compareTo(jd2.getJobId());
241 }
242 }
243 }