1 package com.atlassian.messagequeue.internal.inmemory;
2
3 import com.atlassian.messagequeue.Message;
4 import com.atlassian.messagequeue.MessagePayloadSizeExceededException;
5 import com.atlassian.messagequeue.MessageRunnerConstants;
6 import com.atlassian.messagequeue.MessageRunnerNotRegisteredException;
7 import com.atlassian.messagequeue.MessageRunnerKey;
8 import com.atlassian.messagequeue.registry.MessageContext;
9 import com.atlassian.messagequeue.registry.MessageRunner;
10 import com.atlassian.workcontext.api.ImmutableWorkContextReference;
11 import com.google.common.util.concurrent.MoreExecutors;
12 import org.junit.Test;
13 import org.mockito.ArgumentCaptor;
14
15 import java.util.concurrent.CountDownLatch;
16 import java.util.concurrent.TimeUnit;
17
18 import static org.hamcrest.core.Is.is;
19 import static org.junit.Assert.assertEquals;
20 import static org.junit.Assert.assertThat;
21 import static org.mockito.Matchers.any;
22 import static org.mockito.Mockito.mock;
23 import static org.mockito.Mockito.never;
24 import static org.mockito.Mockito.verify;
25
26 public class TestInMemoryMessageRunnerServiceImpl
27 {
28 @Test
29 public void testRegisterAndRunMessage() {
30 InMemoryMessageRunnerService service = create();
31 MessageRunner runner = mock(MessageRunner.class);
32 MessageRunnerKey key = MessageRunnerKey.of("test");
33
34 service.registerMessageRunner(key, runner);
35 service.addMessage(Message.create(key, "test-payload"));
36
37 verifyPayload(runner, "test-payload");
38 }
39
40 @Test
41 public void testNullPayload() {
42 InMemoryMessageRunnerService service = create();
43 MessageRunner runner = mock(MessageRunner.class);
44 MessageRunnerKey key = MessageRunnerKey.of("test");
45
46 service.registerMessageRunner(key, runner);
47 service.addMessage(Message.builder(key).build());
48
49 verifyPayload(runner, null);
50 }
51
52 @Test
53 public void testSecondRegisterWins() {
54 InMemoryMessageRunnerService service = create();
55 MessageRunner runner1 = mock(MessageRunner.class);
56 MessageRunner runner2 = mock(MessageRunner.class);
57 MessageRunnerKey key = MessageRunnerKey.of("test");
58
59 service.registerMessageRunner(key, runner1);
60 service.registerMessageRunner(key, runner2);
61 service.addMessage(Message.create(key, "test-payload"));
62
63 verify(runner1, never()).processMessage(any(MessageContext.class));
64 verifyPayload(runner2, "test-payload");
65 }
66
67 @Test(expected = MessageRunnerNotRegisteredException.class)
68 public void testRunNonExistentMessageFails() {
69 InMemoryMessageRunnerService service = create();
70 MessageRunnerKey key = MessageRunnerKey.of("test");
71
72 service.addMessage(Message.create(key, "test-payload"));
73 }
74
75 @Test(expected = MessageRunnerNotRegisteredException.class)
76 public void testUnregister() {
77 InMemoryMessageRunnerService service = create();
78 MessageRunner runner = mock(MessageRunner.class);
79 MessageRunnerKey key = MessageRunnerKey.of("test");
80
81 service.registerMessageRunner(key, runner);
82 service.unregisterMessageRunner(key);
83 service.addMessage(Message.create(key, "test-payload"));
84 }
85
86 @Test
87 public void testUnregisterFailsCleanly() {
88 create().unregisterMessageRunner(MessageRunnerKey.of("test"));
89 }
90
91 @Test
92 public void testWorkContextEstablished() throws InterruptedException {
93 InMemoryMessageRunnerService service = create();
94 ImmutableWorkContextReference<String> workContext = new ImmutableWorkContextReference<>(() -> "hello");
95 CountDownLatch latch = new CountDownLatch(1);
96 MessageRunner runner = (MessageContext context) -> {
97 workContext.get();
98 latch.countDown();
99 };
100 MessageRunnerKey key = MessageRunnerKey.of("test");
101 service.registerMessageRunner(key, runner);
102
103 service.addMessage(Message.create(key, "test-payload"));
104
105 assertThat(latch.await(1, TimeUnit.SECONDS), is(true));
106 }
107
108 private void verifyPayload(MessageRunner mockRunner, String payload) {
109 ArgumentCaptor<MessageContext> argument = ArgumentCaptor.forClass(MessageContext.class);
110 verify(mockRunner).processMessage(argument.capture());
111 assertEquals(payload, argument.getValue().getPayload().orElse(null));
112 }
113
114 private InMemoryMessageRunnerService create() {
115 return new InMemoryMessageRunnerService(MoreExecutors.newDirectExecutorService());
116 }
117 }