View Javadoc

1   package com.atlassian.messagequeue.internal.core;
2   
3   import cloud.atlassian.external.container.DefaultProductContainerProxy;
4   import com.atlassian.messagequeue.MessageRunnerKey;
5   import com.atlassian.messagequeue.internal.core.messagevalidators.MessageRunnerKeyValidator;
6   import com.atlassian.messagequeue.internal.core.messagevalidators.TenantIdKeyValidator;
7   import com.atlassian.messagequeue.registry.MessageContext;
8   import com.atlassian.messagequeue.registry.MessageRunner;
9   import com.atlassian.messagequeue.registry.MessageValidator;
10  import com.atlassian.tenant.impl.TenantIdSetter;
11  import com.atlassian.workcontext.api.WorkContextDoorway;
12  import com.timgroup.statsd.StatsDClient;
13  import org.slf4j.Logger;
14  import org.slf4j.LoggerFactory;
15  import org.slf4j.MDC;
16  
17  import java.util.Arrays;
18  import java.util.Collection;
19  import java.util.List;
20  import java.util.Map;
21  import java.util.Optional;
22  
23  import static java.util.Objects.requireNonNull;
24  import static java.util.concurrent.TimeUnit.MILLISECONDS;
25  import static java.util.concurrent.TimeUnit.NANOSECONDS;
26  
27  /**
28   * Consumes a {@code NestedMessage}.
29   */
30  public class NestedMessageConsumer {
31      public static final String MDC_MESSAGE_RUNNER_KEY = "amq.messageRunnerKey";
32  
33      static final String STATSD_MESSAGE_PROCESSING_COUNT = "amq.message.processing.count";
34      static final String STATSD_MESSAGE_PROCESSING_DURATION = "amq.message.processing.duration";
35      static final String STATSD_MESSAGE_PROCESSING_TENANTID_SET = "amq.message.processing.tenantid.set";
36      static final String STATSD_MSG_RUNNER_TAG_PREFIX = "msg_runner_key:";
37  
38      private static final Logger log = LoggerFactory.getLogger(NestedMessageConsumer.class);
39  
40      /**
41       * The point in time that message processing completed (without error) expressed as the number of milliseconds since epoch.
42       */
43      private static final String MDC_MESSAGE_PROCESSING_COMPLETED_TIMESTAMP = "amq.messageProcessingCompletedTimestamp";
44      private static final String MDC_MESSAGE_PROCESSING_DURATION_MILLIS = "amq.messageProcessingDurationMillis";
45  
46  
47      private final MessageRunnerRegistryHelper messageRunnerRegistryHelper;
48      private final TenantIdSetter tenantIdSetter;
49      private final MessageValidatorRegistryHelper messageValidatorRegistryHelper;
50  
51      /**
52       * Constructs an instance of {@code NestedMessageConsumer}.
53       *
54       * @param messageRunnerRegistryHelper    the message runner registry helper
55       * @param tenantIdSetter                 the tenant ID setter
56       * @param messageValidatorRegistryHelper a helper to get all message validators
57       */
58      public NestedMessageConsumer(MessageRunnerRegistryHelper messageRunnerRegistryHelper,
59                                   TenantIdSetter tenantIdSetter,
60                                   MessageValidatorRegistryHelper messageValidatorRegistryHelper) {
61          this.messageRunnerRegistryHelper = requireNonNull(messageRunnerRegistryHelper);
62          this.tenantIdSetter = requireNonNull(tenantIdSetter);
63          this.messageValidatorRegistryHelper = requireNonNull(messageValidatorRegistryHelper);
64      }
65  
66      private Optional<StatsDClient> getStatsDClient() {
67          return DefaultProductContainerProxy.getInstance().getComponentSafely(StatsDClient.class);
68      }
69  
70      /**
71       * Consumes a {@link NestedMessage}.
72       *
73       * @param nestedMessage  the nested message
74       * @param messageContext the message context
75       */
76      public void consume(NestedMessage nestedMessage, MessageContext messageContext) {
77          final String messageId = messageContext.getMessageId().orElse("");
78          final String tenantId = nestedMessage.getAttribute(NestedMessageConstants.TENANT_ID_ATTRIBUTE_NAME);
79  
80          final List<MessageValidator> defaultValidators = Arrays.asList(
81                  new MessageRunnerKeyValidator(),
82                  new TenantIdKeyValidator());
83  
84          final Map<String, String> messageAttributes = nestedMessage.getAttributesClone();
85  
86          for (MessageValidator validator : defaultValidators) {
87              if (!validator.isValid(messageContext, messageAttributes)) {
88                  validator.handleInvalidMessage(messageContext, messageAttributes);
89                  return;
90              }
91          }
92  
93          try (WorkContextDoorway ignored = new WorkContextDoorway().open()) {
94              tenantIdSetter.setTenantId(tenantId);
95  
96              final Collection<MessageValidator> validators = messageValidatorRegistryHelper.getAllValidators();
97  
98              for (MessageValidator validator : validators) {
99                  if (!validator.isValid(messageContext, messageAttributes)) {
100                     validator.handleInvalidMessage(messageContext, messageAttributes);
101                     return;
102                 }
103             }
104 
105             final String messageRunnerKey = nestedMessage.getAttribute(NestedMessageConstants.MESSAGE_RUNNER_KEY_ATTRIBUTE_NAME);
106             final Optional<MessageRunner> messageRunner = messageRunnerRegistryHelper.getMessageRunner(MessageRunnerKey.of(messageRunnerKey));
107             if (!messageRunner.isPresent()) {
108                 log.error("No MessageRunner found to process message (messageID: {}, messageRunnerKey: {})", messageId, messageRunnerKey);
109                 return;
110             }
111 
112             MDC.put(MDC_MESSAGE_RUNNER_KEY, messageRunnerKey);
113 
114 
115             long startTimeNanos = 0;
116             if (log.isInfoEnabled()) {
117                 log.info("Message processing started (messageId: {}, messageRunnerKey: {})", messageId, messageRunnerKey);
118                 startTimeNanos = System.nanoTime();
119             }
120 
121             try {
122                 /*
123                  Copy the MDC context so we can restore it after message processing.
124                  This is a defensive measure against a MessageRunner clearing the MDC context
125                  */
126                 final Map<String, String> copyOfContextMap = MDC.getCopyOfContextMap();
127                 try {
128                     messageRunner.get().processMessage(messageContext);
129                 } finally {
130                     MDC.setContextMap(copyOfContextMap);
131                 }
132 
133                 final long messageProcessingCompleteTimestamp = System.currentTimeMillis();
134                 final long messageProcessingDurationMillis = MILLISECONDS.convert(System.nanoTime() - startTimeNanos, NANOSECONDS);
135 
136                 if (log.isInfoEnabled()) {
137                     MDC.put(MDC_MESSAGE_PROCESSING_COMPLETED_TIMESTAMP, String.valueOf(messageProcessingCompleteTimestamp));
138                     MDC.put(MDC_MESSAGE_PROCESSING_DURATION_MILLIS, String.valueOf(messageProcessingDurationMillis));
139                     try {
140                         log.info("Message processing completed in {}ms (messageProcessingCompletedTimestamp: {}, messageId: {}, messageRunnerKey: {})",
141                                 messageProcessingDurationMillis, messageProcessingCompleteTimestamp, messageId, messageRunnerKey);
142                     } finally {
143                         MDC.remove(MDC_MESSAGE_PROCESSING_COMPLETED_TIMESTAMP);
144                         MDC.remove(MDC_MESSAGE_PROCESSING_DURATION_MILLIS);
145                     }
146                 }
147 
148                 if (messageContext.shouldAutoAcknowledgeMessage()) {
149                     messageContext.acknowledge();
150                 }
151 
152                 postStatsDMetrics(tenantId, messageProcessingDurationMillis, STATSD_MSG_RUNNER_TAG_PREFIX + messageRunnerKey);
153             } catch (Exception e) {
154                 log.error("Invoking MessageRunner for message failed with an exception (messageID: {}, messageRunnerKey: {})", messageId, messageRunnerKey, e);
155             }
156         } finally {
157             MDC.remove(MDC_MESSAGE_RUNNER_KEY);
158         }
159     }
160 
161     private void postStatsDMetrics(
162             final String tenantId,
163             final long messageProcessingDurationMillis,
164             final String... extraStatsDTags) {
165         getStatsDClient().ifPresent(statsDClient -> {
166             statsDClient.increment(STATSD_MESSAGE_PROCESSING_COUNT, extraStatsDTags);
167             statsDClient.recordExecutionTime(STATSD_MESSAGE_PROCESSING_DURATION, messageProcessingDurationMillis, extraStatsDTags);
168             statsDClient.recordSetValue(STATSD_MESSAGE_PROCESSING_TENANTID_SET, tenantId, extraStatsDTags);
169         });
170     }
171 }