1 package com.atlassian.vcache.internal.core.service;
2
3 import com.atlassian.marshalling.api.MarshallingException;
4 import com.atlassian.vcache.ExternalCacheException;
5 import com.atlassian.vcache.VCacheUtils;
6 import com.atlassian.vcache.internal.MetricLabel;
7 import com.atlassian.vcache.internal.core.metrics.CacheType;
8 import com.atlassian.vcache.internal.core.metrics.MetricsRecorder;
9
10 import java.util.HashMap;
11 import java.util.Map;
12 import java.util.Optional;
13 import java.util.Set;
14 import java.util.concurrent.CompletableFuture;
15 import java.util.concurrent.CompletionStage;
16 import java.util.concurrent.ExecutionException;
17 import java.util.function.Function;
18 import java.util.function.Supplier;
19 import java.util.stream.Collectors;
20 import java.util.stream.StreamSupport;
21
22 import static com.atlassian.vcache.ExternalCacheException.Reason.MARSHALLER_FAILURE;
23 import static com.atlassian.vcache.ExternalCacheException.Reason.UNCLASSIFIED_FAILURE;
24 import static com.atlassian.vcache.internal.core.VCacheCoreUtils.failed;
25 import static com.atlassian.vcache.internal.core.VCacheCoreUtils.isEmpty;
26 import static java.util.Objects.requireNonNull;
27
28
29
30
31
32
33
34
35 public abstract class AbstractNonDirectExternalCache<V>
36 extends AbstractExternalCache<V> {
37
38 protected final MetricsRecorder metricsRecorder;
39
40 protected AbstractNonDirectExternalCache(String name, MetricsRecorder metricsRecorder) {
41 super(name);
42 this.metricsRecorder = requireNonNull(metricsRecorder);
43 }
44
45
46
47
48
49
50
51
52 protected abstract V handleCreation(String internalKey, Supplier<V> supplier)
53 throws ExecutionException, InterruptedException;
54
55
56
57
58
59
60
61
62 protected abstract Map<String, V> handleCreation(
63 Function<Set<String>, Map<String, V>> factory, Set<String> externalKeys)
64 throws ExecutionException, InterruptedException;
65
66
67
68
69 protected abstract Map<String, Optional<V>> checkValuesRecorded(Iterable<String> internalKeys);
70
71
72
73
74 protected abstract Optional<V> directGet(String externalKey);
75
76
77
78
79 protected abstract Map<String, Optional<V>> directGetBulk(Set<String> externalKeys);
80
81 @Override
82 public CompletionStage<Optional<V>> get(String internalKey) {
83 final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
84
85 final String externalKey = cacheContext.externalEntryKeyFor(internalKey);
86
87 final CompletionStage<Optional<V>> value = cacheContext.computeValue(internalKey,
88 (key, oldValue) -> {
89
90 if (oldValue != null && !oldValue.toCompletableFuture().isCompletedExceptionally()) {
91 return oldValue;
92 }
93
94
95 if (cacheContext.hasRemoveAll()) {
96 return CompletableFuture.completedFuture(Optional.empty());
97 }
98
99
100
101 return CompletableFuture.<Optional<V>>completedFuture(Optional.empty())
102 .thenApply((ignore) -> {
103 metricsRecorder.record(name, CacheType.EXTERNAL, MetricLabel.NUMBER_OF_REMOTE_GET, 1);
104 return directGet(externalKey);
105 });
106 });
107
108
109
110 return CompletableFuture.completedFuture(VCacheUtils.fold(value, v -> v, e -> Optional.empty()));
111 }
112
113 @Override
114 public CompletionStage<V> get(String internalKey, Supplier<V> supplier) {
115 final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
116
117 return cacheContext.computeValue(internalKey,
118 (key, oldValue) -> {
119 if (oldValue != null && !oldValue.toCompletableFuture().isCompletedExceptionally()) {
120 return oldValue.thenCompose(value -> {
121 if (value.isPresent()) {
122 return CompletableFuture.completedFuture(value);
123 } else {
124 return getNewValue(internalKey, supplier);
125 }
126 });
127 } else {
128 getLogger().trace("Cache {}, creating candidate for key {}", name, internalKey);
129
130 final String externalKey = cacheContext.externalEntryKeyFor(internalKey);
131 metricsRecorder.record(name, CacheType.EXTERNAL, MetricLabel.NUMBER_OF_REMOTE_GET, 1);
132
133
134
135 if (!cacheContext.hasRemoveAll()) {
136 final Optional<V> result = directGet(externalKey);
137 if (result.isPresent()) {
138
139 return CompletableFuture.completedFuture(result);
140 }
141 }
142 getLogger().trace("Cache {}, creating candidate for key {}", name, internalKey);
143 return getNewValue(internalKey, supplier);
144 }
145 })
146
147 .thenCompose(value -> CompletableFuture.completedFuture(value.get()));
148 }
149
150 @Override
151 public CompletionStage<Map<String, Optional<V>>> getBulk(Iterable<String> internalKeys) {
152 return perform(() -> {
153 if (isEmpty(internalKeys)) {
154 return new HashMap<>();
155 }
156
157
158 final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
159 final Map<String, Optional<V>> grandResult = checkValuesRecorded(internalKeys);
160
161
162 final Set<String> missingExternalKeys = StreamSupport.stream(internalKeys.spliterator(), false)
163 .filter(k -> !grandResult.containsKey(k))
164 .map(cacheContext::externalEntryKeyFor)
165 .collect(Collectors.toSet());
166
167 if (missingExternalKeys.isEmpty()) {
168 getLogger().trace("Cache {}: getBulk(): have all the requested entries cached", name);
169 return grandResult;
170 }
171 getLogger().trace("Cache {}: getBulk(): not cached {} requested entries", name, missingExternalKeys.size());
172
173
174 metricsRecorder.record(name, CacheType.EXTERNAL, MetricLabel.NUMBER_OF_REMOTE_GET, 1);
175 final Map<String, Optional<V>> candidateValues = directGetBulk(missingExternalKeys);
176
177 return candidateValues.entrySet().stream().collect(
178 () -> grandResult,
179 (m, e) -> {
180 final Optional<V> result = e.getValue();
181 cacheContext.recordValue(cacheContext.internalEntryKeyFor(e.getKey()), result);
182 m.put(cacheContext.internalEntryKeyFor(e.getKey()), result);
183 },
184 Map::putAll
185 );
186 });
187 }
188
189 @Override
190 public CompletionStage<Map<String, V>> getBulk(
191 Function<Set<String>, Map<String, V>> factory, Iterable<String> internalKeys) {
192 return perform(() -> {
193 if (isEmpty(internalKeys)) {
194 return new HashMap<>();
195 }
196
197
198 final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
199 final Map<String, V> grandResult = checkValuesRecorded(internalKeys).entrySet().stream()
200 .filter(e -> e.getValue().isPresent())
201 .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().get()));
202
203
204 final Set<String> candidateMissingExternalKeys = StreamSupport.stream(internalKeys.spliterator(), false)
205 .filter(k -> !grandResult.containsKey(k))
206 .map(cacheContext::externalEntryKeyFor)
207 .collect(Collectors.toSet());
208
209
210 if (candidateMissingExternalKeys.isEmpty()) {
211 getLogger().trace("Cache {}: getBulk(Function): had all the requested entries cached", name);
212 return grandResult;
213 }
214 getLogger().trace("Cache {}: getBulk(Function): checking external cache for {} keys",
215 name, candidateMissingExternalKeys.size());
216
217 final Map<String, V> missingValues = handleCreation(factory, candidateMissingExternalKeys);
218 cacheContext.recordValues(missingValues);
219 grandResult.putAll(missingValues);
220
221 return grandResult;
222 });
223 }
224
225 private CompletionStage<Optional<V>> getNewValue(String internalKey, Supplier<V> supplier) {
226 try {
227 return CompletableFuture.completedFuture(Optional.of(handleCreation(internalKey, supplier)));
228 } catch (MarshallingException ex) {
229 return failed(new CompletableFuture<Optional<V>>(), new ExternalCacheException(MARSHALLER_FAILURE, ex));
230 } catch (ExecutionException | InterruptedException ex) {
231 return failed(new CompletableFuture<Optional<V>>(), new ExternalCacheException(UNCLASSIFIED_FAILURE, ex));
232 } catch (ExternalCacheException ece) {
233 return failed(new CompletableFuture<Optional<V>>(), ece);
234 } catch (Exception ex) {
235 return failed(new CompletableFuture<Optional<V>>(), mapException(ex));
236 }
237 }
238 }