View Javadoc

1   package com.atlassian.sal.core.scheduling;
2   
3   import java.util.Date;
4   import java.util.Map;
5   import java.util.concurrent.ConcurrentHashMap;
6   import java.util.concurrent.ConcurrentMap;
7   
8   import javax.annotation.Nonnull;
9   import javax.annotation.Nullable;
10  
11  import com.atlassian.sal.api.scheduling.PluginJob;
12  import com.atlassian.sal.api.scheduling.PluginScheduler;
13  import com.atlassian.scheduler.JobRunner;
14  import com.atlassian.scheduler.JobRunnerRequest;
15  import com.atlassian.scheduler.JobRunnerResponse;
16  import com.atlassian.scheduler.SchedulerRuntimeException;
17  import com.atlassian.scheduler.SchedulerService;
18  import com.atlassian.scheduler.SchedulerServiceException;
19  import com.atlassian.scheduler.config.JobConfig;
20  import com.atlassian.scheduler.config.JobId;
21  import com.atlassian.scheduler.config.JobRunnerKey;
22  import com.atlassian.scheduler.config.RunMode;
23  import com.atlassian.scheduler.config.Schedule;
24  
25  import org.springframework.beans.factory.DisposableBean;
26  import org.springframework.beans.factory.InitializingBean;
27  
28  import static com.atlassian.scheduler.JobRunnerResponse.aborted;
29  import static com.atlassian.util.concurrent.Assertions.notNull;
30  
31  /**
32   * A plugin scheduler that is backed by the atlassian-scheduler library.
33   *
34   * @since v3.0.0
35   */
36  public class DefaultPluginScheduler implements PluginScheduler, InitializingBean, DisposableBean
37  {
38      static final JobRunnerKey JOB_RUNNER_KEY = JobRunnerKey.of(DefaultPluginScheduler.class.getName());
39  
40      // Must keep them locally because the scheduler service only accepts Serializable args, not arbitrary Objects
41      private final ConcurrentMap<JobId, JobDescriptor> descriptors = new ConcurrentHashMap<JobId, JobDescriptor>();
42  
43      private final SchedulerService schedulerService;
44  
45      public DefaultPluginScheduler(SchedulerService schedulerService)
46      {
47          this.schedulerService = schedulerService;
48  
49          // Registering here would allow "this" to escape the thread before it is properly constructed.
50          // See Java Concurrency in Practice, section 3.2.1.
51      }
52  
53      @Override
54      public void scheduleJob(String jobKey, Class<? extends PluginJob> jobClass, Map<String, Object> jobDataMap, Date startTime, long repeatInterval)
55      {
56          final JobId jobId = toJobId(jobKey);
57          descriptors.put(jobId, new JobDescriptor(jobClass, jobDataMap));
58          final JobConfig jobConfig = JobConfig.forJobRunnerKey(JOB_RUNNER_KEY)
59                  .withRunMode(RunMode.RUN_LOCALLY)
60                  .withSchedule(Schedule.forInterval(repeatInterval, startTime));
61          try
62          {
63              schedulerService.scheduleJob(jobId, jobConfig);
64          }
65          catch (SchedulerServiceException sse)
66          {
67              throw new SchedulerRuntimeException(sse.getMessage(), sse);
68          }
69      }
70  
71      @Override
72      public void unscheduleJob(String jobKey)
73      {
74          final JobId jobId = toJobId(jobKey);
75  
76          // Do the real unschedule regardless
77          schedulerService.unscheduleJob(jobId);
78  
79          // Why not be idempotent?  Because unfortunately the API demands this behaviour. :(
80          if (descriptors.remove(jobId) == null)
81          {
82              throw new IllegalArgumentException("Error unscheduling job. Job '" + jobKey + "' is not scheduled.");
83          }
84      }
85  
86      @Nonnull
87      JobRunnerResponse runJobImpl(JobRunnerRequest jobRunnerRequest)
88      {
89          final JobDescriptor descriptor = descriptors.get(jobRunnerRequest.getJobId());
90          if (descriptor == null)
91          {
92              return aborted("Job descriptor not found");
93          }
94          return descriptor.runJob();
95      }
96  
97      @Override
98      public void afterPropertiesSet()
99      {
100         // Register the job runner here, instead
101         schedulerService.registerJobRunner(JOB_RUNNER_KEY, new JobRunner()
102         {
103             @Nullable
104             @Override
105             public JobRunnerResponse runJob(JobRunnerRequest jobRunnerRequest)
106             {
107                 return runJobImpl(jobRunnerRequest);
108             }
109         });
110     }
111 
112     @Override
113     public void destroy()
114     {
115         schedulerService.unregisterJobRunner(JOB_RUNNER_KEY);
116     }
117 
118     static JobId toJobId(String jobKey)
119     {
120         return JobId.of(DefaultPluginScheduler.class.getSimpleName() + ':' + jobKey);
121     }
122 
123 
124     static class JobDescriptor
125     {
126         final Class<? extends PluginJob> jobClass;
127         final Map<String, Object> jobDataMap;
128 
129         @SuppressWarnings("AssignmentToCollectionOrArrayFieldFromParameter")  // Required for compatibility
130         JobDescriptor(final Class<? extends PluginJob> jobClass, final Map<String, Object> jobDataMap)
131         {
132             this.jobClass = notNull("jobClass", jobClass);
133             this.jobDataMap = jobDataMap;
134         }
135 
136         @Nonnull
137         JobRunnerResponse runJob()
138         {
139             final PluginJob job;
140             try
141             {
142                 job = jobClass.newInstance();
143             }
144             catch (InstantiationException e)
145             {
146                 return aborted(e.toString());
147             }
148             catch (IllegalAccessException e)
149             {
150                 return aborted(e.toString());
151             }
152 
153             job.execute(jobDataMap);
154             return JobRunnerResponse.success();
155         }
156     }
157 }
158