1 package com.atlassian.vcache.internal.core.service;
2
3 import com.atlassian.vcache.PutPolicy;
4 import com.atlassian.vcache.TransactionalExternalCache;
5 import com.atlassian.vcache.internal.MetricLabel;
6 import com.atlassian.vcache.internal.RequestContext;
7 import com.atlassian.vcache.internal.core.TransactionControl;
8 import com.atlassian.vcache.internal.core.metrics.CacheType;
9 import com.atlassian.vcache.internal.core.metrics.MetricsRecorder;
10
11 import java.time.Duration;
12 import java.util.Collections;
13 import java.util.HashMap;
14 import java.util.Map;
15 import java.util.Optional;
16 import java.util.Set;
17 import java.util.concurrent.CompletionStage;
18 import java.util.concurrent.ExecutionException;
19 import java.util.function.Function;
20 import java.util.function.Supplier;
21 import java.util.stream.Collectors;
22 import java.util.stream.StreamSupport;
23
24 import static com.atlassian.vcache.internal.core.VCacheCoreUtils.isEmpty;
25 import static java.util.Objects.requireNonNull;
26
27
28
29
30
31
32
33 public abstract class AbstractTransactionalExternalCache<V>
34 extends AbstractExternalCache<V>
35 implements TransactionalExternalCache<V>, TransactionControl {
36
37 protected final Supplier<RequestContext> contextSupplier;
38 protected final MetricsRecorder metricsRecorder;
39
40 protected AbstractTransactionalExternalCache(
41 String name,
42 Supplier<RequestContext> contextSupplier,
43 MetricsRecorder metricsRecorder,
44 Duration lockTimeout) {
45 super(name, lockTimeout);
46 this.contextSupplier = requireNonNull(contextSupplier);
47 this.metricsRecorder = requireNonNull(metricsRecorder);
48 }
49
50
51
52
53 protected abstract Optional<V> directGet(String externalKey);
54
55
56
57
58 protected abstract Map<String, Optional<V>> directGetBulk(Set<String> externalKeys);
59
60 @Override
61 public final CompletionStage<Optional<V>> get(String internalKey) {
62 final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
63 return perform(() -> {
64
65 final Optional<Optional<V>> recordedValue = cacheContext.getValueRecorded(internalKey);
66
67 return recordedValue.orElseGet(() -> {
68
69 if (cacheContext.hasRemoveAll()) {
70 return Optional.empty();
71 }
72
73
74 final String externalKey = cacheContext.externalEntryKeyFor(internalKey);
75 metricsRecorder.record(name, CacheType.EXTERNAL, MetricLabel.NUMBER_OF_REMOTE_GET, 1);
76 final Optional<V> externalValue = directGet(externalKey);
77 cacheContext.recordValue(internalKey, externalValue);
78
79 return externalValue;
80 });
81 });
82 }
83
84 @Override
85 public final CompletionStage<V> get(String internalKey, Supplier<V> supplier) {
86 final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
87
88 return perform(() -> {
89 final String externalKey = cacheContext.externalEntryKeyFor(internalKey);
90 metricsRecorder.record(name, CacheType.EXTERNAL, MetricLabel.NUMBER_OF_REMOTE_GET, 1);
91
92
93 final Optional<Optional<V>> recordedValue = cacheContext.getValueRecorded(internalKey);
94 if (recordedValue.isPresent()) {
95 if (recordedValue.get().isPresent()) {
96 return recordedValue.get().get();
97 }
98
99 getLogger().trace("Cache {}, creating candidate for key {}", name, internalKey);
100 return handleCreation(internalKey, supplier);
101 }
102
103
104 if (!cacheContext.hasRemoveAll()) {
105 final Optional<V> result = directGet(externalKey);
106 if (result.isPresent()) {
107
108 return result.get();
109 }
110 }
111 getLogger().trace("Cache {}, creating candidate for key {}", name, internalKey);
112 return handleCreation(internalKey, supplier);
113 });
114 }
115
116 @Override
117 public final CompletionStage<Map<String, Optional<V>>> getBulk(Iterable<String> internalKeys) {
118 return perform(() -> {
119 if (isEmpty(internalKeys)) {
120 return new HashMap<>();
121 }
122
123
124 final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
125 final Map<String, Optional<V>> grandResult = checkValuesRecorded(internalKeys);
126
127
128 final Set<String> missingExternalKeys = StreamSupport.stream(internalKeys.spliterator(), false)
129 .filter(k -> !grandResult.containsKey(k))
130 .map(cacheContext::externalEntryKeyFor)
131 .collect(Collectors.toSet());
132
133 if (missingExternalKeys.isEmpty()) {
134 getLogger().trace("Cache {}: getBulk(): have all the requested entries cached", name);
135 return grandResult;
136 }
137 getLogger().trace("Cache {}: getBulk(): not cached {} requested entries", name, missingExternalKeys.size());
138
139
140 metricsRecorder.record(name, CacheType.EXTERNAL, MetricLabel.NUMBER_OF_REMOTE_GET, 1);
141 final Map<String, Optional<V>> candidateValues = directGetBulk(missingExternalKeys);
142
143 return candidateValues.entrySet().stream().collect(
144 () -> grandResult,
145 (m, e) -> {
146 final Optional<V> result = e.getValue();
147 cacheContext.recordValue(cacheContext.internalEntryKeyFor(e.getKey()), result);
148 m.put(cacheContext.internalEntryKeyFor(e.getKey()), result);
149 },
150 Map::putAll
151 );
152 });
153 }
154
155 @Override
156 public final CompletionStage<Map<String, V>> getBulk(
157 Function<Set<String>, Map<String, V>> factory, Iterable<String> internalKeys) {
158 return perform(() -> {
159 if (isEmpty(internalKeys)) {
160 return new HashMap<>();
161 }
162
163
164 final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
165
166 final Map<String, V> grandResult = checkValuesRecorded(internalKeys).entrySet().stream()
167 .filter(e -> e.getValue().isPresent())
168 .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().get()));
169
170
171 final Set<String> candidateMissingExternalKeys = StreamSupport.stream(internalKeys.spliterator(), false)
172 .filter(k -> !grandResult.containsKey(k))
173 .map(cacheContext::externalEntryKeyFor)
174 .collect(Collectors.toSet());
175
176
177 if (candidateMissingExternalKeys.isEmpty()) {
178 getLogger().trace("Cache {}: getBulk(Function): had all the requested entries cached", name);
179 return grandResult;
180 }
181 getLogger().trace("Cache {}: getBulk(Function): checking external cache for {} keys",
182 name, candidateMissingExternalKeys.size());
183
184 final Map<String, V> missingValues = handleCreation(factory, candidateMissingExternalKeys);
185 cacheContext.recordValues(missingValues);
186 grandResult.putAll(missingValues);
187
188 return grandResult;
189 });
190 }
191
192 @Override
193 public final void put(String internalKey, V value, PutPolicy policy) {
194 final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
195 cacheContext.recordPut(internalKey, value, policy);
196 }
197
198 @Override
199 public final void remove(Iterable<String> internalKeys) {
200 final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
201 cacheContext.recordRemove(internalKeys);
202 }
203
204 @Override
205 public final void removeAll() {
206 final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
207 cacheContext.recordRemoveAll();
208 }
209
210 @Override
211 public final boolean transactionDiscard() {
212 final RequestContext requestContext = contextSupplier.get();
213 final Optional<AbstractExternalCacheRequestContext<V>> cacheRequestContext = requestContext.get(this);
214
215 if (!cacheRequestContext.isPresent()) {
216
217 return false;
218 }
219
220 final boolean hasPendingOperations = cacheRequestContext.get().hasPendingOperations();
221 cacheRequestContext.get().forgetAll();
222 return hasPendingOperations;
223 }
224
225 private Map<String, Optional<V>> checkValuesRecorded(Iterable<String> internalKeys) {
226 final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
227
228 final Map<String, Optional<V>> result = new HashMap<>();
229
230 internalKeys.forEach(k -> {
231 final Optional<Optional<V>> valueRecorded = cacheContext.getValueRecorded(k);
232 if (valueRecorded.isPresent()) {
233 result.put(k, valueRecorded.get());
234 } else if (cacheContext.hasRemoveAll()) {
235 result.put(k, Optional.empty());
236 }
237 });
238
239 return result;
240 }
241
242 private V handleCreation(String internalKey, Supplier<V> supplier) throws ExecutionException, InterruptedException {
243 final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
244 final V suppliedValue = requireNonNull(supplier.get());
245 cacheContext.recordPutPolicy(internalKey, suppliedValue, PutPolicy.ADD_ONLY);
246 cacheContext.recordValue(internalKey, Optional.of(suppliedValue));
247 return suppliedValue;
248 }
249
250 private Map<String, V> handleCreation(Function<Set<String>, Map<String, V>> factory, Set<String> externalKeys)
251 throws ExecutionException, InterruptedException {
252
253 final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
254
255
256
257 final Map<String, V> grandResult = new HashMap<>();
258 final Set<String> missingExternalKeys = fillInKnownValuesFromBackingCache(cacheContext, externalKeys, grandResult);
259
260 if (!missingExternalKeys.isEmpty()) {
261 getLogger().trace("Cache {}: getBulk(Function): calling factory to create {} values",
262 name, missingExternalKeys.size());
263
264 final Set<String> missingInternalKeys = Collections.unmodifiableSet(
265 missingExternalKeys.stream().map(cacheContext::internalEntryKeyFor).collect(Collectors.toSet()));
266 final Map<String, V> missingValues = factory.apply(missingInternalKeys);
267 FactoryUtils.verifyFactoryResult(missingValues, missingInternalKeys);
268
269
270 missingValues.entrySet().forEach(e -> put(e.getKey(), e.getValue(), PutPolicy.ADD_ONLY));
271
272 grandResult.putAll(missingValues);
273 }
274
275 return grandResult;
276 }
277
278 private Set<String> fillInKnownValuesFromBackingCache(
279 AbstractExternalCacheRequestContext<V> cacheContext, Set<String> externalKeys, Map<String, V> grandResult) {
280 final Set<String> missingExternalKeys;
281
282 if (cacheContext.hasRemoveAll()) {
283 missingExternalKeys = externalKeys;
284 } else {
285
286 missingExternalKeys = externalKeys.stream()
287 .filter(k -> {
288 final Optional<Optional<V>> valueRecorded =
289 cacheContext.getValueRecorded(cacheContext.internalEntryKeyFor(k));
290
291 return valueRecorded.isPresent();
292 })
293 .collect(Collectors.toSet());
294
295
296 final Set<String> externalKeysNotRemoved = externalKeys.stream()
297 .filter(k -> !missingExternalKeys.contains(k))
298 .collect(Collectors.toSet());
299
300 if (!externalKeysNotRemoved.isEmpty()) {
301 metricsRecorder.record(name, CacheType.EXTERNAL, MetricLabel.NUMBER_OF_REMOTE_GET, 1);
302 final Map<String, Optional<V>> candidateValues = directGetBulk(externalKeysNotRemoved);
303
304 candidateValues.entrySet().forEach(e -> {
305 if (e.getValue().isPresent()) {
306 grandResult.put(
307 cacheContext.internalEntryKeyFor(e.getKey()),
308 e.getValue().get());
309 } else {
310 missingExternalKeys.add(e.getKey());
311 }
312 });
313 }
314 }
315
316 return missingExternalKeys;
317 }
318 }