View Javadoc

1   package com.atlassian.messagequeue.internal.sqs.yaml;
2   
3   import com.atlassian.messagequeue.MessageRunnerKey;
4   import com.atlassian.messagequeue.internal.sqs.DefaultSQSConsumerQueueConfig;
5   import com.atlassian.messagequeue.internal.sqs.DefaultSQSProducerQueueConfig;
6   import com.atlassian.messagequeue.internal.sqs.SQSConfig;
7   import com.atlassian.messagequeue.internal.sqs.SQSConsumerQueueConfig;
8   import com.atlassian.messagequeue.internal.sqs.SQSProducerQueueConfig;
9   import org.yaml.snakeyaml.Yaml;
10  
11  import java.util.HashMap;
12  import java.util.Map;
13  import java.util.Set;
14  import java.util.function.Function;
15  import java.util.function.Predicate;
16  import java.util.function.Supplier;
17  import java.util.stream.Collectors;
18  
19  import static java.util.stream.Collectors.toSet;
20  
21  /**
22   * Default Yaml config parser for sqs related config.
23   */
24  public class SQSYamlConfigParser {
25  
26      /**
27       * Parse yaml config
28       *
29       * @param yamlConfigSupplier        content of Yaml config
30       * @param queueNameValidator        queue name validator which validates every string queue name
31       * @param queueNameToQueueUrlMapper queue name to queue url mapper.
32       * @return {@link SQSConfig}
33       */
34      public SQSConfig parse(Supplier<String> yamlConfigSupplier, Predicate<String> queueNameValidator, Function<String, String> queueNameToQueueUrlMapper) {
35          SQSProducerQueueConfig defaultSQSProducerQueueConfig = null;
36          final Map<MessageRunnerKey, SQSProducerQueueConfig> messageRunnerKeyToQueueMappings = new HashMap<>();
37          final Map<String, Set<SQSConsumerQueueConfig>> workerGroupToQueueMappings = new HashMap<>();
38  
39          final YamlConfig yamlConfig = new Yaml().loadAs(yamlConfigSupplier.get(), YamlConfig.class);
40  
41          validate(yamlConfig, queueNameValidator);
42  
43          for (YamlQueueConfig queue : yamlConfig.getQueues()) {
44              final DefaultSQSProducerQueueConfig sqsProducerQueueConfig = new DefaultSQSProducerQueueConfig(queue.getQueueName(), queueNameToQueueUrlMapper.apply(queue.getQueueName()));
45              if (queue.isDefault()) {
46                  defaultSQSProducerQueueConfig = sqsProducerQueueConfig;
47              }
48  
49              for (YamlMessageRunnerConfig messageRunner : queue.getMessageRunners()) {
50                  messageRunnerKeyToQueueMappings.put(MessageRunnerKey.of(messageRunner.getKey()), sqsProducerQueueConfig);
51              }
52          }
53  
54          // build workerGroupToQueueMappings
55          workerGroupToQueueMappings.clear();
56          yamlConfig.getWorkerGroups().stream()
57                  .forEach(workerGroup -> workerGroupToQueueMappings.put(workerGroup.getName(),
58                          workerGroup.getConsumerQueues().stream()
59                                  .map(config -> new DefaultSQSConsumerQueueConfig(config.getCorePoolSize(), config.getMaxPoolSize(),
60                                          config.getVisibilityTimeoutExtensionPeriod(), config.getQueueName(), queueNameToQueueUrlMapper.apply(config.getQueueName())))
61                                  .collect(toSet())));
62  
63          return new SQSConfig(defaultSQSProducerQueueConfig, messageRunnerKeyToQueueMappings, workerGroupToQueueMappings);
64      }
65  
66      /**
67       * Validate yaml config
68       *
69       * @param yamlConfig         loaded config
70       * @param queueNameValidator validate queue name
71       */
72      private void validate(YamlConfig yamlConfig, Predicate<String> queueNameValidator) {
73          if (yamlConfig.getQueues().size() == 0) {
74              throw new RuntimeException("Invalid SQS config: there must be at least 1 queue");
75          }
76  
77          for (YamlQueueConfig queue : yamlConfig.getQueues()) {
78              if (!queueNameValidator.test(queue.getQueueName())) {
79                  throw new RuntimeException("Invalid SQS queue name: " + queue.getQueueName());
80              }
81          }
82  
83          if (yamlConfig.getQueues().stream().filter(YamlQueueConfig::isDefault).count() != 1) {
84              throw new RuntimeException("Invalid SQS config: there must be exactly 1 default queue");
85          }
86  
87          if (yamlConfig.getWorkerGroups().size() == 0) {
88              throw new RuntimeException("Invalid SQS config: there must be at least 1 worker group");
89          }
90  
91          for (YamlWorkerGroupConfig workerGroupConfig : yamlConfig.getWorkerGroups()) {
92              validateWorkerGroupConfig(workerGroupConfig);
93          }
94  
95          final Set<String> allWorkerGroupQueues = yamlConfig.getWorkerGroups().stream()
96                  .flatMap(workerGroup -> workerGroup.getConsumerQueues().stream())
97                  .map(YamlConsumerQueueConfig::getQueueName)
98                  .collect(toSet());
99  
100         final Set<YamlQueueConfig> queuesNotBelongToWorkerGroups = yamlConfig.getQueues().stream()
101                 .filter(queue -> !allWorkerGroupQueues.contains(queue.getQueueName()))
102                 .collect(toSet());
103 
104         if (queuesNotBelongToWorkerGroups.size() > 0) {
105             final String queueNamesNotBelongToWorkerGroups = queuesNotBelongToWorkerGroups.stream()
106                     .map(YamlQueueConfig::getQueueName)
107                     .sorted()
108                     .collect(Collectors.joining(", "));
109 
110             throw new RuntimeException("Invalid SQS config: queues(" + queueNamesNotBelongToWorkerGroups + ") must belong to at least 1 worker group");
111         }
112     }
113 
114     private void validateWorkerGroupConfig(YamlWorkerGroupConfig groupConfig) {
115         for (YamlConsumerQueueConfig consumerQueueConfig : groupConfig.getConsumerQueues()) {
116             if (consumerQueueConfig.getCorePoolSize() <= 0) {
117                 throw new RuntimeException("Invalid SQS consumer queue config: corePoolSize must be greater than 0");
118             }
119 
120             if (consumerQueueConfig.getMaxPoolSize() <= 0) {
121                 throw new RuntimeException("Invalid SQS consumer queue config: maxPoolSize must be greater than 0");
122             }
123 
124             if (consumerQueueConfig.getVisibilityTimeoutExtensionPeriod() <= 0) {
125                 throw new RuntimeException("Invalid SQS consumer queue config: visibilityTimeoutExtensionPeriod must be greater than 0");
126             }
127         }
128     }
129 }