1 package com.atlassian.vcache.internal.memcached;
2
3 import com.atlassian.marshalling.api.MarshallingException;
4 import com.atlassian.marshalling.api.MarshallingPair;
5 import com.atlassian.vcache.ExternalCacheException;
6 import com.atlassian.vcache.ExternalCacheSettings;
7 import com.atlassian.vcache.internal.RequestContext;
8 import com.atlassian.vcache.internal.core.ExternalCacheKeyGenerator;
9 import com.atlassian.vcache.internal.core.TransactionControlManager;
10 import com.atlassian.vcache.internal.core.VCacheCoreUtils;
11 import com.atlassian.vcache.internal.core.metrics.MetricsRecorder;
12 import com.atlassian.vcache.internal.core.service.AbstractExternalCacheRequestContext.DeferredOperation;
13 import com.atlassian.vcache.internal.core.service.AbstractTransactionalExternalCache;
14 import com.atlassian.vcache.internal.core.service.VersionedExternalCacheRequestContext;
15 import net.spy.memcached.MemcachedClientIF;
16 import org.slf4j.Logger;
17 import org.slf4j.LoggerFactory;
18
19 import java.util.HashMap;
20 import java.util.Map;
21 import java.util.Map.Entry;
22 import java.util.Optional;
23 import java.util.Set;
24 import java.util.concurrent.ExecutionException;
25 import java.util.concurrent.Future;
26 import java.util.function.Supplier;
27
28 import static com.atlassian.vcache.ExternalCacheException.Reason.TRANSACTION_FAILURE;
29 import static com.atlassian.vcache.VCacheUtils.fold;
30 import static com.atlassian.vcache.internal.core.VCacheCoreUtils.unmarshall;
31 import static com.atlassian.vcache.internal.memcached.MemcachedUtils.expiryTime;
32 import static com.atlassian.vcache.internal.memcached.MemcachedUtils.putOperationForPolicy;
33 import static java.util.Objects.requireNonNull;
34
35
36
37
38
39
40
41 class MemcachedTransactionalExternalCache<V>
42 extends AbstractTransactionalExternalCache<V> {
43 private static final Logger log = LoggerFactory.getLogger(MemcachedTransactionalExternalCache.class);
44
45 private final Supplier<MemcachedClientIF> clientSupplier;
46 private final ExternalCacheKeyGenerator keyGenerator;
47 private final MarshallingPair<V> valueMarshalling;
48 private final int ttlSeconds;
49 private final TransactionControlManager transactionControlManager;
50
51 MemcachedTransactionalExternalCache(
52 MemcachedVCacheServiceSettings serviceSettings,
53 Supplier<RequestContext> contextSupplier,
54 ExternalCacheKeyGenerator keyGenerator,
55 String name,
56 MarshallingPair<V> valueMarshalling,
57 ExternalCacheSettings settings,
58 TransactionControlManager transactionControlManager,
59 MetricsRecorder metricsRecorder) {
60 super(name, contextSupplier, metricsRecorder, serviceSettings.getLockTimeout(), serviceSettings.getExternalCacheExceptionListener());
61 this.clientSupplier = requireNonNull(serviceSettings.getClientSupplier());
62 this.keyGenerator = requireNonNull(keyGenerator);
63 this.valueMarshalling = requireNonNull(valueMarshalling);
64 this.ttlSeconds = VCacheCoreUtils.roundUpToSeconds(settings.getDefaultTtl().get());
65 this.transactionControlManager = requireNonNull(transactionControlManager);
66 }
67
68 @Override
69 public void transactionSync() {
70 log.trace("Cache {}: synchronising operations", name);
71 final VersionedExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
72
73 fold(perform(() -> {
74 if (cacheContext.hasRemoveAll()) {
75 cacheContext.updateCacheVersion(MemcachedUtils.cacheVersionIncrementer(clientSupplier));
76 }
77
78 performKeyedOperations(cacheContext);
79
80 return null;
81 }),
82 v -> null,
83 err -> {
84 log.warn("Cache {}: an operation failed during transaction sync ({}). Clearing cache to remove stale entries.", name, err.getMessage());
85 try {
86 cacheContext.updateCacheVersion(MemcachedUtils.cacheVersionIncrementer(clientSupplier));
87 } catch (RuntimeException e) {
88 log.error("Cache {}: failed to clear the cache: {}", name, e.getMessage());
89 }
90
91 return null;
92 });
93
94 cacheContext.forgetAll();
95 }
96
97 private void performKeyedOperations(VersionedExternalCacheRequestContext<V> cacheContext) {
98
99
100
101
102
103
104 final Map<Future<Boolean>, String> futureToFailureMessageMap = new HashMap<>();
105 final boolean[] anOperationFailed = new boolean[1];
106
107 for (Entry<String, DeferredOperation<V>> entry : cacheContext.getKeyedOperations()) {
108 final String externalKey = cacheContext.externalEntryKeyFor(entry.getKey());
109
110 if (entry.getValue().isRemove()) {
111 log.trace("Cache {}: performing remove on entry {}", name, entry.getKey());
112 futureToFailureMessageMap.put(
113 clientSupplier.get().delete(externalKey),
114 "remove entry " + entry.getKey());
115 } else {
116 log.trace("Cache {}: performing {} on entry {}", name, entry.getValue().getPolicy(), entry.getKey());
117 try {
118 final byte[] valueBytes = valueMarshalling.getMarshaller().marshallToBytes(
119 requireNonNull(entry.getValue().getValue()));
120 final Future<Boolean> putOp = putOperationForPolicy(
121 entry.getValue().getPolicy(),
122 clientSupplier.get(),
123 externalKey,
124 expiryTime(ttlSeconds),
125 valueBytes);
126 futureToFailureMessageMap.put(
127 putOp,
128 "put using policy " + entry.getValue().getPolicy() + " on entry "
129 + entry.getKey());
130 } catch (MarshallingException mex) {
131 log.warn("Cache {}: Unable to marshall value to perform put operation on entry {}",
132 name, entry.getKey(), mex);
133 anOperationFailed[0] = true;
134 break;
135 }
136 }
137 }
138
139
140 futureToFailureMessageMap.entrySet().forEach(entry -> {
141 try {
142 if (anOperationFailed[0]) {
143
144 entry.getKey().cancel(true);
145 } else {
146 final Boolean outcome = entry.getKey().get();
147 if (outcome) {
148 log.trace("Cache {}: successful deferred operation for {}", name, entry.getValue());
149 } else {
150 anOperationFailed[0] = true;
151 log.warn("Cache {}: failed deferred operation for {}", name, entry.getValue());
152 }
153 }
154 } catch (InterruptedException | ExecutionException ex) {
155 if (ex instanceof InterruptedException) {
156
157 Thread.currentThread().interrupt();
158 }
159 anOperationFailed[0] = true;
160 log.error("Cache {}: had failure getting result for deferred operation {}",
161 name, entry.getValue(), ex);
162 }
163 });
164
165
166 if (anOperationFailed[0]) {
167 throw new ExternalCacheException(TRANSACTION_FAILURE);
168 }
169 }
170
171 @Override
172 protected VersionedExternalCacheRequestContext<V> ensureCacheContext() {
173 final RequestContext requestContext = contextSupplier.get();
174
175 transactionControlManager.registerTransactionalExternalCache(requestContext, name, this);
176
177 return requestContext.computeIfAbsent(this, () -> {
178
179
180 log.trace("Cache {}: Setting up a new context", name);
181 return new VersionedExternalCacheRequestContext<>(
182 keyGenerator, name, requestContext::partitionIdentifier,
183 MemcachedUtils.cacheVersionSupplier(clientSupplier),
184 lockTimeout);
185 });
186 }
187
188 @Override
189 protected Logger getLogger() {
190 return log;
191 }
192
193 @Override
194 protected final ExternalCacheException mapException(Exception ex) {
195 return MemcachedUtils.mapException(ex);
196 }
197
198 @Override
199 protected final Optional<V> directGet(String externalKey) {
200 return unmarshall((byte[]) clientSupplier.get().get(externalKey), valueMarshalling);
201 }
202
203 @Override
204 protected final Map<String, Optional<V>> directGetBulk(Set<String> externalKeys) {
205 return MemcachedUtils.directGetBulk(externalKeys, clientSupplier, valueMarshalling);
206 }
207 }