View Javadoc

1   package com.atlassian.scheduler.core;
2   
3   import com.atlassian.scheduler.SchedulerServiceException;
4   import com.atlassian.scheduler.config.JobId;
5   import org.junit.Rule;
6   import org.junit.Test;
7   import org.junit.rules.ExpectedException;
8   
9   import static com.atlassian.scheduler.core.Constants.JOB_ID;
10  import static com.atlassian.scheduler.core.LifecycleAwareSchedulerService.State.SHUTDOWN;
11  import static com.atlassian.scheduler.core.LifecycleAwareSchedulerService.State.STANDBY;
12  import static com.atlassian.scheduler.core.LifecycleAwareSchedulerService.State.STARTED;
13  import static java.util.concurrent.TimeUnit.MILLISECONDS;
14  import static org.hamcrest.Matchers.contains;
15  import static org.hamcrest.Matchers.containsInAnyOrder;
16  import static org.hamcrest.Matchers.containsString;
17  import static org.hamcrest.Matchers.hasSize;
18  import static org.hamcrest.Matchers.is;
19  import static org.hamcrest.Matchers.lessThan;
20  import static org.hamcrest.Matchers.nullValue;
21  import static org.hamcrest.Matchers.sameInstance;
22  import static org.junit.Assert.assertThat;
23  import static org.junit.Assert.fail;
24  import static org.mockito.Matchers.anyLong;
25  import static org.mockito.Mockito.mock;
26  import static org.mockito.Mockito.never;
27  import static org.mockito.Mockito.spy;
28  import static org.mockito.Mockito.times;
29  import static org.mockito.Mockito.verify;
30  
31  public class AbstractSchedulerServiceTest {
32      @Rule
33      public ExpectedException thrown = ExpectedException.none();
34  
35      @SuppressWarnings("unchecked")
36      @Test
37      public void testWaitForScheduledJobsToCompleteWithTimeoutSuccessful() throws Exception {
38          final AbstractSchedulerService schedulerService = new SchedulerServiceFixture();
39          final long start = System.currentTimeMillis();
40          assertThat("should go idle", schedulerService.waitUntilIdle(1000L, MILLISECONDS), is(true));
41          assertThat("should detect idle quickly, not take most or all of the whole timeout",
42                  System.currentTimeMillis() - start, lessThan(300L));
43      }
44  
45  
46      @SuppressWarnings("unchecked")
47      @Test
48      public void testWaitForScheduledJobsToCompleteWithTimeoutUnsuccessful() throws Exception {
49          final SchedulerServiceFixture schedulerService = spy(new SchedulerServiceFixture());
50          schedulerService.hackAwaitNanos();  // Force instant 30ms decrement per call
51          assertThat(schedulerService.enterJob(JOB_ID, mock(RunningJob.class)), nullValue());
52  
53          assertThat("should time out", schedulerService.waitUntilIdle(100L, MILLISECONDS), is(false));
54  
55          // 100 -> 70 -> 40 -> 10 -> -20 => Timeout!
56          verify(schedulerService, times(4)).awaitNanos(anyLong());
57      }
58  
59      @SuppressWarnings("unchecked")
60      @Test(expected = InterruptedException.class)
61      public void testWaitForScheduledJobsToCompleteWithTimeoutInterrupted() throws Exception {
62          final AbstractSchedulerService schedulerService = spy(new SchedulerServiceFixture());
63          assertThat(schedulerService.enterJob(JOB_ID, mock(RunningJob.class)), nullValue());
64  
65          boolean hadToCallFail = false;
66          try {
67              Thread.currentThread().interrupt();
68              final boolean result = schedulerService.waitUntilIdle(50L, MILLISECONDS);
69              hadToCallFail = true;
70              fail("Expected an InterruptedException but got result=" + result);
71          } finally {
72              // Don't leave the interrupted state polluted by the test
73              final boolean threadWasInterrupted = Thread.interrupted();
74              if (!hadToCallFail) {
75                  assertThat("Interrupted flag should have already been clear", threadWasInterrupted, is(false));
76              }
77              verify(schedulerService).awaitNanos(anyLong());
78          }
79      }
80  
81      @Test
82      public void testWaitForScheduledJobsToCompleteWithTimeoutInterruptedButAlreadyIdle() throws Exception {
83          final AbstractSchedulerService schedulerService = spy(new SchedulerServiceFixture());
84          try {
85              Thread.currentThread().interrupt();
86              assertThat("should succeed", schedulerService.waitUntilIdle(50L, MILLISECONDS), is(true));
87              assertThat("should still be interrupted", Thread.interrupted(), is(true));
88  
89              verify(schedulerService, never()).awaitNanos(anyLong());
90          } finally {
91              // Don't leave the interrupted state polluted by the test
92              Thread.interrupted();
93          }
94  
95      }
96  
97      @Test
98      public void testEnterLeaveJob() {
99          final AbstractSchedulerService schedulerService = new SchedulerServiceFixture();
100         final RunningJob job1 = mock(RunningJob.class);
101         final RunningJob job2 = mock(RunningJob.class);
102         final JobId otherJobId = JobId.of("Some other job ID");
103         final RunningJob otherJob = mock(RunningJob.class);
104 
105         assertThat("No jobs initially", schedulerService.getLocallyRunningJobs(), hasSize(0));
106         assertThat("Enter other job", schedulerService.enterJob(otherJobId, otherJob), nullValue());
107         assertThat("Entered other job", schedulerService.getLocallyRunningJobs(), contains(sameInstance(otherJob)));
108 
109         assertFailedLeave("Job is not running", schedulerService, job1);
110         assertThat("Successful first entry when idle", schedulerService.enterJob(JOB_ID, job1), nullValue());
111         assertThat(schedulerService.getLocallyRunningJobs(), containsInAnyOrder(job1, otherJob));
112 
113         assertFailedLeave("Wrong job specified", schedulerService, job2);
114         assertThat(schedulerService.getLocallyRunningJobs(), containsInAnyOrder(job1, otherJob));
115 
116         assertThat("Unsuccessful re-entry", schedulerService.enterJob(JOB_ID, job1), sameInstance(job1));
117         assertThat("Unsuccessful entry of second request", schedulerService.enterJob(JOB_ID, job2), sameInstance(job1));
118         assertThat(schedulerService.getLocallyRunningJobs(), containsInAnyOrder(job1, otherJob));
119         schedulerService.leaveJob(otherJobId, otherJob);  // successful
120         assertThat("Left other job", schedulerService.getLocallyRunningJobs(), contains(job1));
121 
122         schedulerService.leaveJob(JOB_ID, job1);  // successful
123         assertThat("Left job1", schedulerService.getLocallyRunningJobs(), hasSize(0));
124         assertFailedLeave("Unsuccessful re-leave", schedulerService, job1);
125         assertThat("Successful second entry when idle", schedulerService.enterJob(JOB_ID, job2), nullValue());
126         assertThat("Left other job", schedulerService.getLocallyRunningJobs(), contains(job2));
127     }
128 
129     static void assertFailedLeave(final String reason, final AbstractSchedulerService schedulerService, final RunningJob job) {
130         try {
131             schedulerService.leaveJob(JOB_ID, job);
132             fail("Expected unsuccessful leaveJob because " + reason);
133         } catch (IllegalStateException ise) {
134             assertThat(ise.getMessage(), containsString("Invalid call to leaveJob"));
135         }
136     }
137 
138     @Test
139     public void testLifecycle() throws SchedulerServiceException {
140         final SchedulerServiceFixture schedulerService = new SchedulerServiceFixture();
141 
142         schedulerService.assertState(0, 0, 0, STANDBY);
143         schedulerService.standby();
144         schedulerService.assertState(0, 0, 0, STANDBY);
145 
146         schedulerService.start();
147         schedulerService.assertState(1, 0, 0, STARTED);
148         schedulerService.start();
149         schedulerService.assertState(1, 0, 0, STARTED);
150 
151         schedulerService.standby();
152         schedulerService.assertState(1, 1, 0, STANDBY);
153         schedulerService.standby();
154         schedulerService.assertState(1, 1, 0, STANDBY);
155 
156         schedulerService.start();
157         schedulerService.assertState(2, 1, 0, STARTED);
158         schedulerService.start();
159         schedulerService.assertState(2, 1, 0, STARTED);
160 
161         schedulerService.shutdown();
162         schedulerService.assertState(2, 1, 1, SHUTDOWN);
163         schedulerService.shutdown();
164         schedulerService.assertState(2, 1, 1, SHUTDOWN);
165 
166         try {
167             schedulerService.start();
168         } catch (SchedulerServiceException sse) {
169             assertThat(sse.getMessage(), containsString("shut down"));
170             schedulerService.assertState(2, 1, 1, SHUTDOWN);
171         }
172 
173         try {
174             schedulerService.standby();
175         } catch (SchedulerServiceException sse) {
176             assertThat(sse.getMessage(), containsString("shut down"));
177             schedulerService.assertState(2, 1, 1, SHUTDOWN);
178         }
179     }
180 }