View Javadoc

1   package com.atlassian.vcache.internal.memcached;
2   
3   import com.atlassian.marshalling.api.MarshallingPair;
4   import com.atlassian.vcache.CasIdentifier;
5   import com.atlassian.vcache.DirectExternalCache;
6   import com.atlassian.vcache.ExternalCacheException;
7   import com.atlassian.vcache.ExternalCacheSettings;
8   import com.atlassian.vcache.IdentifiedValue;
9   import com.atlassian.vcache.PutPolicy;
10  import com.atlassian.vcache.internal.RequestContext;
11  import com.atlassian.vcache.internal.core.DefaultIdentifiedValue;
12  import com.atlassian.vcache.internal.core.ExternalCacheKeyGenerator;
13  import com.atlassian.vcache.internal.core.VCacheCoreUtils;
14  import com.atlassian.vcache.internal.core.service.AbstractExternalCache;
15  import com.atlassian.vcache.internal.core.service.FactoryUtils;
16  import com.atlassian.vcache.internal.core.service.VersionedExternalCacheRequestContext;
17  import com.google.common.annotations.VisibleForTesting;
18  import net.spy.memcached.CASResponse;
19  import net.spy.memcached.CASValue;
20  import net.spy.memcached.MemcachedClientIF;
21  import net.spy.memcached.OperationTimeoutException;
22  import org.slf4j.Logger;
23  import org.slf4j.LoggerFactory;
24  
25  import java.util.Collections;
26  import java.util.HashMap;
27  import java.util.HashSet;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.Optional;
31  import java.util.Set;
32  import java.util.concurrent.CompletionStage;
33  import java.util.concurrent.Future;
34  import java.util.function.Function;
35  import java.util.function.Supplier;
36  import java.util.stream.Collectors;
37  import java.util.stream.StreamSupport;
38  
39  import static com.atlassian.vcache.internal.core.VCacheCoreUtils.isEmpty;
40  import static com.atlassian.vcache.internal.core.VCacheCoreUtils.marshall;
41  import static com.atlassian.vcache.internal.core.VCacheCoreUtils.unmarshall;
42  import static com.atlassian.vcache.internal.memcached.MemcachedUtils.expiryTime;
43  import static com.atlassian.vcache.internal.memcached.MemcachedUtils.identifiedValueFrom;
44  import static com.atlassian.vcache.internal.memcached.MemcachedUtils.putOperationForPolicy;
45  import static com.atlassian.vcache.internal.memcached.MemcachedUtils.safeExtractId;
46  import static java.util.Objects.requireNonNull;
47  
48  /**
49   * Implementation of the {@link DirectExternalCache} that uses Memcached.
50   *
51   * @param <V> the value type
52   * @since 1.0
53   */
54  class MemcachedDirectExternalCache<V>
55          extends AbstractExternalCache<V>
56          implements DirectExternalCache<V> {
57      private static final Logger log = LoggerFactory.getLogger(MemcachedDirectExternalCache.class);
58  
59      private final Supplier<MemcachedClientIF> clientSupplier;
60      private final Supplier<RequestContext> contextSupplier;
61      private final ExternalCacheKeyGenerator keyGenerator;
62      private final MarshallingPair<V> valueMarshalling;
63      private final int ttlSeconds;
64  
65      MemcachedDirectExternalCache(
66              MemcachedVCacheServiceSettings serviceSettings,
67              Supplier<RequestContext> contextSupplier,
68              ExternalCacheKeyGenerator keyGenerator,
69              String name,
70              MarshallingPair<V> valueMarshalling,
71              ExternalCacheSettings settings) {
72          super(name, serviceSettings.getLockTimeout(), serviceSettings.getExternalCacheExceptionListener());
73          this.clientSupplier = requireNonNull(serviceSettings.getClientSupplier());
74          this.contextSupplier = requireNonNull(contextSupplier);
75          this.keyGenerator = requireNonNull(keyGenerator);
76          this.valueMarshalling = requireNonNull(valueMarshalling);
77          this.ttlSeconds = VCacheCoreUtils.roundUpToSeconds(settings.getDefaultTtl().get());
78      }
79  
80      @Override
81      public CompletionStage<Optional<V>> get(String internalKey) {
82          return perform(() -> {
83              final String externalKey = buildExternalKey(internalKey);
84              return unmarshall((byte[]) clientSupplier.get().get(externalKey), valueMarshalling);
85          });
86      }
87  
88      @Override
89      public CompletionStage<V> get(String internalKey, Supplier<V> supplier) {
90          return perform(() -> {
91              final String externalKey = buildExternalKey(internalKey);
92              final Optional<V> existingValue = unmarshall((byte[]) clientSupplier.get().get(externalKey), valueMarshalling);
93              if (existingValue.isPresent()) {
94                  return existingValue.get();
95              }
96  
97              log.trace("Cache {}, creating candidate for key {}", name, internalKey);
98              final V candidateValue = requireNonNull(supplier.get());
99              final byte[] candidateValueBytes = valueMarshalling.getMarshaller().marshallToBytes(candidateValue);
100 
101             // Loop until either able to add the candidate value, or retrieve one that has been added by another thread
102             for (; ; ) {
103                 final Future<Boolean> addOp = clientSupplier.get().add(externalKey, expiryTime(ttlSeconds), candidateValueBytes);
104                 if (addOp.get()) {
105                     // I break here, rather than just return, due to battling with the compiler. Unless written
106                     // this way, the compiler will not allow the lambda structure.
107                     break;
108                 }
109 
110                 log.info("Cache {}, unable to add candidate for key {}, retrieve what was added", name, internalKey);
111                 final Optional<V> otherAddedValue = unmarshall((byte[]) clientSupplier.get().get(externalKey), valueMarshalling);
112                 if (otherAddedValue.isPresent()) {
113                     return otherAddedValue.get();
114                 }
115 
116                 log.info("Cache {}, unable to retrieve recently added candidate for key {}, looping", name, internalKey);
117             }
118             return candidateValue;
119         });
120     }
121 
122     @Override
123     public CompletionStage<Optional<IdentifiedValue<V>>> getIdentified(String internalKey) {
124         return perform(() -> {
125             final String externalKey = buildExternalKey(internalKey);
126             final CASValue<Object> casValue = clientSupplier.get().gets(externalKey);
127             if (casValue == null) {
128                 return Optional.empty();
129             }
130 
131             final CasIdentifier identifier = new MemcachedCasIdentifier(casValue.getCas());
132             final IdentifiedValue<V> iv = new DefaultIdentifiedValue<>(
133                     identifier, valueMarshalling.getUnmarshaller().unmarshallFrom((byte[]) casValue.getValue()));
134             return Optional.of(iv);
135         });
136     }
137 
138     @Override
139     public CompletionStage<IdentifiedValue<V>> getIdentified(String internalKey, Supplier<V> supplier) {
140         return perform(() -> {
141             final String externalKey = buildExternalKey(internalKey);
142             final CASValue<Object> existingCasValue = clientSupplier.get().gets(externalKey);
143 
144             if (existingCasValue != null) {
145                 final CasIdentifier identifier = new MemcachedCasIdentifier(existingCasValue.getCas());
146                 final IdentifiedValue<V> iv = new DefaultIdentifiedValue<>(
147                         identifier, valueMarshalling.getUnmarshaller().unmarshallFrom((byte[]) existingCasValue.getValue()));
148                 return iv;
149             }
150 
151             log.trace("Cache {}, creating candidate for key {}", name, internalKey);
152             final V candidateValue = requireNonNull(supplier.get());
153             final byte[] candidateValueBytes = valueMarshalling.getMarshaller().marshallToBytes(candidateValue);
154 
155             // Loop until either able to add the candidate value, or retrieve one that has been added by another thread.
156             for (; ; ) {
157                 final Future<Boolean> addOp = clientSupplier.get().add(externalKey, expiryTime(ttlSeconds), candidateValueBytes);
158                 if (!addOp.get()) {
159                     log.trace("Cache {}, unable to add candidate for key {}", name, internalKey);
160                 }
161 
162                 // Regardless of whether able to add an entry or not, need to retrieve to get the CAS value.
163                 log.trace("Cache {}, retrieving the candidate for key {}", name, internalKey);
164                 final CASValue<Object> otherAddedCasValue = clientSupplier.get().gets(externalKey);
165                 if (otherAddedCasValue != null) {
166                     final CasIdentifier identifier = new MemcachedCasIdentifier(otherAddedCasValue.getCas());
167                     final IdentifiedValue<V> iv = new DefaultIdentifiedValue<>(
168                             identifier, valueMarshalling.getUnmarshaller().unmarshallFrom((byte[]) otherAddedCasValue.getValue()));
169                     return iv;
170                 }
171 
172                 log.info("Cache {}, unable to retrieve recently added candidate for key {}, looping", name, internalKey);
173             }
174         });
175     }
176 
177     @Override
178     public CompletionStage<Map<String, Optional<V>>> getBulk(Iterable<String> internalKeys) {
179         return perform(() -> {
180             if (isEmpty(internalKeys)) {
181                 return new HashMap<>();
182             }
183 
184             // De-duplicate the keys and calculate the externalKeys
185             final VersionedExternalCacheRequestContext cacheContext = ensureCacheContext();
186 
187             final Set<String> externalKeys = StreamSupport.stream(internalKeys.spliterator(), false)
188                     .map(cacheContext::externalEntryKeyFor)
189                     .collect(Collectors.toSet());
190 
191             // Returns map of keys that contain values, so need to handle the missing ones
192             final Map<String, Object> haveValues = clientSupplier.get().getBulk(externalKeys);
193 
194             return externalKeys.stream().collect(Collectors.toMap(
195                     cacheContext::internalEntryKeyFor,
196                     k -> unmarshall((byte[]) haveValues.get(k), valueMarshalling)));
197         });
198     }
199 
200     @Override
201     public CompletionStage<Map<String, V>> getBulk(Function<Set<String>, Map<String, V>> factory, Iterable<String> internalKeys) {
202         return perform(() -> {
203             if (isEmpty(internalKeys)) {
204                 return new HashMap<>();
205             }
206 
207             // De-duplicate the keys and calculate the externalKeys
208             final VersionedExternalCacheRequestContext cacheContext = ensureCacheContext();
209 
210             final Set<String> externalKeys = Collections.unmodifiableSet(
211                     StreamSupport.stream(internalKeys.spliterator(), false)
212                             .map(cacheContext::externalEntryKeyFor)
213                             .collect(Collectors.toSet()));
214 
215             // Returns map of keys that contain values, so need to calculate the
216             // missing ones
217             final Map<String, Object> haveValues = clientSupplier.get().getBulk(externalKeys);
218             log.trace("{} of {} entries have values", haveValues.size(), externalKeys.size());
219             final Set<String> missingExternalKeys = new HashSet<>(externalKeys);
220             missingExternalKeys.removeAll(haveValues.keySet());
221 
222             // Add the existing values to the grand result
223             final Map<String, V> grandResult = haveValues.entrySet().stream().collect(Collectors.toMap(
224                     e -> cacheContext.internalEntryKeyFor(e.getKey()),
225                     e -> unmarshall((byte[]) e.getValue(), valueMarshalling).get()
226             ));
227 
228             if (!missingExternalKeys.isEmpty()) {
229                 // Okay, need to get the missing values and mapping from externalKeys to internalKeys
230                 final Set<String> missingInternalKeys = Collections.unmodifiableSet(
231                         missingExternalKeys.stream().map(cacheContext::internalEntryKeyFor).collect(Collectors.toSet()));
232                 final Map<String, V> missingValues = factory.apply(missingInternalKeys);
233                 FactoryUtils.verifyFactoryResult(missingValues, missingInternalKeys);
234 
235                 // Okay, got the missing values, now need to add them to Memcached
236                 final Map<String, Future<Boolean>> internalKeyToFutureMap = missingValues.entrySet().stream().collect(Collectors.toMap(
237                         Map.Entry::getKey,
238                         e -> clientSupplier.get().set(
239                                 cacheContext.externalEntryKeyFor(e.getKey()), expiryTime(ttlSeconds), marshall(e.getValue(), valueMarshalling))
240                 ));
241 
242                 // Now wait for the outcomes and then add to the grand result
243                 for (Map.Entry<String, Future<Boolean>> e : internalKeyToFutureMap.entrySet()) {
244                     e.getValue().get(); // Don't care about the result as it will always be true
245                 }
246 
247                 grandResult.putAll(missingValues);
248             }
249 
250             return grandResult;
251         });
252     }
253 
254     @Override
255     public CompletionStage<Map<String, Optional<IdentifiedValue<V>>>> getBulkIdentified(Iterable<String> internalKeys) {
256         return perform(() -> {
257             if (isEmpty(internalKeys)) {
258                 return new HashMap<>();
259             }
260 
261             // There is not equivalent call in Spy Memcached client. So need to do the calls async.
262             final VersionedExternalCacheRequestContext cacheContext = ensureCacheContext();
263 
264             // De-duplicate the keys, create map on internalKey to the future
265             final Map<String, Future<CASValue<Object>>> internalKeyToFuture =
266                     StreamSupport.stream(internalKeys.spliterator(), false)
267                             .distinct()
268                             .collect(Collectors.toMap(
269                                     k -> k,
270                                     k -> clientSupplier.get().asyncGets(cacheContext.externalEntryKeyFor(k))
271                             ));
272 
273             return internalKeyToFuture.entrySet().stream().collect(Collectors.toMap(
274                     Map.Entry::getKey,
275                     e -> identifiedValueFrom(e.getValue(), valueMarshalling)
276             ));
277         });
278     }
279 
280     @Override
281     public CompletionStage<Boolean> put(String internalKey, V value, PutPolicy policy) {
282         return perform(() -> {
283             final String externalKey = buildExternalKey(internalKey);
284             final byte[] valueBytes = valueMarshalling.getMarshaller().marshallToBytes(requireNonNull(value));
285 
286             final Future<Boolean> putOp =
287                     putOperationForPolicy(policy, clientSupplier.get(), externalKey, expiryTime(ttlSeconds), valueBytes);
288 
289             return putOp.get();
290         });
291     }
292 
293     @Override
294     public CompletionStage<Boolean> removeIf(String internalKey, CasIdentifier casId) {
295         return perform(() -> {
296             final String externalKey = buildExternalKey(internalKey);
297             final Future<Boolean> delOp = clientSupplier.get().delete(externalKey, safeExtractId(casId));
298             return delOp.get();
299         });
300     }
301 
302     @Override
303     public CompletionStage<Boolean> replaceIf(String internalKey, CasIdentifier casId, V newValue) {
304         return perform(() -> {
305             final String externalKey = buildExternalKey(internalKey);
306             final CASResponse casOp = clientSupplier.get().cas(
307                     externalKey,
308                     safeExtractId(casId),
309                     expiryTime(ttlSeconds),
310                     valueMarshalling.getMarshaller().marshallToBytes(requireNonNull(newValue)));
311             return casOp == CASResponse.OK;
312         });
313     }
314 
315     @Override
316     public CompletionStage<Void> remove(Iterable<String> internalKeys) {
317         // There is no bulk delete in the api, so need to remove each one async
318         return perform(() -> {
319             if (isEmpty(internalKeys)) {
320                 return null;
321             }
322 
323             // Lodge all the requests for delete
324             final List<Future<Boolean>> deleteOps =
325                     StreamSupport.stream(internalKeys.spliterator(), false)
326                             .map(this::buildExternalKey)
327                             .map(k -> clientSupplier.get().delete(k))
328                             .collect(Collectors.toList());
329 
330             // Now wait for the outcome
331             for (Future<Boolean> delOp : deleteOps) {
332                 delOp.get(); // don't care if succeeded or not
333             }
334 
335             return null;
336         });
337     }
338 
339     @Override
340     public CompletionStage<Void> removeAll() {
341         return perform(() -> {
342             ensureCacheContext().updateCacheVersion(MemcachedUtils.cacheVersionIncrementer(clientSupplier));
343             return null;
344         });
345     }
346 
347     @VisibleForTesting
348     void refreshCacheVersion() {
349         // Refresh the cacheVersion. Useful if want to get the current state of the external cache in testing.
350         ensureCacheContext().updateCacheVersion(MemcachedUtils.cacheVersionSupplier(clientSupplier));
351     }
352 
353     private String buildExternalKey(String internalKey) throws OperationTimeoutException {
354         final VersionedExternalCacheRequestContext cacheContext = ensureCacheContext();
355         return cacheContext.externalEntryKeyFor(internalKey);
356     }
357 
358     protected VersionedExternalCacheRequestContext<V> ensureCacheContext() {
359         final RequestContext requestContext = contextSupplier.get();
360 
361         return requestContext.computeIfAbsent(this, () -> {
362             // Need to build a new context, which involves getting the current cache version, or setting it if it does
363             // not exist.
364             log.trace("Cache {}: Setting up a new context", name);
365             return new VersionedExternalCacheRequestContext<>(
366                     keyGenerator, name, requestContext::partitionIdentifier,
367                     MemcachedUtils.cacheVersionSupplier(clientSupplier),
368                     lockTimeout);
369         });
370     }
371 
372     @Override
373     protected Logger getLogger() {
374         return log;
375     }
376 
377     @Override
378     protected ExternalCacheException mapException(Exception ex) {
379         return MemcachedUtils.mapException(ex);
380     }
381 }