1 package com.atlassian.vcache.internal.memcached;
2
3 import com.atlassian.marshalling.api.MarshallingPair;
4 import com.atlassian.vcache.CasIdentifier;
5 import com.atlassian.vcache.DirectExternalCache;
6 import com.atlassian.vcache.ExternalCacheException;
7 import com.atlassian.vcache.ExternalCacheSettings;
8 import com.atlassian.vcache.IdentifiedValue;
9 import com.atlassian.vcache.PutPolicy;
10 import com.atlassian.vcache.internal.RequestContext;
11 import com.atlassian.vcache.internal.core.DefaultIdentifiedValue;
12 import com.atlassian.vcache.internal.core.ExternalCacheKeyGenerator;
13 import com.atlassian.vcache.internal.core.VCacheCoreUtils;
14 import com.atlassian.vcache.internal.core.service.AbstractExternalCache;
15 import com.atlassian.vcache.internal.core.service.VersionedExternalCacheRequestContext;
16 import com.google.common.annotations.VisibleForTesting;
17 import net.spy.memcached.CASResponse;
18 import net.spy.memcached.CASValue;
19 import net.spy.memcached.MemcachedClientIF;
20 import net.spy.memcached.OperationTimeoutException;
21 import org.slf4j.Logger;
22 import org.slf4j.LoggerFactory;
23
24 import java.util.Collections;
25 import java.util.HashMap;
26 import java.util.HashSet;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.Optional;
30 import java.util.Set;
31 import java.util.concurrent.CompletionStage;
32 import java.util.concurrent.Future;
33 import java.util.function.Function;
34 import java.util.function.Supplier;
35 import java.util.stream.Collectors;
36 import java.util.stream.StreamSupport;
37
38 import static com.atlassian.vcache.internal.core.VCacheCoreUtils.isEmpty;
39 import static com.atlassian.vcache.internal.core.VCacheCoreUtils.marshall;
40 import static com.atlassian.vcache.internal.core.VCacheCoreUtils.unmarshall;
41 import static com.atlassian.vcache.internal.memcached.MemcachedUtils.expiryTime;
42 import static com.atlassian.vcache.internal.memcached.MemcachedUtils.identifiedValueFrom;
43 import static com.atlassian.vcache.internal.memcached.MemcachedUtils.putOperationForPolicy;
44 import static com.atlassian.vcache.internal.memcached.MemcachedUtils.safeExtractId;
45 import static java.util.Objects.requireNonNull;
46
47
48
49
50
51
52
53 class MemcachedDirectExternalCache<V>
54 extends AbstractExternalCache<V>
55 implements DirectExternalCache<V> {
56 private static final Logger log = LoggerFactory.getLogger(MemcachedDirectExternalCache.class);
57
58 private final Supplier<MemcachedClientIF> clientSupplier;
59 private final Supplier<RequestContext> contextSupplier;
60 private final ExternalCacheKeyGenerator keyGenerator;
61 private final MarshallingPair<V> valueMarshalling;
62 private final int ttlSeconds;
63
64 MemcachedDirectExternalCache(
65 Supplier<MemcachedClientIF> clientSupplier,
66 Supplier<RequestContext> contextSupplier,
67 ExternalCacheKeyGenerator keyGenerator,
68 String name,
69 MarshallingPair<V> valueMarshalling,
70 ExternalCacheSettings settings) {
71 super(name);
72 this.clientSupplier = requireNonNull(clientSupplier);
73 this.contextSupplier = requireNonNull(contextSupplier);
74 this.keyGenerator = requireNonNull(keyGenerator);
75 this.valueMarshalling = requireNonNull(valueMarshalling);
76 this.ttlSeconds = VCacheCoreUtils.roundUpToSeconds(settings.getDefaultTtl().get());
77 }
78
79 @Override
80 public CompletionStage<Optional<V>> get(String internalKey) {
81 return perform(() -> {
82 final String externalKey = buildExternalKey(internalKey);
83 return unmarshall((byte[]) clientSupplier.get().get(externalKey), valueMarshalling);
84 });
85 }
86
87 @Override
88 public CompletionStage<V> get(String internalKey, Supplier<V> supplier) {
89 return perform(() -> {
90 final String externalKey = buildExternalKey(internalKey);
91 final Optional<V> existingValue = unmarshall((byte[]) clientSupplier.get().get(externalKey), valueMarshalling);
92 if (existingValue.isPresent()) {
93 return existingValue.get();
94 }
95
96 log.trace("Cache {}, creating candidate for key {}", name, internalKey);
97 final V candidateValue = requireNonNull(supplier.get());
98 final byte[] candidateValueBytes = valueMarshalling.getMarshaller().marshallToBytes(candidateValue);
99
100
101 for (; ; ) {
102 final Future<Boolean> addOp = clientSupplier.get().add(externalKey, expiryTime(ttlSeconds), candidateValueBytes);
103 if (addOp.get()) {
104
105
106 break;
107 }
108
109 log.info("Cache {}, unable to add candidate for key {}, retrieve what was added", name, internalKey);
110 final Optional<V> otherAddedValue = unmarshall((byte[]) clientSupplier.get().get(externalKey), valueMarshalling);
111 if (otherAddedValue.isPresent()) {
112 return otherAddedValue.get();
113 }
114
115 log.info("Cache {}, unable to retrieve recently added candidate for key {}, looping", name, internalKey);
116 }
117 return candidateValue;
118 });
119 }
120
121 @Override
122 public CompletionStage<Optional<IdentifiedValue<V>>> getIdentified(String internalKey) {
123 return perform(() -> {
124 final String externalKey = buildExternalKey(internalKey);
125 final CASValue<Object> casValue = clientSupplier.get().gets(externalKey);
126 if (casValue == null) {
127 return Optional.empty();
128 }
129
130 final CasIdentifier identifier = new MemcachedCasIdentifier(casValue.getCas());
131 final IdentifiedValue<V> iv = new DefaultIdentifiedValue<>(
132 identifier, valueMarshalling.getUnmarshaller().unmarshallFrom((byte[]) casValue.getValue()));
133 return Optional.of(iv);
134 });
135 }
136
137 @Override
138 public CompletionStage<IdentifiedValue<V>> getIdentified(String internalKey, Supplier<V> supplier) {
139 return perform(() -> {
140 final String externalKey = buildExternalKey(internalKey);
141 final CASValue<Object> existingCasValue = clientSupplier.get().gets(externalKey);
142
143 if (existingCasValue != null) {
144 final CasIdentifier identifier = new MemcachedCasIdentifier(existingCasValue.getCas());
145 final IdentifiedValue<V> iv = new DefaultIdentifiedValue<>(
146 identifier, valueMarshalling.getUnmarshaller().unmarshallFrom((byte[]) existingCasValue.getValue()));
147 return iv;
148 }
149
150 log.trace("Cache {}, creating candidate for key {}", name, internalKey);
151 final V candidateValue = requireNonNull(supplier.get());
152 final byte[] candidateValueBytes = valueMarshalling.getMarshaller().marshallToBytes(candidateValue);
153
154
155 for (; ; ) {
156 final Future<Boolean> addOp = clientSupplier.get().add(externalKey, expiryTime(ttlSeconds), candidateValueBytes);
157 if (!addOp.get()) {
158 log.trace("Cache {}, unable to add candidate for key {}", name, internalKey);
159 }
160
161
162 log.trace("Cache {}, retrieving the candidate for key {}", name, internalKey);
163 final CASValue<Object> otherAddedCasValue = clientSupplier.get().gets(externalKey);
164 if (otherAddedCasValue != null) {
165 final CasIdentifier identifier = new MemcachedCasIdentifier(otherAddedCasValue.getCas());
166 final IdentifiedValue<V> iv = new DefaultIdentifiedValue<>(
167 identifier, valueMarshalling.getUnmarshaller().unmarshallFrom((byte[]) otherAddedCasValue.getValue()));
168 return iv;
169 }
170
171 log.info("Cache {}, unable to retrieve recently added candidate for key {}, looping", name, internalKey);
172 }
173 });
174 }
175
176 @Override
177 public CompletionStage<Map<String, Optional<V>>> getBulk(Iterable<String> internalKeys) {
178 return perform(() -> {
179 if (isEmpty(internalKeys)) {
180 return new HashMap<>();
181 }
182
183
184 final VersionedExternalCacheRequestContext cacheContext = ensureCacheContext();
185
186 final Set<String> externalKeys = StreamSupport.stream(internalKeys.spliterator(), false)
187 .map(cacheContext::externalEntryKeyFor)
188 .collect(Collectors.toSet());
189
190
191 final Map<String, Object> haveValues = clientSupplier.get().getBulk(externalKeys);
192
193 return externalKeys.stream().collect(Collectors.toMap(
194 cacheContext::internalEntryKeyFor,
195 k -> unmarshall((byte[]) haveValues.get(k), valueMarshalling)));
196 });
197 }
198
199 @Override
200 public CompletionStage<Map<String, V>> getBulk(Function<Set<String>, Map<String, V>> factory, Iterable<String> internalKeys) {
201 return perform(() -> {
202 if (isEmpty(internalKeys)) {
203 return new HashMap<>();
204 }
205
206
207 final VersionedExternalCacheRequestContext cacheContext = ensureCacheContext();
208
209 final Set<String> externalKeys = Collections.unmodifiableSet(
210 StreamSupport.stream(internalKeys.spliterator(), false)
211 .map(cacheContext::externalEntryKeyFor)
212 .collect(Collectors.toSet()));
213
214
215
216 final Map<String, Object> haveValues = clientSupplier.get().getBulk(externalKeys);
217 log.trace("{} of {} entries have values", haveValues.size(), externalKeys.size());
218 final Set<String> missingExternalKeys = new HashSet<>(externalKeys);
219 missingExternalKeys.removeAll(haveValues.keySet());
220
221
222 final Map<String, V> grandResult = haveValues.entrySet().stream().collect(Collectors.toMap(
223 e -> cacheContext.internalEntryKeyFor(e.getKey()),
224 e -> unmarshall((byte[]) e.getValue(), valueMarshalling).get()
225 ));
226
227 if (!missingExternalKeys.isEmpty()) {
228
229 final Set<String> missingInternalKeys = Collections.unmodifiableSet(
230 missingExternalKeys.stream().map(cacheContext::internalEntryKeyFor).collect(Collectors.toSet()));
231 final Map<String, V> missingValues = factory.apply(missingInternalKeys);
232
233
234 final Map<String, Future<Boolean>> internalKeyToFutureMap = missingValues.entrySet().stream().collect(Collectors.toMap(
235 Map.Entry::getKey,
236 e -> clientSupplier.get().set(
237 cacheContext.externalEntryKeyFor(e.getKey()), expiryTime(ttlSeconds), marshall(e.getValue(), valueMarshalling))
238 ));
239
240
241 for (Map.Entry<String, Future<Boolean>> e : internalKeyToFutureMap.entrySet()) {
242 e.getValue().get();
243 }
244
245 grandResult.putAll(missingValues);
246 }
247
248 return grandResult;
249 });
250 }
251
252 @Override
253 public CompletionStage<Map<String, Optional<IdentifiedValue<V>>>> getBulkIdentified(Iterable<String> internalKeys) {
254 return perform(() -> {
255 if (isEmpty(internalKeys)) {
256 return new HashMap<>();
257 }
258
259
260 final VersionedExternalCacheRequestContext cacheContext = ensureCacheContext();
261
262
263 final Map<String, Future<CASValue<Object>>> internalKeyToFuture =
264 StreamSupport.stream(internalKeys.spliterator(), false)
265 .distinct()
266 .collect(Collectors.toMap(
267 k -> k,
268 k -> clientSupplier.get().asyncGets(cacheContext.externalEntryKeyFor(k))
269 ));
270
271 return internalKeyToFuture.entrySet().stream().collect(Collectors.toMap(
272 Map.Entry::getKey,
273 e -> identifiedValueFrom(e.getValue(), valueMarshalling)
274 ));
275 });
276 }
277
278 @Override
279 public CompletionStage<Boolean> put(String internalKey, V value, PutPolicy policy) {
280 return perform(() -> {
281 final String externalKey = buildExternalKey(internalKey);
282 final byte[] valueBytes = valueMarshalling.getMarshaller().marshallToBytes(requireNonNull(value));
283
284 final Future<Boolean> putOp =
285 putOperationForPolicy(policy, clientSupplier.get(), externalKey, expiryTime(ttlSeconds), valueBytes);
286
287 return putOp.get();
288 });
289 }
290
291 @Override
292 public CompletionStage<Boolean> removeIf(String internalKey, CasIdentifier casId) {
293 return perform(() -> {
294 final String externalKey = buildExternalKey(internalKey);
295 final Future<Boolean> delOp = clientSupplier.get().delete(externalKey, safeExtractId(casId));
296 return delOp.get();
297 });
298 }
299
300 @Override
301 public CompletionStage<Boolean> replaceIf(String internalKey, CasIdentifier casId, V newValue) {
302 return perform(() -> {
303 final String externalKey = buildExternalKey(internalKey);
304 final CASResponse casOp = clientSupplier.get().cas(
305 externalKey,
306 safeExtractId(casId),
307 expiryTime(ttlSeconds),
308 valueMarshalling.getMarshaller().marshallToBytes(requireNonNull(newValue)));
309 return casOp == CASResponse.OK;
310 });
311 }
312
313 @Override
314 public CompletionStage<Void> remove(Iterable<String> internalKeys) {
315
316 return perform(() -> {
317 if (isEmpty(internalKeys)) {
318 return null;
319 }
320
321
322 final List<Future<Boolean>> deleteOps =
323 StreamSupport.stream(internalKeys.spliterator(), false)
324 .map(this::buildExternalKey)
325 .map(k -> clientSupplier.get().delete(k))
326 .collect(Collectors.toList());
327
328
329 for (Future<Boolean> delOp : deleteOps) {
330 delOp.get();
331 }
332
333 return null;
334 });
335 }
336
337 @Override
338 public CompletionStage<Void> removeAll() {
339 return perform(() -> {
340 final VersionedExternalCacheRequestContext cacheContext = ensureCacheContext();
341 cacheContext.updateCacheVersion(
342 MemcachedUtils.incrementCacheVersion(clientSupplier, cacheContext.externalCacheVersionKey()));
343 return null;
344 });
345 }
346
347 @VisibleForTesting
348 void refreshCacheVersion() {
349
350 final VersionedExternalCacheRequestContext cacheContext = ensureCacheContext();
351 cacheContext.updateCacheVersion(
352 MemcachedUtils.obtainCacheVersion(clientSupplier, cacheContext.externalCacheVersionKey()));
353 }
354
355 private String buildExternalKey(String internalKey) throws OperationTimeoutException {
356 final VersionedExternalCacheRequestContext cacheContext = ensureCacheContext();
357 return cacheContext.externalEntryKeyFor(internalKey);
358 }
359
360 protected VersionedExternalCacheRequestContext<V> ensureCacheContext() throws OperationTimeoutException {
361 final RequestContext requestContext = contextSupplier.get();
362
363 return requestContext.computeIfAbsent(this, () -> {
364
365
366 log.trace("Cache {}: Setting up a new context", name);
367 final VersionedExternalCacheRequestContext<V> newCacheContext =
368 new VersionedExternalCacheRequestContext<>(keyGenerator, name, requestContext::partitionIdentifier);
369 newCacheContext.updateCacheVersion(
370 MemcachedUtils.obtainCacheVersion(clientSupplier, newCacheContext.externalCacheVersionKey()));
371 return newCacheContext;
372 });
373 }
374
375 @Override
376 protected Logger getLogger() {
377 return log;
378 }
379
380 @Override
381 protected ExternalCacheException mapException(Exception ex) {
382 return MemcachedUtils.mapException(ex);
383 }
384 }