1 package com.atlassian.scheduler.core;
2
3 import com.atlassian.scheduler.JobRunner;
4 import com.atlassian.scheduler.SchedulerRuntimeException;
5 import com.atlassian.scheduler.SchedulerServiceException;
6 import com.atlassian.scheduler.config.JobConfig;
7 import com.atlassian.scheduler.config.JobId;
8 import com.atlassian.scheduler.config.JobRunnerKey;
9 import com.atlassian.scheduler.core.spi.RunDetailsDao;
10 import com.atlassian.scheduler.core.status.RunDetailsImpl;
11 import com.atlassian.scheduler.core.util.JobRunnerRegistry;
12 import com.atlassian.scheduler.core.util.ParameterMapSerializer;
13 import com.atlassian.scheduler.status.JobDetails;
14 import com.atlassian.scheduler.status.RunDetails;
15 import com.atlassian.scheduler.status.RunOutcome;
16 import com.google.common.annotations.VisibleForTesting;
17 import com.google.common.collect.ImmutableList;
18 import org.slf4j.Logger;
19 import org.slf4j.LoggerFactory;
20
21 import javax.annotation.Nonnull;
22 import javax.annotation.Nullable;
23 import javax.annotation.concurrent.GuardedBy;
24 import java.io.Serializable;
25 import java.util.Collection;
26 import java.util.Comparator;
27 import java.util.Date;
28 import java.util.Set;
29 import java.util.UUID;
30 import java.util.concurrent.ConcurrentHashMap;
31 import java.util.concurrent.ConcurrentMap;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.locks.Condition;
34 import java.util.concurrent.locks.Lock;
35 import java.util.concurrent.locks.ReentrantLock;
36
37 import static com.atlassian.util.concurrent.Assertions.notNull;
38
39
40
41
42
43
44
45 public abstract class AbstractSchedulerService implements LifecycleAwareSchedulerService {
46 private static final Logger LOG = LoggerFactory.getLogger(AbstractSchedulerService.class);
47
48
49
50
51
52 protected static final Comparator<JobDetails> BY_JOB_ID = new ByJobId();
53
54 private static final int MAX_ATTEMPTS = 100;
55
56 private final Lock idleLock = new ReentrantLock();
57 private final Condition idleCondition = idleLock.newCondition();
58
59 private final JobRunnerRegistry jobRunnerRegistry = new JobRunnerRegistry();
60 private final ConcurrentMap<JobId, RunningJob> runningJobs = new ConcurrentHashMap<JobId, RunningJob>(16);
61
62 private final RunDetailsDao runDetailsDao;
63 private final ParameterMapSerializer parameterMapSerializer;
64
65 private volatile State state = State.STANDBY;
66
67 protected AbstractSchedulerService(RunDetailsDao runDetailsDao) {
68 this(runDetailsDao, new ParameterMapSerializer());
69 }
70
71 protected AbstractSchedulerService(RunDetailsDao runDetailsDao, ParameterMapSerializer parameterMapSerializer) {
72 this.runDetailsDao = runDetailsDao;
73 this.parameterMapSerializer = parameterMapSerializer;
74 }
75
76 @Override
77 public void registerJobRunner(final JobRunnerKey jobRunnerKey, final JobRunner jobRunner) {
78 LOG.debug("registerJobRunner: {}", jobRunnerKey);
79 jobRunnerRegistry.registerJobRunner(jobRunnerKey, jobRunner);
80 }
81
82 @Override
83 public void unregisterJobRunner(final JobRunnerKey jobRunnerKey) {
84 LOG.debug("unregisterJobRunner: {}", jobRunnerKey);
85 jobRunnerRegistry.unregisterJobRunner(jobRunnerKey);
86 }
87
88 public JobRunner getJobRunner(final JobRunnerKey jobRunnerKey) {
89 return jobRunnerRegistry.getJobRunner(jobRunnerKey);
90 }
91
92 @Nonnull
93 @Override
94 public Set<JobRunnerKey> getRegisteredJobRunnerKeys() {
95 return jobRunnerRegistry.getRegisteredJobRunnerKeys();
96 }
97
98 @Nonnull
99 @Override
100 public JobId scheduleJobWithGeneratedId(final JobConfig jobConfig) throws SchedulerServiceException {
101 final JobId jobId = generateUniqueJobId();
102 LOG.debug("scheduleJobWithGeneratedId: {} -> {}", jobConfig, jobId);
103 scheduleJob(jobId, jobConfig);
104 return jobId;
105 }
106
107 private JobId generateUniqueJobId() throws SchedulerServiceException {
108 for (int i = 0; i < MAX_ATTEMPTS; ++i) {
109 final JobId jobId = JobId.of(UUID.randomUUID().toString());
110 if (getJobDetails(jobId) == null) {
111 return jobId;
112 }
113 }
114 throw new SchedulerServiceException("Unable to generate a unique job ID");
115 }
116
117
118
119
120
121
122
123
124
125
126 public RunDetails addRunDetails(JobId jobId, Date startedAt, RunOutcome runOutcome, @Nullable String message) {
127 LOG.debug("addRunDetails: jobId={} startedAt={} runOutcome={} message={}",
128 new Object[]{jobId, startedAt, runOutcome, message});
129
130 notNull("jobId", jobId);
131 notNull("startedAt", startedAt);
132 notNull("runOutcome", runOutcome);
133
134 final long duration = System.currentTimeMillis() - startedAt.getTime();
135 final RunDetails runDetails = new RunDetailsImpl(startedAt, runOutcome, duration, message);
136 runDetailsDao.addRunDetails(jobId, runDetails);
137 return runDetails;
138 }
139
140
141
142
143 public void preJob() {
144 }
145
146
147
148
149 public void postJob() {
150 }
151
152 @Override
153 synchronized public final void start() throws SchedulerServiceException {
154 LOG.debug("{} -> STARTED", state);
155 switch (state) {
156 case STARTED:
157 return;
158 case SHUTDOWN:
159 throw new SchedulerServiceException("The scheduler service has been shut down; it cannot be restarted.");
160 }
161 startImpl();
162 state = State.STARTED;
163 }
164
165 @Override
166 synchronized public final void standby() throws SchedulerServiceException {
167 LOG.debug("{} -> STANDBY", state);
168 switch (state) {
169 case STANDBY:
170 return;
171 case SHUTDOWN:
172 throw new SchedulerServiceException("The scheduler service has been shut down; it cannot be restarted.");
173 }
174 cancelJobs();
175 standbyImpl();
176 state = State.STANDBY;
177 }
178
179 @Override
180 synchronized public final void shutdown() {
181 LOG.debug("{} -> SHUTDOWN", state);
182 if (state == State.SHUTDOWN) {
183 return;
184 }
185 state = State.SHUTDOWN;
186 cancelJobs();
187 shutdownImpl();
188 }
189
190 private void cancelJobs() {
191 for (RunningJob job : runningJobs.values()) {
192 job.cancel();
193 }
194 }
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209 RunningJob enterJob(final JobId jobId, final RunningJob job) {
210 return runningJobs.putIfAbsent(jobId, job);
211 }
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229 void leaveJob(final JobId jobId, final RunningJob job) {
230 if (!runningJobs.remove(jobId, job)) {
231 throw new IllegalStateException("Invalid call to leaveJob(" + jobId + ", " + job +
232 "; actual running job for that ID is: " + runningJobs.get(jobId));
233 }
234
235 if (runningJobs.isEmpty()) {
236 signalIdle();
237 }
238 }
239
240 @GuardedBy("idleLock")
241 private void signalIdle() {
242 idleLock.lock();
243 try {
244 idleCondition.signalAll();
245 } finally {
246 idleLock.unlock();
247 }
248 }
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264 @Override
265 public boolean waitUntilIdle(final long timeout, final TimeUnit units) throws InterruptedException {
266 if (runningJobs.isEmpty()) {
267 return true;
268 }
269 if (timeout <= 0L) {
270 return false;
271 }
272
273 idleLock.lock();
274 try {
275 return waitUntilIdle(units.toNanos(timeout));
276 } finally {
277 idleLock.unlock();
278 }
279 }
280
281 @GuardedBy("idleLock")
282 boolean waitUntilIdle(final long timeoutInNanos) throws InterruptedException {
283 long nanosLeft = timeoutInNanos;
284 while (nanosLeft > 0L) {
285 nanosLeft = awaitNanos(nanosLeft);
286 if (runningJobs.isEmpty()) {
287 return true;
288 }
289 }
290 return false;
291 }
292
293 @VisibleForTesting
294 long awaitNanos(long nanosLeft) throws InterruptedException {
295 return idleCondition.awaitNanos(nanosLeft);
296 }
297
298 @Nonnull
299 @Override
300 public Collection<RunningJob> getLocallyRunningJobs() {
301 return ImmutableList.copyOf(runningJobs.values());
302 }
303
304 @Nonnull
305 @Override
306 public final State getState() {
307 return state;
308 }
309
310
311
312
313
314
315
316
317
318 protected abstract void startImpl() throws SchedulerServiceException;
319
320
321
322
323
324
325
326
327 protected abstract void standbyImpl() throws SchedulerServiceException;
328
329
330
331
332
333
334 protected abstract void shutdownImpl();
335
336
337
338
339
340
341 public ParameterMapSerializer getParameterMapSerializer() {
342 return parameterMapSerializer;
343 }
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360 protected static SchedulerServiceException checked(SchedulerRuntimeException sre) {
361 Throwable cause = sre.getCause();
362 if (cause == null) {
363 cause = sre;
364 }
365 return new SchedulerServiceException(cause.toString(), cause);
366 }
367
368 static class ByJobId implements Comparator<JobDetails>, Serializable {
369 private static final long serialVersionUID = 1L;
370
371 public int compare(final JobDetails jd1, final JobDetails jd2) {
372 return jd1.getJobId().compareTo(jd2.getJobId());
373 }
374 }
375 }
376