1 package com.atlassian.vcache.internal.core.service;
2
3 import com.atlassian.vcache.PutPolicy;
4 import com.atlassian.vcache.StableReadExternalCache;
5 import com.atlassian.vcache.VCacheException;
6 import com.atlassian.vcache.internal.MetricLabel;
7 import com.atlassian.vcache.internal.core.metrics.CacheType;
8 import com.atlassian.vcache.internal.core.metrics.MetricsRecorder;
9 import org.slf4j.Logger;
10 import org.slf4j.LoggerFactory;
11
12 import java.time.Duration;
13 import java.util.HashMap;
14 import java.util.HashSet;
15 import java.util.Map;
16 import java.util.Optional;
17 import java.util.Set;
18 import java.util.concurrent.CompletionStage;
19 import java.util.concurrent.ExecutionException;
20 import java.util.function.Function;
21 import java.util.function.Supplier;
22 import java.util.stream.Collectors;
23 import java.util.stream.StreamSupport;
24
25 import static com.atlassian.vcache.VCacheUtils.unsafeJoin;
26 import static com.atlassian.vcache.internal.core.VCacheCoreUtils.isEmpty;
27 import static java.util.Objects.requireNonNull;
28
29
30
31
32
33
34
35
36
37
38
39 public abstract class AbstractStableReadExternalCache<V>
40 extends AbstractExternalCache<V>
41 implements StableReadExternalCache<V> {
42
43 private static final Logger log = LoggerFactory.getLogger(AbstractStableReadExternalCache.class);
44 protected final MetricsRecorder metricsRecorder;
45
46 protected AbstractStableReadExternalCache(String name, MetricsRecorder metricsRecorder, Duration lockTimeout) {
47 super(name, lockTimeout);
48 this.metricsRecorder = requireNonNull(metricsRecorder);
49 }
50
51 protected abstract boolean internalPut(String internalKey, V value, PutPolicy policy);
52
53 protected abstract void internalRemoveAll();
54
55 protected abstract void internalRemove(Iterable<String> keys);
56
57
58
59
60
61
62
63
64 protected abstract V handleCreation(String internalKey, V candidateValue)
65 throws ExecutionException, InterruptedException;
66
67
68
69
70 protected abstract Optional<V> directGet(String externalKey);
71
72
73
74
75 protected abstract Map<String, Optional<V>> directGetBulk(Set<String> externalKeys);
76
77 @Override
78 public final CompletionStage<Optional<V>> get(String internalKey) {
79 final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
80 return perform(() ->
81 cacheContext.getGlobalLock().withLock(() -> {
82
83 final Optional<Optional<V>> recordedValue = cacheContext.getValueRecorded(internalKey);
84
85 return recordedValue.orElseGet(() -> {
86
87 final String externalKey = cacheContext.externalEntryKeyFor(internalKey);
88 metricsRecorder.record(name, CacheType.EXTERNAL, MetricLabel.NUMBER_OF_REMOTE_GET, 1);
89 final Optional<V> externalValue = directGet(externalKey);
90 cacheContext.recordValue(internalKey, externalValue);
91
92 return externalValue;
93 });
94 }));
95 }
96
97 @Override
98 public final CompletionStage<V> get(String internalKey, Supplier<V> supplier) {
99 final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
100 return perform(() -> {
101
102
103 final Optional<V> existingValue = cacheContext.getGlobalLock().withLock(() -> {
104 final Optional<V> value = unsafeJoin(get(internalKey));
105 if (value.isPresent()) {
106 return value;
107 }
108
109
110 cacheContext.forgetValue(internalKey);
111 return Optional.empty();
112 });
113
114 return existingValue.orElseGet(() -> {
115
116 final V candidateValue = requireNonNull(supplier.get());
117
118
119 return cacheContext.getGlobalLock().withLock(() -> {
120
121 final Optional<Optional<V>> doubleCheck = cacheContext.getValueRecorded(internalKey);
122 if (doubleCheck.isPresent() && doubleCheck.get().isPresent()) {
123
124 return doubleCheck.get().get();
125 }
126
127
128 try {
129 final V finalValue = handleCreation(internalKey, candidateValue);
130 cacheContext.recordValue(internalKey, Optional.of(finalValue));
131 return finalValue;
132 } catch (ExecutionException | InterruptedException e) {
133 throw new VCacheException("Update failure", e);
134 }
135 });
136 });
137 });
138 }
139
140 @Override
141 public final CompletionStage<Map<String, Optional<V>>> getBulk(Iterable<String> internalKeys) {
142 return perform(() -> {
143 if (isEmpty(internalKeys)) {
144 return new HashMap<>();
145 }
146
147 final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
148 return cacheContext.getGlobalLock().withLock(() -> {
149
150 final Map<String, Optional<V>> grandResult = checkValuesRecorded(internalKeys);
151
152
153 final Set<String> missingExternalKeys = StreamSupport.stream(internalKeys.spliterator(), false)
154 .filter(k -> !grandResult.containsKey(k))
155 .map(cacheContext::externalEntryKeyFor)
156 .collect(Collectors.toSet());
157
158 if (missingExternalKeys.isEmpty()) {
159 getLogger().trace("Cache {}: getBulk(): have all the requested entries cached", name);
160 return grandResult;
161 }
162 getLogger().trace("Cache {}: getBulk(): not cached {} requested entries", name, missingExternalKeys.size());
163
164
165 metricsRecorder.record(name, CacheType.EXTERNAL, MetricLabel.NUMBER_OF_REMOTE_GET, 1);
166 final Map<String, Optional<V>> candidateValues = directGetBulk(missingExternalKeys);
167
168 return candidateValues.entrySet().stream().collect(
169 () -> grandResult,
170 (m, e) -> {
171 final Optional<V> result = e.getValue();
172 cacheContext.recordValue(cacheContext.internalEntryKeyFor(e.getKey()), result);
173 m.put(cacheContext.internalEntryKeyFor(e.getKey()), result);
174 },
175 Map::putAll
176 );
177 });
178 });
179 }
180
181 @Override
182 public final CompletionStage<Map<String, V>> getBulk(
183 Function<Set<String>, Map<String, V>> factory, Iterable<String> internalKeys) {
184 return perform(() -> {
185 if (isEmpty(internalKeys)) {
186 return new HashMap<>();
187 }
188
189 final Map<String, V> grandResult = new HashMap<>();
190 final Set<String> missingInternalKeys = new HashSet<>();
191 final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
192
193
194 cacheContext.getGlobalLock().withLock(() -> {
195 final Map<String, Optional<V>> knownState = unsafeJoin(getBulk(internalKeys));
196 knownState.entrySet().forEach(entry -> {
197 if (entry.getValue().isPresent()) {
198
199 grandResult.put(entry.getKey(), entry.getValue().get());
200 } else {
201 missingInternalKeys.add(entry.getKey());
202 cacheContext.forgetValue(entry.getKey());
203 }
204 });
205 });
206
207
208 if (missingInternalKeys.isEmpty()) {
209 return grandResult;
210 }
211
212
213 final Map<String, V> candidateValues = factory.apply(missingInternalKeys);
214 FactoryUtils.verifyFactoryResult(candidateValues, missingInternalKeys);
215
216
217 cacheContext.getGlobalLock().withLock(() ->
218 candidateValues.entrySet().forEach(entry -> {
219 metricsRecorder.record(name, CacheType.EXTERNAL, MetricLabel.NUMBER_OF_REMOTE_GET, 1);
220 final boolean added = unsafeJoin(put(entry.getKey(), entry.getValue(), PutPolicy.ADD_ONLY));
221 final V finalValue;
222 if (added) {
223 finalValue = entry.getValue();
224 } else {
225 log.trace("Was unable to store the candidate value, so needing to retrieve what's there now");
226 finalValue = unsafeJoin(get(entry.getKey(), entry::getValue));
227 }
228
229 grandResult.put(entry.getKey(), finalValue);
230 cacheContext.recordValue(entry.getKey(), Optional.of(finalValue));
231 }));
232
233 return grandResult;
234 });
235 }
236
237 @Override
238 public final CompletionStage<Boolean> put(final String internalKey, final V value, final PutPolicy policy) {
239 return perform(() -> {
240 final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
241
242 final boolean successful = cacheContext.getGlobalLock().withLock(() -> internalPut(internalKey, value, policy));
243 if (successful) {
244 cacheContext.recordValue(internalKey, Optional.of(value));
245 } else {
246 cacheContext.forgetValue(internalKey);
247 }
248 return successful;
249 });
250 }
251
252 @Override
253 public final CompletionStage<Void> remove(final Iterable<String> keys) {
254 return perform(() -> {
255 ensureCacheContext().getGlobalLock().withLock(() -> internalRemove(keys));
256 return null;
257 });
258 }
259
260 @Override
261 public final CompletionStage<Void> removeAll() {
262 return perform(() -> {
263 final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
264 cacheContext.getGlobalLock().withLock(() -> {
265 internalRemoveAll();
266 cacheContext.forgetAllValues();
267 });
268 return null;
269 });
270 }
271
272 private Map<String, Optional<V>> checkValuesRecorded(Iterable<String> internalKeys) {
273 final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
274
275 return StreamSupport.stream(internalKeys.spliterator(), false)
276 .filter(k -> cacheContext.getValueRecorded(k).isPresent())
277 .distinct()
278 .collect(Collectors.toMap(
279 k -> k,
280 k -> cacheContext.getValueRecorded(k).get())
281 );
282 }
283 }