1 package com.atlassian.vcache.internal.core.service;
2
3 import com.atlassian.vcache.PutPolicy;
4 import com.atlassian.vcache.TransactionalExternalCache;
5 import com.atlassian.vcache.internal.ExternalCacheExceptionListener;
6 import com.atlassian.vcache.internal.MetricLabel;
7 import com.atlassian.vcache.internal.RequestContext;
8 import com.atlassian.vcache.internal.core.TransactionControl;
9 import com.atlassian.vcache.internal.core.metrics.CacheType;
10 import com.atlassian.vcache.internal.core.metrics.MetricsRecorder;
11
12 import java.time.Duration;
13 import java.util.Collections;
14 import java.util.HashMap;
15 import java.util.Map;
16 import java.util.Optional;
17 import java.util.Set;
18 import java.util.concurrent.CompletionStage;
19 import java.util.concurrent.ExecutionException;
20 import java.util.function.Function;
21 import java.util.function.Supplier;
22 import java.util.stream.Collectors;
23 import java.util.stream.StreamSupport;
24
25 import static com.atlassian.vcache.internal.core.VCacheCoreUtils.isEmpty;
26 import static java.util.Objects.requireNonNull;
27
28
29
30
31
32
33
34 public abstract class AbstractTransactionalExternalCache<V>
35 extends AbstractExternalCache<V>
36 implements TransactionalExternalCache<V>, TransactionControl {
37
38 protected final Supplier<RequestContext> contextSupplier;
39 protected final MetricsRecorder metricsRecorder;
40
41 protected AbstractTransactionalExternalCache(
42 String name,
43 Supplier<RequestContext> contextSupplier,
44 MetricsRecorder metricsRecorder,
45 Duration lockTimeout,
46 ExternalCacheExceptionListener externalCacheExceptionListener) {
47 super(name, lockTimeout, externalCacheExceptionListener);
48 this.contextSupplier = requireNonNull(contextSupplier);
49 this.metricsRecorder = requireNonNull(metricsRecorder);
50 }
51
52
53
54
55 protected abstract Optional<V> directGet(String externalKey);
56
57
58
59
60 protected abstract Map<String, Optional<V>> directGetBulk(Set<String> externalKeys);
61
62 @Override
63 public final CompletionStage<Optional<V>> get(String internalKey) {
64 return perform(() -> {
65 final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
66
67
68 final Optional<Optional<V>> recordedValue = cacheContext.getValueRecorded(internalKey);
69
70 return recordedValue.orElseGet(() -> {
71
72 if (cacheContext.hasRemoveAll()) {
73 return Optional.empty();
74 }
75
76
77 final String externalKey = cacheContext.externalEntryKeyFor(internalKey);
78 metricsRecorder.record(name, CacheType.EXTERNAL, MetricLabel.NUMBER_OF_REMOTE_GET, 1);
79 final Optional<V> externalValue = directGet(externalKey);
80 cacheContext.recordValue(internalKey, externalValue);
81
82 return externalValue;
83 });
84 });
85 }
86
87 @Override
88 public final CompletionStage<V> get(String internalKey, Supplier<V> supplier) {
89 return perform(() -> {
90 final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
91
92 final String externalKey = cacheContext.externalEntryKeyFor(internalKey);
93
94
95 final Optional<Optional<V>> recordedValue = cacheContext.getValueRecorded(internalKey);
96 if (recordedValue.isPresent()) {
97 if (recordedValue.get().isPresent()) {
98 return recordedValue.get().get();
99 }
100
101 getLogger().trace("Cache {}, creating candidate for key {}", name, internalKey);
102 return handleCreation(internalKey, supplier);
103 }
104
105
106 if (!cacheContext.hasRemoveAll()) {
107 metricsRecorder.record(name, CacheType.EXTERNAL, MetricLabel.NUMBER_OF_REMOTE_GET, 1);
108 final Optional<V> result = directGet(externalKey);
109 if (result.isPresent()) {
110
111 cacheContext.recordValue(internalKey, result);
112 return result.get();
113 }
114 }
115 getLogger().trace("Cache {}, creating candidate for key {}", name, internalKey);
116 return handleCreation(internalKey, supplier);
117 });
118 }
119
120 @Override
121 public final CompletionStage<Map<String, Optional<V>>> getBulk(Iterable<String> internalKeys) {
122 return perform(() -> {
123 if (isEmpty(internalKeys)) {
124 return new HashMap<>();
125 }
126
127
128 final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
129 final Map<String, Optional<V>> grandResult = checkValuesRecorded(internalKeys);
130
131
132 final Set<String> missingExternalKeys = StreamSupport.stream(internalKeys.spliterator(), false)
133 .filter(k -> !grandResult.containsKey(k))
134 .map(cacheContext::externalEntryKeyFor)
135 .collect(Collectors.toSet());
136
137 if (missingExternalKeys.isEmpty()) {
138 getLogger().trace("Cache {}: getBulk(): have all the requested entries cached", name);
139 return grandResult;
140 }
141 getLogger().trace("Cache {}: getBulk(): not cached {} requested entries", name, missingExternalKeys.size());
142
143
144 metricsRecorder.record(name, CacheType.EXTERNAL, MetricLabel.NUMBER_OF_REMOTE_GET, 1);
145 final Map<String, Optional<V>> candidateValues = directGetBulk(missingExternalKeys);
146
147 return candidateValues.entrySet().stream().collect(
148 () -> grandResult,
149 (m, e) -> {
150 final Optional<V> result = e.getValue();
151 cacheContext.recordValue(cacheContext.internalEntryKeyFor(e.getKey()), result);
152 m.put(cacheContext.internalEntryKeyFor(e.getKey()), result);
153 },
154 Map::putAll
155 );
156 });
157 }
158
159 @Override
160 public final CompletionStage<Map<String, V>> getBulk(
161 Function<Set<String>, Map<String, V>> factory, Iterable<String> internalKeys) {
162 return perform(() -> {
163 if (isEmpty(internalKeys)) {
164 return new HashMap<>();
165 }
166
167
168 final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
169
170 final Map<String, V> grandResult = checkValuesRecorded(internalKeys).entrySet().stream()
171 .filter(e -> e.getValue().isPresent())
172 .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().get()));
173
174
175 final Set<String> candidateMissingExternalKeys = StreamSupport.stream(internalKeys.spliterator(), false)
176 .filter(k -> !grandResult.containsKey(k))
177 .map(cacheContext::externalEntryKeyFor)
178 .collect(Collectors.toSet());
179
180
181 if (candidateMissingExternalKeys.isEmpty()) {
182 getLogger().trace("Cache {}: getBulk(Function): had all the requested entries cached", name);
183 return grandResult;
184 }
185 getLogger().trace("Cache {}: getBulk(Function): checking external cache for {} keys",
186 name, candidateMissingExternalKeys.size());
187
188 final Map<String, V> missingValues = handleCreation(factory, candidateMissingExternalKeys);
189 cacheContext.recordValues(missingValues);
190 grandResult.putAll(missingValues);
191
192 return grandResult;
193 });
194 }
195
196 @Override
197 public final void put(String internalKey, V value, PutPolicy policy) {
198 final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
199 cacheContext.recordPut(internalKey, value, policy);
200 }
201
202 @Override
203 public final void remove(Iterable<String> internalKeys) {
204 final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
205 cacheContext.recordRemove(internalKeys);
206 }
207
208 @Override
209 public final void removeAll() {
210 final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
211 cacheContext.recordRemoveAll();
212 }
213
214 @Override
215 public final boolean transactionDiscard() {
216 final RequestContext requestContext = contextSupplier.get();
217 final Optional<AbstractExternalCacheRequestContext<V>> cacheRequestContext = requestContext.get(this);
218
219 if (!cacheRequestContext.isPresent()) {
220
221 return false;
222 }
223
224 final boolean hasPendingOperations = cacheRequestContext.get().hasPendingOperations();
225 cacheRequestContext.get().forgetAll();
226 return hasPendingOperations;
227 }
228
229 private Map<String, Optional<V>> checkValuesRecorded(Iterable<String> internalKeys) {
230 final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
231
232 final Map<String, Optional<V>> result = new HashMap<>();
233
234 internalKeys.forEach(k -> {
235 final Optional<Optional<V>> valueRecorded = cacheContext.getValueRecorded(k);
236 if (valueRecorded.isPresent()) {
237 result.put(k, valueRecorded.get());
238 } else if (cacheContext.hasRemoveAll()) {
239 result.put(k, Optional.empty());
240 }
241 });
242
243 return result;
244 }
245
246 private V handleCreation(String internalKey, Supplier<V> supplier) throws ExecutionException, InterruptedException {
247 final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
248 final V suppliedValue = requireNonNull(supplier.get());
249 cacheContext.recordPutPolicy(internalKey, suppliedValue, PutPolicy.ADD_ONLY);
250 cacheContext.recordValue(internalKey, Optional.of(suppliedValue));
251 return suppliedValue;
252 }
253
254 private Map<String, V> handleCreation(Function<Set<String>, Map<String, V>> factory, Set<String> externalKeys)
255 throws ExecutionException, InterruptedException {
256
257 final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
258
259
260
261 final Map<String, V> grandResult = new HashMap<>();
262 final Set<String> missingExternalKeys = fillInKnownValuesFromBackingCache(cacheContext, externalKeys, grandResult);
263
264 if (!missingExternalKeys.isEmpty()) {
265 getLogger().trace("Cache {}: getBulk(Function): calling factory to create {} values",
266 name, missingExternalKeys.size());
267
268 final Set<String> missingInternalKeys = Collections.unmodifiableSet(
269 missingExternalKeys.stream().map(cacheContext::internalEntryKeyFor).collect(Collectors.toSet()));
270 final Map<String, V> missingValues = factory.apply(missingInternalKeys);
271 FactoryUtils.verifyFactoryResult(missingValues, missingInternalKeys);
272
273
274 missingValues.entrySet().forEach(e -> put(e.getKey(), e.getValue(), PutPolicy.ADD_ONLY));
275
276 grandResult.putAll(missingValues);
277 }
278
279 return grandResult;
280 }
281
282 private Set<String> fillInKnownValuesFromBackingCache(
283 AbstractExternalCacheRequestContext<V> cacheContext, Set<String> externalKeys, Map<String, V> grandResult) {
284 final Set<String> missingExternalKeys;
285
286 if (cacheContext.hasRemoveAll()) {
287 missingExternalKeys = externalKeys;
288 } else {
289
290 missingExternalKeys = externalKeys.stream()
291 .filter(k -> {
292 final Optional<Optional<V>> valueRecorded =
293 cacheContext.getValueRecorded(cacheContext.internalEntryKeyFor(k));
294
295 return valueRecorded.isPresent();
296 })
297 .collect(Collectors.toSet());
298
299
300 final Set<String> externalKeysNotRemoved = externalKeys.stream()
301 .filter(k -> !missingExternalKeys.contains(k))
302 .collect(Collectors.toSet());
303
304 if (!externalKeysNotRemoved.isEmpty()) {
305 metricsRecorder.record(name, CacheType.EXTERNAL, MetricLabel.NUMBER_OF_REMOTE_GET, 1);
306 final Map<String, Optional<V>> candidateValues = directGetBulk(externalKeysNotRemoved);
307
308 candidateValues.entrySet().forEach(e -> {
309 if (e.getValue().isPresent()) {
310 grandResult.put(
311 cacheContext.internalEntryKeyFor(e.getKey()),
312 e.getValue().get());
313 } else {
314 missingExternalKeys.add(e.getKey());
315 }
316 });
317 }
318 }
319
320 return missingExternalKeys;
321 }
322 }