View Javadoc

1   package com.atlassian.messagequeue.internal.sqs;
2   
3   import static org.mockito.Mockito.*;
4   
5   import com.amazonaws.services.sqs.AmazonSQSClient;
6   import com.amazonaws.services.sqs.model.ReceiptHandleIsInvalidException;
7   import com.atlassian.messagequeue.MessageAcknowledgementException;
8   import org.junit.Before;
9   import org.junit.Test;
10  import org.junit.runner.RunWith;
11  import org.mockito.Mock;
12  import org.mockito.runners.MockitoJUnitRunner;
13  
14  import java.util.concurrent.CountDownLatch;
15  import java.util.concurrent.CyclicBarrier;
16  import java.util.concurrent.Future;
17  import java.util.concurrent.TimeUnit;
18  import java.util.concurrent.atomic.AtomicBoolean;
19  
20  @RunWith(MockitoJUnitRunner.class)
21  public class SQSMessageContextTest {
22  
23      private SQSMessageContext messageContext;
24      private String queueUrl;
25      private String receiptHandle;
26      private AmazonSQSClient amazonSQSClient;
27  
28      @Mock
29      Future<?> visibilityTimeoutExtensionFuture;
30  
31      @Before
32      public void setUp() throws Exception {
33          amazonSQSClient = mock(AmazonSQSClient.class);
34          queueUrl = "queueUrl";
35          receiptHandle = "receiptHandle";
36          messageContext = new SQSMessageContext("messageId", "payload", amazonSQSClient, queueUrl, receiptHandle, new AtomicBoolean(false), visibilityTimeoutExtensionFuture);
37      }
38  
39      @Test
40      public void acknowledgeIsThreadSafe() throws Exception {
41          int numberOfThreads = 10;
42          CyclicBarrier barrier = new CyclicBarrier(numberOfThreads);
43          CountDownLatch latch = new CountDownLatch(numberOfThreads);
44  
45          for (int i = 0; i < numberOfThreads; i++) {
46              new Thread(() -> {
47                  try {
48                      barrier.await(1, TimeUnit.SECONDS);
49                  } catch (Exception e) {
50                      throw new RuntimeException(e);
51                  }
52                  messageContext.acknowledge();
53                  latch.countDown();
54              }).start();
55          }
56  
57          latch.await(1, TimeUnit.SECONDS);
58          verify(amazonSQSClient, times(1)).deleteMessage(queueUrl, receiptHandle);
59      }
60  
61      @Test
62      public void acknowledgeShouldCancelExtensionOfVisibilityTimeout() throws Exception {
63          messageContext.acknowledge();
64  
65          verify(visibilityTimeoutExtensionFuture).cancel(anyBoolean());
66      }
67  
68      @Test(expected = MessageAcknowledgementException.class)
69      public void throwExceptionIfReceiptIsInvalid() throws Exception {
70          doThrow(ReceiptHandleIsInvalidException.class).when(amazonSQSClient).deleteMessage(queueUrl, receiptHandle);
71  
72          messageContext.acknowledge();
73      }
74  
75      @Test(expected = IllegalArgumentException.class)
76      public void changeMessageProcessingTimeoutWithInvalidTimeoutValue() throws Exception {
77          messageContext.changeMessageProcessingTimeout(SQSMessageContext.TWELVE_HOURS_AS_SECONDS + 1, TimeUnit.SECONDS);
78      }
79  }