View Javadoc

1   package com.atlassian.vcache.internal.memcached;
2   
3   import com.atlassian.marshalling.api.MarshallingPair;
4   import com.atlassian.vcache.ExternalCacheException;
5   import com.atlassian.vcache.ExternalCacheSettings;
6   import com.atlassian.vcache.PutPolicy;
7   import com.atlassian.vcache.internal.MetricLabel;
8   import com.atlassian.vcache.internal.RequestContext;
9   import com.atlassian.vcache.internal.core.ExternalCacheKeyGenerator;
10  import com.atlassian.vcache.internal.core.VCacheCoreUtils;
11  import com.atlassian.vcache.internal.core.metrics.CacheType;
12  import com.atlassian.vcache.internal.core.metrics.MetricsRecorder;
13  import com.atlassian.vcache.internal.core.service.AbstractStableReadExternalCache;
14  import com.atlassian.vcache.internal.core.service.VersionedExternalCacheRequestContext;
15  import net.spy.memcached.MemcachedClientIF;
16  import org.slf4j.Logger;
17  import org.slf4j.LoggerFactory;
18  
19  import java.util.Map;
20  import java.util.Optional;
21  import java.util.Set;
22  import java.util.concurrent.ExecutionException;
23  import java.util.concurrent.Future;
24  import java.util.function.Supplier;
25  import java.util.stream.Collectors;
26  import java.util.stream.StreamSupport;
27  
28  import static com.atlassian.vcache.internal.core.VCacheCoreUtils.isEmpty;
29  import static com.atlassian.vcache.internal.core.VCacheCoreUtils.unmarshall;
30  import static com.atlassian.vcache.internal.memcached.MemcachedUtils.expiryTime;
31  import static com.atlassian.vcache.internal.memcached.MemcachedUtils.putOperationForPolicy;
32  import static java.util.Objects.requireNonNull;
33  
34  /**
35   * Implementation that backs onto Memcached.
36   *
37   * @param <V> the value type
38   * @since 1.0.0
39   */
40  class MemcachedStableReadExternalCache<V>
41          extends AbstractStableReadExternalCache<V> {
42      private static final Logger log = LoggerFactory.getLogger(MemcachedStableReadExternalCache.class);
43  
44      private final Supplier<MemcachedClientIF> clientSupplier;
45      private final Supplier<RequestContext> contextSupplier;
46      private final ExternalCacheKeyGenerator keyGenerator;
47      private final MarshallingPair<V> valueMarshalling;
48      private final int ttlSeconds;
49  
50      MemcachedStableReadExternalCache(
51              MemcachedVCacheServiceSettings serviceSettings,
52              Supplier<RequestContext> contextSupplier,
53              ExternalCacheKeyGenerator keyGenerator,
54              String name,
55              MarshallingPair<V> valueMarshalling,
56              ExternalCacheSettings settings,
57              MetricsRecorder metricsRecorder) {
58          super(name, metricsRecorder, serviceSettings.getLockTimeout(), serviceSettings.getExternalCacheExceptionListener());
59          this.clientSupplier = requireNonNull(serviceSettings.getClientSupplier());
60          this.contextSupplier = requireNonNull(contextSupplier);
61          this.keyGenerator = requireNonNull(keyGenerator);
62          this.valueMarshalling = requireNonNull(valueMarshalling);
63          this.ttlSeconds = VCacheCoreUtils.roundUpToSeconds(settings.getDefaultTtl().get());
64      }
65  
66      @Override
67      public boolean internalPut(String internalKey, V value, PutPolicy policy) {
68          final String externalKey = ensureCacheContext().externalEntryKeyFor(internalKey);
69          final byte[] valueBytes = valueMarshalling.getMarshaller().marshallToBytes(value);
70  
71          try {
72              final Future<Boolean> putOp = putOperationForPolicy(
73                      policy, clientSupplier.get(), externalKey, expiryTime(ttlSeconds), valueBytes);
74              return putOp.get();
75          } catch (InterruptedException | ExecutionException e) {
76              throw new ExternalCacheException(ExternalCacheException.Reason.UNCLASSIFIED_FAILURE, e);
77          }
78      }
79  
80      @Override
81      protected void internalRemove(Iterable<String> internalKeys) {
82          // There is no bulk delete in the api, so need to remove each one async
83          if (isEmpty(internalKeys)) {
84              return;
85          }
86  
87          // Lodge all the requests for delete
88          final VersionedExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
89          final Map<String, Future<Boolean>> deleteOps =
90                  StreamSupport.stream(internalKeys.spliterator(), false)
91                          .distinct()
92                          .collect(Collectors.toMap(
93                                  k -> k,
94                                  k -> clientSupplier.get().delete(cacheContext.externalEntryKeyFor(k))
95                          ));
96  
97          // Need to loop verifying the outcome. If an exception was thrown for an operation,
98          // then do not record the removal.
99          Exception failureException = null;
100         for (Map.Entry<String, Future<Boolean>> delOp : deleteOps.entrySet()) {
101             try {
102                 // Do not care whether the delOp returns true or false. Either way, no entry with the specified key
103                 // now exists on the Memcache server.
104                 delOp.getValue().get();
105                 cacheContext.recordValue(delOp.getKey(), Optional.empty());
106             } catch (ExecutionException | InterruptedException ex) {
107                 log.info("Cache {}: unable to remove key {}", name, delOp.getKey(), ex);
108                 failureException = ex;
109             }
110         }
111 
112         // Finally, if there was a failure, then re-throw the last one reported (as good as any!)
113         if (failureException != null) {
114             throw new ExternalCacheException(ExternalCacheException.Reason.NETWORK_FAILURE, failureException);
115         }
116     }
117 
118     @Override
119     protected void internalRemoveAll() {
120         final VersionedExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
121         cacheContext.updateCacheVersion(MemcachedUtils.cacheVersionIncrementer(clientSupplier));
122     }
123 
124     @Override
125     protected Logger getLogger() {
126         return log;
127     }
128 
129     @Override
130     protected VersionedExternalCacheRequestContext<V> ensureCacheContext() {
131         final RequestContext requestContext = contextSupplier.get();
132 
133         return requestContext.computeIfAbsent(this, () -> {
134             // Need to build a new context, which involves getting the current cache version, or setting it if it does
135             // not exist.
136             log.trace("Cache {}: Setting up a new context", name);
137             return new VersionedExternalCacheRequestContext<>(
138                     keyGenerator, name, requestContext::partitionIdentifier,
139                     MemcachedUtils.cacheVersionSupplier(clientSupplier),
140                     lockTimeout);
141         });
142     }
143 
144     @Override
145     protected V handleCreation(String internalKey, V candidateValue)
146             throws ExecutionException, InterruptedException {
147         final VersionedExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
148         final byte[] candidateValueBytes = valueMarshalling.getMarshaller().marshallToBytes(candidateValue);
149         final String externalKey = cacheContext.externalEntryKeyFor(internalKey);
150 
151         // Loop until either able to add the candidate value, or retrieve one that has been added by another thread
152         for (; ; ) {
153             metricsRecorder.record(name, CacheType.EXTERNAL, MetricLabel.NUMBER_OF_REMOTE_GET, 1);
154             final Future<Boolean> addOp = clientSupplier.get().add(externalKey, expiryTime(ttlSeconds), candidateValueBytes);
155             if (addOp.get()) {
156                 // I break here, rather than just return, due to battling with the compiler. Unless written
157                 // this way, the compiler will not allow the lambda structure.
158                 break;
159             }
160 
161             getLogger().info("Cache {}, unable to add candidate for key {}, retrieve what was added", name, internalKey);
162             metricsRecorder.record(name, CacheType.EXTERNAL, MetricLabel.NUMBER_OF_REMOTE_GET, 1);
163             final Optional<V> otherAddedValue = unmarshall((byte[]) clientSupplier.get().get(externalKey), valueMarshalling);
164             if (otherAddedValue.isPresent()) {
165                 return otherAddedValue.get();
166             }
167 
168             getLogger().info("Cache {}, unable to retrieve recently added candidate for key {}, looping", name, internalKey);
169         }
170 
171         return candidateValue;
172     }
173 
174     @Override
175     protected final ExternalCacheException mapException(Exception ex) {
176         return MemcachedUtils.mapException(ex);
177     }
178 
179     @Override
180     protected final Optional<V> directGet(String externalKey) {
181         return unmarshall((byte[]) clientSupplier.get().get(externalKey), valueMarshalling);
182     }
183 
184     @Override
185     protected final Map<String, Optional<V>> directGetBulk(Set<String> externalKeys) {
186         return MemcachedUtils.directGetBulk(externalKeys, clientSupplier, valueMarshalling);
187     }
188 }