View Javadoc

1   package com.atlassian.messagequeue.internal.core;
2   
3   import com.atlassian.messagequeue.MessageRunnerKey;
4   import com.atlassian.messagequeue.registry.MessageContext;
5   import com.atlassian.messagequeue.registry.MessageRunner;
6   import com.atlassian.tenant.impl.TenantIdSetter;
7   import com.atlassian.workcontext.api.WorkContextDoorway;
8   import org.slf4j.Logger;
9   import org.slf4j.LoggerFactory;
10  import org.slf4j.MDC;
11  
12  import java.util.Optional;
13  import java.util.concurrent.TimeUnit;
14  
15  import static java.util.Objects.requireNonNull;
16  
17  /**
18   * Consumes a {@code NestedMessage}.
19   */
20  public class NestedMessageConsumer {
21      public static final String MDC_MESSAGE_RUNNER_KEY = "AMQ-messageRunnerKey";
22  
23      private static final Logger log = LoggerFactory.getLogger(NestedMessageConsumer.class);
24  
25      /**
26       * The point in time that message processing completed (without error) expressed as the number of milliseconds since epoch.
27       */
28      private static final String MDC_MESSAGE_PROCESSING_TIMESTAMP = "AMQ-messageProcessingTimestamp";
29      private static final String MDC_MESSAGE_PROCESSING_DURATION_MILLIS = "AMQ-messageProcessingDurationMillis";
30  
31      private final MessageRunnerRegistryHelper messageRunnerRegistryHelper;
32      private final TenantIdSetter tenantIdSetter;
33  
34      /**
35       * Constructs an instance of {@code NestedMessageConsumer}.
36       *
37       * @param messageRunnerRegistryHelper the message runner registry helper
38       * @param tenantIdSetter the tenant ID setter
39       */
40      public NestedMessageConsumer(MessageRunnerRegistryHelper messageRunnerRegistryHelper, TenantIdSetter tenantIdSetter) {
41          this.messageRunnerRegistryHelper = requireNonNull(messageRunnerRegistryHelper);
42          this.tenantIdSetter = requireNonNull(tenantIdSetter);
43      }
44  
45      /**
46       * Consumes a {@link NestedMessage}.
47       *
48       * @param nestedMessage the nested message
49       * @param messageContext the message context
50       */
51      public void consume(NestedMessage nestedMessage, MessageContext messageContext) {
52          final String messageId = messageContext.getMessageId().orElse("");
53          final String tenantId = nestedMessage.getAttribute(NestedMessageConstants.TENANT_ID_ATTRIBUTE_NAME);
54          if (tenantId == null) {
55              log.error("Message received that does not contain required '{}' attribute. Message will be acknowledged (messageID: {})", NestedMessageConstants.TENANT_ID_ATTRIBUTE_NAME, messageId);
56              messageContext.acknowledge();
57              return;
58          }
59  
60          final String messageRunnerKey = nestedMessage.getAttribute(NestedMessageConstants.MESSAGE_RUNNER_KEY_ATTRIBUTE_NAME);
61          if (messageRunnerKey == null) {
62              log.error("Message received that does not contain required '{}' attribute. Message will be acknowledged (messageID: {})", NestedMessageConstants.MESSAGE_RUNNER_KEY_ATTRIBUTE_NAME, messageId);
63              messageContext.acknowledge();
64              return;
65          }
66  
67          final Optional<MessageRunner> messageRunner = messageRunnerRegistryHelper.getMessageRunner(MessageRunnerKey.of(messageRunnerKey));
68          if (!messageRunner.isPresent()) {
69              log.error("No MessageRunner found to process message (messageID: {}, messageRunnerKey: {})", messageId, messageRunnerKey);
70              return;
71          }
72  
73          try (WorkContextDoorway workContextDoorway = new WorkContextDoorway().open();
74               MDC.MDCCloseable mdcMessageRunnerKey = MDC.putCloseable(MDC_MESSAGE_RUNNER_KEY, messageRunnerKey)) {
75              tenantIdSetter.setTenantId(tenantId);
76  
77              long startTimeNanos = 0;
78              if (log.isInfoEnabled()) {
79                  log.info("Message processing started (messageId: {}, messageRunnerKey: {})", messageId, messageRunnerKey);
80                  startTimeNanos = System.nanoTime();
81              }
82  
83              messageRunner.get().processMessage(messageContext);
84  
85              if (log.isInfoEnabled()) {
86                  final long messageProcessingCompleteTimestamp = System.currentTimeMillis();
87                  final long messageProcessingDurationMillis = TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTimeNanos, TimeUnit.NANOSECONDS);
88                  try (MDC.MDCCloseable mdcMessageProcessingTimestamp = MDC.putCloseable(MDC_MESSAGE_PROCESSING_TIMESTAMP, String.valueOf(messageProcessingCompleteTimestamp));
89                       MDC.MDCCloseable mdcMessageProcessingDurationMillis = MDC.putCloseable(MDC_MESSAGE_PROCESSING_DURATION_MILLIS, String.valueOf(messageProcessingDurationMillis))) {
90                      log.info("Message processing completed in {}ms (messageProcessingCompletedTimestamp: {}, messageId: {}, messageRunnerKey: {})",
91                              messageProcessingDurationMillis, messageProcessingCompleteTimestamp, messageId, messageRunnerKey);
92                  }
93              }
94  
95              if (messageContext.shouldAutoAcknowledgeMessage()) {
96                  messageContext.acknowledge();
97              }
98          } catch (Exception e) {
99              log.error("Invoking MessageRunner for message failed with an exception (messageID: {}, messageRunnerKey: {})", messageId, messageRunnerKey, e);
100         }
101     }
102 }