View Javadoc

1   package com.atlassian.scheduler.core;
2   
3   import com.atlassian.scheduler.JobRunner;
4   import com.atlassian.scheduler.JobRunnerRequest;
5   import com.atlassian.scheduler.JobRunnerResponse;
6   import com.atlassian.scheduler.SchedulerRuntimeException;
7   import com.atlassian.scheduler.config.IntervalScheduleInfo;
8   import com.atlassian.scheduler.config.JobConfig;
9   import com.atlassian.scheduler.config.JobId;
10  import com.atlassian.scheduler.config.RunMode;
11  import com.atlassian.scheduler.core.impl.RunningJobImpl;
12  import com.atlassian.scheduler.status.JobDetails;
13  import com.atlassian.scheduler.status.RunOutcome;
14  import org.slf4j.Logger;
15  import org.slf4j.LoggerFactory;
16  
17  import javax.annotation.Nonnull;
18  import javax.annotation.Nullable;
19  import java.util.Date;
20  
21  import static com.atlassian.scheduler.JobRunnerResponse.aborted;
22  import static com.atlassian.scheduler.JobRunnerResponse.failed;
23  import static com.atlassian.scheduler.status.RunOutcome.UNAVAILABLE;
24  import static com.atlassian.util.concurrent.Assertions.notNull;
25  
26  /**
27   * Scheduler implementations can (and should) use {@code JobLauncher} to invoke jobs when
28   * it is time to run them.  It will do the necessary checks to make sure that the job runner
29   * is registered, has the appropriate run mode, can reconstruct the job's parameter map, and
30   * so on.  If everything checks out, it calls the job runner's
31   * {@link JobRunner#runJob(JobRunnerRequest) runJob} method and records the
32   * the resulting RunDetails using
33   * {@link AbstractSchedulerService#addRunDetails(JobId, Date, RunOutcome, String) addRunDetails}.
34   *
35   * @since v1.0
36   */
37  public class JobLauncher {
38      protected static final Logger LOG = LoggerFactory.getLogger(JobLauncher.class);
39  
40      protected final AbstractSchedulerService schedulerService;
41      protected final RunMode schedulerRunMode;
42      protected final Date firedAt;
43      protected final JobId jobId;
44  
45      // Derived (in this order)
46      private JobDetails jobDetails;
47      private JobRunner jobRunner;
48      private JobConfig jobConfig;
49      private JobRunnerResponse response;
50  
51      /**
52       * Creates a job launcher to handle the running of a scheduled job.
53       *
54       * @param schedulerService the scheduler that is invoking the job
55       * @param schedulerRunMode the expected {@link RunMode run mode} for the jobs that this scheduler owns
56       * @param firedAt          the time that the job was started, if known; may be {@code null}, in which case the
57       *                         current time is used
58       * @param jobId            the job ID of the job to run
59       */
60      public JobLauncher(final AbstractSchedulerService schedulerService, final RunMode schedulerRunMode,
61                         @Nullable final Date firedAt, final JobId jobId) {
62          this(schedulerService, schedulerRunMode, firedAt, jobId, null);
63      }
64  
65      /**
66       * Creates a job launcher to handle the running of a scheduled job.
67       *
68       * @param schedulerService the scheduler that is invoking the job
69       * @param schedulerRunMode the expected {@link RunMode run mode} for the jobs that this scheduler owns
70       * @param firedAt          the time that the job was started, if known; may be {@code null}, in which case the
71       *                         current time is used
72       * @param jobId            the job ID of the job to run
73       * @param jobDetails       the already loaded job details, if they are available
74       */
75      public JobLauncher(final AbstractSchedulerService schedulerService, final RunMode schedulerRunMode,
76                         @Nullable final Date firedAt, final JobId jobId, @Nullable final JobDetails jobDetails) {
77          this.schedulerService = notNull("schedulerService", schedulerService);
78          this.schedulerRunMode = notNull("schedulerRunMode", schedulerRunMode);
79          this.firedAt = (firedAt != null) ? firedAt : new Date();
80          this.jobId = notNull("jobId", jobId);
81          this.jobDetails = jobDetails;
82      }
83  
84      /**
85       * Call this to validate the job, run it, and update its status.
86       */
87      public void launch() {
88          LOG.debug("launch: {}: {}", schedulerRunMode, jobId);
89          try {
90              final JobRunnerResponse response = launchAndBuildResponse();
91              schedulerService.addRunDetails(jobId, firedAt, response.getRunOutcome(), response.getMessage());
92          } catch (JobRunnerNotRegisteredException ex) {
93              LOG.debug("Scheduled job with ID '{}' is unavailable because its job runner is not registered: {}",
94                      jobId, ex.getJobRunnerKey());
95              schedulerService.addRunDetails(jobId, firedAt, UNAVAILABLE,
96                      "Job runner key '" + ex.getJobRunnerKey() + "' is not registered");
97          }
98          deleteIfRunOnce();
99      }
100 
101     @Nonnull
102     private JobRunnerResponse launchAndBuildResponse() throws JobRunnerNotRegisteredException {
103         try {
104             response = validate();
105             if (response == null) {
106                 response = runJob();
107             }
108         } catch (RuntimeException ex) {
109             LOG.error("Scheduled job with ID '{}' failed", jobId, ex);
110             response = failed(ex);
111         } catch (LinkageError err) {
112             LOG.error("Scheduled job with ID '{}' failed due to binary incompatibilities", jobId, err);
113             response = failed(err);
114         }
115         return response;
116     }
117 
118     @Nonnull
119     private JobRunnerResponse runJob() {
120         final RunningJob job = new RunningJobImpl(firedAt, jobId, jobConfig);
121 
122         final RunningJob existing = schedulerService.enterJob(jobId, job);
123         if (existing != null) {
124             LOG.debug("Unable to start job {} because it is already running as {}", job, existing);
125             return JobRunnerResponse.aborted("Already running");
126         }
127 
128         schedulerService.preJob();
129         final Thread thd = Thread.currentThread();
130         final ClassLoader originalClassLoader = thd.getContextClassLoader();
131         try {
132             // SCHEDULER-11: Ensure that the Job runs with its own class loader set as the thread's CCL
133             thd.setContextClassLoader(jobRunner.getClass().getClassLoader());
134             final JobRunnerResponse response = jobRunner.runJob(job);
135             return (response != null) ? response : JobRunnerResponse.success();
136         } finally {
137             thd.setContextClassLoader(originalClassLoader);
138             schedulerService.leaveJob(jobId, job);
139             schedulerService.postJob();
140         }
141     }
142 
143 
144     @Nullable
145     private JobRunnerResponse validate() throws JobRunnerNotRegisteredException {
146         JobRunnerResponse response = validateJobDetails();
147         if (response == null) {
148             response = validateJobRunner();
149             if (response == null) {
150                 response = validateJobConfig();
151             }
152         }
153         return response;
154     }
155 
156     @Nullable
157     private JobRunnerResponse validateJobDetails() {
158         if (jobDetails == null) {
159             jobDetails = schedulerService.getJobDetails(jobId);
160             if (jobDetails == null) {
161                 return aborted("No corresponding job details");
162             }
163         }
164 
165         if (jobDetails.getRunMode() != schedulerRunMode) {
166             return aborted("Inconsistent run mode: expected '" + jobDetails.getRunMode() +
167                     "' got: '" + schedulerRunMode + '\'');
168         }
169         return null;
170     }
171 
172     @Nullable
173     private JobRunnerResponse validateJobRunner() throws JobRunnerNotRegisteredException {
174         jobRunner = schedulerService.getJobRunner(jobDetails.getJobRunnerKey());
175         if (jobRunner == null) {
176             // We can't create a JobRunnerResponse for this...
177             throw new JobRunnerNotRegisteredException(jobDetails.getJobRunnerKey());
178         }
179         return null;
180     }
181 
182     @Nullable
183     private JobRunnerResponse validateJobConfig() {
184         try {
185             jobConfig = JobConfig.forJobRunnerKey(jobDetails.getJobRunnerKey())
186                     .withRunMode(jobDetails.getRunMode())
187                     .withSchedule(jobDetails.getSchedule())
188                     .withParameters(jobDetails.getParameters());
189             return null;
190         } catch (SchedulerRuntimeException sre) {
191             return aborted(jobDetails.toString());
192         }
193     }
194 
195     private void deleteIfRunOnce() {
196         if (jobDetails != null) {
197             final IntervalScheduleInfo info = jobDetails.getSchedule().getIntervalScheduleInfo();
198             if (info != null && info.getIntervalInMillis() == 0L) {
199                 LOG.debug("deleteIfRunOnce: deleting completed job: {}", jobId);
200                 schedulerService.unscheduleJob(jobId);
201             }
202         }
203     }
204 
205     @Override
206     public String toString() {
207         return "JobLauncher[jobId=" + jobId +
208                 ",jobDetails=" + jobDetails +
209                 ",response=" + response +
210                 ']';
211     }
212 }