1 package com.atlassian.messagequeue.internal.sqs;
2
3 import static org.mockito.Mockito.*;
4
5 import com.amazonaws.services.sqs.AmazonSQSClient;
6 import com.amazonaws.services.sqs.model.ReceiptHandleIsInvalidException;
7 import com.atlassian.messagequeue.MessageAcknowledgementException;
8 import org.junit.Before;
9 import org.junit.Test;
10 import org.junit.runner.RunWith;
11 import org.mockito.Mock;
12 import org.mockito.runners.MockitoJUnitRunner;
13
14 import java.util.concurrent.CountDownLatch;
15 import java.util.concurrent.CyclicBarrier;
16 import java.util.concurrent.Future;
17 import java.util.concurrent.TimeUnit;
18 import java.util.concurrent.atomic.AtomicBoolean;
19
20 @RunWith(MockitoJUnitRunner.class)
21 public class SQSMessageContextTest {
22
23 private SQSMessageContext messageContext;
24 private String queueUrl;
25 private String receiptHandle;
26 private AmazonSQSClient amazonSQSClient;
27
28 @Mock
29 Future<?> visibilityTimeoutExtensionFuture;
30
31 @Before
32 public void setUp() throws Exception {
33 amazonSQSClient = mock(AmazonSQSClient.class);
34 queueUrl = "queueUrl";
35 receiptHandle = "receiptHandle";
36 messageContext = new SQSMessageContext("messageId", "payload", amazonSQSClient, queueUrl, receiptHandle, new AtomicBoolean(false), visibilityTimeoutExtensionFuture);
37 }
38
39 @Test
40 public void acknowledgeIsThreadSafe() throws Exception {
41 int numberOfThreads = 10;
42 CyclicBarrier barrier = new CyclicBarrier(numberOfThreads);
43 CountDownLatch latch = new CountDownLatch(numberOfThreads);
44
45 for (int i = 0; i < numberOfThreads; i++) {
46 new Thread(() -> {
47 try {
48 barrier.await(1, TimeUnit.SECONDS);
49 } catch (Exception e) {
50 throw new RuntimeException(e);
51 }
52 messageContext.acknowledge();
53 latch.countDown();
54 }).start();
55 }
56
57 latch.await(1, TimeUnit.SECONDS);
58 verify(amazonSQSClient, times(1)).deleteMessage(queueUrl, receiptHandle);
59 }
60
61 @Test
62 public void acknowledgeShouldCancelExtensionOfVisibilityTimeout() throws Exception {
63 messageContext.acknowledge();
64
65 verify(visibilityTimeoutExtensionFuture).cancel(anyBoolean());
66 }
67
68 @Test(expected = MessageAcknowledgementException.class)
69 public void throwExceptionIfReceiptIsInvalid() throws Exception {
70 doThrow(ReceiptHandleIsInvalidException.class).when(amazonSQSClient).deleteMessage(queueUrl, receiptHandle);
71
72 messageContext.acknowledge();
73 }
74
75 @Test(expected = IllegalArgumentException.class)
76 public void changeMessageProcessingTimeoutWithInvalidTimeoutValue() throws Exception {
77 messageContext.changeMessageProcessingTimeout(SQSMessageContext.TWELVE_HOURS_AS_SECONDS + 1, TimeUnit.SECONDS);
78 }
79 }