1 package com.atlassian.vcache.internal.memcached;
2
3 import com.atlassian.marshalling.api.MarshallingPair;
4 import com.atlassian.vcache.CasIdentifier;
5 import com.atlassian.vcache.DirectExternalCache;
6 import com.atlassian.vcache.ExternalCacheException;
7 import com.atlassian.vcache.ExternalCacheSettings;
8 import com.atlassian.vcache.IdentifiedValue;
9 import com.atlassian.vcache.PutPolicy;
10 import com.atlassian.vcache.internal.RequestContext;
11 import com.atlassian.vcache.internal.core.DefaultIdentifiedValue;
12 import com.atlassian.vcache.internal.core.ExternalCacheKeyGenerator;
13 import com.atlassian.vcache.internal.core.VCacheCoreUtils;
14 import com.atlassian.vcache.internal.core.service.AbstractExternalCache;
15 import com.atlassian.vcache.internal.core.service.FactoryUtils;
16 import com.atlassian.vcache.internal.core.service.VersionedExternalCacheRequestContext;
17 import com.google.common.annotations.VisibleForTesting;
18 import net.spy.memcached.CASResponse;
19 import net.spy.memcached.CASValue;
20 import net.spy.memcached.MemcachedClientIF;
21 import net.spy.memcached.OperationTimeoutException;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
24
25 import java.time.Duration;
26 import java.util.Collections;
27 import java.util.HashMap;
28 import java.util.HashSet;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.Optional;
32 import java.util.Set;
33 import java.util.concurrent.CompletionStage;
34 import java.util.concurrent.Future;
35 import java.util.function.Function;
36 import java.util.function.Supplier;
37 import java.util.stream.Collectors;
38 import java.util.stream.StreamSupport;
39
40 import static com.atlassian.vcache.internal.core.VCacheCoreUtils.isEmpty;
41 import static com.atlassian.vcache.internal.core.VCacheCoreUtils.marshall;
42 import static com.atlassian.vcache.internal.core.VCacheCoreUtils.unmarshall;
43 import static com.atlassian.vcache.internal.memcached.MemcachedUtils.expiryTime;
44 import static com.atlassian.vcache.internal.memcached.MemcachedUtils.identifiedValueFrom;
45 import static com.atlassian.vcache.internal.memcached.MemcachedUtils.putOperationForPolicy;
46 import static com.atlassian.vcache.internal.memcached.MemcachedUtils.safeExtractId;
47 import static java.util.Objects.requireNonNull;
48
49
50
51
52
53
54
55 class MemcachedDirectExternalCache<V>
56 extends AbstractExternalCache<V>
57 implements DirectExternalCache<V> {
58 private static final Logger log = LoggerFactory.getLogger(MemcachedDirectExternalCache.class);
59
60 private final Supplier<MemcachedClientIF> clientSupplier;
61 private final Supplier<RequestContext> contextSupplier;
62 private final ExternalCacheKeyGenerator keyGenerator;
63 private final MarshallingPair<V> valueMarshalling;
64 private final int ttlSeconds;
65
66 MemcachedDirectExternalCache(
67 Supplier<MemcachedClientIF> clientSupplier,
68 Supplier<RequestContext> contextSupplier,
69 ExternalCacheKeyGenerator keyGenerator,
70 String name,
71 MarshallingPair<V> valueMarshalling,
72 ExternalCacheSettings settings,
73 Duration lockTimeout) {
74 super(name, lockTimeout);
75 this.clientSupplier = requireNonNull(clientSupplier);
76 this.contextSupplier = requireNonNull(contextSupplier);
77 this.keyGenerator = requireNonNull(keyGenerator);
78 this.valueMarshalling = requireNonNull(valueMarshalling);
79 this.ttlSeconds = VCacheCoreUtils.roundUpToSeconds(settings.getDefaultTtl().get());
80 }
81
82 @Override
83 public CompletionStage<Optional<V>> get(String internalKey) {
84 return perform(() -> {
85 final String externalKey = buildExternalKey(internalKey);
86 return unmarshall((byte[]) clientSupplier.get().get(externalKey), valueMarshalling);
87 });
88 }
89
90 @Override
91 public CompletionStage<V> get(String internalKey, Supplier<V> supplier) {
92 return perform(() -> {
93 final String externalKey = buildExternalKey(internalKey);
94 final Optional<V> existingValue = unmarshall((byte[]) clientSupplier.get().get(externalKey), valueMarshalling);
95 if (existingValue.isPresent()) {
96 return existingValue.get();
97 }
98
99 log.trace("Cache {}, creating candidate for key {}", name, internalKey);
100 final V candidateValue = requireNonNull(supplier.get());
101 final byte[] candidateValueBytes = valueMarshalling.getMarshaller().marshallToBytes(candidateValue);
102
103
104 for (; ; ) {
105 final Future<Boolean> addOp = clientSupplier.get().add(externalKey, expiryTime(ttlSeconds), candidateValueBytes);
106 if (addOp.get()) {
107
108
109 break;
110 }
111
112 log.info("Cache {}, unable to add candidate for key {}, retrieve what was added", name, internalKey);
113 final Optional<V> otherAddedValue = unmarshall((byte[]) clientSupplier.get().get(externalKey), valueMarshalling);
114 if (otherAddedValue.isPresent()) {
115 return otherAddedValue.get();
116 }
117
118 log.info("Cache {}, unable to retrieve recently added candidate for key {}, looping", name, internalKey);
119 }
120 return candidateValue;
121 });
122 }
123
124 @Override
125 public CompletionStage<Optional<IdentifiedValue<V>>> getIdentified(String internalKey) {
126 return perform(() -> {
127 final String externalKey = buildExternalKey(internalKey);
128 final CASValue<Object> casValue = clientSupplier.get().gets(externalKey);
129 if (casValue == null) {
130 return Optional.empty();
131 }
132
133 final CasIdentifier identifier = new MemcachedCasIdentifier(casValue.getCas());
134 final IdentifiedValue<V> iv = new DefaultIdentifiedValue<>(
135 identifier, valueMarshalling.getUnmarshaller().unmarshallFrom((byte[]) casValue.getValue()));
136 return Optional.of(iv);
137 });
138 }
139
140 @Override
141 public CompletionStage<IdentifiedValue<V>> getIdentified(String internalKey, Supplier<V> supplier) {
142 return perform(() -> {
143 final String externalKey = buildExternalKey(internalKey);
144 final CASValue<Object> existingCasValue = clientSupplier.get().gets(externalKey);
145
146 if (existingCasValue != null) {
147 final CasIdentifier identifier = new MemcachedCasIdentifier(existingCasValue.getCas());
148 final IdentifiedValue<V> iv = new DefaultIdentifiedValue<>(
149 identifier, valueMarshalling.getUnmarshaller().unmarshallFrom((byte[]) existingCasValue.getValue()));
150 return iv;
151 }
152
153 log.trace("Cache {}, creating candidate for key {}", name, internalKey);
154 final V candidateValue = requireNonNull(supplier.get());
155 final byte[] candidateValueBytes = valueMarshalling.getMarshaller().marshallToBytes(candidateValue);
156
157
158 for (; ; ) {
159 final Future<Boolean> addOp = clientSupplier.get().add(externalKey, expiryTime(ttlSeconds), candidateValueBytes);
160 if (!addOp.get()) {
161 log.trace("Cache {}, unable to add candidate for key {}", name, internalKey);
162 }
163
164
165 log.trace("Cache {}, retrieving the candidate for key {}", name, internalKey);
166 final CASValue<Object> otherAddedCasValue = clientSupplier.get().gets(externalKey);
167 if (otherAddedCasValue != null) {
168 final CasIdentifier identifier = new MemcachedCasIdentifier(otherAddedCasValue.getCas());
169 final IdentifiedValue<V> iv = new DefaultIdentifiedValue<>(
170 identifier, valueMarshalling.getUnmarshaller().unmarshallFrom((byte[]) otherAddedCasValue.getValue()));
171 return iv;
172 }
173
174 log.info("Cache {}, unable to retrieve recently added candidate for key {}, looping", name, internalKey);
175 }
176 });
177 }
178
179 @Override
180 public CompletionStage<Map<String, Optional<V>>> getBulk(Iterable<String> internalKeys) {
181 return perform(() -> {
182 if (isEmpty(internalKeys)) {
183 return new HashMap<>();
184 }
185
186
187 final VersionedExternalCacheRequestContext cacheContext = ensureCacheContext();
188
189 final Set<String> externalKeys = StreamSupport.stream(internalKeys.spliterator(), false)
190 .map(cacheContext::externalEntryKeyFor)
191 .collect(Collectors.toSet());
192
193
194 final Map<String, Object> haveValues = clientSupplier.get().getBulk(externalKeys);
195
196 return externalKeys.stream().collect(Collectors.toMap(
197 cacheContext::internalEntryKeyFor,
198 k -> unmarshall((byte[]) haveValues.get(k), valueMarshalling)));
199 });
200 }
201
202 @Override
203 public CompletionStage<Map<String, V>> getBulk(Function<Set<String>, Map<String, V>> factory, Iterable<String> internalKeys) {
204 return perform(() -> {
205 if (isEmpty(internalKeys)) {
206 return new HashMap<>();
207 }
208
209
210 final VersionedExternalCacheRequestContext cacheContext = ensureCacheContext();
211
212 final Set<String> externalKeys = Collections.unmodifiableSet(
213 StreamSupport.stream(internalKeys.spliterator(), false)
214 .map(cacheContext::externalEntryKeyFor)
215 .collect(Collectors.toSet()));
216
217
218
219 final Map<String, Object> haveValues = clientSupplier.get().getBulk(externalKeys);
220 log.trace("{} of {} entries have values", haveValues.size(), externalKeys.size());
221 final Set<String> missingExternalKeys = new HashSet<>(externalKeys);
222 missingExternalKeys.removeAll(haveValues.keySet());
223
224
225 final Map<String, V> grandResult = haveValues.entrySet().stream().collect(Collectors.toMap(
226 e -> cacheContext.internalEntryKeyFor(e.getKey()),
227 e -> unmarshall((byte[]) e.getValue(), valueMarshalling).get()
228 ));
229
230 if (!missingExternalKeys.isEmpty()) {
231
232 final Set<String> missingInternalKeys = Collections.unmodifiableSet(
233 missingExternalKeys.stream().map(cacheContext::internalEntryKeyFor).collect(Collectors.toSet()));
234 final Map<String, V> missingValues = factory.apply(missingInternalKeys);
235 FactoryUtils.verifyFactoryResult(missingValues, missingInternalKeys);
236
237
238 final Map<String, Future<Boolean>> internalKeyToFutureMap = missingValues.entrySet().stream().collect(Collectors.toMap(
239 Map.Entry::getKey,
240 e -> clientSupplier.get().set(
241 cacheContext.externalEntryKeyFor(e.getKey()), expiryTime(ttlSeconds), marshall(e.getValue(), valueMarshalling))
242 ));
243
244
245 for (Map.Entry<String, Future<Boolean>> e : internalKeyToFutureMap.entrySet()) {
246 e.getValue().get();
247 }
248
249 grandResult.putAll(missingValues);
250 }
251
252 return grandResult;
253 });
254 }
255
256 @Override
257 public CompletionStage<Map<String, Optional<IdentifiedValue<V>>>> getBulkIdentified(Iterable<String> internalKeys) {
258 return perform(() -> {
259 if (isEmpty(internalKeys)) {
260 return new HashMap<>();
261 }
262
263
264 final VersionedExternalCacheRequestContext cacheContext = ensureCacheContext();
265
266
267 final Map<String, Future<CASValue<Object>>> internalKeyToFuture =
268 StreamSupport.stream(internalKeys.spliterator(), false)
269 .distinct()
270 .collect(Collectors.toMap(
271 k -> k,
272 k -> clientSupplier.get().asyncGets(cacheContext.externalEntryKeyFor(k))
273 ));
274
275 return internalKeyToFuture.entrySet().stream().collect(Collectors.toMap(
276 Map.Entry::getKey,
277 e -> identifiedValueFrom(e.getValue(), valueMarshalling)
278 ));
279 });
280 }
281
282 @Override
283 public CompletionStage<Boolean> put(String internalKey, V value, PutPolicy policy) {
284 return perform(() -> {
285 final String externalKey = buildExternalKey(internalKey);
286 final byte[] valueBytes = valueMarshalling.getMarshaller().marshallToBytes(requireNonNull(value));
287
288 final Future<Boolean> putOp =
289 putOperationForPolicy(policy, clientSupplier.get(), externalKey, expiryTime(ttlSeconds), valueBytes);
290
291 return putOp.get();
292 });
293 }
294
295 @Override
296 public CompletionStage<Boolean> removeIf(String internalKey, CasIdentifier casId) {
297 return perform(() -> {
298 final String externalKey = buildExternalKey(internalKey);
299 final Future<Boolean> delOp = clientSupplier.get().delete(externalKey, safeExtractId(casId));
300 return delOp.get();
301 });
302 }
303
304 @Override
305 public CompletionStage<Boolean> replaceIf(String internalKey, CasIdentifier casId, V newValue) {
306 return perform(() -> {
307 final String externalKey = buildExternalKey(internalKey);
308 final CASResponse casOp = clientSupplier.get().cas(
309 externalKey,
310 safeExtractId(casId),
311 expiryTime(ttlSeconds),
312 valueMarshalling.getMarshaller().marshallToBytes(requireNonNull(newValue)));
313 return casOp == CASResponse.OK;
314 });
315 }
316
317 @Override
318 public CompletionStage<Void> remove(Iterable<String> internalKeys) {
319
320 return perform(() -> {
321 if (isEmpty(internalKeys)) {
322 return null;
323 }
324
325
326 final List<Future<Boolean>> deleteOps =
327 StreamSupport.stream(internalKeys.spliterator(), false)
328 .map(this::buildExternalKey)
329 .map(k -> clientSupplier.get().delete(k))
330 .collect(Collectors.toList());
331
332
333 for (Future<Boolean> delOp : deleteOps) {
334 delOp.get();
335 }
336
337 return null;
338 });
339 }
340
341 @Override
342 public CompletionStage<Void> removeAll() {
343 return perform(() -> {
344 final VersionedExternalCacheRequestContext cacheContext = ensureCacheContext();
345 cacheContext.updateCacheVersion(
346 MemcachedUtils.incrementCacheVersion(clientSupplier, cacheContext.externalCacheVersionKey()));
347 return null;
348 });
349 }
350
351 @VisibleForTesting
352 void refreshCacheVersion() {
353
354 final VersionedExternalCacheRequestContext cacheContext = ensureCacheContext();
355 cacheContext.updateCacheVersion(
356 MemcachedUtils.obtainCacheVersion(clientSupplier, cacheContext.externalCacheVersionKey()));
357 }
358
359 private String buildExternalKey(String internalKey) throws OperationTimeoutException {
360 final VersionedExternalCacheRequestContext cacheContext = ensureCacheContext();
361 return cacheContext.externalEntryKeyFor(internalKey);
362 }
363
364 protected VersionedExternalCacheRequestContext<V> ensureCacheContext() throws OperationTimeoutException {
365 final RequestContext requestContext = contextSupplier.get();
366
367 return requestContext.computeIfAbsent(this, () -> {
368
369
370 log.trace("Cache {}: Setting up a new context", name);
371 final VersionedExternalCacheRequestContext<V> newCacheContext = new VersionedExternalCacheRequestContext<>(
372 keyGenerator, name, requestContext::partitionIdentifier, lockTimeout);
373 newCacheContext.updateCacheVersion(
374 MemcachedUtils.obtainCacheVersion(clientSupplier, newCacheContext.externalCacheVersionKey()));
375 return newCacheContext;
376 });
377 }
378
379 @Override
380 protected Logger getLogger() {
381 return log;
382 }
383
384 @Override
385 protected ExternalCacheException mapException(Exception ex) {
386 return MemcachedUtils.mapException(ex);
387 }
388 }