View Javadoc

1   package com.atlassian.vcache.internal.guava;
2   
3   import com.atlassian.marshalling.api.MarshallingPair;
4   import com.atlassian.vcache.ExternalCacheException;
5   import com.atlassian.vcache.internal.RequestContext;
6   import com.atlassian.vcache.internal.core.ExternalCacheKeyGenerator;
7   import com.atlassian.vcache.internal.core.TransactionControlManager;
8   import com.atlassian.vcache.internal.core.cas.IdentifiedData;
9   import com.atlassian.vcache.internal.core.metrics.MetricsRecorder;
10  import com.atlassian.vcache.internal.core.service.AbstractExternalCacheRequestContext;
11  import com.atlassian.vcache.internal.core.service.AbstractTransactionalExternalCache;
12  import com.atlassian.vcache.internal.core.service.UnversionedExternalCacheRequestContext;
13  import com.google.common.cache.Cache;
14  import org.slf4j.Logger;
15  import org.slf4j.LoggerFactory;
16  
17  import java.time.Duration;
18  import java.util.Map;
19  import java.util.Optional;
20  import java.util.Set;
21  import java.util.function.Supplier;
22  
23  import static com.atlassian.vcache.internal.core.cas.IdentifiedUtils.marshall;
24  import static com.atlassian.vcache.internal.core.cas.IdentifiedUtils.unmarshall;
25  import static java.util.Objects.requireNonNull;
26  
27  /**
28   * Guava implementation of a {@link com.atlassian.vcache.TransactionalExternalCache}.
29   *
30   * @param <V> the value type.
31   * @since 1.0.0
32   */
33  public class GuavaTransactionalExternalCache<V>
34          extends AbstractTransactionalExternalCache<V> {
35      private static final Logger log = LoggerFactory.getLogger(GuavaTransactionalExternalCache.class);
36  
37      private final Cache<String, IdentifiedData> delegate;
38      private final ExternalCacheKeyGenerator keyGenerator;
39      private final Optional<MarshallingPair<V>> valueMarshalling;
40      private final TransactionControlManager transactionControlManager;
41  
42      public GuavaTransactionalExternalCache(
43              String name,
44              Cache<String, IdentifiedData> delegate,
45              Supplier<RequestContext> contextSupplier,
46              ExternalCacheKeyGenerator keyGenerator,
47              Optional<MarshallingPair<V>> valueMarshalling,
48              TransactionControlManager transactionControlManager,
49              MetricsRecorder metricsRecorder,
50              Duration lockTimeout) {
51          super(name, contextSupplier, metricsRecorder, lockTimeout, (n, ex) -> {});
52          this.delegate = requireNonNull(delegate);
53          this.keyGenerator = requireNonNull(keyGenerator);
54          this.valueMarshalling = requireNonNull(valueMarshalling);
55          this.transactionControlManager = requireNonNull(transactionControlManager);
56      }
57  
58      @Override
59      public void transactionSync() {
60          log.trace("Cache {}: synchronising operations", name);
61          final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
62  
63          if (cacheContext.hasRemoveAll()) {
64              delegate.asMap().clear();
65          }
66  
67          performKeyedOperations(cacheContext);
68          cacheContext.forgetAll();
69      }
70  
71      @Override
72      protected AbstractExternalCacheRequestContext<V> ensureCacheContext() {
73          final RequestContext requestContext = contextSupplier.get();
74  
75          transactionControlManager.registerTransactionalExternalCache(requestContext, name, this);
76  
77          return requestContext.computeIfAbsent(this, () -> {
78              // Need to build a new context, which involves getting the current cache version, or setting it if it does
79              // not exist.
80              log.trace("Cache {}: Setting up a new context", name);
81              return new UnversionedExternalCacheRequestContext<>(
82                      keyGenerator, getName(), requestContext::partitionIdentifier, lockTimeout);
83          });
84      }
85  
86      @Override
87      protected Logger getLogger() {
88          return log;
89      }
90  
91      @Override
92      protected final ExternalCacheException mapException(Exception ex) {
93          return GuavaUtils.mapException(ex);
94      }
95  
96      @Override
97      protected final Optional<V> directGet(String externalKey) {
98          return unmarshall(delegate.getIfPresent(externalKey), valueMarshalling);
99      }
100 
101     @Override
102     protected final Map<String, Optional<V>> directGetBulk(Set<String> externalKeys) {
103         return GuavaUtils.directGetBulk(externalKeys, delegate, valueMarshalling);
104     }
105 
106     private void performKeyedOperations(AbstractExternalCacheRequestContext<V> cacheContext) {
107         try {
108             for (Map.Entry<String, AbstractExternalCacheRequestContext.DeferredOperation<V>> entry
109                     : cacheContext.getKeyedOperations()) {
110                 final String externalKey = cacheContext.externalEntryKeyFor(entry.getKey());
111 
112                 if (entry.getValue().isRemove()) {
113                     log.trace("Cache {}: performing remove on entry {}", name, entry.getKey());
114                     delegate.asMap().remove(externalKey);
115                 } else {
116                     log.trace("Cache {}: performing {} on entry {}", name, entry.getValue().getPolicy(), entry.getKey());
117                     final IdentifiedData identifiedData = marshall(entry.getValue().getValue(), valueMarshalling);
118 
119                     final boolean putOutcome =
120                             GuavaUtils.directPut(
121                                     externalKey,
122                                     identifiedData,
123                                     entry.getValue().getPolicy(),
124                                     delegate);
125 
126                     if (!putOutcome) {
127                         log.debug("Cache {}: Unable to perform put() operation {} on entry {}",
128                                 name, entry.getValue().getPolicy(), entry.getKey());
129 
130                         delegate.asMap().clear();
131                         break;
132                     }
133                 }
134             }
135         } catch (ExternalCacheException bugger) {
136             log.error("Cache {}: an operation failed in transaction sync, so clearing the cache", name);
137             delegate.asMap().clear();
138         }
139     }
140 
141 }