1 package com.atlassian.messagequeue.internal.lifecycle;
2
3 import com.amazonaws.auth.BasicAWSCredentials;
4 import com.amazonaws.services.sqs.AmazonSQSClient;
5 import com.google.common.base.Charsets;
6 import com.google.common.io.Resources;
7 import org.elasticmq.rest.sqs.SQSRestServer;
8 import org.elasticmq.rest.sqs.SQSRestServerBuilder;
9 import org.junit.After;
10 import org.junit.Before;
11 import org.junit.Rule;
12 import org.junit.Test;
13 import org.mockito.Mock;
14 import org.mockito.junit.MockitoJUnit;
15 import org.mockito.junit.MockitoRule;
16
17 import java.net.URL;
18 import java.util.function.Supplier;
19
20 import static org.mockito.Mockito.after;
21 import static org.mockito.Mockito.any;
22 import static org.mockito.Mockito.doThrow;
23 import static org.mockito.Mockito.never;
24 import static org.mockito.Mockito.timeout;
25 import static org.mockito.Mockito.verify;
26 import static org.mockito.Mockito.when;
27
28 public class DefaultInstanceLifecycleNotificationObserverTest {
29 private static final String QUEUE_NAME = "lifecycleEvents";
30
31 private static final URL INSTANCE_TERMINATING_NOTIFICATION_URL = Resources.getResource("lifecycle-message-instance-terminating.json");
32 private static final URL INSTANCE_TERMINATING_NOTIFICATION_WRONG_SUBJECT_URL = Resources.getResource("lifecycle-message-instance-terminating-wrong-subject.json");
33 private static final URL INSTANCE_ACTIVE_NOTIFICATION_URL = Resources.getResource("lifecycle-message-instance-active.json");
34
35 @Rule
36 public MockitoRule rule = MockitoJUnit.rule();
37
38 @Mock
39 InstanceLifecycleListener listener1;
40 @Mock
41 InstanceLifecycleListener listener2;
42 @Mock
43 Supplier<String> ec2InstanceIdSupplier;
44
45 private DefaultInstanceLifecycleNotificationObserver instanceLifecycleNotificationObserver;
46 private AmazonSQSClient sqsClient;
47 private String queueUrl;
48 private SQSRestServer server;
49
50 @Before
51 public void setUp() throws Exception {
52 server = SQSRestServerBuilder.start();
53
54 String endpoint = "http://localhost:9324";
55 sqsClient = new AmazonSQSClient(new BasicAWSCredentials("x", "x")).withEndpoint(endpoint);
56
57 sqsClient.createQueue(QUEUE_NAME);
58 queueUrl = sqsClient.getQueueUrl(QUEUE_NAME).getQueueUrl();
59
60 instanceLifecycleNotificationObserver = new DefaultInstanceLifecycleNotificationObserver(queueUrl, sqsClient,
61 new JacksonNotificationDeserializer(), ec2InstanceIdSupplier, 0, 30);
62 instanceLifecycleNotificationObserver.addInstanceTerminatingListener(listener1);
63 instanceLifecycleNotificationObserver.addInstanceTerminatingListener(listener2);
64 instanceLifecycleNotificationObserver.initialise();
65
66 when(ec2InstanceIdSupplier.get()).thenReturn("i-0a86cd00c3193cc5c");
67 }
68
69 @After
70 public void tearDown() throws Exception {
71 instanceLifecycleNotificationObserver.shutdown();
72 server.stopAndWait();
73 }
74
75 @Test
76 public void listenersAreInvokedOnInstanceTerminatingNotification() throws Exception {
77 sqsClient.sendMessage(queueUrl, Resources.toString(INSTANCE_TERMINATING_NOTIFICATION_URL, Charsets.UTF_8));
78
79 verify(listener1, timeout(1000)).onInstanceLifecycleNotification(any(InstanceLifecycleContext.class));
80 verify(listener2, timeout(1000)).onInstanceLifecycleNotification(any(InstanceLifecycleContext.class));
81 }
82
83 @Test
84 public void exceptionInOneListenerDoesNotInterferWithInvocationOfOtherListeners() throws Exception {
85 doThrow(RuntimeException.class).when(listener1).onInstanceLifecycleNotification(any(InstanceLifecycleContext.class));
86
87 sqsClient.sendMessage(queueUrl, Resources.toString(INSTANCE_TERMINATING_NOTIFICATION_URL, Charsets.UTF_8));
88
89 verify(listener2, timeout(1000)).onInstanceLifecycleNotification(any(InstanceLifecycleContext.class));
90 }
91
92 @Test
93 public void listenersAreNotInvokedOnInstanceActiveNotification() throws Exception {
94 sqsClient.sendMessage(queueUrl, Resources.toString(INSTANCE_ACTIVE_NOTIFICATION_URL, Charsets.UTF_8));
95
96 verify(listener1, after(1000).never()).onInstanceLifecycleNotification(any(InstanceLifecycleContext.class));
97 verify(listener2, never()).onInstanceLifecycleNotification(any(InstanceLifecycleContext.class));
98 }
99
100 @Test
101 public void listenersAreNotInvokedIfInstanceIdsDontMatch() throws Exception {
102 sqsClient.sendMessage(queueUrl, Resources.toString(INSTANCE_TERMINATING_NOTIFICATION_URL, Charsets.UTF_8));
103 when(ec2InstanceIdSupplier.get()).thenReturn("i-0aae803fbf84ebed7");
104
105 verify(listener1, after(1000).never()).onInstanceLifecycleNotification(any(InstanceLifecycleContext.class));
106 verify(listener2, never()).onInstanceLifecycleNotification(any(InstanceLifecycleContext.class));
107 }
108
109 @Test
110 public void listenersAreNotInvokedIfSnsNotificationSubjectIsWrong() throws Exception {
111 sqsClient.sendMessage(queueUrl, Resources.toString(INSTANCE_TERMINATING_NOTIFICATION_WRONG_SUBJECT_URL, Charsets.UTF_8));
112
113 verify(listener1, after(1000).never()).onInstanceLifecycleNotification(any(InstanceLifecycleContext.class));
114 verify(listener2, never()).onInstanceLifecycleNotification(any(InstanceLifecycleContext.class));
115 }
116 }