1 package com.atlassian.messagequeue.internal.sqs.yaml;
2
3 import com.atlassian.messagequeue.MessageRunnerKey;
4 import com.atlassian.messagequeue.internal.sqs.DefaultSQSConsumerQueueConfig;
5 import com.atlassian.messagequeue.internal.sqs.DefaultSQSProducerQueueConfig;
6 import com.atlassian.messagequeue.internal.sqs.SQSConfig;
7 import com.atlassian.messagequeue.internal.sqs.SQSConsumerQueueConfig;
8 import com.atlassian.messagequeue.internal.sqs.SQSProducerQueueConfig;
9 import org.yaml.snakeyaml.Yaml;
10
11 import java.util.HashMap;
12 import java.util.Map;
13 import java.util.Set;
14 import java.util.function.Function;
15 import java.util.function.Predicate;
16 import java.util.function.Supplier;
17 import java.util.stream.Collectors;
18
19 import static java.util.stream.Collectors.toSet;
20
21
22
23
24 public class SQSYamlConfigParser {
25
26
27
28
29
30
31
32
33
34 public SQSConfig parse(Supplier<String> yamlConfigSupplier, Predicate<String> queueNameValidator, Function<String, String> queueNameToQueueUrlMapper) {
35 SQSProducerQueueConfig defaultSQSProducerQueueConfig = null;
36 final Map<MessageRunnerKey, SQSProducerQueueConfig> messageRunnerKeyToQueueMappings = new HashMap<>();
37 final Map<String, Set<SQSConsumerQueueConfig>> workerGroupToQueueMappings = new HashMap<>();
38
39 final YamlConfig yamlConfig = new Yaml().loadAs(yamlConfigSupplier.get(), YamlConfig.class);
40
41 validate(yamlConfig, queueNameValidator);
42
43 for (YamlQueueConfig queue : yamlConfig.getQueues()) {
44 final DefaultSQSProducerQueueConfig sqsProducerQueueConfig = new DefaultSQSProducerQueueConfig(queue.getQueueName(), queueNameToQueueUrlMapper.apply(queue.getQueueName()));
45 if (queue.isDefault()) {
46 defaultSQSProducerQueueConfig = sqsProducerQueueConfig;
47 }
48
49 for (YamlMessageRunnerConfig messageRunner : queue.getMessageRunners()) {
50 messageRunnerKeyToQueueMappings.put(MessageRunnerKey.of(messageRunner.getKey()), sqsProducerQueueConfig);
51 }
52 }
53
54
55 workerGroupToQueueMappings.clear();
56 yamlConfig.getWorkerGroups().stream()
57 .forEach(workerGroup -> workerGroupToQueueMappings.put(workerGroup.getName(),
58 workerGroup.getConsumerQueues().stream()
59 .map(config -> new DefaultSQSConsumerQueueConfig(config.getCorePoolSize(), config.getMaxPoolSize(),
60 config.getVisibilityTimeoutExtensionPeriod(), config.getQueueName(), queueNameToQueueUrlMapper.apply(config.getQueueName())))
61 .collect(toSet())));
62
63 return new SQSConfig(defaultSQSProducerQueueConfig, messageRunnerKeyToQueueMappings, workerGroupToQueueMappings);
64 }
65
66
67
68
69
70
71
72 private void validate(YamlConfig yamlConfig, Predicate<String> queueNameValidator) {
73 if (yamlConfig.getQueues().size() == 0) {
74 throw new RuntimeException("Invalid SQS config: there must be at least 1 queue");
75 }
76
77 for (YamlQueueConfig queue : yamlConfig.getQueues()) {
78 if (!queueNameValidator.test(queue.getQueueName())) {
79 throw new RuntimeException("Invalid SQS queue name: " + queue.getQueueName());
80 }
81 }
82
83 if (yamlConfig.getQueues().stream().filter(YamlQueueConfig::isDefault).count() != 1) {
84 throw new RuntimeException("Invalid SQS config: there must be exactly 1 default queue");
85 }
86
87 if (yamlConfig.getWorkerGroups().size() == 0) {
88 throw new RuntimeException("Invalid SQS config: there must be at least 1 worker group");
89 }
90
91 for (YamlWorkerGroupConfig workerGroupConfig : yamlConfig.getWorkerGroups()) {
92 validateWorkerGroupConfig(workerGroupConfig);
93 }
94
95 final Set<String> allWorkerGroupQueues = yamlConfig.getWorkerGroups().stream()
96 .flatMap(workerGroup -> workerGroup.getConsumerQueues().stream())
97 .map(YamlConsumerQueueConfig::getQueueName)
98 .collect(toSet());
99
100 final Set<YamlQueueConfig> queuesNotBelongToWorkerGroups = yamlConfig.getQueues().stream()
101 .filter(queue -> !allWorkerGroupQueues.contains(queue.getQueueName()))
102 .collect(toSet());
103
104 if (queuesNotBelongToWorkerGroups.size() > 0) {
105 final String queueNamesNotBelongToWorkerGroups = queuesNotBelongToWorkerGroups.stream()
106 .map(YamlQueueConfig::getQueueName)
107 .sorted()
108 .collect(Collectors.joining(", "));
109
110 throw new RuntimeException("Invalid SQS config: queues(" + queueNamesNotBelongToWorkerGroups + ") must belong to at least 1 worker group");
111 }
112 }
113
114 private void validateWorkerGroupConfig(YamlWorkerGroupConfig groupConfig) {
115 for (YamlConsumerQueueConfig consumerQueueConfig : groupConfig.getConsumerQueues()) {
116 if (consumerQueueConfig.getCorePoolSize() <= 0) {
117 throw new RuntimeException("Invalid SQS consumer queue config: corePoolSize must be greater than 0");
118 }
119
120 if (consumerQueueConfig.getMaxPoolSize() <= 0) {
121 throw new RuntimeException("Invalid SQS consumer queue config: maxPoolSize must be greater than 0");
122 }
123
124 if (consumerQueueConfig.getVisibilityTimeoutExtensionPeriod() <= 0) {
125 throw new RuntimeException("Invalid SQS consumer queue config: visibilityTimeoutExtensionPeriod must be greater than 0");
126 }
127 }
128 }
129 }