View Javadoc

1   package com.atlassian.vcache.internal.memcached;
2   
3   import com.atlassian.marshalling.api.MarshallingPair;
4   import com.atlassian.vcache.ExternalCacheException;
5   import com.atlassian.vcache.ExternalCacheSettings;
6   import com.atlassian.vcache.PutPolicy;
7   import com.atlassian.vcache.internal.MetricLabel;
8   import com.atlassian.vcache.internal.RequestContext;
9   import com.atlassian.vcache.internal.core.ExternalCacheKeyGenerator;
10  import com.atlassian.vcache.internal.core.VCacheCoreUtils;
11  import com.atlassian.vcache.internal.core.metrics.CacheType;
12  import com.atlassian.vcache.internal.core.metrics.MetricsRecorder;
13  import com.atlassian.vcache.internal.core.service.AbstractStableReadExternalCache;
14  import com.atlassian.vcache.internal.core.service.VersionedExternalCacheRequestContext;
15  import net.spy.memcached.MemcachedClientIF;
16  import net.spy.memcached.OperationTimeoutException;
17  import org.slf4j.Logger;
18  import org.slf4j.LoggerFactory;
19  
20  import java.util.Collections;
21  import java.util.HashSet;
22  import java.util.Map;
23  import java.util.Optional;
24  import java.util.Set;
25  import java.util.concurrent.CompletableFuture;
26  import java.util.concurrent.CompletionStage;
27  import java.util.concurrent.ExecutionException;
28  import java.util.concurrent.Future;
29  import java.util.concurrent.atomic.AtomicBoolean;
30  import java.util.function.Function;
31  import java.util.function.Supplier;
32  import java.util.stream.Collectors;
33  import java.util.stream.StreamSupport;
34  
35  import static com.atlassian.vcache.internal.core.VCacheCoreUtils.isEmpty;
36  import static com.atlassian.vcache.internal.core.VCacheCoreUtils.marshall;
37  import static com.atlassian.vcache.internal.core.VCacheCoreUtils.unmarshall;
38  import static com.atlassian.vcache.internal.memcached.MemcachedUtils.expiryTime;
39  import static com.atlassian.vcache.internal.memcached.MemcachedUtils.putOperationForPolicy;
40  import static java.util.Objects.requireNonNull;
41  
42  /**
43   * Implementation that backs onto Memcached.
44   *
45   * @param <V> the value type
46   * @since 1.0.0
47   */
48  class MemcachedStableReadExternalCache<V>
49          extends AbstractStableReadExternalCache<V> {
50      private static final Logger log = LoggerFactory.getLogger(MemcachedStableReadExternalCache.class);
51  
52      private final Supplier<MemcachedClientIF> clientSupplier;
53      private final Supplier<RequestContext> contextSupplier;
54      private final ExternalCacheKeyGenerator keyGenerator;
55      private final MarshallingPair<V> valueMarshalling;
56      private final int ttlSeconds;
57  
58      MemcachedStableReadExternalCache(
59              Supplier<MemcachedClientIF> clientSupplier,
60              Supplier<RequestContext> contextSupplier,
61              ExternalCacheKeyGenerator keyGenerator,
62              String name,
63              MarshallingPair<V> valueMarshalling,
64              ExternalCacheSettings settings,
65              MetricsRecorder metricsRecorder) {
66          super(name, metricsRecorder);
67          this.clientSupplier = requireNonNull(clientSupplier);
68          this.contextSupplier = requireNonNull(contextSupplier);
69          this.keyGenerator = requireNonNull(keyGenerator);
70          this.valueMarshalling = requireNonNull(valueMarshalling);
71          this.ttlSeconds = VCacheCoreUtils.roundUpToSeconds(settings.getDefaultTtl().get());
72      }
73  
74      @Override
75      public CompletionStage<Boolean> internalPut(String internalKey, V value, PutPolicy policy) {
76          final VersionedExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
77          return perform(() -> {
78                      // This stores the result of the remote operation. If it fails then we can roll back. We use Atomic because in the
79                      // future we may be async.
80                      final AtomicBoolean remotePutResult = new AtomicBoolean(true);
81                      cacheContext.computeValue(internalKey, (key, oldValue) -> {
82  
83                          // computeValue allows us to get the current CompletableFuture (if there is one) and chain the next
84                          // put() onto the old future. If the old future is still running then we preserve
85                          // ordering (and so consistency). Also any blocked gets will get the new value when the chain
86                          // completes. The map locks the update to the future so updates are nicely serialized.
87                          if (oldValue != null) {
88                              return oldValue.thenApply((oldV) -> {
89                                  remotePutResult.set(remotePut(cacheContext.externalEntryKeyFor(internalKey), value, policy));
90                                  return Optional.of(value);
91                              });
92                          } else {
93                              remotePutResult.set(remotePut(cacheContext.externalEntryKeyFor(internalKey), value, policy));
94                              return CompletableFuture.completedFuture(Optional.of(value));
95                          }
96                      });
97                      return remotePutResult.get();
98                  },
99                  (result) -> {
100                     if (!result) {
101                         cacheContext.forgetValue(internalKey);
102                     }
103                 });
104     }
105 
106     private boolean remotePut(String externalKey, V value, PutPolicy policy) {
107 
108         final byte[] valueBytes = valueMarshalling.getMarshaller().marshallToBytes(value);
109 
110         try {
111             final Future<Boolean> putOp = putOperationForPolicy(policy, clientSupplier.get(), externalKey, expiryTime(ttlSeconds), valueBytes);
112             return putOp.get();
113         } catch (InterruptedException | ExecutionException e) {
114             throw new ExternalCacheException(ExternalCacheException.Reason.UNCLASSIFIED_FAILURE, e);
115         }
116     }
117 
118     @Override
119     protected CompletionStage<Void> internalRemove(Iterable<String> internalKeys) {
120         // There is no bulk delete in the api, so need to remove each one async
121         return perform(() -> {
122             if (isEmpty(internalKeys)) {
123                 return null;
124             }
125 
126             // Lodge all the requests for delete
127             final VersionedExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
128             final Map<String, Future<Boolean>> deleteOps =
129                     StreamSupport.stream(internalKeys.spliterator(), false)
130                             .distinct()
131                             .collect(Collectors.toMap(
132                                     k -> k,
133                                     k -> clientSupplier.get().delete(cacheContext.externalEntryKeyFor(k))
134                             ));
135 
136             // Need to loop verifying the outcome. If an exception was thrown for an operation,
137             // then do not record the removal.
138             Exception failureException = null;
139             for (Map.Entry<String, Future<Boolean>> delOp : deleteOps.entrySet()) {
140                 try {
141                     delOp.getValue().get();
142                     cacheContext.recordValue(delOp.getKey(), Optional.empty());
143                 } catch (ExecutionException | InterruptedException ex) {
144                     log.info("Cache {}: unable to remove key {}", name, delOp.getKey(), ex);
145                     failureException = ex;
146                 }
147             }
148 
149             // Finally, if there was a failure, then re-throw the last one reported (as good as any!)
150             if (failureException != null) {
151                 if (failureException instanceof ExecutionException) {
152                     throw (ExecutionException) failureException;
153                 }
154                 throw (InterruptedException) failureException;
155             }
156 
157             return null;
158         });
159     }
160 
161     @Override
162     protected CompletionStage<Void> internalRemoveAll() {
163         return perform(() -> {
164             final VersionedExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
165             cacheContext.updateCacheVersion(
166                     MemcachedUtils.incrementCacheVersion(clientSupplier, cacheContext.externalCacheVersionKey()));
167             cacheContext.forgetAllValues();
168             return null;
169         });
170     }
171 
172     @Override
173     protected Logger getLogger() {
174         return log;
175     }
176 
177     @Override
178     protected VersionedExternalCacheRequestContext<V> ensureCacheContext() throws OperationTimeoutException {
179         final RequestContext requestContext = contextSupplier.get();
180 
181         return requestContext.computeIfAbsent(this, () -> {
182             // Need to build a new context, which involves getting the current cache version, or setting it if it does
183             // not exist.
184             log.trace("Cache {}: Setting up a new context", name);
185             final VersionedExternalCacheRequestContext<V> newCacheContext = new VersionedExternalCacheRequestContext<>(
186                     keyGenerator, name, requestContext::partitionIdentifier);
187             newCacheContext.updateCacheVersion(
188                     MemcachedUtils.obtainCacheVersion(clientSupplier, newCacheContext.externalCacheVersionKey()));
189             return newCacheContext;
190         });
191     }
192 
193     @Override
194     protected V handleCreation(String internalKey, Supplier<V> supplier)
195             throws ExecutionException, InterruptedException {
196         final VersionedExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
197         final V candidateValue = requireNonNull(supplier.get());
198         final byte[] candidateValueBytes = valueMarshalling.getMarshaller().marshallToBytes(candidateValue);
199         final String externalKey = cacheContext.externalEntryKeyFor(internalKey);
200 
201         // Loop until either able to add the candidate value, or retrieve one that has been added by another thread
202         for (; ; ) {
203             final Future<Boolean> addOp = clientSupplier.get().add(externalKey, expiryTime(ttlSeconds), candidateValueBytes);
204             if (addOp.get()) {
205                 // I break here, rather than just return, due to battling with the compiler. Unless written
206                 // this way, the compiler will not allow the lambda structure.
207                 break;
208             }
209 
210             getLogger().info("Cache {}, unable to add candidate for key {}, retrieve what was added", name, internalKey);
211             final Optional<V> otherAddedValue = unmarshall((byte[]) clientSupplier.get().get(externalKey), valueMarshalling);
212             if (otherAddedValue.isPresent()) {
213                 return otherAddedValue.get();
214             }
215 
216             getLogger().info("Cache {}, unable to retrieve recently added candidate for key {}, looping", name, internalKey);
217         }
218 
219         return candidateValue;
220     }
221 
222     @Override
223     protected Map<String, V> handleCreation(Function<Set<String>, Map<String, V>> factory, Set<String> externalKeys)
224             throws ExecutionException, InterruptedException {
225         // Get the missing values from the external cache.
226         // Returns map of keys that contain values, so need to handle the missing ones
227         final VersionedExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
228         metricsRecorder.record(name, CacheType.EXTERNAL, MetricLabel.NUMBER_OF_REMOTE_GET, 1);
229         final Map<String, Object> haveValues = clientSupplier.get().getBulk(externalKeys);
230         getLogger().trace("Cache {}: getBulk(Function): {} of {} entries have values",
231                 name, haveValues.size(), externalKeys.size());
232         final Set<String> missingExternalKeys = new HashSet<>(externalKeys);
233         missingExternalKeys.removeAll(haveValues.keySet());
234 
235         // Add the retrieved values to the grand result
236         final Map<String, V> grandResult = haveValues.entrySet().stream()
237                 .collect(Collectors.toMap(
238                         e -> cacheContext.internalEntryKeyFor(e.getKey()),
239                         e -> unmarshall((byte[]) e.getValue(), valueMarshalling).get()));
240 
241         if (!missingExternalKeys.isEmpty()) {
242             getLogger().trace("Cache {}: getBulk(Function): calling factory to create {} values",
243                     name, missingExternalKeys.size());
244             // Okay, need to get the missing values and mapping from externalKeys to internalKeys
245             final Set<String> missingInternalKeys = Collections.unmodifiableSet(
246                     missingExternalKeys.stream().map(cacheContext::internalEntryKeyFor).collect(Collectors.toSet()));
247             final Map<String, V> missingValues = factory.apply(missingInternalKeys);
248             if (missingInternalKeys.size() != missingValues.size()) {
249                 getLogger().warn("Cache {}: getBulk(Function): mismatch on generated values, expected ",
250                         name, missingInternalKeys.size() + " but got " + missingValues.size());
251                 throw new ExternalCacheException(ExternalCacheException.Reason.FUNCTION_INCORRECT_RESULT);
252             }
253 
254             // Okay, got the missing values, now need to add them to Memcached
255             final Map<String, Future<Boolean>> internalKeyToFutureMap = missingValues.entrySet().stream().collect(Collectors.toMap(
256                     Map.Entry::getKey,
257                     e -> clientSupplier.get().set(
258                             cacheContext.externalEntryKeyFor(e.getKey()), expiryTime(ttlSeconds), marshall(e.getValue(), valueMarshalling))
259             ));
260 
261             // Now wait for the outcomes and then add to the grand result
262             for (Map.Entry<String, Future<Boolean>> e : internalKeyToFutureMap.entrySet()) {
263                 e.getValue().get(); // Don't care about the result as it will always be true
264             }
265 
266             grandResult.putAll(missingValues);
267         }
268 
269         return grandResult;
270     }
271 
272     @Override
273     protected final ExternalCacheException mapException(Exception ex) {
274         return MemcachedUtils.mapException(ex);
275     }
276 
277     @Override
278     protected final Optional<V> directGet(String externalKey) {
279         return unmarshall((byte[]) clientSupplier.get().get(externalKey), valueMarshalling);
280     }
281 
282     @Override
283     protected final Map<String, Optional<V>> directGetBulk(Set<String> externalKeys) {
284         return MemcachedUtils.directGetBulk(externalKeys, clientSupplier, valueMarshalling);
285     }
286 }