1 package com.atlassian.messagequeue.internal.sqs;
2
3 import com.amazonaws.auth.BasicAWSCredentials;
4 import com.amazonaws.services.sqs.AmazonSQS;
5 import com.amazonaws.services.sqs.AmazonSQSClient;
6 import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
7 import com.amazonaws.services.sqs.model.SendMessageRequest;
8 import com.atlassian.messagequeue.MessageInformationService;
9 import com.atlassian.messagequeue.MessageRunnerKey;
10 import com.atlassian.messagequeue.TenantDataIdSupplier;
11 import com.atlassian.messagequeue.internal.core.DefaultMessageRunnerRegistryService;
12 import com.atlassian.messagequeue.internal.core.DefaultMessageValidatorRegistryService;
13 import com.atlassian.messagequeue.internal.core.JacksonNestedMessageSerializer;
14 import com.atlassian.messagequeue.internal.core.NestedMessageSerializer;
15 import com.atlassian.messagequeue.registry.MessageContext;
16 import com.atlassian.messagequeue.registry.MessageRunner;
17 import com.atlassian.tenant.api.TenantContext;
18 import com.atlassian.tenant.api.TenantContextProvider;
19 import com.atlassian.tenant.impl.TenantIdSetter;
20 import com.atlassian.workcontext.api.WorkContextDoorway;
21 import org.elasticmq.rest.sqs.SQSRestServer;
22 import org.elasticmq.rest.sqs.SQSRestServerBuilder;
23 import org.junit.After;
24 import org.junit.Before;
25 import org.mockito.ArgumentCaptor;
26 import org.mockito.Captor;
27 import org.mockito.Mock;
28
29 import java.util.Collections;
30 import java.util.Queue;
31 import java.util.Set;
32 import java.util.concurrent.BlockingQueue;
33 import java.util.concurrent.LinkedBlockingQueue;
34
35 import static java.util.Objects.requireNonNull;
36 import static org.mockito.Matchers.any;
37 import static org.mockito.Mockito.mock;
38 import static org.mockito.Mockito.spy;
39 import static org.mockito.Mockito.when;
40
41 public class AbstractSQSMessageRunnerServiceTest {
42 protected static final MessageRunnerKey MESSAGE_RUNNER_KEY = MessageRunnerKey.of("testMessageRunner");
43 protected static final int VISIBILITY_TIMEOUT_SECONDS = 10;
44 protected static final int RECEIVE_WAIT_TIME_SECONDS = 0;
45 protected static final String QUEUE_NAME = "testQueue";
46 protected static final int CONCURRENT_CONSUMERS = 1;
47 protected static final String TENANT_ID = "tenant-123";
48 protected static final int VERIFY_TIMEOUT_MILLIS = 1000;
49
50 protected SQSMessageRunnerService messageRunnerService;
51 protected BlockingQueue<String> payloads;
52 protected SQSRestServer sqsServer;
53 protected AmazonSQS sqsClient;
54 protected AmazonSQS spyingSqsClient;
55 protected WorkContextDoorway workContextDoorway;
56
57 @Mock
58 protected TenantContextProvider tenantContextProvider;
59 @Mock
60 protected TenantIdSetter tenantIdSetter;
61 @Mock
62 protected SQSMessageRunnerKeyToProducerMapper messageRunnerKeyToProducerMapper;
63
64 @Captor
65 protected ArgumentCaptor<ReceiveMessageRequest> receiveMessageRequestArgumentCaptor;
66 @Captor
67 protected ArgumentCaptor<SendMessageRequest> sendMessageRequestArgumentCaptor;
68 @Captor
69 protected ArgumentCaptor<MessageContext> messageContextArgumentCaptor;
70
71 protected MessageRunner messageRunner;
72 protected DefaultMessageRunnerRegistryService registryService;
73 protected MessageInformationService messageInformationService;
74 protected String queueUrl;
75 protected NestedMessageSerializer nestedMessageSerializer;
76 protected TenantContext.Builder tenantContextBuilder;
77 protected SQSMessageVisibilityTimeoutManager sqsMessageVisibilityTimeoutManager;
78 String tenantDataId = null;
79 protected final TenantDataIdSupplier tenantDataIdSupplier = () -> tenantDataId;
80 protected DefaultMessageValidatorRegistryService messageValidatorRegistry;
81
82 @Before
83 public void setUp() throws Exception {
84 sqsServer = SQSRestServerBuilder.start();
85
86 String endpoint = "http://localhost:9324";
87
88 sqsClient = new AmazonSQSClient(new BasicAWSCredentials("x", "x")).withEndpoint(endpoint);
89 sqsClient.createQueue(QUEUE_NAME);
90 queueUrl = sqsClient.getQueueUrl(QUEUE_NAME).getQueueUrl();
91 spyingSqsClient = spy(sqsClient);
92
93 nestedMessageSerializer = spy(new JacksonNestedMessageSerializer());
94 registryService = spy(new DefaultMessageRunnerRegistryService());
95 messageValidatorRegistry = new DefaultMessageValidatorRegistryService();
96 messageInformationService = spy(new SQSMessageInformationService(getDefaultMessageRunnerKeyToProducerMapper(), tenantContextProvider, nestedMessageSerializer, tenantDataIdSupplier));
97 sqsMessageVisibilityTimeoutManager = new SQSMessageVisibilityTimeoutManager(spyingSqsClient, 2);
98
99 messageRunnerService = SQSMessageRunnerService.newBuilder(getDefaultMessageRunnerKeyToProducerMapper(), getDefaultConsumerQueueConfig())
100 .withAmazonSQSClient(spyingSqsClient)
101 .withReceiveWaitTimeSeconds(RECEIVE_WAIT_TIME_SECONDS)
102 .withMessageRunnerRegistryHelper(registryService)
103 .withTenantIdSetter(tenantIdSetter)
104 .withMessageInformationService(messageInformationService)
105 .withNestedMessageSerializer(nestedMessageSerializer)
106 .withSqsMessageVisibilityTimeoutManager(sqsMessageVisibilityTimeoutManager)
107 .withMessageValidatorRegistryHelper(messageValidatorRegistry)
108 .build();
109
110 payloads = new LinkedBlockingQueue<>();
111
112 messageRunner = spy(new DefaultMessageRunner(payloads));
113 registryService.registerMessageRunner(MESSAGE_RUNNER_KEY, messageRunner);
114
115 tenantContextBuilder = new TenantContext.Builder()
116 .jdbcUrl("jdbcUrl")
117 .jdbcUsername("username")
118 .jdbcPassword("password");
119 when(tenantContextProvider.getTenantContext()).thenReturn(tenantContextBuilder.tenantId(TENANT_ID).build());
120
121 tenantDataId = "1";
122
123 workContextDoorway = new WorkContextDoorway();
124 workContextDoorway.open();
125 }
126
127 @After
128 public void tearDown() throws Exception {
129 workContextDoorway.close();
130 messageRunnerService.shutdown();
131 sqsServer.stopAndWait();
132 }
133
134 protected SQSMessageRunnerKeyToProducerMapper getDefaultMessageRunnerKeyToProducerMapper() {
135 SQSMessageRunnerKeyToProducerMapper result = mock(SQSMessageRunnerKeyToProducerMapper.class);
136 SQSProducerQueueConfig producerQueueConfig = mock(SQSProducerQueueConfig.class);
137 when(producerQueueConfig.getQueueName()).thenReturn(QUEUE_NAME);
138 when(producerQueueConfig.getQueueUrl()).thenReturn(queueUrl);
139 when(result.getQueueConfigForMessageRunner(any(MessageRunnerKey.class))).thenReturn(producerQueueConfig);
140 return result;
141 }
142
143 protected Set<SQSConsumerQueueConfig> getDefaultConsumerQueueConfig() {
144 return getDefaultConsumerQueueConfig(VISIBILITY_TIMEOUT_SECONDS);
145 }
146
147 protected Set<SQSConsumerQueueConfig> getDefaultConsumerQueueConfig(int visibilityExtensionSeconds) {
148 SQSConsumerQueueConfig result = mock(SQSConsumerQueueConfig.class);
149 when(result.getQueueName()).thenReturn(QUEUE_NAME);
150 when(result.getQueueUrl()).thenReturn(queueUrl);
151 when(result.getCorePoolSize()).thenReturn(2);
152 when(result.getMaxPoolSize()).thenReturn(2);
153 when(result.getVisibilityExtensionPeriod()).thenReturn(visibilityExtensionSeconds);
154 return Collections.singleton(result);
155 }
156
157 protected static class DelegatingMessageRunner implements MessageRunner {
158 private final MessageRunner delegate;
159
160 public DelegatingMessageRunner(MessageRunner delegate) {
161 this.delegate = delegate;
162 }
163
164 @Override
165 public void processMessage(MessageContext context) {
166 delegate.processMessage(context);
167 }
168 }
169
170 protected static class DefaultMessageRunner implements MessageRunner {
171 private final Queue<String> payloads;
172
173 public DefaultMessageRunner(Queue<String> payloads) {
174 this.payloads = requireNonNull(payloads);
175 }
176
177 @Override
178 public void processMessage(MessageContext messageContext) {
179 payloads.offer(messageContext.getPayload().orElse(null));
180 }
181 }
182 }