View Javadoc

1   package com.atlassian.messagequeue.internal.inmemory;
2   
3   import com.atlassian.messagequeue.Message;
4   import com.atlassian.messagequeue.MessagePayloadSizeExceededException;
5   import com.atlassian.messagequeue.MessageRunnerConstants;
6   import com.atlassian.messagequeue.MessageRunnerNotRegisteredException;
7   import com.atlassian.messagequeue.MessageRunnerKey;
8   import com.atlassian.messagequeue.registry.MessageContext;
9   import com.atlassian.messagequeue.registry.MessageRunner;
10  import com.atlassian.workcontext.api.ImmutableWorkContextReference;
11  import com.google.common.util.concurrent.MoreExecutors;
12  import org.junit.Test;
13  import org.mockito.ArgumentCaptor;
14  
15  import java.util.concurrent.CountDownLatch;
16  import java.util.concurrent.TimeUnit;
17  
18  import static org.hamcrest.core.Is.is;
19  import static org.junit.Assert.assertEquals;
20  import static org.junit.Assert.assertThat;
21  import static org.mockito.Matchers.any;
22  import static org.mockito.Mockito.mock;
23  import static org.mockito.Mockito.never;
24  import static org.mockito.Mockito.verify;
25  
26  public class TestInMemoryMessageRunnerServiceImpl
27  {
28      @Test
29      public void testRegisterAndRunMessage() {
30          InMemoryMessageRunnerService service = create();
31          MessageRunner runner = mock(MessageRunner.class);
32          MessageRunnerKey key = MessageRunnerKey.of("test");
33  
34          service.registerMessageRunner(key, runner);
35          service.addMessage(Message.create(key, "test-payload"));
36  
37          verifyPayload(runner, "test-payload");
38      }
39  
40      @Test
41      public void testNullPayload() {
42          InMemoryMessageRunnerService service = create();
43          MessageRunner runner = mock(MessageRunner.class);
44          MessageRunnerKey key = MessageRunnerKey.of("test");
45  
46          service.registerMessageRunner(key, runner);
47          service.addMessage(Message.builder(key).build());
48  
49          verifyPayload(runner, null);
50      }
51  
52      @Test
53      public void testSecondRegisterWins() {
54          InMemoryMessageRunnerService service = create();
55          MessageRunner runner1 = mock(MessageRunner.class);
56          MessageRunner runner2 = mock(MessageRunner.class);
57          MessageRunnerKey key = MessageRunnerKey.of("test");
58  
59          service.registerMessageRunner(key, runner1);
60          service.registerMessageRunner(key, runner2);
61          service.addMessage(Message.create(key, "test-payload"));
62  
63          verify(runner1, never()).processMessage(any(MessageContext.class));
64          verifyPayload(runner2, "test-payload");
65      }
66  
67      @Test(expected = MessageRunnerNotRegisteredException.class)
68      public void testRunNonExistentMessageFails() {
69          InMemoryMessageRunnerService service = create();
70          MessageRunnerKey key = MessageRunnerKey.of("test");
71  
72          service.addMessage(Message.create(key, "test-payload"));
73      }
74  
75      @Test(expected = MessageRunnerNotRegisteredException.class)
76      public void testUnregister() {
77          InMemoryMessageRunnerService service = create();
78          MessageRunner runner = mock(MessageRunner.class);
79          MessageRunnerKey key = MessageRunnerKey.of("test");
80  
81          service.registerMessageRunner(key, runner);
82          service.unregisterMessageRunner(key);
83          service.addMessage(Message.create(key, "test-payload"));
84      }
85  
86      @Test
87      public void testUnregisterFailsCleanly() {
88          create().unregisterMessageRunner(MessageRunnerKey.of("test"));
89      }
90  
91      @Test
92      public void testWorkContextEstablished() throws InterruptedException {
93          InMemoryMessageRunnerService service = create();
94          ImmutableWorkContextReference<String> workContext = new ImmutableWorkContextReference<>(() -> "hello");
95          CountDownLatch latch = new CountDownLatch(1);
96          MessageRunner runner = (MessageContext context) -> {
97              workContext.get();
98              latch.countDown();
99          };
100         MessageRunnerKey key = MessageRunnerKey.of("test");
101         service.registerMessageRunner(key, runner);
102 
103         service.addMessage(Message.create(key, "test-payload"));
104 
105         assertThat(latch.await(1, TimeUnit.SECONDS), is(true));
106     }
107 
108     private void verifyPayload(MessageRunner mockRunner, String payload) {
109         ArgumentCaptor<MessageContext> argument = ArgumentCaptor.forClass(MessageContext.class);
110         verify(mockRunner).processMessage(argument.capture());
111         assertEquals(payload, argument.getValue().getPayload().orElse(null));
112     }
113 
114     private InMemoryMessageRunnerService create() {
115         return new InMemoryMessageRunnerService(MoreExecutors.newDirectExecutorService());
116     }
117 }