View Javadoc

1   package com.atlassian.vcache.internal.memcached;
2   
3   import com.atlassian.marshalling.api.MarshallingPair;
4   import com.atlassian.vcache.ExternalCacheException;
5   import com.atlassian.vcache.ExternalCacheSettings;
6   import com.atlassian.vcache.PutPolicy;
7   import com.atlassian.vcache.internal.MetricLabel;
8   import com.atlassian.vcache.internal.RequestContext;
9   import com.atlassian.vcache.internal.core.ExternalCacheKeyGenerator;
10  import com.atlassian.vcache.internal.core.VCacheCoreUtils;
11  import com.atlassian.vcache.internal.core.metrics.CacheType;
12  import com.atlassian.vcache.internal.core.metrics.MetricsRecorder;
13  import com.atlassian.vcache.internal.core.service.AbstractStableReadExternalCache;
14  import com.atlassian.vcache.internal.core.service.VersionedExternalCacheRequestContext;
15  import net.spy.memcached.MemcachedClientIF;
16  import net.spy.memcached.OperationTimeoutException;
17  import org.slf4j.Logger;
18  import org.slf4j.LoggerFactory;
19  
20  import java.time.Duration;
21  import java.util.Map;
22  import java.util.Optional;
23  import java.util.Set;
24  import java.util.concurrent.ExecutionException;
25  import java.util.concurrent.Future;
26  import java.util.function.Supplier;
27  import java.util.stream.Collectors;
28  import java.util.stream.StreamSupport;
29  
30  import static com.atlassian.vcache.internal.core.VCacheCoreUtils.isEmpty;
31  import static com.atlassian.vcache.internal.core.VCacheCoreUtils.unmarshall;
32  import static com.atlassian.vcache.internal.memcached.MemcachedUtils.expiryTime;
33  import static com.atlassian.vcache.internal.memcached.MemcachedUtils.putOperationForPolicy;
34  import static java.util.Objects.requireNonNull;
35  
36  /**
37   * Implementation that backs onto Memcached.
38   *
39   * @param <V> the value type
40   * @since 1.0.0
41   */
42  class MemcachedStableReadExternalCache<V>
43          extends AbstractStableReadExternalCache<V> {
44      private static final Logger log = LoggerFactory.getLogger(MemcachedStableReadExternalCache.class);
45  
46      private final Supplier<MemcachedClientIF> clientSupplier;
47      private final Supplier<RequestContext> contextSupplier;
48      private final ExternalCacheKeyGenerator keyGenerator;
49      private final MarshallingPair<V> valueMarshalling;
50      private final int ttlSeconds;
51  
52      MemcachedStableReadExternalCache(
53              Supplier<MemcachedClientIF> clientSupplier,
54              Supplier<RequestContext> contextSupplier,
55              ExternalCacheKeyGenerator keyGenerator,
56              String name,
57              MarshallingPair<V> valueMarshalling,
58              ExternalCacheSettings settings,
59              MetricsRecorder metricsRecorder,
60              Duration lockTimeout) {
61          super(name, metricsRecorder, lockTimeout);
62          this.clientSupplier = requireNonNull(clientSupplier);
63          this.contextSupplier = requireNonNull(contextSupplier);
64          this.keyGenerator = requireNonNull(keyGenerator);
65          this.valueMarshalling = requireNonNull(valueMarshalling);
66          this.ttlSeconds = VCacheCoreUtils.roundUpToSeconds(settings.getDefaultTtl().get());
67      }
68  
69      @Override
70      public boolean internalPut(String internalKey, V value, PutPolicy policy) {
71          final String externalKey = ensureCacheContext().externalEntryKeyFor(internalKey);
72          final byte[] valueBytes = valueMarshalling.getMarshaller().marshallToBytes(value);
73  
74          try {
75              final Future<Boolean> putOp = putOperationForPolicy(
76                      policy, clientSupplier.get(), externalKey, expiryTime(ttlSeconds), valueBytes);
77              return putOp.get();
78          } catch (InterruptedException | ExecutionException e) {
79              throw new ExternalCacheException(ExternalCacheException.Reason.UNCLASSIFIED_FAILURE, e);
80          }
81      }
82  
83      @Override
84      protected void internalRemove(Iterable<String> internalKeys) {
85          // There is no bulk delete in the api, so need to remove each one async
86          if (isEmpty(internalKeys)) {
87              return;
88          }
89  
90          // Lodge all the requests for delete
91          final VersionedExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
92          final Map<String, Future<Boolean>> deleteOps =
93                  StreamSupport.stream(internalKeys.spliterator(), false)
94                          .distinct()
95                          .collect(Collectors.toMap(
96                                  k -> k,
97                                  k -> clientSupplier.get().delete(cacheContext.externalEntryKeyFor(k))
98                          ));
99  
100         // Need to loop verifying the outcome. If an exception was thrown for an operation,
101         // then do not record the removal.
102         Exception failureException = null;
103         for (Map.Entry<String, Future<Boolean>> delOp : deleteOps.entrySet()) {
104             try {
105                 // Do not care whether the delOp returns true or false. Either way, no entry with the specified key
106                 // now exists on the Memcache server.
107                 delOp.getValue().get();
108                 cacheContext.recordValue(delOp.getKey(), Optional.empty());
109             } catch (ExecutionException | InterruptedException ex) {
110                 log.info("Cache {}: unable to remove key {}", name, delOp.getKey(), ex);
111                 failureException = ex;
112             }
113         }
114 
115         // Finally, if there was a failure, then re-throw the last one reported (as good as any!)
116         if (failureException != null) {
117             throw new ExternalCacheException(ExternalCacheException.Reason.NETWORK_FAILURE, failureException);
118         }
119     }
120 
121     @Override
122     protected void internalRemoveAll() {
123         final VersionedExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
124         cacheContext.updateCacheVersion(
125                 MemcachedUtils.incrementCacheVersion(clientSupplier, cacheContext.externalCacheVersionKey()));
126     }
127 
128     @Override
129     protected Logger getLogger() {
130         return log;
131     }
132 
133     @Override
134     protected VersionedExternalCacheRequestContext<V> ensureCacheContext() throws OperationTimeoutException {
135         final RequestContext requestContext = contextSupplier.get();
136 
137         return requestContext.computeIfAbsent(this, () -> {
138             // Need to build a new context, which involves getting the current cache version, or setting it if it does
139             // not exist.
140             log.trace("Cache {}: Setting up a new context", name);
141             final VersionedExternalCacheRequestContext<V> newCacheContext = new VersionedExternalCacheRequestContext<>(
142                     keyGenerator, name, requestContext::partitionIdentifier, lockTimeout);
143             newCacheContext.updateCacheVersion(
144                     MemcachedUtils.obtainCacheVersion(clientSupplier, newCacheContext.externalCacheVersionKey()));
145             return newCacheContext;
146         });
147     }
148 
149     @Override
150     protected V handleCreation(String internalKey, V candidateValue)
151             throws ExecutionException, InterruptedException {
152         final VersionedExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
153         final byte[] candidateValueBytes = valueMarshalling.getMarshaller().marshallToBytes(candidateValue);
154         final String externalKey = cacheContext.externalEntryKeyFor(internalKey);
155 
156         // Loop until either able to add the candidate value, or retrieve one that has been added by another thread
157         for (; ; ) {
158             metricsRecorder.record(name, CacheType.EXTERNAL, MetricLabel.NUMBER_OF_REMOTE_GET, 1);
159             final Future<Boolean> addOp = clientSupplier.get().add(externalKey, expiryTime(ttlSeconds), candidateValueBytes);
160             if (addOp.get()) {
161                 // I break here, rather than just return, due to battling with the compiler. Unless written
162                 // this way, the compiler will not allow the lambda structure.
163                 break;
164             }
165 
166             getLogger().info("Cache {}, unable to add candidate for key {}, retrieve what was added", name, internalKey);
167             metricsRecorder.record(name, CacheType.EXTERNAL, MetricLabel.NUMBER_OF_REMOTE_GET, 1);
168             final Optional<V> otherAddedValue = unmarshall((byte[]) clientSupplier.get().get(externalKey), valueMarshalling);
169             if (otherAddedValue.isPresent()) {
170                 return otherAddedValue.get();
171             }
172 
173             getLogger().info("Cache {}, unable to retrieve recently added candidate for key {}, looping", name, internalKey);
174         }
175 
176         return candidateValue;
177     }
178 
179     @Override
180     protected final ExternalCacheException mapException(Exception ex) {
181         return MemcachedUtils.mapException(ex);
182     }
183 
184     @Override
185     protected final Optional<V> directGet(String externalKey) {
186         return unmarshall((byte[]) clientSupplier.get().get(externalKey), valueMarshalling);
187     }
188 
189     @Override
190     protected final Map<String, Optional<V>> directGetBulk(Set<String> externalKeys) {
191         return MemcachedUtils.directGetBulk(externalKeys, clientSupplier, valueMarshalling);
192     }
193 }