View Javadoc

1   package com.atlassian.messagequeue.internal.core;
2   
3   import com.atlassian.messagequeue.Message;
4   import com.atlassian.messagequeue.MessageInformationService;
5   import com.atlassian.messagequeue.MessageRunnerKey;
6   import com.atlassian.messagequeue.TenantDataIdSupplier;
7   import com.atlassian.tenant.api.TenantContext;
8   import com.atlassian.tenant.api.TenantContextProvider;
9   
10  import static com.atlassian.messagequeue.internal.core.NestedMessageConstants.MESSAGE_RUNNER_KEY_ATTRIBUTE_NAME;
11  import static com.atlassian.messagequeue.internal.core.NestedMessageConstants.TENANT_DATA_ID_ATTRIBUTE_NAME;
12  import static com.atlassian.messagequeue.internal.core.NestedMessageConstants.TENANT_ID_ATTRIBUTE_NAME;
13  import static java.util.Objects.requireNonNull;
14  
15  /**
16   * Message information service.
17   */
18  public class DefaultMessageInformationService implements MessageInformationService {
19      private final TenantContextProvider tenantContextProvider;
20      private final String queueUrl;
21      private final NestedMessageSerializer nestedMessageSerializer;
22      private final TenantDataIdSupplier tenantDataIdSupplier;
23  
24      /**
25       * @param queueUrl The URL that atlassian-messagequeue will be producing to and consuming from.
26       * @param tenantContextProvider the tenant context provider
27       * @param nestedMessageSerializer the nested message serializer
28       * @param tenantDataIdSupplier the tenant data id supplier
29       */
30      public DefaultMessageInformationService(String queueUrl,
31                                              TenantContextProvider tenantContextProvider,
32                                              NestedMessageSerializer nestedMessageSerializer,
33                                              TenantDataIdSupplier tenantDataIdSupplier) {
34          this.queueUrl = requireNonNull(queueUrl, "queueUrl");
35          this.tenantContextProvider = requireNonNull(tenantContextProvider, "tenantContextProvider");
36          this.nestedMessageSerializer = requireNonNull(nestedMessageSerializer, "nestedMessageSerializer");
37          this.tenantDataIdSupplier = requireNonNull(tenantDataIdSupplier);
38      }
39  
40      @Override
41      public String getQueueUrl(MessageRunnerKey messageRunnerKey) {
42          return queueUrl;
43      }
44  
45      @Override
46      public String toPayload(Message message) {
47          final NestedMessage nestedMessage = new NestedMessage();
48  
49          final TenantContext tenantContext = tenantContextProvider.getTenantContext();
50          if (tenantContext == null) {
51              throw new IllegalStateException("No tenant context available from which to derive a tenant ID");
52          } else if (tenantContext.getTenantId() == null || "".equals(tenantContext.getTenantId())) {
53              throw new IllegalStateException("Null or empty tenantId found in tenant context");
54          }
55  
56          nestedMessage
57                  .addAttribute(TENANT_ID_ATTRIBUTE_NAME, requireNonNull(tenantContext.getTenantId(), "tenantId"))
58                  .addAttribute(MESSAGE_RUNNER_KEY_ATTRIBUTE_NAME, requireNonNull(message.getRunnerKey(), "messageRunnerKey").toString())
59                  .addAttribute(TENANT_DATA_ID_ATTRIBUTE_NAME, tenantDataIdSupplier.get())
60                  .setPayload(message.getPayload().orElse(null));
61  
62          return nestedMessageSerializer.serialize(nestedMessage);
63      }
64  
65      @Override
66      public Message fromPayload(String payload) {
67          if (payload == null || "".equals(payload)) {
68              throw new IllegalArgumentException("payload not be null or empty");
69          }
70  
71          final NestedMessage nestedMessage = nestedMessageSerializer.deserialize(payload);
72          final String messageRunnerKey = nestedMessage.getAttribute(MESSAGE_RUNNER_KEY_ATTRIBUTE_NAME);
73  
74          return Message.create(MessageRunnerKey.of(messageRunnerKey), nestedMessage.getPayload());
75      }
76  }