1 package com.atlassian.vcache.internal.guava;
2
3 import com.atlassian.marshalling.api.MarshallingPair;
4 import com.atlassian.vcache.ExternalCacheException;
5 import com.atlassian.vcache.internal.RequestContext;
6 import com.atlassian.vcache.internal.core.ExternalCacheKeyGenerator;
7 import com.atlassian.vcache.internal.core.TransactionControlManager;
8 import com.atlassian.vcache.internal.core.cas.IdentifiedData;
9 import com.atlassian.vcache.internal.core.metrics.MetricsRecorder;
10 import com.atlassian.vcache.internal.core.service.AbstractExternalCacheRequestContext;
11 import com.atlassian.vcache.internal.core.service.AbstractTransactionalExternalCache;
12 import com.atlassian.vcache.internal.core.service.UnversionedExternalCacheRequestContext;
13 import com.google.common.cache.Cache;
14 import org.slf4j.Logger;
15 import org.slf4j.LoggerFactory;
16
17 import java.time.Duration;
18 import java.util.Map;
19 import java.util.Optional;
20 import java.util.Set;
21 import java.util.function.Supplier;
22
23 import static com.atlassian.vcache.internal.core.cas.IdentifiedUtils.marshall;
24 import static com.atlassian.vcache.internal.core.cas.IdentifiedUtils.unmarshall;
25 import static java.util.Objects.requireNonNull;
26
27
28
29
30
31
32
33 public class GuavaTransactionalExternalCache<V>
34 extends AbstractTransactionalExternalCache<V> {
35 private static final Logger log = LoggerFactory.getLogger(GuavaTransactionalExternalCache.class);
36
37 private final Cache<String, IdentifiedData> delegate;
38 private final ExternalCacheKeyGenerator keyGenerator;
39 private final Optional<MarshallingPair<V>> valueMarshalling;
40 private final TransactionControlManager transactionControlManager;
41
42 public GuavaTransactionalExternalCache(
43 String name,
44 Cache<String, IdentifiedData> delegate,
45 Supplier<RequestContext> contextSupplier,
46 ExternalCacheKeyGenerator keyGenerator,
47 Optional<MarshallingPair<V>> valueMarshalling,
48 TransactionControlManager transactionControlManager,
49 MetricsRecorder metricsRecorder,
50 Duration lockTimeout) {
51 super(name, contextSupplier, metricsRecorder, lockTimeout, (n, ex) -> {});
52 this.delegate = requireNonNull(delegate);
53 this.keyGenerator = requireNonNull(keyGenerator);
54 this.valueMarshalling = requireNonNull(valueMarshalling);
55 this.transactionControlManager = requireNonNull(transactionControlManager);
56 }
57
58 @Override
59 public void transactionSync() {
60 log.trace("Cache {}: synchronising operations", name);
61 final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
62
63 if (cacheContext.hasRemoveAll()) {
64 delegate.asMap().clear();
65 }
66
67 performKeyedOperations(cacheContext);
68 cacheContext.forgetAll();
69 }
70
71 @Override
72 protected AbstractExternalCacheRequestContext<V> ensureCacheContext() {
73 final RequestContext requestContext = contextSupplier.get();
74
75 transactionControlManager.registerTransactionalExternalCache(requestContext, name, this);
76
77 return requestContext.computeIfAbsent(this, () -> {
78
79
80 log.trace("Cache {}: Setting up a new context", name);
81 return new UnversionedExternalCacheRequestContext<>(
82 keyGenerator, getName(), requestContext::partitionIdentifier, lockTimeout);
83 });
84 }
85
86 @Override
87 protected Logger getLogger() {
88 return log;
89 }
90
91 @Override
92 protected final ExternalCacheException mapException(Exception ex) {
93 return GuavaUtils.mapException(ex);
94 }
95
96 @Override
97 protected final Optional<V> directGet(String externalKey) {
98 return unmarshall(delegate.getIfPresent(externalKey), valueMarshalling);
99 }
100
101 @Override
102 protected final Map<String, Optional<V>> directGetBulk(Set<String> externalKeys) {
103 return GuavaUtils.directGetBulk(externalKeys, delegate, valueMarshalling);
104 }
105
106 private void performKeyedOperations(AbstractExternalCacheRequestContext<V> cacheContext) {
107 try {
108 for (Map.Entry<String, AbstractExternalCacheRequestContext.DeferredOperation<V>> entry
109 : cacheContext.getKeyedOperations()) {
110 final String externalKey = cacheContext.externalEntryKeyFor(entry.getKey());
111
112 if (entry.getValue().isRemove()) {
113 log.trace("Cache {}: performing remove on entry {}", name, entry.getKey());
114 delegate.asMap().remove(externalKey);
115 } else {
116 log.trace("Cache {}: performing {} on entry {}", name, entry.getValue().getPolicy(), entry.getKey());
117 final IdentifiedData identifiedData = marshall(entry.getValue().getValue(), valueMarshalling);
118
119 final boolean putOutcome =
120 GuavaUtils.directPut(
121 externalKey,
122 identifiedData,
123 entry.getValue().getPolicy(),
124 delegate);
125
126 if (!putOutcome) {
127 log.debug("Cache {}: Unable to perform put() operation {} on entry {}",
128 name, entry.getValue().getPolicy(), entry.getKey());
129
130 delegate.asMap().clear();
131 break;
132 }
133 }
134 }
135 } catch (ExternalCacheException bugger) {
136 log.error("Cache {}: an operation failed in transaction sync, so clearing the cache", name);
137 delegate.asMap().clear();
138 }
139 }
140
141 }