View Javadoc

1   package com.atlassian.vcache.internal.memcached;
2   
3   import com.atlassian.marshalling.api.MarshallingPair;
4   import com.atlassian.vcache.CasIdentifier;
5   import com.atlassian.vcache.DirectExternalCache;
6   import com.atlassian.vcache.ExternalCacheException;
7   import com.atlassian.vcache.ExternalCacheSettings;
8   import com.atlassian.vcache.IdentifiedValue;
9   import com.atlassian.vcache.PutPolicy;
10  import com.atlassian.vcache.internal.RequestContext;
11  import com.atlassian.vcache.internal.core.DefaultIdentifiedValue;
12  import com.atlassian.vcache.internal.core.ExternalCacheKeyGenerator;
13  import com.atlassian.vcache.internal.core.VCacheCoreUtils;
14  import com.atlassian.vcache.internal.core.service.AbstractExternalCache;
15  import com.atlassian.vcache.internal.core.service.FactoryUtils;
16  import com.atlassian.vcache.internal.core.service.VersionedExternalCacheRequestContext;
17  import com.google.common.annotations.VisibleForTesting;
18  import net.spy.memcached.CASResponse;
19  import net.spy.memcached.CASValue;
20  import net.spy.memcached.MemcachedClientIF;
21  import net.spy.memcached.OperationTimeoutException;
22  import org.slf4j.Logger;
23  import org.slf4j.LoggerFactory;
24  
25  import java.time.Duration;
26  import java.util.Collections;
27  import java.util.HashMap;
28  import java.util.HashSet;
29  import java.util.List;
30  import java.util.Map;
31  import java.util.Optional;
32  import java.util.Set;
33  import java.util.concurrent.CompletionStage;
34  import java.util.concurrent.Future;
35  import java.util.function.Function;
36  import java.util.function.Supplier;
37  import java.util.stream.Collectors;
38  import java.util.stream.StreamSupport;
39  
40  import static com.atlassian.vcache.internal.core.VCacheCoreUtils.isEmpty;
41  import static com.atlassian.vcache.internal.core.VCacheCoreUtils.marshall;
42  import static com.atlassian.vcache.internal.core.VCacheCoreUtils.unmarshall;
43  import static com.atlassian.vcache.internal.memcached.MemcachedUtils.expiryTime;
44  import static com.atlassian.vcache.internal.memcached.MemcachedUtils.identifiedValueFrom;
45  import static com.atlassian.vcache.internal.memcached.MemcachedUtils.putOperationForPolicy;
46  import static com.atlassian.vcache.internal.memcached.MemcachedUtils.safeExtractId;
47  import static java.util.Objects.requireNonNull;
48  
49  /**
50   * Implementation of the {@link DirectExternalCache} that uses Memcached.
51   *
52   * @param <V> the value type
53   * @since 1.0
54   */
55  class MemcachedDirectExternalCache<V>
56          extends AbstractExternalCache<V>
57          implements DirectExternalCache<V> {
58      private static final Logger log = LoggerFactory.getLogger(MemcachedDirectExternalCache.class);
59  
60      private final Supplier<MemcachedClientIF> clientSupplier;
61      private final Supplier<RequestContext> contextSupplier;
62      private final ExternalCacheKeyGenerator keyGenerator;
63      private final MarshallingPair<V> valueMarshalling;
64      private final int ttlSeconds;
65  
66      MemcachedDirectExternalCache(
67              Supplier<MemcachedClientIF> clientSupplier,
68              Supplier<RequestContext> contextSupplier,
69              ExternalCacheKeyGenerator keyGenerator,
70              String name,
71              MarshallingPair<V> valueMarshalling,
72              ExternalCacheSettings settings,
73              Duration lockTimeout) {
74          super(name, lockTimeout);
75          this.clientSupplier = requireNonNull(clientSupplier);
76          this.contextSupplier = requireNonNull(contextSupplier);
77          this.keyGenerator = requireNonNull(keyGenerator);
78          this.valueMarshalling = requireNonNull(valueMarshalling);
79          this.ttlSeconds = VCacheCoreUtils.roundUpToSeconds(settings.getDefaultTtl().get());
80      }
81  
82      @Override
83      public CompletionStage<Optional<V>> get(String internalKey) {
84          return perform(() -> {
85              final String externalKey = buildExternalKey(internalKey);
86              return unmarshall((byte[]) clientSupplier.get().get(externalKey), valueMarshalling);
87          });
88      }
89  
90      @Override
91      public CompletionStage<V> get(String internalKey, Supplier<V> supplier) {
92          return perform(() -> {
93              final String externalKey = buildExternalKey(internalKey);
94              final Optional<V> existingValue = unmarshall((byte[]) clientSupplier.get().get(externalKey), valueMarshalling);
95              if (existingValue.isPresent()) {
96                  return existingValue.get();
97              }
98  
99              log.trace("Cache {}, creating candidate for key {}", name, internalKey);
100             final V candidateValue = requireNonNull(supplier.get());
101             final byte[] candidateValueBytes = valueMarshalling.getMarshaller().marshallToBytes(candidateValue);
102 
103             // Loop until either able to add the candidate value, or retrieve one that has been added by another thread
104             for (; ; ) {
105                 final Future<Boolean> addOp = clientSupplier.get().add(externalKey, expiryTime(ttlSeconds), candidateValueBytes);
106                 if (addOp.get()) {
107                     // I break here, rather than just return, due to battling with the compiler. Unless written
108                     // this way, the compiler will not allow the lambda structure.
109                     break;
110                 }
111 
112                 log.info("Cache {}, unable to add candidate for key {}, retrieve what was added", name, internalKey);
113                 final Optional<V> otherAddedValue = unmarshall((byte[]) clientSupplier.get().get(externalKey), valueMarshalling);
114                 if (otherAddedValue.isPresent()) {
115                     return otherAddedValue.get();
116                 }
117 
118                 log.info("Cache {}, unable to retrieve recently added candidate for key {}, looping", name, internalKey);
119             }
120             return candidateValue;
121         });
122     }
123 
124     @Override
125     public CompletionStage<Optional<IdentifiedValue<V>>> getIdentified(String internalKey) {
126         return perform(() -> {
127             final String externalKey = buildExternalKey(internalKey);
128             final CASValue<Object> casValue = clientSupplier.get().gets(externalKey);
129             if (casValue == null) {
130                 return Optional.empty();
131             }
132 
133             final CasIdentifier identifier = new MemcachedCasIdentifier(casValue.getCas());
134             final IdentifiedValue<V> iv = new DefaultIdentifiedValue<>(
135                     identifier, valueMarshalling.getUnmarshaller().unmarshallFrom((byte[]) casValue.getValue()));
136             return Optional.of(iv);
137         });
138     }
139 
140     @Override
141     public CompletionStage<IdentifiedValue<V>> getIdentified(String internalKey, Supplier<V> supplier) {
142         return perform(() -> {
143             final String externalKey = buildExternalKey(internalKey);
144             final CASValue<Object> existingCasValue = clientSupplier.get().gets(externalKey);
145 
146             if (existingCasValue != null) {
147                 final CasIdentifier identifier = new MemcachedCasIdentifier(existingCasValue.getCas());
148                 final IdentifiedValue<V> iv = new DefaultIdentifiedValue<>(
149                         identifier, valueMarshalling.getUnmarshaller().unmarshallFrom((byte[]) existingCasValue.getValue()));
150                 return iv;
151             }
152 
153             log.trace("Cache {}, creating candidate for key {}", name, internalKey);
154             final V candidateValue = requireNonNull(supplier.get());
155             final byte[] candidateValueBytes = valueMarshalling.getMarshaller().marshallToBytes(candidateValue);
156 
157             // Loop until either able to add the candidate value, or retrieve one that has been added by another thread.
158             for (; ; ) {
159                 final Future<Boolean> addOp = clientSupplier.get().add(externalKey, expiryTime(ttlSeconds), candidateValueBytes);
160                 if (!addOp.get()) {
161                     log.trace("Cache {}, unable to add candidate for key {}", name, internalKey);
162                 }
163 
164                 // Regardless of whether able to add an entry or not, need to retrieve to get the CAS value.
165                 log.trace("Cache {}, retrieving the candidate for key {}", name, internalKey);
166                 final CASValue<Object> otherAddedCasValue = clientSupplier.get().gets(externalKey);
167                 if (otherAddedCasValue != null) {
168                     final CasIdentifier identifier = new MemcachedCasIdentifier(otherAddedCasValue.getCas());
169                     final IdentifiedValue<V> iv = new DefaultIdentifiedValue<>(
170                             identifier, valueMarshalling.getUnmarshaller().unmarshallFrom((byte[]) otherAddedCasValue.getValue()));
171                     return iv;
172                 }
173 
174                 log.info("Cache {}, unable to retrieve recently added candidate for key {}, looping", name, internalKey);
175             }
176         });
177     }
178 
179     @Override
180     public CompletionStage<Map<String, Optional<V>>> getBulk(Iterable<String> internalKeys) {
181         return perform(() -> {
182             if (isEmpty(internalKeys)) {
183                 return new HashMap<>();
184             }
185 
186             // De-duplicate the keys and calculate the externalKeys
187             final VersionedExternalCacheRequestContext cacheContext = ensureCacheContext();
188 
189             final Set<String> externalKeys = StreamSupport.stream(internalKeys.spliterator(), false)
190                     .map(cacheContext::externalEntryKeyFor)
191                     .collect(Collectors.toSet());
192 
193             // Returns map of keys that contain values, so need to handle the missing ones
194             final Map<String, Object> haveValues = clientSupplier.get().getBulk(externalKeys);
195 
196             return externalKeys.stream().collect(Collectors.toMap(
197                     cacheContext::internalEntryKeyFor,
198                     k -> unmarshall((byte[]) haveValues.get(k), valueMarshalling)));
199         });
200     }
201 
202     @Override
203     public CompletionStage<Map<String, V>> getBulk(Function<Set<String>, Map<String, V>> factory, Iterable<String> internalKeys) {
204         return perform(() -> {
205             if (isEmpty(internalKeys)) {
206                 return new HashMap<>();
207             }
208 
209             // De-duplicate the keys and calculate the externalKeys
210             final VersionedExternalCacheRequestContext cacheContext = ensureCacheContext();
211 
212             final Set<String> externalKeys = Collections.unmodifiableSet(
213                     StreamSupport.stream(internalKeys.spliterator(), false)
214                             .map(cacheContext::externalEntryKeyFor)
215                             .collect(Collectors.toSet()));
216 
217             // Returns map of keys that contain values, so need to calculate the
218             // missing ones
219             final Map<String, Object> haveValues = clientSupplier.get().getBulk(externalKeys);
220             log.trace("{} of {} entries have values", haveValues.size(), externalKeys.size());
221             final Set<String> missingExternalKeys = new HashSet<>(externalKeys);
222             missingExternalKeys.removeAll(haveValues.keySet());
223 
224             // Add the existing values to the grand result
225             final Map<String, V> grandResult = haveValues.entrySet().stream().collect(Collectors.toMap(
226                     e -> cacheContext.internalEntryKeyFor(e.getKey()),
227                     e -> unmarshall((byte[]) e.getValue(), valueMarshalling).get()
228             ));
229 
230             if (!missingExternalKeys.isEmpty()) {
231                 // Okay, need to get the missing values and mapping from externalKeys to internalKeys
232                 final Set<String> missingInternalKeys = Collections.unmodifiableSet(
233                         missingExternalKeys.stream().map(cacheContext::internalEntryKeyFor).collect(Collectors.toSet()));
234                 final Map<String, V> missingValues = factory.apply(missingInternalKeys);
235                 FactoryUtils.verifyFactoryResult(missingValues, missingInternalKeys);
236 
237                 // Okay, got the missing values, now need to add them to Memcached
238                 final Map<String, Future<Boolean>> internalKeyToFutureMap = missingValues.entrySet().stream().collect(Collectors.toMap(
239                         Map.Entry::getKey,
240                         e -> clientSupplier.get().set(
241                                 cacheContext.externalEntryKeyFor(e.getKey()), expiryTime(ttlSeconds), marshall(e.getValue(), valueMarshalling))
242                 ));
243 
244                 // Now wait for the outcomes and then add to the grand result
245                 for (Map.Entry<String, Future<Boolean>> e : internalKeyToFutureMap.entrySet()) {
246                     e.getValue().get(); // Don't care about the result as it will always be true
247                 }
248 
249                 grandResult.putAll(missingValues);
250             }
251 
252             return grandResult;
253         });
254     }
255 
256     @Override
257     public CompletionStage<Map<String, Optional<IdentifiedValue<V>>>> getBulkIdentified(Iterable<String> internalKeys) {
258         return perform(() -> {
259             if (isEmpty(internalKeys)) {
260                 return new HashMap<>();
261             }
262 
263             // There is not equivalent call in Spy Memcached client. So need to do the calls async.
264             final VersionedExternalCacheRequestContext cacheContext = ensureCacheContext();
265 
266             // De-duplicate the keys, create map on internalKey to the future
267             final Map<String, Future<CASValue<Object>>> internalKeyToFuture =
268                     StreamSupport.stream(internalKeys.spliterator(), false)
269                             .distinct()
270                             .collect(Collectors.toMap(
271                                     k -> k,
272                                     k -> clientSupplier.get().asyncGets(cacheContext.externalEntryKeyFor(k))
273                             ));
274 
275             return internalKeyToFuture.entrySet().stream().collect(Collectors.toMap(
276                     Map.Entry::getKey,
277                     e -> identifiedValueFrom(e.getValue(), valueMarshalling)
278             ));
279         });
280     }
281 
282     @Override
283     public CompletionStage<Boolean> put(String internalKey, V value, PutPolicy policy) {
284         return perform(() -> {
285             final String externalKey = buildExternalKey(internalKey);
286             final byte[] valueBytes = valueMarshalling.getMarshaller().marshallToBytes(requireNonNull(value));
287 
288             final Future<Boolean> putOp =
289                     putOperationForPolicy(policy, clientSupplier.get(), externalKey, expiryTime(ttlSeconds), valueBytes);
290 
291             return putOp.get();
292         });
293     }
294 
295     @Override
296     public CompletionStage<Boolean> removeIf(String internalKey, CasIdentifier casId) {
297         return perform(() -> {
298             final String externalKey = buildExternalKey(internalKey);
299             final Future<Boolean> delOp = clientSupplier.get().delete(externalKey, safeExtractId(casId));
300             return delOp.get();
301         });
302     }
303 
304     @Override
305     public CompletionStage<Boolean> replaceIf(String internalKey, CasIdentifier casId, V newValue) {
306         return perform(() -> {
307             final String externalKey = buildExternalKey(internalKey);
308             final CASResponse casOp = clientSupplier.get().cas(
309                     externalKey,
310                     safeExtractId(casId),
311                     expiryTime(ttlSeconds),
312                     valueMarshalling.getMarshaller().marshallToBytes(requireNonNull(newValue)));
313             return casOp == CASResponse.OK;
314         });
315     }
316 
317     @Override
318     public CompletionStage<Void> remove(Iterable<String> internalKeys) {
319         // There is no bulk delete in the api, so need to remove each one async
320         return perform(() -> {
321             if (isEmpty(internalKeys)) {
322                 return null;
323             }
324 
325             // Lodge all the requests for delete
326             final List<Future<Boolean>> deleteOps =
327                     StreamSupport.stream(internalKeys.spliterator(), false)
328                             .map(this::buildExternalKey)
329                             .map(k -> clientSupplier.get().delete(k))
330                             .collect(Collectors.toList());
331 
332             // Now wait for the outcome
333             for (Future<Boolean> delOp : deleteOps) {
334                 delOp.get(); // don't care if succeeded or not
335             }
336 
337             return null;
338         });
339     }
340 
341     @Override
342     public CompletionStage<Void> removeAll() {
343         return perform(() -> {
344             final VersionedExternalCacheRequestContext cacheContext = ensureCacheContext();
345             cacheContext.updateCacheVersion(
346                     MemcachedUtils.incrementCacheVersion(clientSupplier, cacheContext.externalCacheVersionKey()));
347             return null;
348         });
349     }
350 
351     @VisibleForTesting
352     void refreshCacheVersion() {
353         // Refresh the cacheVersion. Useful if want to get the current state of the external cache in testing.
354         final VersionedExternalCacheRequestContext cacheContext = ensureCacheContext();
355         cacheContext.updateCacheVersion(
356                 MemcachedUtils.obtainCacheVersion(clientSupplier, cacheContext.externalCacheVersionKey()));
357     }
358 
359     private String buildExternalKey(String internalKey) throws OperationTimeoutException {
360         final VersionedExternalCacheRequestContext cacheContext = ensureCacheContext();
361         return cacheContext.externalEntryKeyFor(internalKey);
362     }
363 
364     protected VersionedExternalCacheRequestContext<V> ensureCacheContext() throws OperationTimeoutException {
365         final RequestContext requestContext = contextSupplier.get();
366 
367         return requestContext.computeIfAbsent(this, () -> {
368             // Need to build a new context, which involves getting the current cache version, or setting it if it does
369             // not exist.
370             log.trace("Cache {}: Setting up a new context", name);
371             final VersionedExternalCacheRequestContext<V> newCacheContext = new VersionedExternalCacheRequestContext<>(
372                     keyGenerator, name, requestContext::partitionIdentifier, lockTimeout);
373             newCacheContext.updateCacheVersion(
374                     MemcachedUtils.obtainCacheVersion(clientSupplier, newCacheContext.externalCacheVersionKey()));
375             return newCacheContext;
376         });
377     }
378 
379     @Override
380     protected Logger getLogger() {
381         return log;
382     }
383 
384     @Override
385     protected ExternalCacheException mapException(Exception ex) {
386         return MemcachedUtils.mapException(ex);
387     }
388 }