1 package com.atlassian.vcache.internal.core.service;
2
3 import com.atlassian.vcache.PutPolicy;
4 import com.atlassian.vcache.StableReadExternalCache;
5 import com.atlassian.vcache.internal.ExternalCacheExceptionListener;
6 import com.atlassian.vcache.internal.MetricLabel;
7 import com.atlassian.vcache.internal.core.metrics.CacheType;
8 import com.atlassian.vcache.internal.core.metrics.MetricsRecorder;
9 import org.slf4j.Logger;
10 import org.slf4j.LoggerFactory;
11
12 import java.time.Duration;
13 import java.util.HashMap;
14 import java.util.HashSet;
15 import java.util.Map;
16 import java.util.Optional;
17 import java.util.Set;
18 import java.util.concurrent.CompletionStage;
19 import java.util.concurrent.ExecutionException;
20 import java.util.function.Function;
21 import java.util.function.Supplier;
22 import java.util.stream.Collectors;
23 import java.util.stream.StreamSupport;
24
25 import static com.atlassian.vcache.VCacheUtils.unsafeJoin;
26 import static com.atlassian.vcache.internal.core.VCacheCoreUtils.isEmpty;
27 import static java.util.Objects.requireNonNull;
28
29
30
31
32
33
34
35
36
37
38
39 public abstract class AbstractStableReadExternalCache<V>
40 extends AbstractExternalCache<V>
41 implements StableReadExternalCache<V> {
42
43 private static final Logger log = LoggerFactory.getLogger(AbstractStableReadExternalCache.class);
44 protected final MetricsRecorder metricsRecorder;
45
46 protected AbstractStableReadExternalCache(
47 String name,
48 MetricsRecorder metricsRecorder,
49 Duration lockTimeout,
50 ExternalCacheExceptionListener externalCacheExceptionListener) {
51 super(name, lockTimeout, externalCacheExceptionListener);
52 this.metricsRecorder = requireNonNull(metricsRecorder);
53 }
54
55 protected abstract boolean internalPut(String internalKey, V value, PutPolicy policy);
56
57 protected abstract void internalRemoveAll();
58
59 protected abstract void internalRemove(Iterable<String> keys);
60
61
62
63
64
65
66
67
68 protected abstract V handleCreation(String internalKey, V candidateValue)
69 throws ExecutionException, InterruptedException;
70
71
72
73
74 protected abstract Optional<V> directGet(String externalKey);
75
76
77
78
79 protected abstract Map<String, Optional<V>> directGetBulk(Set<String> externalKeys);
80
81 @Override
82 public final CompletionStage<Optional<V>> get(String internalKey) {
83 return perform(() -> {
84 final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
85
86 return cacheContext.getGlobalLock().withLock(() -> internalGetWithoutLock(internalKey, cacheContext));
87 });
88 }
89
90 @Override
91 public final CompletionStage<V> get(String internalKey, Supplier<V> supplier) {
92 return perform(() -> {
93 final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
94
95
96
97 final Optional<V> existingValue = cacheContext.getGlobalLock().withLock(() -> {
98 final Optional<V> value = internalGetWithoutLock(internalKey, cacheContext);
99
100 if (value.isPresent()) {
101 return value;
102 }
103
104
105 cacheContext.forgetValue(internalKey);
106 return Optional.empty();
107 });
108
109 return existingValue.orElseGet(() -> {
110
111 final V candidateValue = requireNonNull(supplier.get());
112
113
114 return cacheContext.getGlobalLock().withLock(() -> {
115
116 final Optional<Optional<V>> doubleCheck = cacheContext.getValueRecorded(internalKey);
117 if (doubleCheck.isPresent() && doubleCheck.get().isPresent()) {
118 return doubleCheck.get().get();
119 }
120
121
122 try {
123 final V finalValue = handleCreation(internalKey, candidateValue);
124 cacheContext.recordValue(internalKey, Optional.of(finalValue));
125 return finalValue;
126 } catch (final Exception e) {
127
128 cacheContext.recordValue(internalKey, Optional.of(candidateValue));
129 return candidateValue;
130 }
131 });
132 });
133 });
134 }
135
136 private Optional<V> internalGetWithoutLock(String internalKey, AbstractExternalCacheRequestContext<V> cacheContext) {
137
138
139
140
141 final Optional<Optional<V>> recordedValue = cacheContext.getValueRecorded(internalKey);
142
143 return recordedValue.orElseGet(() -> {
144
145 final String externalKey = cacheContext.externalEntryKeyFor(internalKey);
146 metricsRecorder.record(name, CacheType.EXTERNAL, MetricLabel.NUMBER_OF_REMOTE_GET, 1);
147 final Optional<V> externalValue = directGet(externalKey);
148 cacheContext.recordValue(internalKey, externalValue);
149
150 return externalValue;
151 });
152 }
153
154
155 @Override
156 public final CompletionStage<Map<String, Optional<V>>> getBulk(Iterable<String> internalKeys) {
157 return perform(() -> {
158 if (isEmpty(internalKeys)) {
159 return new HashMap<>();
160 }
161
162 final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
163 return cacheContext.getGlobalLock().withLock(() -> {
164
165 final Map<String, Optional<V>> grandResult = checkValuesRecorded(internalKeys);
166
167
168 final Set<String> missingExternalKeys = StreamSupport.stream(internalKeys.spliterator(), false)
169 .filter(k -> !grandResult.containsKey(k))
170 .map(cacheContext::externalEntryKeyFor)
171 .collect(Collectors.toSet());
172
173 if (missingExternalKeys.isEmpty()) {
174 getLogger().trace("Cache {}: getBulk(): have all the requested entries cached", name);
175 return grandResult;
176 }
177 getLogger().trace("Cache {}: getBulk(): not cached {} requested entries", name, missingExternalKeys.size());
178
179
180 metricsRecorder.record(name, CacheType.EXTERNAL, MetricLabel.NUMBER_OF_REMOTE_GET, 1);
181 final Map<String, Optional<V>> candidateValues = directGetBulk(missingExternalKeys);
182
183 return candidateValues.entrySet().stream().collect(
184 () -> grandResult,
185 (m, e) -> {
186 final Optional<V> result = e.getValue();
187 cacheContext.recordValue(cacheContext.internalEntryKeyFor(e.getKey()), result);
188 m.put(cacheContext.internalEntryKeyFor(e.getKey()), result);
189 },
190 Map::putAll
191 );
192 });
193 });
194 }
195
196 @Override
197 public final CompletionStage<Map<String, V>> getBulk(
198 Function<Set<String>, Map<String, V>> factory, Iterable<String> internalKeys) {
199 return perform(() -> {
200 if (isEmpty(internalKeys)) {
201 return new HashMap<>();
202 }
203
204 final Map<String, V> grandResult = new HashMap<>();
205 final Set<String> missingInternalKeys = new HashSet<>();
206 final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
207
208
209 cacheContext.getGlobalLock().withLock(() -> {
210 final Map<String, Optional<V>> knownState = unsafeJoin(getBulk(internalKeys));
211 knownState.forEach((key, value) -> {
212 if (value.isPresent()) {
213 grandResult.put(key, value.get());
214 } else {
215 missingInternalKeys.add(key);
216 cacheContext.forgetValue(key);
217 }
218 });
219 });
220
221
222 if (missingInternalKeys.isEmpty()) {
223 return grandResult;
224 }
225
226
227 final Map<String, V> candidateValues = factory.apply(missingInternalKeys);
228 FactoryUtils.verifyFactoryResult(candidateValues, missingInternalKeys);
229
230
231 cacheContext.getGlobalLock().withLock(() ->
232 candidateValues.entrySet().forEach(entry -> {
233 metricsRecorder.record(name, CacheType.EXTERNAL, MetricLabel.NUMBER_OF_REMOTE_GET, 1);
234 V finalValue;
235 try {
236 final boolean added = unsafeJoin(put(entry.getKey(), entry.getValue(), PutPolicy.ADD_ONLY));
237
238 if (added) {
239 finalValue = entry.getValue();
240 } else {
241 log.trace("Was unable to store the candidate value, so needing to retrieve what's there now");
242 finalValue = unsafeJoin(get(entry.getKey(), entry::getValue));
243 }
244 } catch (final Exception ignore) {
245
246 finalValue = entry.getValue();
247 }
248 grandResult.put(entry.getKey(), finalValue);
249 cacheContext.recordValue(entry.getKey(), Optional.of(finalValue));
250 }));
251
252 return grandResult;
253 });
254 }
255
256 @Override
257 public final CompletionStage<Boolean> put(final String internalKey, final V value, final PutPolicy policy) {
258 return perform(() -> {
259 final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
260
261 final boolean successful = cacheContext.getGlobalLock().withLock(() -> internalPut(internalKey, value, policy));
262 if (successful) {
263 cacheContext.recordValue(internalKey, Optional.of(value));
264 } else {
265 cacheContext.forgetValue(internalKey);
266 }
267 return successful;
268 });
269 }
270
271 @Override
272 public final CompletionStage<Void> remove(final Iterable<String> keys) {
273 return perform(() -> {
274 ensureCacheContext().getGlobalLock().withLock(() -> internalRemove(keys));
275 return null;
276 });
277 }
278
279 @Override
280 public final CompletionStage<Void> removeAll() {
281 return perform(() -> {
282 final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
283 cacheContext.getGlobalLock().withLock(() -> {
284 internalRemoveAll();
285 cacheContext.forgetAllValues();
286 });
287 return null;
288 });
289 }
290
291 private Map<String, Optional<V>> checkValuesRecorded(Iterable<String> internalKeys) {
292 final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
293
294 return StreamSupport.stream(internalKeys.spliterator(), false)
295 .filter(k -> cacheContext.getValueRecorded(k).isPresent())
296 .distinct()
297 .collect(Collectors.toMap(
298 k -> k,
299 k -> cacheContext.getValueRecorded(k).get())
300 );
301 }
302 }