View Javadoc

1   package com.atlassian.sal.core.scheduling;
2   
3   import com.atlassian.sal.api.scheduling.PluginJob;
4   import com.atlassian.sal.api.scheduling.PluginScheduler;
5   import com.atlassian.scheduler.JobRunner;
6   import com.atlassian.scheduler.JobRunnerRequest;
7   import com.atlassian.scheduler.JobRunnerResponse;
8   import com.atlassian.scheduler.SchedulerRuntimeException;
9   import com.atlassian.scheduler.SchedulerService;
10  import com.atlassian.scheduler.SchedulerServiceException;
11  import com.atlassian.scheduler.config.JobConfig;
12  import com.atlassian.scheduler.config.JobId;
13  import com.atlassian.scheduler.config.JobRunnerKey;
14  import com.atlassian.scheduler.config.RunMode;
15  import com.atlassian.scheduler.config.Schedule;
16  import org.springframework.beans.factory.DisposableBean;
17  import org.springframework.beans.factory.InitializingBean;
18  
19  import javax.annotation.Nonnull;
20  import javax.annotation.Nullable;
21  import java.util.Date;
22  import java.util.Map;
23  import java.util.concurrent.ConcurrentHashMap;
24  import java.util.concurrent.ConcurrentMap;
25  
26  import static com.atlassian.scheduler.JobRunnerResponse.aborted;
27  import static com.atlassian.util.concurrent.Assertions.notNull;
28  
29  /**
30   * A plugin scheduler that is backed by the atlassian-scheduler library.
31   *
32   * @since v3.0.0
33   */
34  public class DefaultPluginScheduler implements PluginScheduler, InitializingBean, DisposableBean {
35      static final JobRunnerKey JOB_RUNNER_KEY = JobRunnerKey.of(DefaultPluginScheduler.class.getName());
36  
37      // Must keep them locally because the scheduler service only accepts Serializable args, not arbitrary Objects
38      private final ConcurrentMap<JobId, JobDescriptor> descriptors = new ConcurrentHashMap<JobId, JobDescriptor>();
39  
40      private final SchedulerService schedulerService;
41  
42      public DefaultPluginScheduler(SchedulerService schedulerService) {
43          this.schedulerService = schedulerService;
44  
45          // Registering here would allow "this" to escape the thread before it is properly constructed.
46          // See Java Concurrency in Practice, section 3.2.1.
47      }
48  
49      @Override
50      public void scheduleJob(String jobKey, Class<? extends PluginJob> jobClass, Map<String, Object> jobDataMap, Date startTime, long repeatInterval) {
51          final JobId jobId = toJobId(jobKey);
52          descriptors.put(jobId, new JobDescriptor(jobClass, jobDataMap));
53          final JobConfig jobConfig = JobConfig.forJobRunnerKey(JOB_RUNNER_KEY)
54                  .withRunMode(RunMode.RUN_LOCALLY)
55                  .withSchedule(Schedule.forInterval(repeatInterval, startTime));
56          try {
57              schedulerService.scheduleJob(jobId, jobConfig);
58          } catch (SchedulerServiceException sse) {
59              throw new SchedulerRuntimeException(sse.getMessage(), sse);
60          }
61      }
62  
63      @Override
64      public void unscheduleJob(String jobKey) {
65          final JobId jobId = toJobId(jobKey);
66  
67          // Do the real unschedule regardless
68          schedulerService.unscheduleJob(jobId);
69  
70          // Why not be idempotent?  Because unfortunately the API demands this behaviour. :(
71          if (descriptors.remove(jobId) == null) {
72              throw new IllegalArgumentException("Error unscheduling job. Job '" + jobKey + "' is not scheduled.");
73          }
74      }
75  
76      @Nonnull
77      JobRunnerResponse runJobImpl(JobRunnerRequest jobRunnerRequest) {
78          final JobDescriptor descriptor = descriptors.get(jobRunnerRequest.getJobId());
79          if (descriptor == null) {
80              return aborted("Job descriptor not found");
81          }
82          return descriptor.runJob();
83      }
84  
85      @Override
86      public void afterPropertiesSet() {
87          // Register the job runner here, instead
88          schedulerService.registerJobRunner(JOB_RUNNER_KEY, new JobRunner() {
89              @Nullable
90              @Override
91              public JobRunnerResponse runJob(JobRunnerRequest jobRunnerRequest) {
92                  return runJobImpl(jobRunnerRequest);
93              }
94          });
95      }
96  
97      @Override
98      public void destroy() {
99          schedulerService.unregisterJobRunner(JOB_RUNNER_KEY);
100     }
101 
102     static JobId toJobId(String jobKey) {
103         return JobId.of(DefaultPluginScheduler.class.getSimpleName() + ':' + jobKey);
104     }
105 
106 
107     static class JobDescriptor {
108         final Class<? extends PluginJob> jobClass;
109         final Map<String, Object> jobDataMap;
110 
111         @SuppressWarnings("AssignmentToCollectionOrArrayFieldFromParameter")
112             // Required for compatibility
113         JobDescriptor(final Class<? extends PluginJob> jobClass, final Map<String, Object> jobDataMap) {
114             this.jobClass = notNull("jobClass", jobClass);
115             this.jobDataMap = jobDataMap;
116         }
117 
118         @Nonnull
119         JobRunnerResponse runJob() {
120             final PluginJob job;
121             try {
122                 job = jobClass.newInstance();
123             } catch (InstantiationException e) {
124                 return aborted(e.toString());
125             } catch (IllegalAccessException e) {
126                 return aborted(e.toString());
127             }
128 
129             job.execute(jobDataMap);
130             return JobRunnerResponse.success();
131         }
132     }
133 }
134