1 package com.atlassian.vcache.internal.memcached;
2
3 import com.atlassian.marshalling.api.MarshallingException;
4 import com.atlassian.marshalling.api.MarshallingPair;
5 import com.atlassian.vcache.ExternalCacheException;
6 import com.atlassian.vcache.ExternalCacheSettings;
7 import com.atlassian.vcache.internal.RequestContext;
8 import com.atlassian.vcache.internal.core.ExternalCacheKeyGenerator;
9 import com.atlassian.vcache.internal.core.TransactionControlManager;
10 import com.atlassian.vcache.internal.core.VCacheCoreUtils;
11 import com.atlassian.vcache.internal.core.metrics.MetricsRecorder;
12 import com.atlassian.vcache.internal.core.service.AbstractTransactionalExternalCache;
13 import com.atlassian.vcache.internal.core.service.VersionedExternalCacheRequestContext;
14 import net.spy.memcached.MemcachedClientIF;
15 import net.spy.memcached.OperationTimeoutException;
16 import org.slf4j.Logger;
17 import org.slf4j.LoggerFactory;
18
19 import java.time.Duration;
20 import java.util.HashMap;
21 import java.util.Map;
22 import java.util.Optional;
23 import java.util.Set;
24 import java.util.concurrent.ExecutionException;
25 import java.util.concurrent.Future;
26 import java.util.function.Supplier;
27
28 import static com.atlassian.vcache.internal.core.VCacheCoreUtils.unmarshall;
29 import static com.atlassian.vcache.internal.memcached.MemcachedUtils.expiryTime;
30 import static com.atlassian.vcache.internal.memcached.MemcachedUtils.putOperationForPolicy;
31 import static java.util.Objects.requireNonNull;
32
33
34
35
36
37
38
39 class MemcachedTransactionalExternalCache<V>
40 extends AbstractTransactionalExternalCache<V> {
41 private static final Logger log = LoggerFactory.getLogger(MemcachedTransactionalExternalCache.class);
42
43 private final Supplier<MemcachedClientIF> clientSupplier;
44 private final ExternalCacheKeyGenerator keyGenerator;
45 private final MarshallingPair<V> valueMarshalling;
46 private final int ttlSeconds;
47 private final TransactionControlManager transactionControlManager;
48
49 MemcachedTransactionalExternalCache(
50 Supplier<MemcachedClientIF> clientSupplier,
51 Supplier<RequestContext> contextSupplier,
52 ExternalCacheKeyGenerator keyGenerator,
53 String name,
54 MarshallingPair<V> valueMarshalling,
55 ExternalCacheSettings settings,
56 TransactionControlManager transactionControlManager,
57 MetricsRecorder metricsRecorder,
58 Duration lockTimeout) {
59 super(name, contextSupplier, metricsRecorder, lockTimeout);
60 this.clientSupplier = requireNonNull(clientSupplier);
61 this.keyGenerator = requireNonNull(keyGenerator);
62 this.valueMarshalling = requireNonNull(valueMarshalling);
63 this.ttlSeconds = VCacheCoreUtils.roundUpToSeconds(settings.getDefaultTtl().get());
64 this.transactionControlManager = requireNonNull(transactionControlManager);
65 }
66
67 @Override
68 public void transactionSync() {
69 log.trace("Cache {}: synchronising operations", name);
70 final VersionedExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
71
72 if (cacheContext.hasRemoveAll()) {
73 cacheContext.updateCacheVersion(
74 MemcachedUtils.incrementCacheVersion(clientSupplier, cacheContext.externalCacheVersionKey()));
75 }
76
77 performKeyedOperations(cacheContext);
78 cacheContext.forgetAll();
79 }
80
81 private void performKeyedOperations(VersionedExternalCacheRequestContext<V> cacheContext) {
82
83
84
85
86
87
88 final Map<Future<Boolean>, String> futureToFailureMessageMap = new HashMap<>();
89 cacheContext.getKeyedOperations().forEach(entry -> {
90 final String externalKey = cacheContext.externalEntryKeyFor(entry.getKey());
91
92 if (entry.getValue().isRemove()) {
93 log.trace("Cache {}: performing remove on entry {}", name, entry.getKey());
94 futureToFailureMessageMap.put(
95 clientSupplier.get().delete(externalKey),
96 "remove entry " + entry.getKey());
97 } else {
98 log.trace("Cache {}: performing {} on entry {}", name, entry.getValue().getPolicy(), entry.getKey());
99 try {
100 final byte[] valueBytes = valueMarshalling.getMarshaller().marshallToBytes(
101 requireNonNull(entry.getValue().getValue()));
102 final Future<Boolean> putOp = putOperationForPolicy(
103 entry.getValue().getPolicy(),
104 clientSupplier.get(),
105 externalKey,
106 expiryTime(ttlSeconds),
107 valueBytes);
108 futureToFailureMessageMap.put(
109 putOp,
110 "put using policy " + entry.getValue().getPolicy() + " on entry "
111 + entry.getKey());
112 } catch (MarshallingException mex) {
113 log.warn("Cache {}: Unable to marshall value to perform put operation on entry {}",
114 name, entry.getKey(), mex);
115 }
116 }
117 });
118
119
120 final boolean[] anOperationFailed = new boolean[1];
121 futureToFailureMessageMap.entrySet().forEach(entry -> {
122 try {
123 final Boolean outcome = entry.getKey().get();
124 if (outcome) {
125 log.trace("Cache {}: successful deferred operation for {}", name, entry.getValue());
126 } else {
127 anOperationFailed[0] = true;
128 log.trace("Cache {}: failed deferred operation for {}", name, entry.getValue());
129 }
130 } catch (InterruptedException | ExecutionException ex) {
131 anOperationFailed[0] = true;
132 log.error("Cache {}: had failure getting result for deferred operation {}",
133 name, entry.getValue(), ex);
134 }
135 });
136
137
138 if (anOperationFailed[0]) {
139 log.warn("Cache {}: an operation failed in transaction sync, so clearing the cache", name);
140 cacheContext.updateCacheVersion(
141 MemcachedUtils.incrementCacheVersion(clientSupplier, cacheContext.externalCacheVersionKey()));
142 }
143 }
144
145 @Override
146 protected VersionedExternalCacheRequestContext<V> ensureCacheContext() throws OperationTimeoutException {
147 final RequestContext requestContext = contextSupplier.get();
148
149 transactionControlManager.registerTransactionalExternalCache(requestContext, name, this);
150
151 return requestContext.computeIfAbsent(this, () -> {
152
153
154 log.trace("Cache {}: Setting up a new context", name);
155 final VersionedExternalCacheRequestContext<V> newCacheContext = new VersionedExternalCacheRequestContext<>(
156 keyGenerator, name, requestContext::partitionIdentifier, lockTimeout);
157 newCacheContext.updateCacheVersion(
158 MemcachedUtils.obtainCacheVersion(clientSupplier, newCacheContext.externalCacheVersionKey()));
159 return newCacheContext;
160 });
161 }
162
163 @Override
164 protected Logger getLogger() {
165 return log;
166 }
167
168 @Override
169 protected final ExternalCacheException mapException(Exception ex) {
170 return MemcachedUtils.mapException(ex);
171 }
172
173 @Override
174 protected final Optional<V> directGet(String externalKey) {
175 return unmarshall((byte[]) clientSupplier.get().get(externalKey), valueMarshalling);
176 }
177
178 @Override
179 protected final Map<String, Optional<V>> directGetBulk(Set<String> externalKeys) {
180 return MemcachedUtils.directGetBulk(externalKeys, clientSupplier, valueMarshalling);
181 }
182 }