View Javadoc

1   package com.atlassian.vcache.internal.memcached;
2   
3   import com.atlassian.marshalling.api.MarshallingException;
4   import com.atlassian.marshalling.api.MarshallingPair;
5   import com.atlassian.vcache.ExternalCacheException;
6   import com.atlassian.vcache.ExternalCacheSettings;
7   import com.atlassian.vcache.internal.RequestContext;
8   import com.atlassian.vcache.internal.core.ExternalCacheKeyGenerator;
9   import com.atlassian.vcache.internal.core.TransactionControlManager;
10  import com.atlassian.vcache.internal.core.VCacheCoreUtils;
11  import com.atlassian.vcache.internal.core.metrics.MetricsRecorder;
12  import com.atlassian.vcache.internal.core.service.AbstractExternalCacheRequestContext.DeferredOperation;
13  import com.atlassian.vcache.internal.core.service.AbstractTransactionalExternalCache;
14  import com.atlassian.vcache.internal.core.service.VersionedExternalCacheRequestContext;
15  import net.spy.memcached.MemcachedClientIF;
16  import org.slf4j.Logger;
17  import org.slf4j.LoggerFactory;
18  
19  import java.util.HashMap;
20  import java.util.Map;
21  import java.util.Map.Entry;
22  import java.util.Optional;
23  import java.util.Set;
24  import java.util.concurrent.ExecutionException;
25  import java.util.concurrent.Future;
26  import java.util.function.Supplier;
27  
28  import static com.atlassian.vcache.ExternalCacheException.Reason.TRANSACTION_FAILURE;
29  import static com.atlassian.vcache.VCacheUtils.fold;
30  import static com.atlassian.vcache.internal.core.VCacheCoreUtils.unmarshall;
31  import static com.atlassian.vcache.internal.memcached.MemcachedUtils.expiryTime;
32  import static com.atlassian.vcache.internal.memcached.MemcachedUtils.putOperationForPolicy;
33  import static java.util.Objects.requireNonNull;
34  
35  /**
36   * Implementation that backs onto Memcached.
37   *
38   * @param <V> the value type
39   * @since 1.0.0
40   */
41  class MemcachedTransactionalExternalCache<V>
42          extends AbstractTransactionalExternalCache<V> {
43      private static final Logger log = LoggerFactory.getLogger(MemcachedTransactionalExternalCache.class);
44  
45      private final Supplier<MemcachedClientIF> clientSupplier;
46      private final ExternalCacheKeyGenerator keyGenerator;
47      private final MarshallingPair<V> valueMarshalling;
48      private final int ttlSeconds;
49      private final TransactionControlManager transactionControlManager;
50  
51      MemcachedTransactionalExternalCache(
52              MemcachedVCacheServiceSettings serviceSettings,
53              Supplier<RequestContext> contextSupplier,
54              ExternalCacheKeyGenerator keyGenerator,
55              String name,
56              MarshallingPair<V> valueMarshalling,
57              ExternalCacheSettings settings,
58              TransactionControlManager transactionControlManager,
59              MetricsRecorder metricsRecorder) {
60          super(name, contextSupplier, metricsRecorder, serviceSettings.getLockTimeout(), serviceSettings.getExternalCacheExceptionListener());
61          this.clientSupplier = requireNonNull(serviceSettings.getClientSupplier());
62          this.keyGenerator = requireNonNull(keyGenerator);
63          this.valueMarshalling = requireNonNull(valueMarshalling);
64          this.ttlSeconds = VCacheCoreUtils.roundUpToSeconds(settings.getDefaultTtl().get());
65          this.transactionControlManager = requireNonNull(transactionControlManager);
66      }
67  
68      @Override
69      public void transactionSync() {
70          log.trace("Cache {}: synchronising operations", name);
71          final VersionedExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
72  
73          fold(perform(() -> {
74                      if (cacheContext.hasRemoveAll()) {
75                          cacheContext.updateCacheVersion(MemcachedUtils.cacheVersionIncrementer(clientSupplier));
76                      }
77  
78                      performKeyedOperations(cacheContext);
79  
80                      return null;
81                  }),
82                  v -> null,
83                  err -> {
84                      log.warn("Cache {}: an operation failed during transaction sync ({}). Clearing cache to remove stale entries.", name, err.getMessage());
85                      try {
86                          cacheContext.updateCacheVersion(MemcachedUtils.cacheVersionIncrementer(clientSupplier));
87                      } catch (RuntimeException e) {
88                          log.error("Cache {}: failed to clear the cache: {}", name, e.getMessage());
89                      }
90  
91                      return null;
92                  });
93  
94          cacheContext.forgetAll();
95      }
96  
97      private void performKeyedOperations(VersionedExternalCacheRequestContext<V> cacheContext) {
98          // The approach used to perform the operations is:
99          // - schedule all the operations
100         // - check the result of all the operations and log a warning is necessary
101         // - if any operation failed, wipe the cache
102 
103         // Right then, lets submit all the operations.
104         final Map<Future<Boolean>, String> futureToFailureMessageMap = new HashMap<>();
105         final boolean[] anOperationFailed = new boolean[1];
106 
107         for (Entry<String, DeferredOperation<V>> entry : cacheContext.getKeyedOperations()) {
108             final String externalKey = cacheContext.externalEntryKeyFor(entry.getKey());
109 
110             if (entry.getValue().isRemove()) {
111                 log.trace("Cache {}: performing remove on entry {}", name, entry.getKey());
112                 futureToFailureMessageMap.put(
113                         clientSupplier.get().delete(externalKey),
114                         "remove entry " + entry.getKey());
115             } else {
116                 log.trace("Cache {}: performing {} on entry {}", name, entry.getValue().getPolicy(), entry.getKey());
117                 try {
118                     final byte[] valueBytes = valueMarshalling.getMarshaller().marshallToBytes(
119                             requireNonNull(entry.getValue().getValue()));
120                     final Future<Boolean> putOp = putOperationForPolicy(
121                             entry.getValue().getPolicy(),
122                             clientSupplier.get(),
123                             externalKey,
124                             expiryTime(ttlSeconds),
125                             valueBytes);
126                     futureToFailureMessageMap.put(
127                             putOp,
128                             "put using policy " + entry.getValue().getPolicy() + " on entry "
129                                     + entry.getKey());
130                 } catch (MarshallingException mex) {
131                     log.warn("Cache {}: Unable to marshall value to perform put operation on entry {}",
132                             name, entry.getKey(), mex);
133                     anOperationFailed[0] = true;
134                     break;
135                 }
136             }
137         }
138 
139         // Cool, now lets check the result of each operation
140         futureToFailureMessageMap.entrySet().forEach(entry -> {
141             try {
142                 if (anOperationFailed[0]) {
143                     // We can safely cancel while running - see javadoc for net.spy.memcached.MemcachedClient
144                     entry.getKey().cancel(true);
145                 } else {
146                     final Boolean outcome = entry.getKey().get();
147                     if (outcome) {
148                         log.trace("Cache {}: successful deferred operation for {}", name, entry.getValue());
149                     } else {
150                         anOperationFailed[0] = true;
151                         log.warn("Cache {}: failed deferred operation for {}", name, entry.getValue());
152                     }
153                 }
154             } catch (InterruptedException | ExecutionException ex) {
155                 if (ex instanceof InterruptedException) {
156                     // re-interrupt the thread if we've swallowed an interrupt - https://www.ibm.com/developerworks/library/j-jtp05236/
157                     Thread.currentThread().interrupt();
158                 }
159                 anOperationFailed[0] = true;
160                 log.error("Cache {}: had failure getting result for deferred operation {}",
161                         name, entry.getValue(), ex);
162             }
163         });
164 
165         // Finally, lets see if an operation failed
166         if (anOperationFailed[0]) {
167             throw new ExternalCacheException(TRANSACTION_FAILURE);
168         }
169     }
170 
171     @Override
172     protected VersionedExternalCacheRequestContext<V> ensureCacheContext() {
173         final RequestContext requestContext = contextSupplier.get();
174 
175         transactionControlManager.registerTransactionalExternalCache(requestContext, name, this);
176 
177         return requestContext.computeIfAbsent(this, () -> {
178             // Need to build a new context, which involves getting the current cache version, or setting it if it does
179             // not exist.
180             log.trace("Cache {}: Setting up a new context", name);
181             return new VersionedExternalCacheRequestContext<>(
182                     keyGenerator, name, requestContext::partitionIdentifier,
183                     MemcachedUtils.cacheVersionSupplier(clientSupplier),
184                     lockTimeout);
185         });
186     }
187 
188     @Override
189     protected Logger getLogger() {
190         return log;
191     }
192 
193     @Override
194     protected final ExternalCacheException mapException(Exception ex) {
195         return MemcachedUtils.mapException(ex);
196     }
197 
198     @Override
199     protected final Optional<V> directGet(String externalKey) {
200         return unmarshall((byte[]) clientSupplier.get().get(externalKey), valueMarshalling);
201     }
202 
203     @Override
204     protected final Map<String, Optional<V>> directGetBulk(Set<String> externalKeys) {
205         return MemcachedUtils.directGetBulk(externalKeys, clientSupplier, valueMarshalling);
206     }
207 }