View Javadoc

1   package com.atlassian.messagequeue.internal.sqs;
2   
3   import static org.hamcrest.CoreMatchers.is;
4   import static org.junit.Assert.assertThat;
5   import static org.mockito.Mockito.*;
6   
7   import com.amazonaws.services.sqs.AmazonSQSClient;
8   import com.amazonaws.services.sqs.model.MessageNotInflightException;
9   import org.hamcrest.CoreMatchers;
10  import org.hamcrest.Matchers;
11  import org.junit.After;
12  import org.junit.Assert;
13  import org.junit.Before;
14  import org.junit.Test;
15  import org.mockito.Mock;
16  import org.junit.Rule;
17  import org.mockito.junit.MockitoJUnit;
18  import org.mockito.junit.MockitoRule;
19  
20  import java.util.concurrent.Future;
21  
22  public class SQSMessageVisibilityTimeoutManagerTest {
23      @Rule
24      public MockitoRule rule = MockitoJUnit.rule();
25  
26      @Mock
27      AmazonSQSClient sqsClient;
28      @Mock
29      SQSConsumerQueueConfig queueConfig;
30      private SQSMessageVisibilityTimeoutManager manager;
31      private int baseVisibilityTimeoutSeconds;
32  
33      @Before
34      public void setUp() throws Exception {
35          baseVisibilityTimeoutSeconds = 1;
36          manager = new SQSMessageVisibilityTimeoutManager(sqsClient, 1);
37          when(queueConfig.getVisibilityExtensionPeriod()).thenReturn(1);
38      }
39  
40      @Test
41      public void stopExtendingVisibilityTimeoutOnException() throws Exception {
42          doThrow(new MessageNotInflightException("message not in flight")).when(sqsClient).changeMessageVisibility(anyString(), anyString(), anyInt());
43  
44          final Future<?> future = manager.scheduleVisibilityTimeoutExtension(queueConfig, "receiptHandle");
45  
46          verify(sqsClient, after(baseVisibilityTimeoutSeconds * 1000 * 2).times(1)).changeMessageVisibility(anyString(), anyString(), anyInt());
47          assertThat(future.isDone(), is(true));
48          assertThat(manager.visibilityTimeoutExtensionWorkerPool.getQueue(), Matchers.empty());
49      }
50  
51      @Test
52      public void visibilityTimeoutExtensionRecurs() throws Exception {
53          manager.scheduleVisibilityTimeoutExtension(queueConfig, "receiptHandle");
54  
55          // Within 2500ms, the extension task is enabled after a delay of 1000ms and then again at 2000ms producing 2 wanted invocations
56          final int wantedNumberOfInvocations = 2;
57          verify(sqsClient, after(2500).times(wantedNumberOfInvocations)).changeMessageVisibility(anyString(), anyString(), anyInt());
58      }
59  
60      @After
61      public void tearDown() throws Exception {
62          manager.shutdown();
63      }
64  }