View Javadoc

1   package com.atlassian.messagequeue.internal.sqs;
2   
3   import com.amazonaws.services.sqs.model.SendMessageRequest;
4   import com.atlassian.messagequeue.Message;
5   import com.atlassian.messagequeue.MessageRunnerKey;
6   import com.atlassian.messagequeue.internal.core.NestedMessage;
7   import com.atlassian.messagequeue.registry.MessageContext;
8   import org.junit.Test;
9   import org.junit.runner.RunWith;
10  import org.mockito.runners.MockitoJUnitRunner;
11  
12  import javax.annotation.Nullable;
13  import java.util.ArrayList;
14  import java.util.Collections;
15  import java.util.List;
16  import java.util.Set;
17  
18  import static org.hamcrest.core.Is.is;
19  import static org.junit.Assert.assertEquals;
20  import static org.junit.Assert.assertThat;
21  import static org.mockito.Matchers.any;
22  import static org.mockito.Mockito.after;
23  import static org.mockito.Mockito.times;
24  import static org.mockito.Mockito.verify;
25  
26  @RunWith(MockitoJUnitRunner.class)
27  public class MultipleQueuesSQSMessageRunnerServiceTest extends AbstractSQSMessageRunnerServiceTest {
28      private static final String QUEUE_NAME_KANGAROO = "kangarooQueue";
29      private static final String QUEUE_NAME_KOALA = "koalaQueue";
30      private static final MessageRunnerKey MESSAGE_RUNNER_KEY_MILK = MessageRunnerKey.of("milk");
31      private static final MessageRunnerKey MESSAGE_RUNNER_KEY_BAMBOO = MessageRunnerKey.of("bamboo");
32  
33      private String queueUrlKangaroo;
34      private String queueUrlKoala;
35  
36      @Override
37      public void setUp() throws Exception {
38          super.setUp();
39  
40          // Create 2 test queues
41          sqsClient.createQueue(QUEUE_NAME_KANGAROO);
42          queueUrlKangaroo = sqsClient.getQueueUrl(QUEUE_NAME_KANGAROO).getQueueUrl();
43          sqsClient.createQueue(QUEUE_NAME_KOALA);
44          queueUrlKoala = sqsClient.getQueueUrl(QUEUE_NAME_KOALA).getQueueUrl();
45  
46          // Register 2 message runners
47          registryService.registerMessageRunner(MESSAGE_RUNNER_KEY_MILK, messageRunner);
48          registryService.registerMessageRunner(MESSAGE_RUNNER_KEY_BAMBOO, messageRunner);
49      }
50  
51      @Test
52      public void messagesAreSentToCorrectQueues() {
53          // Producer: milk -> kangaroo, else -> koala
54          // Consumer: nothing
55          messageRunnerService = withQueueConfigs(defaultMessageRunnerKeyToProducerMapper(), Collections.emptySet());
56  
57          messageRunnerService.addMessage(Message.create(MESSAGE_RUNNER_KEY_MILK, "m"));
58          messageRunnerService.addMessage(Message.create(MESSAGE_RUNNER_KEY_BAMBOO, "b"));
59          messageRunnerService.addMessage(Message.create(MESSAGE_RUNNER_KEY, "t"));
60  
61          verify(spyingSqsClient, times(3)).sendMessage(sendMessageRequestArgumentCaptor.capture());
62          List<SendMessageRequest> requests = sendMessageRequestArgumentCaptor.getAllValues();
63          // kangaroo queue
64          NestedMessage nestedMessage = nestedMessageSerializer.deserialize(requests.get(0).getMessageBody());
65          assertThat(requests.get(0).getQueueUrl(), is(queueUrlKangaroo));
66          assertThat(nestedMessage.getPayload(), is("m"));
67  
68          // koala queue
69          nestedMessage = nestedMessageSerializer.deserialize(requests.get(1).getMessageBody());
70          assertThat(requests.get(1).getQueueUrl(), is(queueUrlKoala));
71          assertThat(nestedMessage.getPayload(), is("b"));
72  
73          // default, koala queue
74          nestedMessage = nestedMessageSerializer.deserialize(requests.get(2).getMessageBody());
75          assertThat(requests.get(2).getQueueUrl(), is(queueUrlKoala));
76          assertThat(nestedMessage.getPayload(), is("t"));
77      }
78  
79      @Test(expected = NullPointerException.class)
80      public void failImmediatelyIfNoProducerQueueConfig() {
81          withQueueConfigs(null, Collections.emptySet());
82      }
83  
84      @Test(expected = NullPointerException.class)
85      public void failImmediatelyIfNoConsumerQueueConfig() {
86          withQueueConfigs(getDefaultMessageRunnerKeyToProducerMapper(), null);
87      }
88  
89      @Test
90      public void onlyMatchedMessageIsConsumed() {
91          // Listen to queue kangaroo only
92          messageRunnerService = withQueueConfigs(defaultMessageRunnerKeyToProducerMapper(),
93                  Collections.singleton(newConsumerQueueConfig(QUEUE_NAME_KANGAROO, queueUrlKangaroo)));
94          messageRunnerService.initialiseMessageConsumers();
95  
96          // Target queue kangaroo, should be consumed
97          messageRunnerService.addMessage(Message.create(MESSAGE_RUNNER_KEY_MILK, "m"));
98          // Target default queue, should be ignored
99          messageRunnerService.addMessage(Message.create(MESSAGE_RUNNER_KEY_BAMBOO, "b"));
100 
101         verify(spyingSqsClient, times(2)).sendMessage(sendMessageRequestArgumentCaptor.capture());
102         List<SendMessageRequest> requests = sendMessageRequestArgumentCaptor.getAllValues();
103         assertThat(requests.get(0).getQueueUrl(), is(queueUrlKangaroo));
104         assertThat(requests.get(1).getQueueUrl(), is(queueUrlKoala));
105         verify(messageRunner, after(1000).times(1)).processMessage(any(MessageContext.class));
106         List<String> processedPayloads = new ArrayList<>();
107         payloads.drainTo(processedPayloads);
108         assertEquals(processedPayloads.size(), 1);
109         assertThat(processedPayloads.get(0), is("m"));
110     }
111 
112     private SQSMessageRunnerService withQueueConfigs(@Nullable SQSMessageRunnerKeyToProducerMapper messageRunnerKeyToProducerMapper,
113                                                      @Nullable Set<SQSConsumerQueueConfig> consumerQueueConfigs) {
114         return SQSMessageRunnerService.newBuilder(messageRunnerKeyToProducerMapper, consumerQueueConfigs)
115                 .withAmazonSQSClient(spyingSqsClient)
116                 .withReceiveWaitTimeSeconds(RECEIVE_WAIT_TIME_SECONDS)
117                 .withMessageRunnerRegistryHelper(registryService)
118                 .withTenantIdSetter(tenantIdSetter)
119                 .withMessageInformationService(new SQSMessageInformationService(messageRunnerKeyToProducerMapper, tenantContextProvider, nestedMessageSerializer, tenantDataIdSupplier))
120                 .withNestedMessageSerializer(nestedMessageSerializer)
121                 .withSqsMessageVisibilityTimeoutManager(sqsMessageVisibilityTimeoutManager)
122                 .withMessageValidatorRegistryHelper(messageValidatorRegistry).build();
123 
124     }
125 
126     private SQSConsumerQueueConfig newConsumerQueueConfig(String queueName, String queueUrl) {
127         return new SQSConsumerQueueConfig() {
128 
129             @Override
130             public String getQueueName() {
131                 return queueName;
132             }
133 
134             @Override
135             public String getQueueUrl() {
136                 return queueUrl;
137             }
138 
139             @Override
140             public int getCorePoolSize() {
141                 return 2;
142             }
143 
144             @Override
145             public int getMaxPoolSize() {
146                 return 2;
147             }
148 
149             @Override
150             public int getVisibilityExtensionPeriod() {
151                 return 15;
152             }
153         };
154     }
155 
156     private SQSMessageRunnerKeyToProducerMapper defaultMessageRunnerKeyToProducerMapper() {
157         return messageRunnerKey -> {
158             if (messageRunnerKey.equals(MESSAGE_RUNNER_KEY_MILK)) {
159                 return new SQSProducerQueueConfig() {
160                     @Override
161                     public String getQueueName() {
162                         return QUEUE_NAME_KANGAROO;
163                     }
164 
165                     @Override
166                     public String getQueueUrl() {
167                         return queueUrlKangaroo;
168                     }
169                 };
170             }
171             return new SQSProducerQueueConfig() {
172                 @Override
173                 public String getQueueName() {
174                     return QUEUE_NAME_KOALA;
175                 }
176 
177                 @Override
178                 public String getQueueUrl() {
179                     return queueUrlKoala;
180                 }
181             };
182         };
183     }
184 }