View Javadoc

1   package com.atlassian.vcache.internal.memcached;
2   
3   import com.atlassian.marshalling.api.MarshallingPair;
4   import com.atlassian.vcache.CasIdentifier;
5   import com.atlassian.vcache.DirectExternalCache;
6   import com.atlassian.vcache.ExternalCacheException;
7   import com.atlassian.vcache.ExternalCacheSettings;
8   import com.atlassian.vcache.IdentifiedValue;
9   import com.atlassian.vcache.PutPolicy;
10  import com.atlassian.vcache.internal.RequestContext;
11  import com.atlassian.vcache.internal.core.DefaultIdentifiedValue;
12  import com.atlassian.vcache.internal.core.ExternalCacheKeyGenerator;
13  import com.atlassian.vcache.internal.core.VCacheCoreUtils;
14  import com.atlassian.vcache.internal.core.service.AbstractExternalCache;
15  import com.atlassian.vcache.internal.core.service.VersionedExternalCacheRequestContext;
16  import com.google.common.annotations.VisibleForTesting;
17  import net.spy.memcached.CASResponse;
18  import net.spy.memcached.CASValue;
19  import net.spy.memcached.MemcachedClientIF;
20  import net.spy.memcached.OperationTimeoutException;
21  import org.slf4j.Logger;
22  import org.slf4j.LoggerFactory;
23  
24  import java.util.Collections;
25  import java.util.HashMap;
26  import java.util.HashSet;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.Optional;
30  import java.util.Set;
31  import java.util.concurrent.CompletionStage;
32  import java.util.concurrent.Future;
33  import java.util.function.Function;
34  import java.util.function.Supplier;
35  import java.util.stream.Collectors;
36  import java.util.stream.StreamSupport;
37  
38  import static com.atlassian.vcache.internal.core.VCacheCoreUtils.isEmpty;
39  import static com.atlassian.vcache.internal.core.VCacheCoreUtils.marshall;
40  import static com.atlassian.vcache.internal.core.VCacheCoreUtils.unmarshall;
41  import static com.atlassian.vcache.internal.memcached.MemcachedUtils.expiryTime;
42  import static com.atlassian.vcache.internal.memcached.MemcachedUtils.identifiedValueFrom;
43  import static com.atlassian.vcache.internal.memcached.MemcachedUtils.putOperationForPolicy;
44  import static com.atlassian.vcache.internal.memcached.MemcachedUtils.safeExtractId;
45  import static java.util.Objects.requireNonNull;
46  
47  /**
48   * Implementation of the {@link DirectExternalCache} that uses Memcached.
49   *
50   * @param <V> the value type
51   * @since 1.0
52   */
53  class MemcachedDirectExternalCache<V>
54          extends AbstractExternalCache<V>
55          implements DirectExternalCache<V> {
56      private static final Logger log = LoggerFactory.getLogger(MemcachedDirectExternalCache.class);
57  
58      private final Supplier<MemcachedClientIF> clientSupplier;
59      private final Supplier<RequestContext> contextSupplier;
60      private final ExternalCacheKeyGenerator keyGenerator;
61      private final MarshallingPair<V> valueMarshalling;
62      private final int ttlSeconds;
63  
64      MemcachedDirectExternalCache(
65              Supplier<MemcachedClientIF> clientSupplier,
66              Supplier<RequestContext> contextSupplier,
67              ExternalCacheKeyGenerator keyGenerator,
68              String name,
69              MarshallingPair<V> valueMarshalling,
70              ExternalCacheSettings settings) {
71          super(name);
72          this.clientSupplier = requireNonNull(clientSupplier);
73          this.contextSupplier = requireNonNull(contextSupplier);
74          this.keyGenerator = requireNonNull(keyGenerator);
75          this.valueMarshalling = requireNonNull(valueMarshalling);
76          this.ttlSeconds = VCacheCoreUtils.roundUpToSeconds(settings.getDefaultTtl().get());
77      }
78  
79      @Override
80      public CompletionStage<Optional<V>> get(String internalKey) {
81          return perform(() -> {
82              final String externalKey = buildExternalKey(internalKey);
83              return unmarshall((byte[]) clientSupplier.get().get(externalKey), valueMarshalling);
84          });
85      }
86  
87      @Override
88      public CompletionStage<V> get(String internalKey, Supplier<V> supplier) {
89          return perform(() -> {
90              final String externalKey = buildExternalKey(internalKey);
91              final Optional<V> existingValue = unmarshall((byte[]) clientSupplier.get().get(externalKey), valueMarshalling);
92              if (existingValue.isPresent()) {
93                  return existingValue.get();
94              }
95  
96              log.trace("Cache {}, creating candidate for key {}", name, internalKey);
97              final V candidateValue = requireNonNull(supplier.get());
98              final byte[] candidateValueBytes = valueMarshalling.getMarshaller().marshallToBytes(candidateValue);
99  
100             // Loop until either able to add the candidate value, or retrieve one that has been added by another thread
101             for (; ; ) {
102                 final Future<Boolean> addOp = clientSupplier.get().add(externalKey, expiryTime(ttlSeconds), candidateValueBytes);
103                 if (addOp.get()) {
104                     // I break here, rather than just return, due to battling with the compiler. Unless written
105                     // this way, the compiler will not allow the lambda structure.
106                     break;
107                 }
108 
109                 log.info("Cache {}, unable to add candidate for key {}, retrieve what was added", name, internalKey);
110                 final Optional<V> otherAddedValue = unmarshall((byte[]) clientSupplier.get().get(externalKey), valueMarshalling);
111                 if (otherAddedValue.isPresent()) {
112                     return otherAddedValue.get();
113                 }
114 
115                 log.info("Cache {}, unable to retrieve recently added candidate for key {}, looping", name, internalKey);
116             }
117             return candidateValue;
118         });
119     }
120 
121     @Override
122     public CompletionStage<Optional<IdentifiedValue<V>>> getIdentified(String internalKey) {
123         return perform(() -> {
124             final String externalKey = buildExternalKey(internalKey);
125             final CASValue<Object> casValue = clientSupplier.get().gets(externalKey);
126             if (casValue == null) {
127                 return Optional.empty();
128             }
129 
130             final CasIdentifier identifier = new MemcachedCasIdentifier(casValue.getCas());
131             final IdentifiedValue<V> iv = new DefaultIdentifiedValue<>(
132                     identifier, valueMarshalling.getUnmarshaller().unmarshallFrom((byte[]) casValue.getValue()));
133             return Optional.of(iv);
134         });
135     }
136 
137     @Override
138     public CompletionStage<IdentifiedValue<V>> getIdentified(String internalKey, Supplier<V> supplier) {
139         return perform(() -> {
140             final String externalKey = buildExternalKey(internalKey);
141             final CASValue<Object> existingCasValue = clientSupplier.get().gets(externalKey);
142 
143             if (existingCasValue != null) {
144                 final CasIdentifier identifier = new MemcachedCasIdentifier(existingCasValue.getCas());
145                 final IdentifiedValue<V> iv = new DefaultIdentifiedValue<>(
146                         identifier, valueMarshalling.getUnmarshaller().unmarshallFrom((byte[]) existingCasValue.getValue()));
147                 return iv;
148             }
149 
150             log.trace("Cache {}, creating candidate for key {}", name, internalKey);
151             final V candidateValue = requireNonNull(supplier.get());
152             final byte[] candidateValueBytes = valueMarshalling.getMarshaller().marshallToBytes(candidateValue);
153 
154             // Loop until either able to add the candidate value, or retrieve one that has been added by another thread.
155             for (; ; ) {
156                 final Future<Boolean> addOp = clientSupplier.get().add(externalKey, expiryTime(ttlSeconds), candidateValueBytes);
157                 if (!addOp.get()) {
158                     log.trace("Cache {}, unable to add candidate for key {}", name, internalKey);
159                 }
160 
161                 // Regardless of whether able to add an entry or not, need to retrieve to get the CAS value.
162                 log.trace("Cache {}, retrieving the candidate for key {}", name, internalKey);
163                 final CASValue<Object> otherAddedCasValue = clientSupplier.get().gets(externalKey);
164                 if (otherAddedCasValue != null) {
165                     final CasIdentifier identifier = new MemcachedCasIdentifier(otherAddedCasValue.getCas());
166                     final IdentifiedValue<V> iv = new DefaultIdentifiedValue<>(
167                             identifier, valueMarshalling.getUnmarshaller().unmarshallFrom((byte[]) otherAddedCasValue.getValue()));
168                     return iv;
169                 }
170 
171                 log.info("Cache {}, unable to retrieve recently added candidate for key {}, looping", name, internalKey);
172             }
173         });
174     }
175 
176     @Override
177     public CompletionStage<Map<String, Optional<V>>> getBulk(Iterable<String> internalKeys) {
178         return perform(() -> {
179             if (isEmpty(internalKeys)) {
180                 return new HashMap<>();
181             }
182 
183             // De-duplicate the keys and calculate the externalKeys
184             final VersionedExternalCacheRequestContext cacheContext = ensureCacheContext();
185 
186             final Set<String> externalKeys = StreamSupport.stream(internalKeys.spliterator(), false)
187                     .map(cacheContext::externalEntryKeyFor)
188                     .collect(Collectors.toSet());
189 
190             // Returns map of keys that contain values, so need to handle the missing ones
191             final Map<String, Object> haveValues = clientSupplier.get().getBulk(externalKeys);
192 
193             return externalKeys.stream().collect(Collectors.toMap(
194                     cacheContext::internalEntryKeyFor,
195                     k -> unmarshall((byte[]) haveValues.get(k), valueMarshalling)));
196         });
197     }
198 
199     @Override
200     public CompletionStage<Map<String, V>> getBulk(Function<Set<String>, Map<String, V>> factory, Iterable<String> internalKeys) {
201         return perform(() -> {
202             if (isEmpty(internalKeys)) {
203                 return new HashMap<>();
204             }
205 
206             // De-duplicate the keys and calculate the externalKeys
207             final VersionedExternalCacheRequestContext cacheContext = ensureCacheContext();
208 
209             final Set<String> externalKeys = Collections.unmodifiableSet(
210                     StreamSupport.stream(internalKeys.spliterator(), false)
211                             .map(cacheContext::externalEntryKeyFor)
212                             .collect(Collectors.toSet()));
213 
214             // Returns map of keys that contain values, so need to calculate the
215             // missing ones
216             final Map<String, Object> haveValues = clientSupplier.get().getBulk(externalKeys);
217             log.trace("{} of {} entries have values", haveValues.size(), externalKeys.size());
218             final Set<String> missingExternalKeys = new HashSet<>(externalKeys);
219             missingExternalKeys.removeAll(haveValues.keySet());
220 
221             // Add the existing values to the grand result
222             final Map<String, V> grandResult = haveValues.entrySet().stream().collect(Collectors.toMap(
223                     e -> cacheContext.internalEntryKeyFor(e.getKey()),
224                     e -> unmarshall((byte[]) e.getValue(), valueMarshalling).get()
225             ));
226 
227             if (!missingExternalKeys.isEmpty()) {
228                 // Okay, need to get the missing values and mapping from externalKeys to internalKeys
229                 final Set<String> missingInternalKeys = Collections.unmodifiableSet(
230                         missingExternalKeys.stream().map(cacheContext::internalEntryKeyFor).collect(Collectors.toSet()));
231                 final Map<String, V> missingValues = factory.apply(missingInternalKeys);
232 
233                 // Okay, got the missing values, now need to add them to Memcached
234                 final Map<String, Future<Boolean>> internalKeyToFutureMap = missingValues.entrySet().stream().collect(Collectors.toMap(
235                         Map.Entry::getKey,
236                         e -> clientSupplier.get().set(
237                                 cacheContext.externalEntryKeyFor(e.getKey()), expiryTime(ttlSeconds), marshall(e.getValue(), valueMarshalling))
238                 ));
239 
240                 // Now wait for the outcomes and then add to the grand result
241                 for (Map.Entry<String, Future<Boolean>> e : internalKeyToFutureMap.entrySet()) {
242                     e.getValue().get(); // Don't care about the result as it will always be true
243                 }
244 
245                 grandResult.putAll(missingValues);
246             }
247 
248             return grandResult;
249         });
250     }
251 
252     @Override
253     public CompletionStage<Map<String, Optional<IdentifiedValue<V>>>> getBulkIdentified(Iterable<String> internalKeys) {
254         return perform(() -> {
255             if (isEmpty(internalKeys)) {
256                 return new HashMap<>();
257             }
258 
259             // There is not equivalent call in Spy Memcached client. So need to do the calls async.
260             final VersionedExternalCacheRequestContext cacheContext = ensureCacheContext();
261 
262             // De-duplicate the keys, create map on internalKey to the future
263             final Map<String, Future<CASValue<Object>>> internalKeyToFuture =
264                     StreamSupport.stream(internalKeys.spliterator(), false)
265                             .distinct()
266                             .collect(Collectors.toMap(
267                                     k -> k,
268                                     k -> clientSupplier.get().asyncGets(cacheContext.externalEntryKeyFor(k))
269                             ));
270 
271             return internalKeyToFuture.entrySet().stream().collect(Collectors.toMap(
272                     Map.Entry::getKey,
273                     e -> identifiedValueFrom(e.getValue(), valueMarshalling)
274             ));
275         });
276     }
277 
278     @Override
279     public CompletionStage<Boolean> put(String internalKey, V value, PutPolicy policy) {
280         return perform(() -> {
281             final String externalKey = buildExternalKey(internalKey);
282             final byte[] valueBytes = valueMarshalling.getMarshaller().marshallToBytes(requireNonNull(value));
283 
284             final Future<Boolean> putOp =
285                     putOperationForPolicy(policy, clientSupplier.get(), externalKey, expiryTime(ttlSeconds), valueBytes);
286 
287             return putOp.get();
288         });
289     }
290 
291     @Override
292     public CompletionStage<Boolean> removeIf(String internalKey, CasIdentifier casId) {
293         return perform(() -> {
294             final String externalKey = buildExternalKey(internalKey);
295             final Future<Boolean> delOp = clientSupplier.get().delete(externalKey, safeExtractId(casId));
296             return delOp.get();
297         });
298     }
299 
300     @Override
301     public CompletionStage<Boolean> replaceIf(String internalKey, CasIdentifier casId, V newValue) {
302         return perform(() -> {
303             final String externalKey = buildExternalKey(internalKey);
304             final CASResponse casOp = clientSupplier.get().cas(
305                     externalKey,
306                     safeExtractId(casId),
307                     expiryTime(ttlSeconds),
308                     valueMarshalling.getMarshaller().marshallToBytes(requireNonNull(newValue)));
309             return casOp == CASResponse.OK;
310         });
311     }
312 
313     @Override
314     public CompletionStage<Void> remove(Iterable<String> internalKeys) {
315         // There is no bulk delete in the api, so need to remove each one async
316         return perform(() -> {
317             if (isEmpty(internalKeys)) {
318                 return null;
319             }
320 
321             // Lodge all the requests for delete
322             final List<Future<Boolean>> deleteOps =
323                     StreamSupport.stream(internalKeys.spliterator(), false)
324                             .map(this::buildExternalKey)
325                             .map(k -> clientSupplier.get().delete(k))
326                             .collect(Collectors.toList());
327 
328             // Now wait for the outcome
329             for (Future<Boolean> delOp : deleteOps) {
330                 delOp.get(); // don't care if succeeded or not
331             }
332 
333             return null;
334         });
335     }
336 
337     @Override
338     public CompletionStage<Void> removeAll() {
339         return perform(() -> {
340             final VersionedExternalCacheRequestContext cacheContext = ensureCacheContext();
341             cacheContext.updateCacheVersion(
342                     MemcachedUtils.incrementCacheVersion(clientSupplier, cacheContext.externalCacheVersionKey()));
343             return null;
344         });
345     }
346 
347     @VisibleForTesting
348     void refreshCacheVersion() {
349         // Refresh the cacheVersion. Useful if want to get the current state of the external cache in testing.
350         final VersionedExternalCacheRequestContext cacheContext = ensureCacheContext();
351         cacheContext.updateCacheVersion(
352                 MemcachedUtils.obtainCacheVersion(clientSupplier, cacheContext.externalCacheVersionKey()));
353     }
354 
355     private String buildExternalKey(String internalKey) throws OperationTimeoutException {
356         final VersionedExternalCacheRequestContext cacheContext = ensureCacheContext();
357         return cacheContext.externalEntryKeyFor(internalKey);
358     }
359 
360     protected VersionedExternalCacheRequestContext<V> ensureCacheContext() throws OperationTimeoutException {
361         final RequestContext requestContext = contextSupplier.get();
362 
363         return requestContext.computeIfAbsent(this, () -> {
364             // Need to build a new context, which involves getting the current cache version, or setting it if it does
365             // not exist.
366             log.trace("Cache {}: Setting up a new context", name);
367             final VersionedExternalCacheRequestContext<V> newCacheContext =
368                     new VersionedExternalCacheRequestContext<>(keyGenerator, name, requestContext::partitionIdentifier);
369             newCacheContext.updateCacheVersion(
370                     MemcachedUtils.obtainCacheVersion(clientSupplier, newCacheContext.externalCacheVersionKey()));
371             return newCacheContext;
372         });
373     }
374 
375     @Override
376     protected Logger getLogger() {
377         return log;
378     }
379 
380     @Override
381     protected ExternalCacheException mapException(Exception ex) {
382         return MemcachedUtils.mapException(ex);
383     }
384 }