View Javadoc

1   package com.atlassian.messagequeue.internal.inmemory;
2   
3   import com.atlassian.messagequeue.Message;
4   import com.atlassian.messagequeue.MessageRunnerNotRegisteredException;
5   import com.atlassian.messagequeue.MessageRunnerKey;
6   import com.atlassian.messagequeue.MessageRunnerService;
7   import com.atlassian.messagequeue.internal.core.MessageRunnerRegistryHelper;
8   import com.atlassian.messagequeue.registry.MessageRunnerRegistryService;
9   import com.atlassian.messagequeue.registry.MessageRunner;
10  import com.atlassian.workcontext.api.WorkContextDoorway;
11  import org.slf4j.Logger;
12  import org.slf4j.LoggerFactory;
13  
14  import java.util.Optional;
15  import java.util.concurrent.ConcurrentHashMap;
16  import java.util.concurrent.ExecutorService;
17  
18  import static java.util.Objects.requireNonNull;
19  
20  /**
21   * Implementation of both {@link MessageRunnerService} and {@link MessageRunnerRegistryService}. Messages are stored in-memory.
22   */
23  public class InMemoryMessageRunnerService implements MessageRunnerService, MessageRunnerRegistryService, MessageRunnerRegistryHelper {
24  
25      private static final Logger log = LoggerFactory.getLogger(InMemoryMessageRunnerService.class);
26  
27      private final ConcurrentHashMap<MessageRunnerKey, MessageRunner> runners = new ConcurrentHashMap<>();
28  
29      private final ExecutorService executorService;
30  
31      public InMemoryMessageRunnerService(ExecutorService executorService) {
32          this.executorService = requireNonNull(executorService);
33      }
34  
35      @Override
36      public void registerMessageRunner(MessageRunnerKey messageRunnerKey, MessageRunner messageRunner) {
37          runners.put(messageRunnerKey, messageRunner);
38      }
39  
40      @Override
41      public void unregisterMessageRunner(MessageRunnerKey messageRunnerKey) {
42          final MessageRunner prev = runners.remove(messageRunnerKey);
43          if (null == prev) {
44              log.debug("Attempting to remove unregistered runner {}", messageRunnerKey);
45          }
46      }
47  
48      @Override
49      public void addMessage(Message message) {
50          final Optional<String> payload = message.getPayload();
51          final MessageRunnerKey runnerKey = message.getRunnerKey();
52          final MessageRunner messageRunner = runners.get(runnerKey);
53          if (null == messageRunner) {
54              throw new MessageRunnerNotRegisteredException(runnerKey);
55          }
56          executorService.submit(() -> {
57              try (WorkContextDoorway ignored = new WorkContextDoorway().open()) {
58                  messageRunner.processMessage(new InMemoryMessageContext(payload.orElse(null)));
59              } catch (Exception e) {
60                  log.error("failed to process message for key: " + runnerKey, e);
61              }
62          });
63      }
64  
65      @Override
66      public Optional<MessageRunner> getMessageRunner(MessageRunnerKey messageRunnerKey) {
67          return Optional.ofNullable(runners.get(messageRunnerKey));
68      }
69  }