View Javadoc

1   package com.atlassian.vcache.internal.memcached;
2   
3   import com.atlassian.marshalling.api.MarshallingException;
4   import com.atlassian.marshalling.api.MarshallingPair;
5   import com.atlassian.vcache.ExternalCacheException;
6   import com.atlassian.vcache.ExternalCacheSettings;
7   import com.atlassian.vcache.internal.RequestContext;
8   import com.atlassian.vcache.internal.core.ExternalCacheKeyGenerator;
9   import com.atlassian.vcache.internal.core.TransactionControlManager;
10  import com.atlassian.vcache.internal.core.VCacheCoreUtils;
11  import com.atlassian.vcache.internal.core.metrics.MetricsRecorder;
12  import com.atlassian.vcache.internal.core.service.AbstractTransactionalExternalCache;
13  import com.atlassian.vcache.internal.core.service.VersionedExternalCacheRequestContext;
14  import net.spy.memcached.MemcachedClientIF;
15  import net.spy.memcached.OperationTimeoutException;
16  import org.slf4j.Logger;
17  import org.slf4j.LoggerFactory;
18  
19  import java.util.HashMap;
20  import java.util.Map;
21  import java.util.Optional;
22  import java.util.Set;
23  import java.util.concurrent.ExecutionException;
24  import java.util.concurrent.Future;
25  import java.util.function.Supplier;
26  
27  import static com.atlassian.vcache.internal.core.VCacheCoreUtils.unmarshall;
28  import static com.atlassian.vcache.internal.memcached.MemcachedUtils.expiryTime;
29  import static com.atlassian.vcache.internal.memcached.MemcachedUtils.putOperationForPolicy;
30  import static java.util.Objects.requireNonNull;
31  
32  /**
33   * Implementation that backs onto Memcached.
34   *
35   * @param <V> the value type
36   * @since 1.0.0
37   */
38  class MemcachedTransactionalExternalCache<V>
39          extends AbstractTransactionalExternalCache<V> {
40      private static final Logger log = LoggerFactory.getLogger(MemcachedTransactionalExternalCache.class);
41  
42      private final Supplier<MemcachedClientIF> clientSupplier;
43      private final ExternalCacheKeyGenerator keyGenerator;
44      private final MarshallingPair<V> valueMarshalling;
45      private final int ttlSeconds;
46      private final TransactionControlManager transactionControlManager;
47  
48      MemcachedTransactionalExternalCache(
49              Supplier<MemcachedClientIF> clientSupplier,
50              Supplier<RequestContext> contextSupplier,
51              ExternalCacheKeyGenerator keyGenerator,
52              String name,
53              MarshallingPair<V> valueMarshalling,
54              ExternalCacheSettings settings,
55              TransactionControlManager transactionControlManager,
56              MetricsRecorder metricsRecorder) {
57          super(name, contextSupplier, metricsRecorder);
58          this.clientSupplier = requireNonNull(clientSupplier);
59          this.keyGenerator = requireNonNull(keyGenerator);
60          this.valueMarshalling = requireNonNull(valueMarshalling);
61          this.ttlSeconds = VCacheCoreUtils.roundUpToSeconds(settings.getDefaultTtl().get());
62          this.transactionControlManager = requireNonNull(transactionControlManager);
63      }
64  
65      @Override
66      public void transactionSync() {
67          log.trace("Cache {}: synchronising operations", name);
68          final VersionedExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
69  
70          if (cacheContext.hasRemoveAll()) {
71              cacheContext.updateCacheVersion(
72                      MemcachedUtils.incrementCacheVersion(clientSupplier, cacheContext.externalCacheVersionKey()));
73          }
74  
75          performKeyedOperations(cacheContext);
76          cacheContext.forgetAll();
77      }
78  
79      private void performKeyedOperations(VersionedExternalCacheRequestContext<V> cacheContext) {
80          // The approach used to perform the operations is:
81          // - schedule all the operations
82          // - check the result of all the operations and log a warning is necessary
83          // - if any operation failed, wipe the cache
84  
85          // Right then, lets submit all the operations.
86          final Map<Future<Boolean>, String> futureToFailureMessageMap = new HashMap<>();
87          cacheContext.getKeyedOperations().forEach(entry -> {
88              final String externalKey = cacheContext.externalEntryKeyFor(entry.getKey());
89  
90              if (entry.getValue().isRemove()) {
91                  log.trace("Cache {}: performing remove on entry {}", name, entry.getKey());
92                  futureToFailureMessageMap.put(
93                          clientSupplier.get().delete(externalKey),
94                          "remove entry " + entry.getKey());
95              } else {
96                  log.trace("Cache {}: performing {} on entry {}", name, entry.getValue().getPolicy(), entry.getKey());
97                  try {
98                      final byte[] valueBytes = valueMarshalling.getMarshaller().marshallToBytes(
99                              requireNonNull(entry.getValue().getValue()));
100                     final Future<Boolean> putOp = putOperationForPolicy(
101                             entry.getValue().getPolicy(),
102                             clientSupplier.get(),
103                             externalKey,
104                             expiryTime(ttlSeconds),
105                             valueBytes);
106                     futureToFailureMessageMap.put(
107                             putOp,
108                             "put using policy " + entry.getValue().getPolicy() + " on entry "
109                                     + entry.getKey());
110                 } catch (MarshallingException mex) {
111                     log.warn("Cache {}: Unable to marshall value to perform put operation on entry {}",
112                             name, entry.getKey(), mex);
113                 }
114             }
115         });
116 
117         // Cool, now lets check the result of each operation
118         final boolean[] anOperationFailed = new boolean[1];
119         futureToFailureMessageMap.entrySet().forEach(entry -> {
120             try {
121                 final Boolean outcome = entry.getKey().get();
122                 if (outcome) {
123                     log.trace("Cache {}: successful deferred operation for {}", name, entry.getValue());
124                 } else {
125                     anOperationFailed[0] = true;
126                     log.trace("Cache {}: failed deferred operation for {}", name, entry.getValue());
127                 }
128             } catch (InterruptedException | ExecutionException ex) {
129                 anOperationFailed[0] = true;
130                 log.error("Cache {}: had failure getting result for deferred operation {}",
131                         name, entry.getValue(), ex);
132             }
133         });
134 
135         // Finally, lets see if an operation failed
136         if (anOperationFailed[0]) {
137             log.warn("Cache {}: an operation failed in transaction sync, so clearing the cache", name);
138             cacheContext.updateCacheVersion(
139                     MemcachedUtils.incrementCacheVersion(clientSupplier, cacheContext.externalCacheVersionKey()));
140         }
141     }
142 
143     @Override
144     protected VersionedExternalCacheRequestContext<V> ensureCacheContext() throws OperationTimeoutException {
145         final RequestContext requestContext = contextSupplier.get();
146 
147         transactionControlManager.registerTransactionalExternalCache(requestContext, name, this);
148 
149         return requestContext.computeIfAbsent(this, () -> {
150             // Need to build a new context, which involves getting the current cache version, or setting it if it does
151             // not exist.
152             log.trace("Cache {}: Setting up a new context", name);
153             final VersionedExternalCacheRequestContext<V> newCacheContext = new VersionedExternalCacheRequestContext<>(
154                     keyGenerator, name, requestContext::partitionIdentifier);
155             newCacheContext.updateCacheVersion(
156                     MemcachedUtils.obtainCacheVersion(clientSupplier, newCacheContext.externalCacheVersionKey()));
157             return newCacheContext;
158         });
159     }
160 
161     @Override
162     protected Logger getLogger() {
163         return log;
164     }
165 
166     @Override
167     protected final ExternalCacheException mapException(Exception ex) {
168         return MemcachedUtils.mapException(ex);
169     }
170 
171     @Override
172     protected final Optional<V> directGet(String externalKey) {
173         return unmarshall((byte[]) clientSupplier.get().get(externalKey), valueMarshalling);
174     }
175 
176     @Override
177     protected final Map<String, Optional<V>> directGetBulk(Set<String> externalKeys) {
178         return MemcachedUtils.directGetBulk(externalKeys, clientSupplier, valueMarshalling);
179     }
180 }