View Javadoc

1   package com.atlassian.messagequeue.internal.lifecycle;
2   
3   import com.amazonaws.auth.BasicAWSCredentials;
4   import com.amazonaws.services.sqs.AmazonSQSClient;
5   import com.google.common.base.Charsets;
6   import com.google.common.io.Resources;
7   import org.elasticmq.rest.sqs.SQSRestServer;
8   import org.elasticmq.rest.sqs.SQSRestServerBuilder;
9   import org.junit.After;
10  import org.junit.Before;
11  import org.junit.Rule;
12  import org.junit.Test;
13  import org.mockito.Mock;
14  import org.mockito.junit.MockitoJUnit;
15  import org.mockito.junit.MockitoRule;
16  
17  import java.net.URL;
18  import java.util.function.Supplier;
19  
20  import static org.mockito.Mockito.after;
21  import static org.mockito.Mockito.any;
22  import static org.mockito.Mockito.doThrow;
23  import static org.mockito.Mockito.never;
24  import static org.mockito.Mockito.timeout;
25  import static org.mockito.Mockito.verify;
26  import static org.mockito.Mockito.when;
27  
28  public class DefaultInstanceLifecycleNotificationObserverTest {
29      private static final String QUEUE_NAME = "lifecycleEvents";
30  
31      private static final URL INSTANCE_TERMINATING_NOTIFICATION_URL = Resources.getResource("lifecycle-message-instance-terminating.json");
32      private static final URL INSTANCE_TERMINATING_NOTIFICATION_WRONG_SUBJECT_URL = Resources.getResource("lifecycle-message-instance-terminating-wrong-subject.json");
33      private static final URL INSTANCE_ACTIVE_NOTIFICATION_URL = Resources.getResource("lifecycle-message-instance-active.json");
34  
35      @Rule
36      public MockitoRule rule = MockitoJUnit.rule();
37  
38      @Mock
39      InstanceLifecycleListener listener1;
40      @Mock
41      InstanceLifecycleListener listener2;
42      @Mock
43      Supplier<String> ec2InstanceIdSupplier;
44  
45      private DefaultInstanceLifecycleNotificationObserver instanceLifecycleNotificationObserver;
46      private AmazonSQSClient sqsClient;
47      private String queueUrl;
48      private SQSRestServer server;
49  
50      @Before
51      public void setUp() throws Exception {
52          server = SQSRestServerBuilder.start();
53  
54          String endpoint = "http://localhost:9324";
55          sqsClient = new AmazonSQSClient(new BasicAWSCredentials("x", "x")).withEndpoint(endpoint);
56  
57          sqsClient.createQueue(QUEUE_NAME);
58          queueUrl = sqsClient.getQueueUrl(QUEUE_NAME).getQueueUrl();
59  
60          instanceLifecycleNotificationObserver = new DefaultInstanceLifecycleNotificationObserver(queueUrl, sqsClient,
61                  new JacksonNotificationDeserializer(), ec2InstanceIdSupplier, 0, 30);
62          instanceLifecycleNotificationObserver.addInstanceTerminatingListener(listener1);
63          instanceLifecycleNotificationObserver.addInstanceTerminatingListener(listener2);
64          instanceLifecycleNotificationObserver.initialise();
65  
66          when(ec2InstanceIdSupplier.get()).thenReturn("i-0a86cd00c3193cc5c");
67      }
68  
69      @After
70      public void tearDown() throws Exception {
71          instanceLifecycleNotificationObserver.shutdown();
72          server.stopAndWait();
73      }
74  
75      @Test
76      public void listenersAreInvokedOnInstanceTerminatingNotification() throws Exception {
77          sqsClient.sendMessage(queueUrl, Resources.toString(INSTANCE_TERMINATING_NOTIFICATION_URL, Charsets.UTF_8));
78  
79          verify(listener1, timeout(1000)).onInstanceLifecycleNotification(any(InstanceLifecycleContext.class));
80          verify(listener2, timeout(1000)).onInstanceLifecycleNotification(any(InstanceLifecycleContext.class));
81      }
82  
83      @Test
84      public void exceptionInOneListenerDoesNotInterferWithInvocationOfOtherListeners() throws Exception {
85          doThrow(RuntimeException.class).when(listener1).onInstanceLifecycleNotification(any(InstanceLifecycleContext.class));
86  
87          sqsClient.sendMessage(queueUrl, Resources.toString(INSTANCE_TERMINATING_NOTIFICATION_URL, Charsets.UTF_8));
88  
89          verify(listener2, timeout(1000)).onInstanceLifecycleNotification(any(InstanceLifecycleContext.class));
90      }
91  
92      @Test
93      public void listenersAreNotInvokedOnInstanceActiveNotification() throws Exception {
94          sqsClient.sendMessage(queueUrl, Resources.toString(INSTANCE_ACTIVE_NOTIFICATION_URL, Charsets.UTF_8));
95  
96          verify(listener1, after(1000).never()).onInstanceLifecycleNotification(any(InstanceLifecycleContext.class));
97          verify(listener2, never()).onInstanceLifecycleNotification(any(InstanceLifecycleContext.class));
98      }
99  
100     @Test
101     public void listenersAreNotInvokedIfInstanceIdsDontMatch() throws Exception {
102         sqsClient.sendMessage(queueUrl, Resources.toString(INSTANCE_TERMINATING_NOTIFICATION_URL, Charsets.UTF_8));
103         when(ec2InstanceIdSupplier.get()).thenReturn("i-0aae803fbf84ebed7");
104 
105         verify(listener1, after(1000).never()).onInstanceLifecycleNotification(any(InstanceLifecycleContext.class));
106         verify(listener2, never()).onInstanceLifecycleNotification(any(InstanceLifecycleContext.class));
107     }
108 
109     @Test
110     public void listenersAreNotInvokedIfSnsNotificationSubjectIsWrong() throws Exception {
111         sqsClient.sendMessage(queueUrl, Resources.toString(INSTANCE_TERMINATING_NOTIFICATION_WRONG_SUBJECT_URL, Charsets.UTF_8));
112 
113         verify(listener1, after(1000).never()).onInstanceLifecycleNotification(any(InstanceLifecycleContext.class));
114         verify(listener2, never()).onInstanceLifecycleNotification(any(InstanceLifecycleContext.class));
115     }
116 }