1 package com.atlassian.messagequeue.internal.sqs;
2
3 import com.amazonaws.services.sqs.model.SendMessageRequest;
4 import com.atlassian.messagequeue.Message;
5 import com.atlassian.messagequeue.MessageRunnerKey;
6 import com.atlassian.messagequeue.internal.core.NestedMessage;
7 import com.atlassian.messagequeue.registry.MessageContext;
8 import org.junit.Test;
9 import org.junit.runner.RunWith;
10 import org.mockito.runners.MockitoJUnitRunner;
11
12 import javax.annotation.Nullable;
13 import java.util.ArrayList;
14 import java.util.Collections;
15 import java.util.List;
16 import java.util.Set;
17
18 import static org.hamcrest.core.Is.is;
19 import static org.junit.Assert.assertEquals;
20 import static org.junit.Assert.assertThat;
21 import static org.mockito.Matchers.any;
22 import static org.mockito.Mockito.after;
23 import static org.mockito.Mockito.times;
24 import static org.mockito.Mockito.verify;
25
26 @RunWith(MockitoJUnitRunner.class)
27 public class MultipleQueuesSQSMessageRunnerServiceTest extends AbstractSQSMessageRunnerServiceTest {
28 private static final String QUEUE_NAME_KANGAROO = "kangarooQueue";
29 private static final String QUEUE_NAME_KOALA = "koalaQueue";
30 private static final MessageRunnerKey MESSAGE_RUNNER_KEY_MILK = MessageRunnerKey.of("milk");
31 private static final MessageRunnerKey MESSAGE_RUNNER_KEY_BAMBOO = MessageRunnerKey.of("bamboo");
32
33 private String queueUrlKangaroo;
34 private String queueUrlKoala;
35
36 @Override
37 public void setUp() throws Exception {
38 super.setUp();
39
40
41 sqsClient.createQueue(QUEUE_NAME_KANGAROO);
42 queueUrlKangaroo = sqsClient.getQueueUrl(QUEUE_NAME_KANGAROO).getQueueUrl();
43 sqsClient.createQueue(QUEUE_NAME_KOALA);
44 queueUrlKoala = sqsClient.getQueueUrl(QUEUE_NAME_KOALA).getQueueUrl();
45
46
47 registryService.registerMessageRunner(MESSAGE_RUNNER_KEY_MILK, messageRunner);
48 registryService.registerMessageRunner(MESSAGE_RUNNER_KEY_BAMBOO, messageRunner);
49 }
50
51 @Test
52 public void messagesAreSentToCorrectQueues() {
53
54
55 messageRunnerService = withQueueConfigs(defaultMessageRunnerKeyToProducerMapper(), Collections.emptySet());
56
57 messageRunnerService.addMessage(Message.create(MESSAGE_RUNNER_KEY_MILK, "m"));
58 messageRunnerService.addMessage(Message.create(MESSAGE_RUNNER_KEY_BAMBOO, "b"));
59 messageRunnerService.addMessage(Message.create(MESSAGE_RUNNER_KEY, "t"));
60
61 verify(spyingSqsClient, times(3)).sendMessage(sendMessageRequestArgumentCaptor.capture());
62 List<SendMessageRequest> requests = sendMessageRequestArgumentCaptor.getAllValues();
63
64 NestedMessage nestedMessage = nestedMessageSerializer.deserialize(requests.get(0).getMessageBody());
65 assertThat(requests.get(0).getQueueUrl(), is(queueUrlKangaroo));
66 assertThat(nestedMessage.getPayload(), is("m"));
67
68
69 nestedMessage = nestedMessageSerializer.deserialize(requests.get(1).getMessageBody());
70 assertThat(requests.get(1).getQueueUrl(), is(queueUrlKoala));
71 assertThat(nestedMessage.getPayload(), is("b"));
72
73
74 nestedMessage = nestedMessageSerializer.deserialize(requests.get(2).getMessageBody());
75 assertThat(requests.get(2).getQueueUrl(), is(queueUrlKoala));
76 assertThat(nestedMessage.getPayload(), is("t"));
77 }
78
79 @Test(expected = NullPointerException.class)
80 public void failImmediatelyIfNoProducerQueueConfig() {
81 withQueueConfigs(null, Collections.emptySet());
82 }
83
84 @Test(expected = NullPointerException.class)
85 public void failImmediatelyIfNoConsumerQueueConfig() {
86 withQueueConfigs(getDefaultMessageRunnerKeyToProducerMapper(), null);
87 }
88
89 @Test
90 public void onlyMatchedMessageIsConsumed() {
91
92 messageRunnerService = withQueueConfigs(defaultMessageRunnerKeyToProducerMapper(),
93 Collections.singleton(newConsumerQueueConfig(QUEUE_NAME_KANGAROO, queueUrlKangaroo)));
94 messageRunnerService.initialiseMessageConsumers();
95
96
97 messageRunnerService.addMessage(Message.create(MESSAGE_RUNNER_KEY_MILK, "m"));
98
99 messageRunnerService.addMessage(Message.create(MESSAGE_RUNNER_KEY_BAMBOO, "b"));
100
101 verify(spyingSqsClient, times(2)).sendMessage(sendMessageRequestArgumentCaptor.capture());
102 List<SendMessageRequest> requests = sendMessageRequestArgumentCaptor.getAllValues();
103 assertThat(requests.get(0).getQueueUrl(), is(queueUrlKangaroo));
104 assertThat(requests.get(1).getQueueUrl(), is(queueUrlKoala));
105 verify(messageRunner, after(1000).times(1)).processMessage(any(MessageContext.class));
106 List<String> processedPayloads = new ArrayList<>();
107 payloads.drainTo(processedPayloads);
108 assertEquals(processedPayloads.size(), 1);
109 assertThat(processedPayloads.get(0), is("m"));
110 }
111
112 private SQSMessageRunnerService withQueueConfigs(@Nullable SQSMessageRunnerKeyToProducerMapper messageRunnerKeyToProducerMapper,
113 @Nullable Set<SQSConsumerQueueConfig> consumerQueueConfigs) {
114 return SQSMessageRunnerService.newBuilder(messageRunnerKeyToProducerMapper, consumerQueueConfigs)
115 .withAmazonSQSClient(spyingSqsClient)
116 .withReceiveWaitTimeSeconds(RECEIVE_WAIT_TIME_SECONDS)
117 .withMessageRunnerRegistryHelper(registryService)
118 .withTenantIdSetter(tenantIdSetter)
119 .withMessageInformationService(new SQSMessageInformationService(messageRunnerKeyToProducerMapper, tenantContextProvider, nestedMessageSerializer, tenantDataIdSupplier))
120 .withNestedMessageSerializer(nestedMessageSerializer)
121 .withSqsMessageVisibilityTimeoutManager(sqsMessageVisibilityTimeoutManager)
122 .withMessageValidatorRegistryHelper(messageValidatorRegistry).build();
123
124 }
125
126 private SQSConsumerQueueConfig newConsumerQueueConfig(String queueName, String queueUrl) {
127 return new SQSConsumerQueueConfig() {
128
129 @Override
130 public String getQueueName() {
131 return queueName;
132 }
133
134 @Override
135 public String getQueueUrl() {
136 return queueUrl;
137 }
138
139 @Override
140 public int getCorePoolSize() {
141 return 2;
142 }
143
144 @Override
145 public int getMaxPoolSize() {
146 return 2;
147 }
148
149 @Override
150 public int getVisibilityExtensionPeriod() {
151 return 15;
152 }
153 };
154 }
155
156 private SQSMessageRunnerKeyToProducerMapper defaultMessageRunnerKeyToProducerMapper() {
157 return messageRunnerKey -> {
158 if (messageRunnerKey.equals(MESSAGE_RUNNER_KEY_MILK)) {
159 return new SQSProducerQueueConfig() {
160 @Override
161 public String getQueueName() {
162 return QUEUE_NAME_KANGAROO;
163 }
164
165 @Override
166 public String getQueueUrl() {
167 return queueUrlKangaroo;
168 }
169 };
170 }
171 return new SQSProducerQueueConfig() {
172 @Override
173 public String getQueueName() {
174 return QUEUE_NAME_KOALA;
175 }
176
177 @Override
178 public String getQueueUrl() {
179 return queueUrlKoala;
180 }
181 };
182 };
183 }
184 }