1 package com.atlassian.vcache.internal.memcached;
2
3 import com.atlassian.marshalling.api.MarshallingPair;
4 import com.atlassian.vcache.ExternalCacheException;
5 import com.atlassian.vcache.ExternalCacheSettings;
6 import com.atlassian.vcache.PutPolicy;
7 import com.atlassian.vcache.internal.MetricLabel;
8 import com.atlassian.vcache.internal.RequestContext;
9 import com.atlassian.vcache.internal.core.ExternalCacheKeyGenerator;
10 import com.atlassian.vcache.internal.core.VCacheCoreUtils;
11 import com.atlassian.vcache.internal.core.metrics.CacheType;
12 import com.atlassian.vcache.internal.core.metrics.MetricsRecorder;
13 import com.atlassian.vcache.internal.core.service.AbstractStableReadExternalCache;
14 import com.atlassian.vcache.internal.core.service.VersionedExternalCacheRequestContext;
15 import net.spy.memcached.MemcachedClientIF;
16 import net.spy.memcached.OperationTimeoutException;
17 import org.slf4j.Logger;
18 import org.slf4j.LoggerFactory;
19
20 import java.util.Collections;
21 import java.util.HashSet;
22 import java.util.Map;
23 import java.util.Optional;
24 import java.util.Set;
25 import java.util.concurrent.CompletableFuture;
26 import java.util.concurrent.CompletionStage;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.Future;
29 import java.util.concurrent.atomic.AtomicBoolean;
30 import java.util.function.Function;
31 import java.util.function.Supplier;
32 import java.util.stream.Collectors;
33 import java.util.stream.StreamSupport;
34
35 import static com.atlassian.vcache.internal.core.VCacheCoreUtils.isEmpty;
36 import static com.atlassian.vcache.internal.core.VCacheCoreUtils.marshall;
37 import static com.atlassian.vcache.internal.core.VCacheCoreUtils.unmarshall;
38 import static com.atlassian.vcache.internal.memcached.MemcachedUtils.expiryTime;
39 import static com.atlassian.vcache.internal.memcached.MemcachedUtils.putOperationForPolicy;
40 import static java.util.Objects.requireNonNull;
41
42
43
44
45
46
47
48 class MemcachedStableReadExternalCache<V>
49 extends AbstractStableReadExternalCache<V> {
50 private static final Logger log = LoggerFactory.getLogger(MemcachedStableReadExternalCache.class);
51
52 private final Supplier<MemcachedClientIF> clientSupplier;
53 private final Supplier<RequestContext> contextSupplier;
54 private final ExternalCacheKeyGenerator keyGenerator;
55 private final MarshallingPair<V> valueMarshalling;
56 private final int ttlSeconds;
57
58 MemcachedStableReadExternalCache(
59 Supplier<MemcachedClientIF> clientSupplier,
60 Supplier<RequestContext> contextSupplier,
61 ExternalCacheKeyGenerator keyGenerator,
62 String name,
63 MarshallingPair<V> valueMarshalling,
64 ExternalCacheSettings settings,
65 MetricsRecorder metricsRecorder) {
66 super(name, metricsRecorder);
67 this.clientSupplier = requireNonNull(clientSupplier);
68 this.contextSupplier = requireNonNull(contextSupplier);
69 this.keyGenerator = requireNonNull(keyGenerator);
70 this.valueMarshalling = requireNonNull(valueMarshalling);
71 this.ttlSeconds = VCacheCoreUtils.roundUpToSeconds(settings.getDefaultTtl().get());
72 }
73
74 @Override
75 public CompletionStage<Boolean> internalPut(String internalKey, V value, PutPolicy policy) {
76 final VersionedExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
77 return perform(() -> {
78
79
80 final AtomicBoolean remotePutResult = new AtomicBoolean(true);
81 cacheContext.computeValue(internalKey, (key, oldValue) -> {
82
83
84
85
86
87 if (oldValue != null) {
88 return oldValue.thenApply((oldV) -> {
89 remotePutResult.set(remotePut(cacheContext.externalEntryKeyFor(internalKey), value, policy));
90 return Optional.of(value);
91 });
92 } else {
93 remotePutResult.set(remotePut(cacheContext.externalEntryKeyFor(internalKey), value, policy));
94 return CompletableFuture.completedFuture(Optional.of(value));
95 }
96 });
97 return remotePutResult.get();
98 },
99 (result) -> {
100 if (!result) {
101 cacheContext.forgetValue(internalKey);
102 }
103 });
104 }
105
106 private boolean remotePut(String externalKey, V value, PutPolicy policy) {
107
108 final byte[] valueBytes = valueMarshalling.getMarshaller().marshallToBytes(value);
109
110 try {
111 final Future<Boolean> putOp = putOperationForPolicy(policy, clientSupplier.get(), externalKey, expiryTime(ttlSeconds), valueBytes);
112 return putOp.get();
113 } catch (InterruptedException | ExecutionException e) {
114 throw new ExternalCacheException(ExternalCacheException.Reason.UNCLASSIFIED_FAILURE, e);
115 }
116 }
117
118 @Override
119 protected CompletionStage<Void> internalRemove(Iterable<String> internalKeys) {
120
121 return perform(() -> {
122 if (isEmpty(internalKeys)) {
123 return null;
124 }
125
126
127 final VersionedExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
128 final Map<String, Future<Boolean>> deleteOps =
129 StreamSupport.stream(internalKeys.spliterator(), false)
130 .distinct()
131 .collect(Collectors.toMap(
132 k -> k,
133 k -> clientSupplier.get().delete(cacheContext.externalEntryKeyFor(k))
134 ));
135
136
137
138 Exception failureException = null;
139 for (Map.Entry<String, Future<Boolean>> delOp : deleteOps.entrySet()) {
140 try {
141 delOp.getValue().get();
142 cacheContext.recordValue(delOp.getKey(), Optional.empty());
143 } catch (ExecutionException | InterruptedException ex) {
144 log.info("Cache {}: unable to remove key {}", name, delOp.getKey(), ex);
145 failureException = ex;
146 }
147 }
148
149
150 if (failureException != null) {
151 if (failureException instanceof ExecutionException) {
152 throw (ExecutionException) failureException;
153 }
154 throw (InterruptedException) failureException;
155 }
156
157 return null;
158 });
159 }
160
161 @Override
162 protected CompletionStage<Void> internalRemoveAll() {
163 return perform(() -> {
164 final VersionedExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
165 cacheContext.updateCacheVersion(
166 MemcachedUtils.incrementCacheVersion(clientSupplier, cacheContext.externalCacheVersionKey()));
167 cacheContext.forgetAllValues();
168 return null;
169 });
170 }
171
172 @Override
173 protected Logger getLogger() {
174 return log;
175 }
176
177 @Override
178 protected VersionedExternalCacheRequestContext<V> ensureCacheContext() throws OperationTimeoutException {
179 final RequestContext requestContext = contextSupplier.get();
180
181 return requestContext.computeIfAbsent(this, () -> {
182
183
184 log.trace("Cache {}: Setting up a new context", name);
185 final VersionedExternalCacheRequestContext<V> newCacheContext = new VersionedExternalCacheRequestContext<>(
186 keyGenerator, name, requestContext::partitionIdentifier);
187 newCacheContext.updateCacheVersion(
188 MemcachedUtils.obtainCacheVersion(clientSupplier, newCacheContext.externalCacheVersionKey()));
189 return newCacheContext;
190 });
191 }
192
193 @Override
194 protected V handleCreation(String internalKey, Supplier<V> supplier)
195 throws ExecutionException, InterruptedException {
196 final VersionedExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
197 final V candidateValue = requireNonNull(supplier.get());
198 final byte[] candidateValueBytes = valueMarshalling.getMarshaller().marshallToBytes(candidateValue);
199 final String externalKey = cacheContext.externalEntryKeyFor(internalKey);
200
201
202 for (; ; ) {
203 final Future<Boolean> addOp = clientSupplier.get().add(externalKey, expiryTime(ttlSeconds), candidateValueBytes);
204 if (addOp.get()) {
205
206
207 break;
208 }
209
210 getLogger().info("Cache {}, unable to add candidate for key {}, retrieve what was added", name, internalKey);
211 final Optional<V> otherAddedValue = unmarshall((byte[]) clientSupplier.get().get(externalKey), valueMarshalling);
212 if (otherAddedValue.isPresent()) {
213 return otherAddedValue.get();
214 }
215
216 getLogger().info("Cache {}, unable to retrieve recently added candidate for key {}, looping", name, internalKey);
217 }
218
219 return candidateValue;
220 }
221
222 @Override
223 protected Map<String, V> handleCreation(Function<Set<String>, Map<String, V>> factory, Set<String> externalKeys)
224 throws ExecutionException, InterruptedException {
225
226
227 final VersionedExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
228 metricsRecorder.record(name, CacheType.EXTERNAL, MetricLabel.NUMBER_OF_REMOTE_GET, 1);
229 final Map<String, Object> haveValues = clientSupplier.get().getBulk(externalKeys);
230 getLogger().trace("Cache {}: getBulk(Function): {} of {} entries have values",
231 name, haveValues.size(), externalKeys.size());
232 final Set<String> missingExternalKeys = new HashSet<>(externalKeys);
233 missingExternalKeys.removeAll(haveValues.keySet());
234
235
236 final Map<String, V> grandResult = haveValues.entrySet().stream()
237 .collect(Collectors.toMap(
238 e -> cacheContext.internalEntryKeyFor(e.getKey()),
239 e -> unmarshall((byte[]) e.getValue(), valueMarshalling).get()));
240
241 if (!missingExternalKeys.isEmpty()) {
242 getLogger().trace("Cache {}: getBulk(Function): calling factory to create {} values",
243 name, missingExternalKeys.size());
244
245 final Set<String> missingInternalKeys = Collections.unmodifiableSet(
246 missingExternalKeys.stream().map(cacheContext::internalEntryKeyFor).collect(Collectors.toSet()));
247 final Map<String, V> missingValues = factory.apply(missingInternalKeys);
248 if (missingInternalKeys.size() != missingValues.size()) {
249 getLogger().warn("Cache {}: getBulk(Function): mismatch on generated values, expected ",
250 name, missingInternalKeys.size() + " but got " + missingValues.size());
251 throw new ExternalCacheException(ExternalCacheException.Reason.FUNCTION_INCORRECT_RESULT);
252 }
253
254
255 final Map<String, Future<Boolean>> internalKeyToFutureMap = missingValues.entrySet().stream().collect(Collectors.toMap(
256 Map.Entry::getKey,
257 e -> clientSupplier.get().set(
258 cacheContext.externalEntryKeyFor(e.getKey()), expiryTime(ttlSeconds), marshall(e.getValue(), valueMarshalling))
259 ));
260
261
262 for (Map.Entry<String, Future<Boolean>> e : internalKeyToFutureMap.entrySet()) {
263 e.getValue().get();
264 }
265
266 grandResult.putAll(missingValues);
267 }
268
269 return grandResult;
270 }
271
272 @Override
273 protected final ExternalCacheException mapException(Exception ex) {
274 return MemcachedUtils.mapException(ex);
275 }
276
277 @Override
278 protected final Optional<V> directGet(String externalKey) {
279 return unmarshall((byte[]) clientSupplier.get().get(externalKey), valueMarshalling);
280 }
281
282 @Override
283 protected final Map<String, Optional<V>> directGetBulk(Set<String> externalKeys) {
284 return MemcachedUtils.directGetBulk(externalKeys, clientSupplier, valueMarshalling);
285 }
286 }