1 package com.atlassian.messagequeue.internal.core;
2
3 import cloud.atlassian.external.container.DefaultProductContainerProxy;
4 import com.atlassian.messagequeue.MessageRunnerKey;
5 import com.atlassian.messagequeue.internal.core.messagevalidators.MessageRunnerKeyValidator;
6 import com.atlassian.messagequeue.internal.core.messagevalidators.TenantIdKeyValidator;
7 import com.atlassian.messagequeue.registry.MessageContext;
8 import com.atlassian.messagequeue.registry.MessageRunner;
9 import com.atlassian.messagequeue.registry.MessageValidator;
10 import com.atlassian.tenant.impl.TenantIdSetter;
11 import com.atlassian.workcontext.api.WorkContextDoorway;
12 import com.timgroup.statsd.StatsDClient;
13 import org.slf4j.Logger;
14 import org.slf4j.LoggerFactory;
15 import org.slf4j.MDC;
16
17 import java.util.Arrays;
18 import java.util.Collection;
19 import java.util.List;
20 import java.util.Map;
21 import java.util.Optional;
22
23 import static java.util.Objects.requireNonNull;
24 import static java.util.concurrent.TimeUnit.MILLISECONDS;
25 import static java.util.concurrent.TimeUnit.NANOSECONDS;
26
27
28
29
30 public class NestedMessageConsumer {
31 public static final String MDC_MESSAGE_RUNNER_KEY = "amq.messageRunnerKey";
32
33 static final String STATSD_MESSAGE_PROCESSING_COUNT = "amq.message.processing.count";
34 static final String STATSD_MESSAGE_PROCESSING_DURATION = "amq.message.processing.duration";
35 static final String STATSD_MESSAGE_PROCESSING_TENANTID_SET = "amq.message.processing.tenantid.set";
36 static final String STATSD_MSG_RUNNER_TAG_PREFIX = "msg_runner_key:";
37
38 private static final Logger log = LoggerFactory.getLogger(NestedMessageConsumer.class);
39
40
41
42
43 private static final String MDC_MESSAGE_PROCESSING_COMPLETED_TIMESTAMP = "amq.messageProcessingCompletedTimestamp";
44 private static final String MDC_MESSAGE_PROCESSING_DURATION_MILLIS = "amq.messageProcessingDurationMillis";
45
46
47 private final MessageRunnerRegistryHelper messageRunnerRegistryHelper;
48 private final TenantIdSetter tenantIdSetter;
49 private final MessageValidatorRegistryHelper messageValidatorRegistryHelper;
50
51
52
53
54
55
56
57
58 public NestedMessageConsumer(MessageRunnerRegistryHelper messageRunnerRegistryHelper,
59 TenantIdSetter tenantIdSetter,
60 MessageValidatorRegistryHelper messageValidatorRegistryHelper) {
61 this.messageRunnerRegistryHelper = requireNonNull(messageRunnerRegistryHelper);
62 this.tenantIdSetter = requireNonNull(tenantIdSetter);
63 this.messageValidatorRegistryHelper = requireNonNull(messageValidatorRegistryHelper);
64 }
65
66 private Optional<StatsDClient> getStatsDClient() {
67 return DefaultProductContainerProxy.getInstance().getComponentSafely(StatsDClient.class);
68 }
69
70
71
72
73
74
75
76 public void consume(NestedMessage nestedMessage, MessageContext messageContext) {
77 final String messageId = messageContext.getMessageId().orElse("");
78 final String tenantId = nestedMessage.getAttribute(NestedMessageConstants.TENANT_ID_ATTRIBUTE_NAME);
79
80 final List<MessageValidator> defaultValidators = Arrays.asList(
81 new MessageRunnerKeyValidator(),
82 new TenantIdKeyValidator());
83
84 final Map<String, String> messageAttributes = nestedMessage.getAttributesClone();
85
86 for (MessageValidator validator : defaultValidators) {
87 if (!validator.isValid(messageContext, messageAttributes)) {
88 validator.handleInvalidMessage(messageContext, messageAttributes);
89 return;
90 }
91 }
92
93 try (WorkContextDoorway ignored = new WorkContextDoorway().open()) {
94 tenantIdSetter.setTenantId(tenantId);
95
96 final Collection<MessageValidator> validators = messageValidatorRegistryHelper.getAllValidators();
97
98 for (MessageValidator validator : validators) {
99 if (!validator.isValid(messageContext, messageAttributes)) {
100 validator.handleInvalidMessage(messageContext, messageAttributes);
101 return;
102 }
103 }
104
105 final String messageRunnerKey = nestedMessage.getAttribute(NestedMessageConstants.MESSAGE_RUNNER_KEY_ATTRIBUTE_NAME);
106 final Optional<MessageRunner> messageRunner = messageRunnerRegistryHelper.getMessageRunner(MessageRunnerKey.of(messageRunnerKey));
107 if (!messageRunner.isPresent()) {
108 log.error("No MessageRunner found to process message (messageID: {}, messageRunnerKey: {})", messageId, messageRunnerKey);
109 return;
110 }
111
112 MDC.put(MDC_MESSAGE_RUNNER_KEY, messageRunnerKey);
113
114
115 long startTimeNanos = 0;
116 if (log.isInfoEnabled()) {
117 log.info("Message processing started (messageId: {}, messageRunnerKey: {})", messageId, messageRunnerKey);
118 startTimeNanos = System.nanoTime();
119 }
120
121 try {
122
123
124
125
126 final Map<String, String> copyOfContextMap = MDC.getCopyOfContextMap();
127 try {
128 messageRunner.get().processMessage(messageContext);
129 } finally {
130 MDC.setContextMap(copyOfContextMap);
131 }
132
133 final long messageProcessingCompleteTimestamp = System.currentTimeMillis();
134 final long messageProcessingDurationMillis = MILLISECONDS.convert(System.nanoTime() - startTimeNanos, NANOSECONDS);
135
136 if (log.isInfoEnabled()) {
137 MDC.put(MDC_MESSAGE_PROCESSING_COMPLETED_TIMESTAMP, String.valueOf(messageProcessingCompleteTimestamp));
138 MDC.put(MDC_MESSAGE_PROCESSING_DURATION_MILLIS, String.valueOf(messageProcessingDurationMillis));
139 try {
140 log.info("Message processing completed in {}ms (messageProcessingCompletedTimestamp: {}, messageId: {}, messageRunnerKey: {})",
141 messageProcessingDurationMillis, messageProcessingCompleteTimestamp, messageId, messageRunnerKey);
142 } finally {
143 MDC.remove(MDC_MESSAGE_PROCESSING_COMPLETED_TIMESTAMP);
144 MDC.remove(MDC_MESSAGE_PROCESSING_DURATION_MILLIS);
145 }
146 }
147
148 if (messageContext.shouldAutoAcknowledgeMessage()) {
149 messageContext.acknowledge();
150 }
151
152 postStatsDMetrics(tenantId, messageProcessingDurationMillis, STATSD_MSG_RUNNER_TAG_PREFIX + messageRunnerKey);
153 } catch (Exception e) {
154 log.error("Invoking MessageRunner for message failed with an exception (messageID: {}, messageRunnerKey: {})", messageId, messageRunnerKey, e);
155 }
156 } finally {
157 MDC.remove(MDC_MESSAGE_RUNNER_KEY);
158 }
159 }
160
161 private void postStatsDMetrics(
162 final String tenantId,
163 final long messageProcessingDurationMillis,
164 final String... extraStatsDTags) {
165 getStatsDClient().ifPresent(statsDClient -> {
166 statsDClient.increment(STATSD_MESSAGE_PROCESSING_COUNT, extraStatsDTags);
167 statsDClient.recordExecutionTime(STATSD_MESSAGE_PROCESSING_DURATION, messageProcessingDurationMillis, extraStatsDTags);
168 statsDClient.recordSetValue(STATSD_MESSAGE_PROCESSING_TENANTID_SET, tenantId, extraStatsDTags);
169 });
170 }
171 }