1 package com.atlassian.vcache.internal.memcached;
2
3 import com.atlassian.marshalling.api.MarshallingException;
4 import com.atlassian.marshalling.api.MarshallingPair;
5 import com.atlassian.vcache.ExternalCacheException;
6 import com.atlassian.vcache.ExternalCacheSettings;
7 import com.atlassian.vcache.internal.RequestContext;
8 import com.atlassian.vcache.internal.core.ExternalCacheKeyGenerator;
9 import com.atlassian.vcache.internal.core.TransactionControlManager;
10 import com.atlassian.vcache.internal.core.VCacheCoreUtils;
11 import com.atlassian.vcache.internal.core.metrics.MetricsRecorder;
12 import com.atlassian.vcache.internal.core.service.AbstractTransactionalExternalCache;
13 import com.atlassian.vcache.internal.core.service.VersionedExternalCacheRequestContext;
14 import net.spy.memcached.MemcachedClientIF;
15 import net.spy.memcached.OperationTimeoutException;
16 import org.slf4j.Logger;
17 import org.slf4j.LoggerFactory;
18
19 import java.util.HashMap;
20 import java.util.Map;
21 import java.util.Optional;
22 import java.util.Set;
23 import java.util.concurrent.ExecutionException;
24 import java.util.concurrent.Future;
25 import java.util.function.Supplier;
26
27 import static com.atlassian.vcache.internal.core.VCacheCoreUtils.unmarshall;
28 import static com.atlassian.vcache.internal.memcached.MemcachedUtils.expiryTime;
29 import static com.atlassian.vcache.internal.memcached.MemcachedUtils.putOperationForPolicy;
30 import static java.util.Objects.requireNonNull;
31
32
33
34
35
36
37
38 class MemcachedTransactionalExternalCache<V>
39 extends AbstractTransactionalExternalCache<V> {
40 private static final Logger log = LoggerFactory.getLogger(MemcachedTransactionalExternalCache.class);
41
42 private final Supplier<MemcachedClientIF> clientSupplier;
43 private final ExternalCacheKeyGenerator keyGenerator;
44 private final MarshallingPair<V> valueMarshalling;
45 private final int ttlSeconds;
46 private final TransactionControlManager transactionControlManager;
47
48 MemcachedTransactionalExternalCache(
49 Supplier<MemcachedClientIF> clientSupplier,
50 Supplier<RequestContext> contextSupplier,
51 ExternalCacheKeyGenerator keyGenerator,
52 String name,
53 MarshallingPair<V> valueMarshalling,
54 ExternalCacheSettings settings,
55 TransactionControlManager transactionControlManager,
56 MetricsRecorder metricsRecorder) {
57 super(name, contextSupplier, metricsRecorder);
58 this.clientSupplier = requireNonNull(clientSupplier);
59 this.keyGenerator = requireNonNull(keyGenerator);
60 this.valueMarshalling = requireNonNull(valueMarshalling);
61 this.ttlSeconds = VCacheCoreUtils.roundUpToSeconds(settings.getDefaultTtl().get());
62 this.transactionControlManager = requireNonNull(transactionControlManager);
63 }
64
65 @Override
66 public void transactionSync() {
67 log.trace("Cache {}: synchronising operations", name);
68 final VersionedExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
69
70 if (cacheContext.hasRemoveAll()) {
71 cacheContext.updateCacheVersion(
72 MemcachedUtils.incrementCacheVersion(clientSupplier, cacheContext.externalCacheVersionKey()));
73 }
74
75 performKeyedOperations(cacheContext);
76 cacheContext.forgetAll();
77 }
78
79 private void performKeyedOperations(VersionedExternalCacheRequestContext<V> cacheContext) {
80
81
82
83
84
85
86 final Map<Future<Boolean>, String> futureToFailureMessageMap = new HashMap<>();
87 cacheContext.getKeyedOperations().forEach(entry -> {
88 final String externalKey = cacheContext.externalEntryKeyFor(entry.getKey());
89
90 if (entry.getValue().isRemove()) {
91 log.trace("Cache {}: performing remove on entry {}", name, entry.getKey());
92 futureToFailureMessageMap.put(
93 clientSupplier.get().delete(externalKey),
94 "remove entry " + entry.getKey());
95 } else {
96 log.trace("Cache {}: performing {} on entry {}", name, entry.getValue().getPolicy(), entry.getKey());
97 try {
98 final byte[] valueBytes = valueMarshalling.getMarshaller().marshallToBytes(
99 requireNonNull(entry.getValue().getValue()));
100 final Future<Boolean> putOp = putOperationForPolicy(
101 entry.getValue().getPolicy(),
102 clientSupplier.get(),
103 externalKey,
104 expiryTime(ttlSeconds),
105 valueBytes);
106 futureToFailureMessageMap.put(
107 putOp,
108 "put using policy " + entry.getValue().getPolicy() + " on entry "
109 + entry.getKey());
110 } catch (MarshallingException mex) {
111 log.warn("Cache {}: Unable to marshall value to perform put operation on entry {}",
112 name, entry.getKey(), mex);
113 }
114 }
115 });
116
117
118 final boolean[] anOperationFailed = new boolean[1];
119 futureToFailureMessageMap.entrySet().forEach(entry -> {
120 try {
121 final Boolean outcome = entry.getKey().get();
122 if (outcome) {
123 log.trace("Cache {}: successful deferred operation for {}", name, entry.getValue());
124 } else {
125 anOperationFailed[0] = true;
126 log.trace("Cache {}: failed deferred operation for {}", name, entry.getValue());
127 }
128 } catch (InterruptedException | ExecutionException ex) {
129 anOperationFailed[0] = true;
130 log.error("Cache {}: had failure getting result for deferred operation {}",
131 name, entry.getValue(), ex);
132 }
133 });
134
135
136 if (anOperationFailed[0]) {
137 log.warn("Cache {}: an operation failed in transaction sync, so clearing the cache", name);
138 cacheContext.updateCacheVersion(
139 MemcachedUtils.incrementCacheVersion(clientSupplier, cacheContext.externalCacheVersionKey()));
140 }
141 }
142
143 @Override
144 protected VersionedExternalCacheRequestContext<V> ensureCacheContext() throws OperationTimeoutException {
145 final RequestContext requestContext = contextSupplier.get();
146
147 transactionControlManager.registerTransactionalExternalCache(requestContext, name, this);
148
149 return requestContext.computeIfAbsent(this, () -> {
150
151
152 log.trace("Cache {}: Setting up a new context", name);
153 final VersionedExternalCacheRequestContext<V> newCacheContext = new VersionedExternalCacheRequestContext<>(
154 keyGenerator, name, requestContext::partitionIdentifier);
155 newCacheContext.updateCacheVersion(
156 MemcachedUtils.obtainCacheVersion(clientSupplier, newCacheContext.externalCacheVersionKey()));
157 return newCacheContext;
158 });
159 }
160
161 @Override
162 protected Logger getLogger() {
163 return log;
164 }
165
166 @Override
167 protected final ExternalCacheException mapException(Exception ex) {
168 return MemcachedUtils.mapException(ex);
169 }
170
171 @Override
172 protected final Optional<V> directGet(String externalKey) {
173 return unmarshall((byte[]) clientSupplier.get().get(externalKey), valueMarshalling);
174 }
175
176 @Override
177 protected final Map<String, Optional<V>> directGetBulk(Set<String> externalKeys) {
178 return MemcachedUtils.directGetBulk(externalKeys, clientSupplier, valueMarshalling);
179 }
180 }