1 package com.atlassian.messagequeue.internal.core;
2
3 import com.atlassian.messagequeue.MessageRunnerKey;
4 import com.atlassian.messagequeue.registry.MessageContext;
5 import com.atlassian.messagequeue.registry.MessageRunner;
6 import com.atlassian.tenant.impl.TenantIdSetter;
7 import com.atlassian.workcontext.api.WorkContextDoorway;
8 import org.slf4j.Logger;
9 import org.slf4j.LoggerFactory;
10 import org.slf4j.MDC;
11
12 import java.util.Optional;
13 import java.util.concurrent.TimeUnit;
14
15 import static java.util.Objects.requireNonNull;
16
17
18
19
20 public class NestedMessageConsumer {
21 public static final String MDC_MESSAGE_RUNNER_KEY = "AMQ-messageRunnerKey";
22
23 private static final Logger log = LoggerFactory.getLogger(NestedMessageConsumer.class);
24
25
26
27
28 private static final String MDC_MESSAGE_PROCESSING_TIMESTAMP = "AMQ-messageProcessingTimestamp";
29 private static final String MDC_MESSAGE_PROCESSING_DURATION_MILLIS = "AMQ-messageProcessingDurationMillis";
30
31 private final MessageRunnerRegistryHelper messageRunnerRegistryHelper;
32 private final TenantIdSetter tenantIdSetter;
33
34
35
36
37
38
39
40 public NestedMessageConsumer(MessageRunnerRegistryHelper messageRunnerRegistryHelper, TenantIdSetter tenantIdSetter) {
41 this.messageRunnerRegistryHelper = requireNonNull(messageRunnerRegistryHelper);
42 this.tenantIdSetter = requireNonNull(tenantIdSetter);
43 }
44
45
46
47
48
49
50
51 public void consume(NestedMessage nestedMessage, MessageContext messageContext) {
52 final String messageId = messageContext.getMessageId().orElse("");
53 final String tenantId = nestedMessage.getAttribute(NestedMessageConstants.TENANT_ID_ATTRIBUTE_NAME);
54 if (tenantId == null) {
55 log.error("Message received that does not contain required '{}' attribute. Message will be acknowledged (messageID: {})", NestedMessageConstants.TENANT_ID_ATTRIBUTE_NAME, messageId);
56 messageContext.acknowledge();
57 return;
58 }
59
60 final String messageRunnerKey = nestedMessage.getAttribute(NestedMessageConstants.MESSAGE_RUNNER_KEY_ATTRIBUTE_NAME);
61 if (messageRunnerKey == null) {
62 log.error("Message received that does not contain required '{}' attribute. Message will be acknowledged (messageID: {})", NestedMessageConstants.MESSAGE_RUNNER_KEY_ATTRIBUTE_NAME, messageId);
63 messageContext.acknowledge();
64 return;
65 }
66
67 final Optional<MessageRunner> messageRunner = messageRunnerRegistryHelper.getMessageRunner(MessageRunnerKey.of(messageRunnerKey));
68 if (!messageRunner.isPresent()) {
69 log.error("No MessageRunner found to process message (messageID: {}, messageRunnerKey: {})", messageId, messageRunnerKey);
70 return;
71 }
72
73 try (WorkContextDoorway workContextDoorway = new WorkContextDoorway().open();
74 MDC.MDCCloseable mdcMessageRunnerKey = MDC.putCloseable(MDC_MESSAGE_RUNNER_KEY, messageRunnerKey)) {
75 tenantIdSetter.setTenantId(tenantId);
76
77 long startTimeNanos = 0;
78 if (log.isInfoEnabled()) {
79 log.info("Message processing started (messageId: {}, messageRunnerKey: {})", messageId, messageRunnerKey);
80 startTimeNanos = System.nanoTime();
81 }
82
83 messageRunner.get().processMessage(messageContext);
84
85 if (log.isInfoEnabled()) {
86 final long messageProcessingCompleteTimestamp = System.currentTimeMillis();
87 final long messageProcessingDurationMillis = TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTimeNanos, TimeUnit.NANOSECONDS);
88 try (MDC.MDCCloseable mdcMessageProcessingTimestamp = MDC.putCloseable(MDC_MESSAGE_PROCESSING_TIMESTAMP, String.valueOf(messageProcessingCompleteTimestamp));
89 MDC.MDCCloseable mdcMessageProcessingDurationMillis = MDC.putCloseable(MDC_MESSAGE_PROCESSING_DURATION_MILLIS, String.valueOf(messageProcessingDurationMillis))) {
90 log.info("Message processing completed in {}ms (messageProcessingCompletedTimestamp: {}, messageId: {}, messageRunnerKey: {})",
91 messageProcessingDurationMillis, messageProcessingCompleteTimestamp, messageId, messageRunnerKey);
92 }
93 }
94
95 if (messageContext.shouldAutoAcknowledgeMessage()) {
96 messageContext.acknowledge();
97 }
98 } catch (Exception e) {
99 log.error("Invoking MessageRunner for message failed with an exception (messageID: {}, messageRunnerKey: {})", messageId, messageRunnerKey, e);
100 }
101 }
102 }