1 package com.atlassian.scheduler.core;
2
3 import com.atlassian.scheduler.SchedulerServiceException;
4 import com.atlassian.scheduler.config.JobId;
5 import org.junit.Rule;
6 import org.junit.Test;
7 import org.junit.rules.ExpectedException;
8
9 import static com.atlassian.scheduler.core.Constants.JOB_ID;
10 import static com.atlassian.scheduler.core.LifecycleAwareSchedulerService.State.SHUTDOWN;
11 import static com.atlassian.scheduler.core.LifecycleAwareSchedulerService.State.STANDBY;
12 import static com.atlassian.scheduler.core.LifecycleAwareSchedulerService.State.STARTED;
13 import static java.util.concurrent.TimeUnit.MILLISECONDS;
14 import static org.hamcrest.Matchers.contains;
15 import static org.hamcrest.Matchers.containsInAnyOrder;
16 import static org.hamcrest.Matchers.containsString;
17 import static org.hamcrest.Matchers.hasSize;
18 import static org.hamcrest.Matchers.is;
19 import static org.hamcrest.Matchers.lessThan;
20 import static org.hamcrest.Matchers.nullValue;
21 import static org.hamcrest.Matchers.sameInstance;
22 import static org.junit.Assert.assertThat;
23 import static org.junit.Assert.fail;
24 import static org.mockito.Matchers.anyLong;
25 import static org.mockito.Mockito.mock;
26 import static org.mockito.Mockito.never;
27 import static org.mockito.Mockito.spy;
28 import static org.mockito.Mockito.times;
29 import static org.mockito.Mockito.verify;
30
31 public class AbstractSchedulerServiceTest {
32 @Rule
33 public ExpectedException thrown = ExpectedException.none();
34
35 @SuppressWarnings("unchecked")
36 @Test
37 public void testWaitForScheduledJobsToCompleteWithTimeoutSuccessful() throws Exception {
38 final AbstractSchedulerService schedulerService = new SchedulerServiceFixture();
39 final long start = System.currentTimeMillis();
40 assertThat("should go idle", schedulerService.waitUntilIdle(1000L, MILLISECONDS), is(true));
41 assertThat("should detect idle quickly, not take most or all of the whole timeout",
42 System.currentTimeMillis() - start, lessThan(300L));
43 }
44
45
46 @SuppressWarnings("unchecked")
47 @Test
48 public void testWaitForScheduledJobsToCompleteWithTimeoutUnsuccessful() throws Exception {
49 final SchedulerServiceFixture schedulerService = spy(new SchedulerServiceFixture());
50 schedulerService.hackAwaitNanos();
51 assertThat(schedulerService.enterJob(JOB_ID, mock(RunningJob.class)), nullValue());
52
53 assertThat("should time out", schedulerService.waitUntilIdle(100L, MILLISECONDS), is(false));
54
55
56 verify(schedulerService, times(4)).awaitNanos(anyLong());
57 }
58
59 @SuppressWarnings("unchecked")
60 @Test(expected = InterruptedException.class)
61 public void testWaitForScheduledJobsToCompleteWithTimeoutInterrupted() throws Exception {
62 final AbstractSchedulerService schedulerService = spy(new SchedulerServiceFixture());
63 assertThat(schedulerService.enterJob(JOB_ID, mock(RunningJob.class)), nullValue());
64
65 boolean hadToCallFail = false;
66 try {
67 Thread.currentThread().interrupt();
68 final boolean result = schedulerService.waitUntilIdle(50L, MILLISECONDS);
69 hadToCallFail = true;
70 fail("Expected an InterruptedException but got result=" + result);
71 } finally {
72
73 final boolean threadWasInterrupted = Thread.interrupted();
74 if (!hadToCallFail) {
75 assertThat("Interrupted flag should have already been clear", threadWasInterrupted, is(false));
76 }
77 verify(schedulerService).awaitNanos(anyLong());
78 }
79 }
80
81 @Test
82 public void testWaitForScheduledJobsToCompleteWithTimeoutInterruptedButAlreadyIdle() throws Exception {
83 final AbstractSchedulerService schedulerService = spy(new SchedulerServiceFixture());
84 try {
85 Thread.currentThread().interrupt();
86 assertThat("should succeed", schedulerService.waitUntilIdle(50L, MILLISECONDS), is(true));
87 assertThat("should still be interrupted", Thread.interrupted(), is(true));
88
89 verify(schedulerService, never()).awaitNanos(anyLong());
90 } finally {
91
92 Thread.interrupted();
93 }
94
95 }
96
97 @Test
98 public void testEnterLeaveJob() {
99 final AbstractSchedulerService schedulerService = new SchedulerServiceFixture();
100 final RunningJob job1 = mock(RunningJob.class);
101 final RunningJob job2 = mock(RunningJob.class);
102 final JobId otherJobId = JobId.of("Some other job ID");
103 final RunningJob otherJob = mock(RunningJob.class);
104
105 assertThat("No jobs initially", schedulerService.getLocallyRunningJobs(), hasSize(0));
106 assertThat("Enter other job", schedulerService.enterJob(otherJobId, otherJob), nullValue());
107 assertThat("Entered other job", schedulerService.getLocallyRunningJobs(), contains(sameInstance(otherJob)));
108
109 assertFailedLeave("Job is not running", schedulerService, job1);
110 assertThat("Successful first entry when idle", schedulerService.enterJob(JOB_ID, job1), nullValue());
111 assertThat(schedulerService.getLocallyRunningJobs(), containsInAnyOrder(job1, otherJob));
112
113 assertFailedLeave("Wrong job specified", schedulerService, job2);
114 assertThat(schedulerService.getLocallyRunningJobs(), containsInAnyOrder(job1, otherJob));
115
116 assertThat("Unsuccessful re-entry", schedulerService.enterJob(JOB_ID, job1), sameInstance(job1));
117 assertThat("Unsuccessful entry of second request", schedulerService.enterJob(JOB_ID, job2), sameInstance(job1));
118 assertThat(schedulerService.getLocallyRunningJobs(), containsInAnyOrder(job1, otherJob));
119 schedulerService.leaveJob(otherJobId, otherJob);
120 assertThat("Left other job", schedulerService.getLocallyRunningJobs(), contains(job1));
121
122 schedulerService.leaveJob(JOB_ID, job1);
123 assertThat("Left job1", schedulerService.getLocallyRunningJobs(), hasSize(0));
124 assertFailedLeave("Unsuccessful re-leave", schedulerService, job1);
125 assertThat("Successful second entry when idle", schedulerService.enterJob(JOB_ID, job2), nullValue());
126 assertThat("Left other job", schedulerService.getLocallyRunningJobs(), contains(job2));
127 }
128
129 static void assertFailedLeave(final String reason, final AbstractSchedulerService schedulerService, final RunningJob job) {
130 try {
131 schedulerService.leaveJob(JOB_ID, job);
132 fail("Expected unsuccessful leaveJob because " + reason);
133 } catch (IllegalStateException ise) {
134 assertThat(ise.getMessage(), containsString("Invalid call to leaveJob"));
135 }
136 }
137
138 @Test
139 public void testLifecycle() throws SchedulerServiceException {
140 final SchedulerServiceFixture schedulerService = new SchedulerServiceFixture();
141
142 schedulerService.assertState(0, 0, 0, STANDBY);
143 schedulerService.standby();
144 schedulerService.assertState(0, 0, 0, STANDBY);
145
146 schedulerService.start();
147 schedulerService.assertState(1, 0, 0, STARTED);
148 schedulerService.start();
149 schedulerService.assertState(1, 0, 0, STARTED);
150
151 schedulerService.standby();
152 schedulerService.assertState(1, 1, 0, STANDBY);
153 schedulerService.standby();
154 schedulerService.assertState(1, 1, 0, STANDBY);
155
156 schedulerService.start();
157 schedulerService.assertState(2, 1, 0, STARTED);
158 schedulerService.start();
159 schedulerService.assertState(2, 1, 0, STARTED);
160
161 schedulerService.shutdown();
162 schedulerService.assertState(2, 1, 1, SHUTDOWN);
163 schedulerService.shutdown();
164 schedulerService.assertState(2, 1, 1, SHUTDOWN);
165
166 try {
167 schedulerService.start();
168 } catch (SchedulerServiceException sse) {
169 assertThat(sse.getMessage(), containsString("shut down"));
170 schedulerService.assertState(2, 1, 1, SHUTDOWN);
171 }
172
173 try {
174 schedulerService.standby();
175 } catch (SchedulerServiceException sse) {
176 assertThat(sse.getMessage(), containsString("shut down"));
177 schedulerService.assertState(2, 1, 1, SHUTDOWN);
178 }
179 }
180 }