View Javadoc

1   package com.atlassian.vcache.internal.memcached;
2   
3   import com.atlassian.marshalling.api.MarshallingException;
4   import com.atlassian.marshalling.api.MarshallingPair;
5   import com.atlassian.vcache.ExternalCacheException;
6   import com.atlassian.vcache.ExternalCacheSettings;
7   import com.atlassian.vcache.internal.RequestContext;
8   import com.atlassian.vcache.internal.core.ExternalCacheKeyGenerator;
9   import com.atlassian.vcache.internal.core.TransactionControlManager;
10  import com.atlassian.vcache.internal.core.VCacheCoreUtils;
11  import com.atlassian.vcache.internal.core.metrics.MetricsRecorder;
12  import com.atlassian.vcache.internal.core.service.AbstractTransactionalExternalCache;
13  import com.atlassian.vcache.internal.core.service.VersionedExternalCacheRequestContext;
14  import net.spy.memcached.MemcachedClientIF;
15  import net.spy.memcached.OperationTimeoutException;
16  import org.slf4j.Logger;
17  import org.slf4j.LoggerFactory;
18  
19  import java.time.Duration;
20  import java.util.HashMap;
21  import java.util.Map;
22  import java.util.Optional;
23  import java.util.Set;
24  import java.util.concurrent.ExecutionException;
25  import java.util.concurrent.Future;
26  import java.util.function.Supplier;
27  
28  import static com.atlassian.vcache.internal.core.VCacheCoreUtils.unmarshall;
29  import static com.atlassian.vcache.internal.memcached.MemcachedUtils.expiryTime;
30  import static com.atlassian.vcache.internal.memcached.MemcachedUtils.putOperationForPolicy;
31  import static java.util.Objects.requireNonNull;
32  
33  /**
34   * Implementation that backs onto Memcached.
35   *
36   * @param <V> the value type
37   * @since 1.0.0
38   */
39  class MemcachedTransactionalExternalCache<V>
40          extends AbstractTransactionalExternalCache<V> {
41      private static final Logger log = LoggerFactory.getLogger(MemcachedTransactionalExternalCache.class);
42  
43      private final Supplier<MemcachedClientIF> clientSupplier;
44      private final ExternalCacheKeyGenerator keyGenerator;
45      private final MarshallingPair<V> valueMarshalling;
46      private final int ttlSeconds;
47      private final TransactionControlManager transactionControlManager;
48  
49      MemcachedTransactionalExternalCache(
50              Supplier<MemcachedClientIF> clientSupplier,
51              Supplier<RequestContext> contextSupplier,
52              ExternalCacheKeyGenerator keyGenerator,
53              String name,
54              MarshallingPair<V> valueMarshalling,
55              ExternalCacheSettings settings,
56              TransactionControlManager transactionControlManager,
57              MetricsRecorder metricsRecorder,
58              Duration lockTimeout) {
59          super(name, contextSupplier, metricsRecorder, lockTimeout);
60          this.clientSupplier = requireNonNull(clientSupplier);
61          this.keyGenerator = requireNonNull(keyGenerator);
62          this.valueMarshalling = requireNonNull(valueMarshalling);
63          this.ttlSeconds = VCacheCoreUtils.roundUpToSeconds(settings.getDefaultTtl().get());
64          this.transactionControlManager = requireNonNull(transactionControlManager);
65      }
66  
67      @Override
68      public void transactionSync() {
69          log.trace("Cache {}: synchronising operations", name);
70          final VersionedExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
71  
72          if (cacheContext.hasRemoveAll()) {
73              cacheContext.updateCacheVersion(
74                      MemcachedUtils.incrementCacheVersion(clientSupplier, cacheContext.externalCacheVersionKey()));
75          }
76  
77          performKeyedOperations(cacheContext);
78          cacheContext.forgetAll();
79      }
80  
81      private void performKeyedOperations(VersionedExternalCacheRequestContext<V> cacheContext) {
82          // The approach used to perform the operations is:
83          // - schedule all the operations
84          // - check the result of all the operations and log a warning is necessary
85          // - if any operation failed, wipe the cache
86  
87          // Right then, lets submit all the operations.
88          final Map<Future<Boolean>, String> futureToFailureMessageMap = new HashMap<>();
89          cacheContext.getKeyedOperations().forEach(entry -> {
90              final String externalKey = cacheContext.externalEntryKeyFor(entry.getKey());
91  
92              if (entry.getValue().isRemove()) {
93                  log.trace("Cache {}: performing remove on entry {}", name, entry.getKey());
94                  futureToFailureMessageMap.put(
95                          clientSupplier.get().delete(externalKey),
96                          "remove entry " + entry.getKey());
97              } else {
98                  log.trace("Cache {}: performing {} on entry {}", name, entry.getValue().getPolicy(), entry.getKey());
99                  try {
100                     final byte[] valueBytes = valueMarshalling.getMarshaller().marshallToBytes(
101                             requireNonNull(entry.getValue().getValue()));
102                     final Future<Boolean> putOp = putOperationForPolicy(
103                             entry.getValue().getPolicy(),
104                             clientSupplier.get(),
105                             externalKey,
106                             expiryTime(ttlSeconds),
107                             valueBytes);
108                     futureToFailureMessageMap.put(
109                             putOp,
110                             "put using policy " + entry.getValue().getPolicy() + " on entry "
111                                     + entry.getKey());
112                 } catch (MarshallingException mex) {
113                     log.warn("Cache {}: Unable to marshall value to perform put operation on entry {}",
114                             name, entry.getKey(), mex);
115                 }
116             }
117         });
118 
119         // Cool, now lets check the result of each operation
120         final boolean[] anOperationFailed = new boolean[1];
121         futureToFailureMessageMap.entrySet().forEach(entry -> {
122             try {
123                 final Boolean outcome = entry.getKey().get();
124                 if (outcome) {
125                     log.trace("Cache {}: successful deferred operation for {}", name, entry.getValue());
126                 } else {
127                     anOperationFailed[0] = true;
128                     log.trace("Cache {}: failed deferred operation for {}", name, entry.getValue());
129                 }
130             } catch (InterruptedException | ExecutionException ex) {
131                 anOperationFailed[0] = true;
132                 log.error("Cache {}: had failure getting result for deferred operation {}",
133                         name, entry.getValue(), ex);
134             }
135         });
136 
137         // Finally, lets see if an operation failed
138         if (anOperationFailed[0]) {
139             log.warn("Cache {}: an operation failed in transaction sync, so clearing the cache", name);
140             cacheContext.updateCacheVersion(
141                     MemcachedUtils.incrementCacheVersion(clientSupplier, cacheContext.externalCacheVersionKey()));
142         }
143     }
144 
145     @Override
146     protected VersionedExternalCacheRequestContext<V> ensureCacheContext() throws OperationTimeoutException {
147         final RequestContext requestContext = contextSupplier.get();
148 
149         transactionControlManager.registerTransactionalExternalCache(requestContext, name, this);
150 
151         return requestContext.computeIfAbsent(this, () -> {
152             // Need to build a new context, which involves getting the current cache version, or setting it if it does
153             // not exist.
154             log.trace("Cache {}: Setting up a new context", name);
155             final VersionedExternalCacheRequestContext<V> newCacheContext = new VersionedExternalCacheRequestContext<>(
156                     keyGenerator, name, requestContext::partitionIdentifier, lockTimeout);
157             newCacheContext.updateCacheVersion(
158                     MemcachedUtils.obtainCacheVersion(clientSupplier, newCacheContext.externalCacheVersionKey()));
159             return newCacheContext;
160         });
161     }
162 
163     @Override
164     protected Logger getLogger() {
165         return log;
166     }
167 
168     @Override
169     protected final ExternalCacheException mapException(Exception ex) {
170         return MemcachedUtils.mapException(ex);
171     }
172 
173     @Override
174     protected final Optional<V> directGet(String externalKey) {
175         return unmarshall((byte[]) clientSupplier.get().get(externalKey), valueMarshalling);
176     }
177 
178     @Override
179     protected final Map<String, Optional<V>> directGetBulk(Set<String> externalKeys) {
180         return MemcachedUtils.directGetBulk(externalKeys, clientSupplier, valueMarshalling);
181     }
182 }