View Javadoc

1   package com.atlassian.scheduler.core;
2   
3   import com.atlassian.scheduler.JobRunner;
4   import com.atlassian.scheduler.SchedulerRuntimeException;
5   import com.atlassian.scheduler.SchedulerServiceException;
6   import com.atlassian.scheduler.config.JobConfig;
7   import com.atlassian.scheduler.config.JobId;
8   import com.atlassian.scheduler.config.JobRunnerKey;
9   import com.atlassian.scheduler.core.spi.RunDetailsDao;
10  import com.atlassian.scheduler.core.status.RunDetailsImpl;
11  import com.atlassian.scheduler.core.util.JobRunnerRegistry;
12  import com.atlassian.scheduler.core.util.ParameterMapSerializer;
13  import com.atlassian.scheduler.status.JobDetails;
14  import com.atlassian.scheduler.status.RunDetails;
15  import com.atlassian.scheduler.status.RunOutcome;
16  import com.google.common.annotations.VisibleForTesting;
17  import com.google.common.collect.ImmutableList;
18  import org.slf4j.Logger;
19  import org.slf4j.LoggerFactory;
20  
21  import javax.annotation.Nonnull;
22  import javax.annotation.Nullable;
23  import javax.annotation.concurrent.GuardedBy;
24  import java.io.Serializable;
25  import java.util.Collection;
26  import java.util.Comparator;
27  import java.util.Date;
28  import java.util.Set;
29  import java.util.UUID;
30  import java.util.concurrent.ConcurrentHashMap;
31  import java.util.concurrent.ConcurrentMap;
32  import java.util.concurrent.TimeUnit;
33  import java.util.concurrent.locks.Condition;
34  import java.util.concurrent.locks.Lock;
35  import java.util.concurrent.locks.ReentrantLock;
36  
37  import static com.atlassian.util.concurrent.Assertions.notNull;
38  
39  /**
40   * Base class for implementing a scheduler service.  Provides {@link JobRunner} registration,
41   * and job status tracking.
42   *
43   * @since v1.0
44   */
45  public abstract class AbstractSchedulerService implements LifecycleAwareSchedulerService {
46      private static final Logger LOG = LoggerFactory.getLogger(AbstractSchedulerService.class);
47  
48      /**
49       * Sorts by job ID (and nothing else).  This is useful for implementing
50       * {@link #getJobsByJobRunnerKey(JobRunnerKey)}.
51       */
52      protected static final Comparator<JobDetails> BY_JOB_ID = new ByJobId();
53  
54      private static final int MAX_ATTEMPTS = 100;
55  
56      private final Lock idleLock = new ReentrantLock();
57      private final Condition idleCondition = idleLock.newCondition();
58  
59      private final JobRunnerRegistry jobRunnerRegistry = new JobRunnerRegistry();
60      private final ConcurrentMap<JobId, RunningJob> runningJobs = new ConcurrentHashMap<JobId, RunningJob>(16);
61  
62      private final RunDetailsDao runDetailsDao;
63      private final ParameterMapSerializer parameterMapSerializer;
64  
65      private volatile State state = State.STANDBY;
66  
67      protected AbstractSchedulerService(RunDetailsDao runDetailsDao) {
68          this(runDetailsDao, new ParameterMapSerializer());
69      }
70  
71      protected AbstractSchedulerService(RunDetailsDao runDetailsDao, ParameterMapSerializer parameterMapSerializer) {
72          this.runDetailsDao = runDetailsDao;
73          this.parameterMapSerializer = parameterMapSerializer;
74      }
75  
76      @Override
77      public void registerJobRunner(final JobRunnerKey jobRunnerKey, final JobRunner jobRunner) {
78          LOG.debug("registerJobRunner: {}", jobRunnerKey);
79          jobRunnerRegistry.registerJobRunner(jobRunnerKey, jobRunner);
80      }
81  
82      @Override
83      public void unregisterJobRunner(final JobRunnerKey jobRunnerKey) {
84          LOG.debug("unregisterJobRunner: {}", jobRunnerKey);
85          jobRunnerRegistry.unregisterJobRunner(jobRunnerKey);
86      }
87  
88      public JobRunner getJobRunner(final JobRunnerKey jobRunnerKey) {
89          return jobRunnerRegistry.getJobRunner(jobRunnerKey);
90      }
91  
92      @Nonnull
93      @Override
94      public Set<JobRunnerKey> getRegisteredJobRunnerKeys() {
95          return jobRunnerRegistry.getRegisteredJobRunnerKeys();
96      }
97  
98      @Nonnull
99      @Override
100     public JobId scheduleJobWithGeneratedId(final JobConfig jobConfig) throws SchedulerServiceException {
101         final JobId jobId = generateUniqueJobId();
102         LOG.debug("scheduleJobWithGeneratedId: {} -> {}", jobConfig, jobId);
103         scheduleJob(jobId, jobConfig);
104         return jobId;
105     }
106 
107     private JobId generateUniqueJobId() throws SchedulerServiceException {
108         for (int i = 0; i < MAX_ATTEMPTS; ++i) {
109             final JobId jobId = JobId.of(UUID.randomUUID().toString());
110             if (getJobDetails(jobId) == null) {
111                 return jobId;
112             }
113         }
114         throw new SchedulerServiceException("Unable to generate a unique job ID");
115     }
116 
117     /**
118      * Creates or updates the job status record for the given job ID.
119      *
120      * @param jobId      the job ID for which the status is being updated
121      * @param startedAt  the clock time that the run started at
122      * @param runOutcome the result of the run
123      * @param message    an optional informational message to include in the {@code RunDetails}
124      * @return the newly created run details.
125      */
126     public RunDetails addRunDetails(JobId jobId, Date startedAt, RunOutcome runOutcome, @Nullable String message) {
127         LOG.debug("addRunDetails: jobId={} startedAt={} runOutcome={} message={}",
128                 new Object[]{jobId, startedAt, runOutcome, message});
129 
130         notNull("jobId", jobId);
131         notNull("startedAt", startedAt);
132         notNull("runOutcome", runOutcome);
133 
134         final long duration = System.currentTimeMillis() - startedAt.getTime();
135         final RunDetails runDetails = new RunDetailsImpl(startedAt, runOutcome, duration, message);
136         runDetailsDao.addRunDetails(jobId, runDetails);
137         return runDetails;
138     }
139 
140     /**
141      * This method is called before a job begins execution.
142      */
143     public void preJob() {
144     }
145 
146     /**
147      * This method is called after a job has finished execution.
148      */
149     public void postJob() {
150     }
151 
152     @Override
153     synchronized public final void start() throws SchedulerServiceException {
154         LOG.debug("{} -> STARTED", state);
155         switch (state) {
156             case STARTED:
157                 return;
158             case SHUTDOWN:
159                 throw new SchedulerServiceException("The scheduler service has been shut down; it cannot be restarted.");
160         }
161         startImpl();
162         state = State.STARTED;
163     }
164 
165     @Override
166     synchronized public final void standby() throws SchedulerServiceException {
167         LOG.debug("{} -> STANDBY", state);
168         switch (state) {
169             case STANDBY:
170                 return;
171             case SHUTDOWN:
172                 throw new SchedulerServiceException("The scheduler service has been shut down; it cannot be restarted.");
173         }
174         cancelJobs();
175         standbyImpl();
176         state = State.STANDBY;
177     }
178 
179     @Override
180     synchronized public final void shutdown() {
181         LOG.debug("{} -> SHUTDOWN", state);
182         if (state == State.SHUTDOWN) {
183             return;
184         }
185         state = State.SHUTDOWN;
186         cancelJobs();
187         shutdownImpl();
188     }
189 
190     private void cancelJobs() {
191         for (RunningJob job : runningJobs.values()) {
192             job.cancel();
193         }
194     }
195 
196     /**
197      * Records beginning to run a job.
198      * <p>
199      * Any successful call to this method <strong>MUST</strong> be paired with a call to
200      * {@link #leaveJob(JobId, RunningJob)} using the same arguments upon completion of the job.
201      * Success is defined by this method returning {@code null} as opposed to an existing job.
202      * </p>
203      *
204      * @param jobId the job ID that will be run
205      * @param job   the job that is about to be started
206      * @return {@code null} if successful, meaning that the job was not running and has been successfully
207      * registered.  If another job with that job ID is already running, then it is returned, instead.
208      */
209     RunningJob enterJob(final JobId jobId, final RunningJob job) {
210         return runningJobs.putIfAbsent(jobId, job);
211     }
212 
213     /**
214      * Records the completion of a running job.
215      * <p>
216      * This must be called exactly once for each successful call to {@link #enterJob(JobId, RunningJob)}.
217      * Failing to call it, calling it multiple times, or calling it with different arguments is a serious
218      * error and will throw an {@code IllegalStateException} if/when it is detected.  This behaviour
219      * should not be relied upon; it is intended only to serve as an aid for checking the correctness
220      * of the scheduler implementation.
221      * </p>
222      *
223      * @param jobId the job ID that was running
224      * @param job   the running job that has completed
225      * @throws IllegalStateException if this call to {@code leaveJob} does not follow a call to
226      *                               {@link #enterJob(JobId, RunningJob)} with the same arguments, which indicates a
227      *                               serious error in the scheduler implementation.
228      */
229     void leaveJob(final JobId jobId, final RunningJob job) {
230         if (!runningJobs.remove(jobId, job)) {
231             throw new IllegalStateException("Invalid call to leaveJob(" + jobId + ", " + job +
232                     "; actual running job for that ID is: " + runningJobs.get(jobId));
233         }
234 
235         if (runningJobs.isEmpty()) {
236             signalIdle();
237         }
238     }
239 
240     @GuardedBy("idleLock")
241     private void signalIdle() {
242         idleLock.lock();
243         try {
244             idleCondition.signalAll();
245         } finally {
246             idleLock.unlock();
247         }
248     }
249 
250 
251     /**
252      * {@inheritDoc}
253      * <p>
254      * Implementations may override the default implementation for {@code waitUntilIdle}.
255      * The default implementation that {@code AbstractSchedulerService} provides will poll
256      * {@link #getLocallyRunningJobs()} every 100ms until one of the following occurs:
257      * </p>
258      * <ol>
259      * <li>{@link #getLocallyRunningJobs()} returns an empty collection.</li>
260      * <li>The current thread is interrupted.</li>
261      * <li>The timeout is exhausted.</li>
262      * </ol>
263      */
264     @Override
265     public boolean waitUntilIdle(final long timeout, final TimeUnit units) throws InterruptedException {
266         if (runningJobs.isEmpty()) {
267             return true;
268         }
269         if (timeout <= 0L) {
270             return false;
271         }
272 
273         idleLock.lock();
274         try {
275             return waitUntilIdle(units.toNanos(timeout));
276         } finally {
277             idleLock.unlock();
278         }
279     }
280 
281     @GuardedBy("idleLock")
282     boolean waitUntilIdle(final long timeoutInNanos) throws InterruptedException {
283         long nanosLeft = timeoutInNanos;
284         while (nanosLeft > 0L) {
285             nanosLeft = awaitNanos(nanosLeft);
286             if (runningJobs.isEmpty()) {
287                 return true;
288             }
289         }
290         return false;
291     }
292 
293     @VisibleForTesting
294     long awaitNanos(long nanosLeft) throws InterruptedException {
295         return idleCondition.awaitNanos(nanosLeft);
296     }
297 
298     @Nonnull
299     @Override
300     public Collection<RunningJob> getLocallyRunningJobs() {
301         return ImmutableList.copyOf(runningJobs.values());
302     }
303 
304     @Nonnull
305     @Override
306     public final State getState() {
307         return state;
308     }
309 
310 
311     /**
312      * Provided by the scheduler service to implement {@link #start()} requests.  This
313      * is only ever called while in {@link State#STANDBY}, and throwing an exception will
314      * prevent the state from being updated.
315      *
316      * @throws SchedulerServiceException if the scheduler implementation fails to start
317      */
318     protected abstract void startImpl() throws SchedulerServiceException;
319 
320     /**
321      * Provided by the scheduler service to implement {@link #standby()} requests.  This
322      * is only ever called while in {@link State#STARTED}, and throwing an exception will
323      * prevent the state from being updated.
324      *
325      * @throws SchedulerServiceException if the scheduler implementation fails to enter standby mode
326      */
327     protected abstract void standbyImpl() throws SchedulerServiceException;
328 
329     /**
330      * Provided by the scheduler service to implement {@link #shutdown()} requests.  This
331      * is only ever called if the scheduler has not already been shut down, and throwing an
332      * exception <em>does not</em> prevent the scheduler service from entering this state.
333      */
334     protected abstract void shutdownImpl();
335 
336     /**
337      * Returns the parameter map serializer used by this scheduler.
338      *
339      * @return the parameter map serializer used by this scheduler.
340      */
341     public ParameterMapSerializer getParameterMapSerializer() {
342         return parameterMapSerializer;
343     }
344 
345 
346     /**
347      * Converts the unchecked {@code SchedulerRuntimeException} to the checked
348      * {@code SchedulerServiceException}.  It makes sense to do this in many cases
349      * where the checked exception is declared, as the caller is already having to
350      * handle it.
351      * <p>
352      * The message and cause of the {@code SchedulerServiceException} are taken from
353      * the {@link Throwable#getCause() cause} of the runtime exception.  If no cause
354      * was set, then the runtime exception itself is used.
355      * </p>
356      *
357      * @param sre the scheduler runtime exception to convert
358      * @return the converted exception
359      */
360     protected static SchedulerServiceException checked(SchedulerRuntimeException sre) {
361         Throwable cause = sre.getCause();
362         if (cause == null) {
363             cause = sre;
364         }
365         return new SchedulerServiceException(cause.toString(), cause);
366     }
367 
368     static class ByJobId implements Comparator<JobDetails>, Serializable {
369         private static final long serialVersionUID = 1L;
370 
371         public int compare(final JobDetails jd1, final JobDetails jd2) {
372             return jd1.getJobId().compareTo(jd2.getJobId());
373         }
374     }
375 }
376