View Javadoc

1   package com.atlassian.vcache.internal.memcached;
2   
3   import java.util.Collections;
4   import java.util.HashMap;
5   import java.util.HashSet;
6   import java.util.List;
7   import java.util.Map;
8   import java.util.Optional;
9   import java.util.Set;
10  import java.util.concurrent.CompletableFuture;
11  import java.util.concurrent.Future;
12  import java.util.function.Function;
13  import java.util.function.Supplier;
14  import java.util.stream.Collectors;
15  import java.util.stream.StreamSupport;
16  import javax.annotation.Nonnull;
17  
18  import com.atlassian.vcache.CasIdentifier;
19  import com.atlassian.vcache.DirectExternalCache;
20  import com.atlassian.vcache.ExternalCacheException;
21  import com.atlassian.vcache.ExternalCacheSettings;
22  import com.atlassian.vcache.IdentifiedValue;
23  import com.atlassian.vcache.Marshaller;
24  import com.atlassian.vcache.PutPolicy;
25  import com.atlassian.vcache.internal.RequestContext;
26  import com.atlassian.vcache.internal.core.DefaultIdentifiedValue;
27  import com.atlassian.vcache.internal.core.ExternalCacheKeyGenerator;
28  import com.atlassian.vcache.internal.core.VCacheUtils;
29  import com.atlassian.vcache.internal.core.service.AbstractExternalCache;
30  
31  import com.google.common.annotations.VisibleForTesting;
32  import net.spy.memcached.CASResponse;
33  import net.spy.memcached.CASValue;
34  import net.spy.memcached.MemcachedClientIF;
35  import net.spy.memcached.OperationTimeoutException;
36  import org.slf4j.Logger;
37  import org.slf4j.LoggerFactory;
38  
39  import static com.atlassian.vcache.internal.core.VCacheUtils.isEmpty;
40  import static com.atlassian.vcache.internal.core.VCacheUtils.marshall;
41  import static com.atlassian.vcache.internal.core.VCacheUtils.unmarshall;
42  import static com.atlassian.vcache.internal.memcached.MemcachedUtils.identifiedValueFrom;
43  import static com.atlassian.vcache.internal.memcached.MemcachedUtils.putOperationForPolicy;
44  import static com.atlassian.vcache.internal.memcached.MemcachedUtils.safeExtractId;
45  import static java.util.Objects.requireNonNull;
46  
47  /**
48   * Implementation of the {@link DirectExternalCache} that uses Memcached.
49   *
50   * @param <V> the value type
51   *
52   * @since 1.0
53   */
54  class MemcachedDirectExternalCache<V>
55          extends AbstractExternalCache<V>
56          implements DirectExternalCache<V>
57  {
58      private static final Logger log = LoggerFactory.getLogger(MemcachedDirectExternalCache.class);
59  
60      private final Supplier<MemcachedClientIF> clientSupplier;
61      private final Supplier<RequestContext> contextSupplier;
62      private final ExternalCacheKeyGenerator keyGenerator;
63      private final Marshaller<V> valueMarshaller;
64      private final ExternalCacheSettings settings;
65      private final int defaultTtl;
66  
67      MemcachedDirectExternalCache(
68              Supplier<MemcachedClientIF> clientSupplier,
69              Supplier<RequestContext> contextSupplier,
70              ExternalCacheKeyGenerator keyGenerator,
71              String name,
72              Marshaller<V> valueMarshaller,
73              ExternalCacheSettings settings)
74      {
75          super(name);
76          this.clientSupplier = requireNonNull(clientSupplier);
77          this.contextSupplier = requireNonNull(contextSupplier);
78          this.keyGenerator = requireNonNull(keyGenerator);
79          this.valueMarshaller = requireNonNull(valueMarshaller);
80          this.settings = requireNonNull(settings);
81          this.defaultTtl = VCacheUtils.roundUpToSeconds(settings.getDefaultTtl().get());
82      }
83  
84      @Nonnull
85      @Override
86      public CompletableFuture<Optional<V>> get(String internalKey)
87      {
88          return perform(() -> {
89              final String externalKey = buildExternalKey(internalKey);
90              return unmarshall(clientSupplier.get().get(externalKey), valueMarshaller);
91          });
92      }
93  
94      @Nonnull
95      @Override
96      public CompletableFuture<V> get(String internalKey, Supplier<V> supplier)
97      {
98          return perform(() -> {
99              final String externalKey = buildExternalKey(internalKey);
100             final Optional<V> existingValue = unmarshall(clientSupplier.get().get(externalKey), valueMarshaller);
101             if (existingValue.isPresent())
102             {
103                 return existingValue.get();
104             }
105 
106             log.trace("Cache {}, creating candidate for key {}", name, internalKey);
107             final V candidateValue = requireNonNull(supplier.get());
108             final byte[] candidateValueBytes = valueMarshaller.marshall(candidateValue);
109 
110             // Loop until either able to add the candidate value, or retrieve one that has been added by another thread
111             for (; ; )
112             {
113                 final Future<Boolean> addOp = clientSupplier.get().add(externalKey, defaultTtl, candidateValueBytes);
114                 if (addOp.get())
115                 {
116                     // I break here, rather than just return, due to battling with the compiler. Unless written
117                     // this way, the compiler will not allow the lambda structure.
118                     break;
119                 }
120 
121                 log.info("Cache {}, unable to add candidate for key {}, retrieve what was added", name, internalKey);
122                 final Optional<V> otherAddedValue = unmarshall(clientSupplier.get().get(externalKey), valueMarshaller);
123                 if (otherAddedValue.isPresent())
124                 {
125                     return otherAddedValue.get();
126                 }
127 
128                 log.info("Cache {}, unable to retrieve recently added candidate for key {}, looping", name, internalKey);
129             }
130             return candidateValue;
131         });
132     }
133 
134     @Nonnull
135     @Override
136     public CompletableFuture<Optional<IdentifiedValue<V>>> getIdentified(String internalKey)
137     {
138         return perform(() -> {
139             final String externalKey = buildExternalKey(internalKey);
140             final CASValue<Object> casValue = clientSupplier.get().gets(externalKey);
141             if (casValue == null)
142             {
143                 return Optional.empty();
144             }
145 
146             final CasIdentifier identifier = new MemcachedCasIdentifier(casValue.getCas());
147             final IdentifiedValue<V> iv = new DefaultIdentifiedValue<>(
148                     identifier, valueMarshaller.unmarshall((byte[]) casValue.getValue()));
149             return Optional.of(iv);
150         });
151     }
152 
153     @Nonnull
154     @Override
155     public CompletableFuture<Map<String, Optional<V>>> getBulk(Iterable<String> internalKeys)
156     {
157         return perform(() -> {
158             if (isEmpty(internalKeys))
159             {
160                 return new HashMap<>();
161             }
162 
163             // De-duplicate the keys and calculate the externalKeys
164             final ExternalCacheRequestContext cacheContext = ensureCacheContext();
165 
166             final Set<String> externalKeys = StreamSupport.stream(internalKeys.spliterator(), false)
167                     .map(cacheContext::externalEntryKeyFor)
168                     .collect(Collectors.toSet());
169 
170             // Returns map of keys that contain values, so need to handle the missing ones
171             final Map<String, Object> haveValues = clientSupplier.get().getBulk(externalKeys);
172 
173             return externalKeys.stream().collect(Collectors.toMap(
174                     cacheContext::internalEntryKeyFor,
175                     k -> unmarshall(haveValues.get(k), valueMarshaller)));
176         });
177     }
178 
179     @Nonnull
180     @Override
181     public CompletableFuture<Map<String, V>> getBulk(Function<Set<String>, Map<String, V>> factory, Iterable<String> internalKeys)
182     {
183         return perform(() -> {
184             if (isEmpty(internalKeys))
185             {
186                 return new HashMap<>();
187             }
188 
189             // De-duplicate the keys and calculate the externalKeys
190             final ExternalCacheRequestContext cacheContext = ensureCacheContext();
191 
192             final Set<String> externalKeys = Collections.unmodifiableSet(
193                     StreamSupport.stream(internalKeys.spliterator(), false)
194                             .map(cacheContext::externalEntryKeyFor)
195                             .collect(Collectors.toSet()));
196 
197             // Returns map of keys that contain values, so need to calculate the
198             // missing ones
199             final Map<String, Object> haveValues = clientSupplier.get().getBulk(externalKeys);
200             log.trace("{} of {} entries have values", haveValues.size(), externalKeys.size());
201             final Set<String> missingExternalKeys = new HashSet<>(externalKeys);
202             missingExternalKeys.removeAll(haveValues.keySet());
203 
204             // Add the existing values to the grand result
205             final Map<String, V> grandResult = haveValues.entrySet().stream().collect(Collectors.toMap(
206                     e -> cacheContext.internalEntryKeyFor(e.getKey()),
207                     e -> unmarshall(e.getValue(), valueMarshaller).get()
208             ));
209 
210             if (!missingExternalKeys.isEmpty())
211             {
212                 // Okay, need to get the missing values and mapping from externalKeys to internalKeys
213                 final Set<String> missingInternalKeys = Collections.unmodifiableSet(
214                         missingExternalKeys.stream().map(cacheContext::internalEntryKeyFor).collect(Collectors.toSet()));
215                 final Map<String, V> missingValues = factory.apply(missingInternalKeys);
216 
217                 // Okay, got the missing values, now need to add them to Memcached
218                 final Map<String, Future<Boolean>> internalKeyToFutureMap = missingValues.entrySet().stream().collect(Collectors.toMap(
219                         Map.Entry::getKey,
220                         e -> clientSupplier.get().set(
221                                 cacheContext.externalEntryKeyFor(e.getKey()), defaultTtl, marshall(e.getValue(), valueMarshaller))
222                 ));
223 
224                 // Now wait for the outcomes and then add to the grand result
225                 for (Map.Entry<String, Future<Boolean>> e : internalKeyToFutureMap.entrySet())
226                 {
227                     e.getValue().get(); // Don't care about the result as it will always be true
228                 }
229 
230                 grandResult.putAll(missingValues);
231             }
232 
233             return grandResult;
234         });
235     }
236 
237     @Nonnull
238     @Override
239     public CompletableFuture<Map<String, Optional<IdentifiedValue<V>>>> getBulkIdentified(Iterable<String> internalKeys)
240     {
241         return perform(() -> {
242             if (isEmpty(internalKeys))
243             {
244                 return new HashMap<>();
245             }
246 
247             // There is not equivalent call in Spy Memcached client. So need to do the calls async.
248             final ExternalCacheRequestContext cacheContext = ensureCacheContext();
249 
250             // De-duplicate the keys, create map on internalKey to the future
251             final Map<String, Future<CASValue<Object>>> internalKeyToFuture =
252                     StreamSupport.stream(internalKeys.spliterator(), false)
253                             .distinct()
254                             .collect(Collectors.toMap(
255                                     k -> k,
256                                     k -> clientSupplier.get().asyncGets(cacheContext.externalEntryKeyFor(k))
257                             ));
258 
259             return internalKeyToFuture.entrySet().stream().collect(Collectors.toMap(
260                     Map.Entry::getKey,
261                     e -> identifiedValueFrom(e.getValue(), valueMarshaller)
262             ));
263         });
264     }
265 
266     @Nonnull
267     @Override
268     public CompletableFuture<Boolean> put(String internalKey, V value, PutPolicy policy)
269     {
270         return perform(() -> {
271             final String externalKey = buildExternalKey(internalKey);
272             final byte[] valueBytes = valueMarshaller.marshall(value);
273 
274             final Future<Boolean> putOp =
275                     putOperationForPolicy(policy, clientSupplier.get(), externalKey, defaultTtl, valueBytes);
276 
277             return putOp.get();
278         });
279     }
280 
281     @Nonnull
282     @Override
283     public CompletableFuture<Boolean> removeIf(String internalKey, CasIdentifier casId)
284     {
285         return perform(() -> {
286             final String partitionedKey = buildExternalKey(internalKey);
287             final Future<Boolean> delOp = clientSupplier.get().delete(partitionedKey, safeExtractId(casId));
288             return delOp.get();
289         });
290     }
291 
292     @Nonnull
293     @Override
294     public CompletableFuture<Boolean> replaceIf(String internalKey, CasIdentifier casId, V newValue)
295     {
296         return perform(() -> {
297             final String externalKey = buildExternalKey(internalKey);
298             final CASResponse casOp = clientSupplier.get().cas(
299                     externalKey, safeExtractId(casId), defaultTtl, valueMarshaller.marshall(newValue));
300             return casOp == CASResponse.OK;
301         });
302     }
303 
304     @Nonnull
305     @Override
306     public CompletableFuture<Void> remove(Iterable<String> internalKeys)
307     {
308         // There is no bulk delete in the api, so need to remove each one async
309         return perform(() -> {
310             if (isEmpty(internalKeys))
311             {
312                 return null;
313             }
314 
315             // Lodge all the requests for delete
316             final List<Future<Boolean>> deleteOps =
317                     StreamSupport.stream(internalKeys.spliterator(), false)
318                             .map(this::buildExternalKey)
319                             .map(k -> clientSupplier.get().delete(k))
320                             .collect(Collectors.toList());
321 
322             // Now wait for the outcome
323             for (Future<Boolean> delOp : deleteOps)
324             {
325                 delOp.get(); // don't care if succeeded or not
326             }
327 
328             return null;
329         });
330     }
331 
332     @Nonnull
333     @Override
334     public CompletableFuture<Void> removeAll()
335     {
336         return perform(() -> {
337             final ExternalCacheRequestContext cacheContext = ensureCacheContext();
338             cacheContext.updateCacheVersion(
339                     clientSupplier.get().incr(cacheContext.externalCacheVersionKey(), 1, 1));
340             return null;
341         });
342     }
343 
344     @VisibleForTesting
345     void refreshCacheVersion()
346     {
347         // Refresh the cacheVersion. Useful if want to get the current state of the external cache in testing.
348         final ExternalCacheRequestContext cacheContext = ensureCacheContext();
349         cacheContext.updateCacheVersion(
350                 clientSupplier.get().incr(cacheContext.externalCacheVersionKey(), 0, 1));
351     }
352 
353     private String buildExternalKey(String internalKey) throws OperationTimeoutException
354     {
355         final ExternalCacheRequestContext cacheContext = ensureCacheContext();
356         return cacheContext.externalEntryKeyFor(internalKey);
357     }
358 
359     @Nonnull
360     protected ExternalCacheRequestContext ensureCacheContext() throws OperationTimeoutException
361     {
362         final RequestContext requestContext = contextSupplier.get();
363 
364         return requestContext.computeIfAbsent(this, () -> {
365             // Need to build a new context, which involves getting the current cache version, or setting it if it does
366             // not exist.
367             log.trace("Cache {}: Setting up a new context", name);
368             final ExternalCacheRequestContext newCacheContext =
369                     new ExternalCacheRequestContext(keyGenerator, name, requestContext::partitionIdentifier);
370             newCacheContext.updateCacheVersion(
371                     clientSupplier.get().incr(newCacheContext.externalCacheVersionKey(), 0, 1));
372             return newCacheContext;
373         });
374     }
375 
376     @Nonnull
377     @Override
378     protected Logger getLogger()
379     {
380         return log;
381     }
382 
383     @Nonnull
384     @Override
385     protected ExternalCacheException mapException(Exception ex)
386     {
387         return MemcachedUtils.mapException(ex);
388     }
389 }