1 package com.atlassian.messagequeue.internal.inmemory;
2
3 import com.atlassian.messagequeue.Message;
4 import com.atlassian.messagequeue.MessageRunnerNotRegisteredException;
5 import com.atlassian.messagequeue.MessageRunnerKey;
6 import com.atlassian.messagequeue.MessageRunnerService;
7 import com.atlassian.messagequeue.internal.core.MessageRunnerRegistryHelper;
8 import com.atlassian.messagequeue.registry.MessageRunnerRegistryService;
9 import com.atlassian.messagequeue.registry.MessageRunner;
10 import com.atlassian.workcontext.api.WorkContextDoorway;
11 import org.slf4j.Logger;
12 import org.slf4j.LoggerFactory;
13
14 import java.util.Optional;
15 import java.util.concurrent.ConcurrentHashMap;
16 import java.util.concurrent.ExecutorService;
17
18 import static java.util.Objects.requireNonNull;
19
20
21
22
23 public class InMemoryMessageRunnerService implements MessageRunnerService, MessageRunnerRegistryService, MessageRunnerRegistryHelper {
24
25 private static final Logger log = LoggerFactory.getLogger(InMemoryMessageRunnerService.class);
26
27 private final ConcurrentHashMap<MessageRunnerKey, MessageRunner> runners = new ConcurrentHashMap<>();
28
29 private final ExecutorService executorService;
30
31 public InMemoryMessageRunnerService(ExecutorService executorService) {
32 this.executorService = requireNonNull(executorService);
33 }
34
35 @Override
36 public void registerMessageRunner(MessageRunnerKey messageRunnerKey, MessageRunner messageRunner) {
37 runners.put(messageRunnerKey, messageRunner);
38 }
39
40 @Override
41 public void unregisterMessageRunner(MessageRunnerKey messageRunnerKey) {
42 final MessageRunner prev = runners.remove(messageRunnerKey);
43 if (null == prev) {
44 log.debug("Attempting to remove unregistered runner {}", messageRunnerKey);
45 }
46 }
47
48 @Override
49 public void addMessage(Message message) {
50 final Optional<String> payload = message.getPayload();
51 final MessageRunnerKey runnerKey = message.getRunnerKey();
52 final MessageRunner messageRunner = runners.get(runnerKey);
53 if (null == messageRunner) {
54 throw new MessageRunnerNotRegisteredException(runnerKey);
55 }
56 executorService.submit(() -> {
57 try (WorkContextDoorway ignored = new WorkContextDoorway().open()) {
58 messageRunner.processMessage(new InMemoryMessageContext(payload.orElse(null)));
59 } catch (Exception e) {
60 log.error("failed to process message for key: " + runnerKey, e);
61 }
62 });
63 }
64
65 @Override
66 public Optional<MessageRunner> getMessageRunner(MessageRunnerKey messageRunnerKey) {
67 return Optional.ofNullable(runners.get(messageRunnerKey));
68 }
69 }