1 package com.atlassian.scheduler.core;
2
3 import com.atlassian.scheduler.JobRunner;
4 import com.atlassian.scheduler.JobRunnerRequest;
5 import com.atlassian.scheduler.JobRunnerResponse;
6 import com.atlassian.scheduler.SchedulerRuntimeException;
7 import com.atlassian.scheduler.config.IntervalScheduleInfo;
8 import com.atlassian.scheduler.config.JobConfig;
9 import com.atlassian.scheduler.config.JobId;
10 import com.atlassian.scheduler.config.RunMode;
11 import com.atlassian.scheduler.core.impl.RunningJobImpl;
12 import com.atlassian.scheduler.status.JobDetails;
13 import com.atlassian.scheduler.status.RunOutcome;
14 import org.slf4j.Logger;
15 import org.slf4j.LoggerFactory;
16
17 import javax.annotation.Nonnull;
18 import javax.annotation.Nullable;
19 import java.util.Date;
20
21 import static com.atlassian.scheduler.JobRunnerResponse.aborted;
22 import static com.atlassian.scheduler.JobRunnerResponse.failed;
23 import static com.atlassian.scheduler.status.RunOutcome.UNAVAILABLE;
24 import static com.atlassian.util.concurrent.Assertions.notNull;
25
26
27
28
29
30
31
32
33
34
35
36
37 public class JobLauncher {
38 protected static final Logger LOG = LoggerFactory.getLogger(JobLauncher.class);
39
40 protected final AbstractSchedulerService schedulerService;
41 protected final RunMode schedulerRunMode;
42 protected final Date firedAt;
43 protected final JobId jobId;
44
45
46 private JobDetails jobDetails;
47 private JobRunner jobRunner;
48 private JobConfig jobConfig;
49 private JobRunnerResponse response;
50
51
52
53
54
55
56
57
58
59
60 public JobLauncher(final AbstractSchedulerService schedulerService, final RunMode schedulerRunMode,
61 @Nullable final Date firedAt, final JobId jobId) {
62 this(schedulerService, schedulerRunMode, firedAt, jobId, null);
63 }
64
65
66
67
68
69
70
71
72
73
74
75 public JobLauncher(final AbstractSchedulerService schedulerService, final RunMode schedulerRunMode,
76 @Nullable final Date firedAt, final JobId jobId, @Nullable final JobDetails jobDetails) {
77 this.schedulerService = notNull("schedulerService", schedulerService);
78 this.schedulerRunMode = notNull("schedulerRunMode", schedulerRunMode);
79 this.firedAt = (firedAt != null) ? firedAt : new Date();
80 this.jobId = notNull("jobId", jobId);
81 this.jobDetails = jobDetails;
82 }
83
84
85
86
87 public void launch() {
88 LOG.debug("launch: {}: {}", schedulerRunMode, jobId);
89 try {
90 final JobRunnerResponse response = launchAndBuildResponse();
91 schedulerService.addRunDetails(jobId, firedAt, response.getRunOutcome(), response.getMessage());
92 } catch (JobRunnerNotRegisteredException ex) {
93 LOG.debug("Scheduled job with ID '{}' is unavailable because its job runner is not registered: {}",
94 jobId, ex.getJobRunnerKey());
95 schedulerService.addRunDetails(jobId, firedAt, UNAVAILABLE,
96 "Job runner key '" + ex.getJobRunnerKey() + "' is not registered");
97 }
98 deleteIfRunOnce();
99 }
100
101 @Nonnull
102 private JobRunnerResponse launchAndBuildResponse() throws JobRunnerNotRegisteredException {
103 try {
104 response = validate();
105 if (response == null) {
106 response = runJob();
107 }
108 } catch (RuntimeException ex) {
109 LOG.error("Scheduled job with ID '{}' failed", jobId, ex);
110 response = failed(ex);
111 } catch (LinkageError err) {
112 LOG.error("Scheduled job with ID '{}' failed due to binary incompatibilities", jobId, err);
113 response = failed(err);
114 }
115 return response;
116 }
117
118 @Nonnull
119 private JobRunnerResponse runJob() {
120 final RunningJob job = new RunningJobImpl(firedAt, jobId, jobConfig);
121
122 final RunningJob existing = schedulerService.enterJob(jobId, job);
123 if (existing != null) {
124 LOG.debug("Unable to start job {} because it is already running as {}", job, existing);
125 return JobRunnerResponse.aborted("Already running");
126 }
127
128 schedulerService.preJob();
129 final Thread thd = Thread.currentThread();
130 final ClassLoader originalClassLoader = thd.getContextClassLoader();
131 try {
132
133 thd.setContextClassLoader(jobRunner.getClass().getClassLoader());
134 final JobRunnerResponse response = jobRunner.runJob(job);
135 return (response != null) ? response : JobRunnerResponse.success();
136 } finally {
137 thd.setContextClassLoader(originalClassLoader);
138 schedulerService.leaveJob(jobId, job);
139 schedulerService.postJob();
140 }
141 }
142
143
144 @Nullable
145 private JobRunnerResponse validate() throws JobRunnerNotRegisteredException {
146 JobRunnerResponse response = validateJobDetails();
147 if (response == null) {
148 response = validateJobRunner();
149 if (response == null) {
150 response = validateJobConfig();
151 }
152 }
153 return response;
154 }
155
156 @Nullable
157 private JobRunnerResponse validateJobDetails() {
158 if (jobDetails == null) {
159 jobDetails = schedulerService.getJobDetails(jobId);
160 if (jobDetails == null) {
161 return aborted("No corresponding job details");
162 }
163 }
164
165 if (jobDetails.getRunMode() != schedulerRunMode) {
166 return aborted("Inconsistent run mode: expected '" + jobDetails.getRunMode() +
167 "' got: '" + schedulerRunMode + '\'');
168 }
169 return null;
170 }
171
172 @Nullable
173 private JobRunnerResponse validateJobRunner() throws JobRunnerNotRegisteredException {
174 jobRunner = schedulerService.getJobRunner(jobDetails.getJobRunnerKey());
175 if (jobRunner == null) {
176
177 throw new JobRunnerNotRegisteredException(jobDetails.getJobRunnerKey());
178 }
179 return null;
180 }
181
182 @Nullable
183 private JobRunnerResponse validateJobConfig() {
184 try {
185 jobConfig = JobConfig.forJobRunnerKey(jobDetails.getJobRunnerKey())
186 .withRunMode(jobDetails.getRunMode())
187 .withSchedule(jobDetails.getSchedule())
188 .withParameters(jobDetails.getParameters());
189 return null;
190 } catch (SchedulerRuntimeException sre) {
191 return aborted(jobDetails.toString());
192 }
193 }
194
195 private void deleteIfRunOnce() {
196 if (jobDetails != null) {
197 final IntervalScheduleInfo info = jobDetails.getSchedule().getIntervalScheduleInfo();
198 if (info != null && info.getIntervalInMillis() == 0L) {
199 LOG.debug("deleteIfRunOnce: deleting completed job: {}", jobId);
200 schedulerService.unscheduleJob(jobId);
201 }
202 }
203 }
204
205 @Override
206 public String toString() {
207 return "JobLauncher[jobId=" + jobId +
208 ",jobDetails=" + jobDetails +
209 ",response=" + response +
210 ']';
211 }
212 }