1 package com.atlassian.sal.core.scheduling;
2
3 import java.util.Date;
4 import java.util.Map;
5 import java.util.concurrent.ConcurrentHashMap;
6 import java.util.concurrent.ConcurrentMap;
7
8 import javax.annotation.Nonnull;
9 import javax.annotation.Nullable;
10
11 import com.atlassian.sal.api.scheduling.PluginJob;
12 import com.atlassian.sal.api.scheduling.PluginScheduler;
13 import com.atlassian.scheduler.JobRunner;
14 import com.atlassian.scheduler.JobRunnerRequest;
15 import com.atlassian.scheduler.JobRunnerResponse;
16 import com.atlassian.scheduler.SchedulerRuntimeException;
17 import com.atlassian.scheduler.SchedulerService;
18 import com.atlassian.scheduler.SchedulerServiceException;
19 import com.atlassian.scheduler.config.JobConfig;
20 import com.atlassian.scheduler.config.JobId;
21 import com.atlassian.scheduler.config.JobRunnerKey;
22 import com.atlassian.scheduler.config.RunMode;
23 import com.atlassian.scheduler.config.Schedule;
24
25 import org.springframework.beans.factory.DisposableBean;
26 import org.springframework.beans.factory.InitializingBean;
27
28 import static com.atlassian.scheduler.JobRunnerResponse.aborted;
29 import static com.atlassian.util.concurrent.Assertions.notNull;
30
31
32
33
34
35
36 public class DefaultPluginScheduler implements PluginScheduler, InitializingBean, DisposableBean
37 {
38 static final JobRunnerKey JOB_RUNNER_KEY = JobRunnerKey.of(DefaultPluginScheduler.class.getName());
39
40
41 private final ConcurrentMap<JobId, JobDescriptor> descriptors = new ConcurrentHashMap<JobId, JobDescriptor>();
42
43 private final SchedulerService schedulerService;
44
45 public DefaultPluginScheduler(SchedulerService schedulerService)
46 {
47 this.schedulerService = schedulerService;
48
49
50
51 }
52
53 @Override
54 public void scheduleJob(String jobKey, Class<? extends PluginJob> jobClass, Map<String, Object> jobDataMap, Date startTime, long repeatInterval)
55 {
56 final JobId jobId = toJobId(jobKey);
57 descriptors.put(jobId, new JobDescriptor(jobClass, jobDataMap));
58 final JobConfig jobConfig = JobConfig.forJobRunnerKey(JOB_RUNNER_KEY)
59 .withRunMode(RunMode.RUN_LOCALLY)
60 .withSchedule(Schedule.forInterval(repeatInterval, startTime));
61 try
62 {
63 schedulerService.scheduleJob(jobId, jobConfig);
64 }
65 catch (SchedulerServiceException sse)
66 {
67 throw new SchedulerRuntimeException(sse.getMessage(), sse);
68 }
69 }
70
71 @Override
72 public void unscheduleJob(String jobKey)
73 {
74 final JobId jobId = toJobId(jobKey);
75
76
77 schedulerService.unscheduleJob(jobId);
78
79
80 if (descriptors.remove(jobId) == null)
81 {
82 throw new IllegalArgumentException("Error unscheduling job. Job '" + jobKey + "' is not scheduled.");
83 }
84 }
85
86 @Nonnull
87 JobRunnerResponse runJobImpl(JobRunnerRequest jobRunnerRequest)
88 {
89 final JobDescriptor descriptor = descriptors.get(jobRunnerRequest.getJobId());
90 if (descriptor == null)
91 {
92 return aborted("Job descriptor not found");
93 }
94 return descriptor.runJob();
95 }
96
97 @Override
98 public void afterPropertiesSet()
99 {
100
101 schedulerService.registerJobRunner(JOB_RUNNER_KEY, new JobRunner()
102 {
103 @Nullable
104 @Override
105 public JobRunnerResponse runJob(JobRunnerRequest jobRunnerRequest)
106 {
107 return runJobImpl(jobRunnerRequest);
108 }
109 });
110 }
111
112 @Override
113 public void destroy()
114 {
115 schedulerService.unregisterJobRunner(JOB_RUNNER_KEY);
116 }
117
118 static JobId toJobId(String jobKey)
119 {
120 return JobId.of(DefaultPluginScheduler.class.getSimpleName() + ':' + jobKey);
121 }
122
123
124 static class JobDescriptor
125 {
126 final Class<? extends PluginJob> jobClass;
127 final Map<String, Object> jobDataMap;
128
129 @SuppressWarnings("AssignmentToCollectionOrArrayFieldFromParameter")
130 JobDescriptor(final Class<? extends PluginJob> jobClass, final Map<String, Object> jobDataMap)
131 {
132 this.jobClass = notNull("jobClass", jobClass);
133 this.jobDataMap = jobDataMap;
134 }
135
136 @Nonnull
137 JobRunnerResponse runJob()
138 {
139 final PluginJob job;
140 try
141 {
142 job = jobClass.newInstance();
143 }
144 catch (InstantiationException e)
145 {
146 return aborted(e.toString());
147 }
148 catch (IllegalAccessException e)
149 {
150 return aborted(e.toString());
151 }
152
153 job.execute(jobDataMap);
154 return JobRunnerResponse.success();
155 }
156 }
157 }
158