View Javadoc

1   package com.atlassian.messagequeue.internal.sqs;
2   
3   import com.amazonaws.auth.BasicAWSCredentials;
4   import com.amazonaws.services.sqs.AmazonSQS;
5   import com.amazonaws.services.sqs.AmazonSQSClient;
6   import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
7   import com.amazonaws.services.sqs.model.SendMessageRequest;
8   import com.atlassian.messagequeue.MessageInformationService;
9   import com.atlassian.messagequeue.MessageRunnerKey;
10  import com.atlassian.messagequeue.TenantDataIdSupplier;
11  import com.atlassian.messagequeue.internal.core.DefaultMessageRunnerRegistryService;
12  import com.atlassian.messagequeue.internal.core.DefaultMessageValidatorRegistryService;
13  import com.atlassian.messagequeue.internal.core.JacksonNestedMessageSerializer;
14  import com.atlassian.messagequeue.internal.core.NestedMessageSerializer;
15  import com.atlassian.messagequeue.registry.MessageContext;
16  import com.atlassian.messagequeue.registry.MessageRunner;
17  import com.atlassian.tenant.api.TenantContext;
18  import com.atlassian.tenant.api.TenantContextProvider;
19  import com.atlassian.tenant.impl.TenantIdSetter;
20  import com.atlassian.workcontext.api.WorkContextDoorway;
21  import org.elasticmq.rest.sqs.SQSRestServer;
22  import org.elasticmq.rest.sqs.SQSRestServerBuilder;
23  import org.junit.After;
24  import org.junit.Before;
25  import org.mockito.ArgumentCaptor;
26  import org.mockito.Captor;
27  import org.mockito.Mock;
28  
29  import java.util.Collections;
30  import java.util.Queue;
31  import java.util.Set;
32  import java.util.concurrent.BlockingQueue;
33  import java.util.concurrent.LinkedBlockingQueue;
34  
35  import static java.util.Objects.requireNonNull;
36  import static org.mockito.Matchers.any;
37  import static org.mockito.Mockito.mock;
38  import static org.mockito.Mockito.spy;
39  import static org.mockito.Mockito.when;
40  
41  public class AbstractSQSMessageRunnerServiceTest {
42      protected static final MessageRunnerKey MESSAGE_RUNNER_KEY = MessageRunnerKey.of("testMessageRunner");
43      protected static final int VISIBILITY_TIMEOUT_SECONDS = 10;
44      protected static final int RECEIVE_WAIT_TIME_SECONDS = 0;
45      protected static final String QUEUE_NAME = "testQueue";
46      protected static final int CONCURRENT_CONSUMERS = 1;
47      protected static final String TENANT_ID = "tenant-123";
48      protected static final int VERIFY_TIMEOUT_MILLIS = 1000;
49  
50      protected SQSMessageRunnerService messageRunnerService;
51      protected BlockingQueue<String> payloads;
52      protected SQSRestServer sqsServer;
53      protected AmazonSQS sqsClient;
54      protected AmazonSQS spyingSqsClient;
55      protected WorkContextDoorway workContextDoorway;
56  
57      @Mock
58      protected TenantContextProvider tenantContextProvider;
59      @Mock
60      protected TenantIdSetter tenantIdSetter;
61      @Mock
62      protected SQSMessageRunnerKeyToProducerMapper messageRunnerKeyToProducerMapper;
63  
64      @Captor
65      protected ArgumentCaptor<ReceiveMessageRequest> receiveMessageRequestArgumentCaptor;
66      @Captor
67      protected ArgumentCaptor<SendMessageRequest> sendMessageRequestArgumentCaptor;
68      @Captor
69      protected ArgumentCaptor<MessageContext> messageContextArgumentCaptor;
70  
71      protected MessageRunner messageRunner;
72      protected DefaultMessageRunnerRegistryService registryService;
73      protected MessageInformationService messageInformationService;
74      protected String queueUrl;
75      protected NestedMessageSerializer nestedMessageSerializer;
76      protected TenantContext.Builder tenantContextBuilder;
77      protected SQSMessageVisibilityTimeoutManager sqsMessageVisibilityTimeoutManager;
78      String tenantDataId = null;
79      protected final TenantDataIdSupplier tenantDataIdSupplier = () -> tenantDataId;
80      protected DefaultMessageValidatorRegistryService messageValidatorRegistry;
81  
82      @Before
83      public void setUp() throws Exception {
84          sqsServer = SQSRestServerBuilder.start();
85  
86          String endpoint = "http://localhost:9324";
87  
88          sqsClient = new AmazonSQSClient(new BasicAWSCredentials("x", "x")).withEndpoint(endpoint);
89          sqsClient.createQueue(QUEUE_NAME);
90          queueUrl = sqsClient.getQueueUrl(QUEUE_NAME).getQueueUrl();
91          spyingSqsClient = spy(sqsClient);
92  
93          nestedMessageSerializer = spy(new JacksonNestedMessageSerializer());
94          registryService = spy(new DefaultMessageRunnerRegistryService());
95          messageValidatorRegistry = new DefaultMessageValidatorRegistryService();
96          messageInformationService = spy(new SQSMessageInformationService(getDefaultMessageRunnerKeyToProducerMapper(), tenantContextProvider, nestedMessageSerializer, tenantDataIdSupplier));
97          sqsMessageVisibilityTimeoutManager = new SQSMessageVisibilityTimeoutManager(spyingSqsClient, 2);
98  
99          messageRunnerService = SQSMessageRunnerService.newBuilder(getDefaultMessageRunnerKeyToProducerMapper(), getDefaultConsumerQueueConfig())
100                 .withAmazonSQSClient(spyingSqsClient)
101                 .withReceiveWaitTimeSeconds(RECEIVE_WAIT_TIME_SECONDS)
102                 .withMessageRunnerRegistryHelper(registryService)
103                 .withTenantIdSetter(tenantIdSetter)
104                 .withMessageInformationService(messageInformationService)
105                 .withNestedMessageSerializer(nestedMessageSerializer)
106                 .withSqsMessageVisibilityTimeoutManager(sqsMessageVisibilityTimeoutManager)
107                 .withMessageValidatorRegistryHelper(messageValidatorRegistry)
108                 .build();
109 
110         payloads = new LinkedBlockingQueue<>();
111 
112         messageRunner = spy(new DefaultMessageRunner(payloads));
113         registryService.registerMessageRunner(MESSAGE_RUNNER_KEY, messageRunner);
114 
115         tenantContextBuilder = new TenantContext.Builder()
116                 .jdbcUrl("jdbcUrl")
117                 .jdbcUsername("username")
118                 .jdbcPassword("password");
119         when(tenantContextProvider.getTenantContext()).thenReturn(tenantContextBuilder.tenantId(TENANT_ID).build());
120 
121         tenantDataId = "1";
122 
123         workContextDoorway = new WorkContextDoorway();
124         workContextDoorway.open();
125     }
126 
127     @After
128     public void tearDown() throws Exception {
129         workContextDoorway.close();
130         messageRunnerService.shutdown();
131         sqsServer.stopAndWait();
132     }
133 
134     protected SQSMessageRunnerKeyToProducerMapper getDefaultMessageRunnerKeyToProducerMapper() {
135         SQSMessageRunnerKeyToProducerMapper result = mock(SQSMessageRunnerKeyToProducerMapper.class);
136         SQSProducerQueueConfig producerQueueConfig = mock(SQSProducerQueueConfig.class);
137         when(producerQueueConfig.getQueueName()).thenReturn(QUEUE_NAME);
138         when(producerQueueConfig.getQueueUrl()).thenReturn(queueUrl);
139         when(result.getQueueConfigForMessageRunner(any(MessageRunnerKey.class))).thenReturn(producerQueueConfig);
140         return result;
141     }
142 
143     protected Set<SQSConsumerQueueConfig> getDefaultConsumerQueueConfig() {
144         return getDefaultConsumerQueueConfig(VISIBILITY_TIMEOUT_SECONDS);
145     }
146 
147     protected Set<SQSConsumerQueueConfig> getDefaultConsumerQueueConfig(int visibilityExtensionSeconds) {
148         SQSConsumerQueueConfig result = mock(SQSConsumerQueueConfig.class);
149         when(result.getQueueName()).thenReturn(QUEUE_NAME);
150         when(result.getQueueUrl()).thenReturn(queueUrl);
151         when(result.getCorePoolSize()).thenReturn(2);
152         when(result.getMaxPoolSize()).thenReturn(2);
153         when(result.getVisibilityExtensionPeriod()).thenReturn(visibilityExtensionSeconds);
154         return Collections.singleton(result);
155     }
156 
157     protected static class DelegatingMessageRunner implements MessageRunner {
158         private final MessageRunner delegate;
159 
160         public DelegatingMessageRunner(MessageRunner delegate) {
161             this.delegate = delegate;
162         }
163 
164         @Override
165         public void processMessage(MessageContext context) {
166             delegate.processMessage(context);
167         }
168     }
169 
170     protected static class DefaultMessageRunner implements MessageRunner {
171         private final Queue<String> payloads;
172 
173         public DefaultMessageRunner(Queue<String> payloads) {
174             this.payloads = requireNonNull(payloads);
175         }
176 
177         @Override
178         public void processMessage(MessageContext messageContext) {
179             payloads.offer(messageContext.getPayload().orElse(null));
180         }
181     }
182 }