1 package com.atlassian.vcache.internal.memcached;
2
3 import com.atlassian.marshalling.api.MarshallingPair;
4 import com.atlassian.vcache.CasIdentifier;
5 import com.atlassian.vcache.DirectExternalCache;
6 import com.atlassian.vcache.ExternalCacheException;
7 import com.atlassian.vcache.ExternalCacheSettings;
8 import com.atlassian.vcache.IdentifiedValue;
9 import com.atlassian.vcache.PutPolicy;
10 import com.atlassian.vcache.internal.RequestContext;
11 import com.atlassian.vcache.internal.core.DefaultIdentifiedValue;
12 import com.atlassian.vcache.internal.core.ExternalCacheKeyGenerator;
13 import com.atlassian.vcache.internal.core.VCacheCoreUtils;
14 import com.atlassian.vcache.internal.core.service.AbstractExternalCache;
15 import com.atlassian.vcache.internal.core.service.FactoryUtils;
16 import com.atlassian.vcache.internal.core.service.VersionedExternalCacheRequestContext;
17 import com.google.common.annotations.VisibleForTesting;
18 import net.spy.memcached.CASResponse;
19 import net.spy.memcached.CASValue;
20 import net.spy.memcached.MemcachedClientIF;
21 import net.spy.memcached.OperationTimeoutException;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
24
25 import java.util.Collections;
26 import java.util.HashMap;
27 import java.util.HashSet;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.Optional;
31 import java.util.Set;
32 import java.util.concurrent.CompletionStage;
33 import java.util.concurrent.Future;
34 import java.util.function.Function;
35 import java.util.function.Supplier;
36 import java.util.stream.Collectors;
37 import java.util.stream.StreamSupport;
38
39 import static com.atlassian.vcache.internal.core.VCacheCoreUtils.isEmpty;
40 import static com.atlassian.vcache.internal.core.VCacheCoreUtils.marshall;
41 import static com.atlassian.vcache.internal.core.VCacheCoreUtils.unmarshall;
42 import static com.atlassian.vcache.internal.memcached.MemcachedUtils.expiryTime;
43 import static com.atlassian.vcache.internal.memcached.MemcachedUtils.identifiedValueFrom;
44 import static com.atlassian.vcache.internal.memcached.MemcachedUtils.putOperationForPolicy;
45 import static com.atlassian.vcache.internal.memcached.MemcachedUtils.safeExtractId;
46 import static java.util.Objects.requireNonNull;
47
48
49
50
51
52
53
54 class MemcachedDirectExternalCache<V>
55 extends AbstractExternalCache<V>
56 implements DirectExternalCache<V> {
57 private static final Logger log = LoggerFactory.getLogger(MemcachedDirectExternalCache.class);
58
59 private final Supplier<MemcachedClientIF> clientSupplier;
60 private final Supplier<RequestContext> contextSupplier;
61 private final ExternalCacheKeyGenerator keyGenerator;
62 private final MarshallingPair<V> valueMarshalling;
63 private final int ttlSeconds;
64
65 MemcachedDirectExternalCache(
66 MemcachedVCacheServiceSettings serviceSettings,
67 Supplier<RequestContext> contextSupplier,
68 ExternalCacheKeyGenerator keyGenerator,
69 String name,
70 MarshallingPair<V> valueMarshalling,
71 ExternalCacheSettings settings) {
72 super(name, serviceSettings.getLockTimeout(), serviceSettings.getExternalCacheExceptionListener());
73 this.clientSupplier = requireNonNull(serviceSettings.getClientSupplier());
74 this.contextSupplier = requireNonNull(contextSupplier);
75 this.keyGenerator = requireNonNull(keyGenerator);
76 this.valueMarshalling = requireNonNull(valueMarshalling);
77 this.ttlSeconds = VCacheCoreUtils.roundUpToSeconds(settings.getDefaultTtl().get());
78 }
79
80 @Override
81 public CompletionStage<Optional<V>> get(String internalKey) {
82 return perform(() -> {
83 final String externalKey = buildExternalKey(internalKey);
84 return unmarshall((byte[]) clientSupplier.get().get(externalKey), valueMarshalling);
85 });
86 }
87
88 @Override
89 public CompletionStage<V> get(String internalKey, Supplier<V> supplier) {
90 return perform(() -> {
91 final String externalKey = buildExternalKey(internalKey);
92 final Optional<V> existingValue = unmarshall((byte[]) clientSupplier.get().get(externalKey), valueMarshalling);
93 if (existingValue.isPresent()) {
94 return existingValue.get();
95 }
96
97 log.trace("Cache {}, creating candidate for key {}", name, internalKey);
98 final V candidateValue = requireNonNull(supplier.get());
99 final byte[] candidateValueBytes = valueMarshalling.getMarshaller().marshallToBytes(candidateValue);
100
101
102 for (; ; ) {
103 final Future<Boolean> addOp = clientSupplier.get().add(externalKey, expiryTime(ttlSeconds), candidateValueBytes);
104 if (addOp.get()) {
105
106
107 break;
108 }
109
110 log.info("Cache {}, unable to add candidate for key {}, retrieve what was added", name, internalKey);
111 final Optional<V> otherAddedValue = unmarshall((byte[]) clientSupplier.get().get(externalKey), valueMarshalling);
112 if (otherAddedValue.isPresent()) {
113 return otherAddedValue.get();
114 }
115
116 log.info("Cache {}, unable to retrieve recently added candidate for key {}, looping", name, internalKey);
117 }
118 return candidateValue;
119 });
120 }
121
122 @Override
123 public CompletionStage<Optional<IdentifiedValue<V>>> getIdentified(String internalKey) {
124 return perform(() -> {
125 final String externalKey = buildExternalKey(internalKey);
126 final CASValue<Object> casValue = clientSupplier.get().gets(externalKey);
127 if (casValue == null) {
128 return Optional.empty();
129 }
130
131 final CasIdentifier identifier = new MemcachedCasIdentifier(casValue.getCas());
132 final IdentifiedValue<V> iv = new DefaultIdentifiedValue<>(
133 identifier, valueMarshalling.getUnmarshaller().unmarshallFrom((byte[]) casValue.getValue()));
134 return Optional.of(iv);
135 });
136 }
137
138 @Override
139 public CompletionStage<IdentifiedValue<V>> getIdentified(String internalKey, Supplier<V> supplier) {
140 return perform(() -> {
141 final String externalKey = buildExternalKey(internalKey);
142 final CASValue<Object> existingCasValue = clientSupplier.get().gets(externalKey);
143
144 if (existingCasValue != null) {
145 final CasIdentifier identifier = new MemcachedCasIdentifier(existingCasValue.getCas());
146 final IdentifiedValue<V> iv = new DefaultIdentifiedValue<>(
147 identifier, valueMarshalling.getUnmarshaller().unmarshallFrom((byte[]) existingCasValue.getValue()));
148 return iv;
149 }
150
151 log.trace("Cache {}, creating candidate for key {}", name, internalKey);
152 final V candidateValue = requireNonNull(supplier.get());
153 final byte[] candidateValueBytes = valueMarshalling.getMarshaller().marshallToBytes(candidateValue);
154
155
156 for (; ; ) {
157 final Future<Boolean> addOp = clientSupplier.get().add(externalKey, expiryTime(ttlSeconds), candidateValueBytes);
158 if (!addOp.get()) {
159 log.trace("Cache {}, unable to add candidate for key {}", name, internalKey);
160 }
161
162
163 log.trace("Cache {}, retrieving the candidate for key {}", name, internalKey);
164 final CASValue<Object> otherAddedCasValue = clientSupplier.get().gets(externalKey);
165 if (otherAddedCasValue != null) {
166 final CasIdentifier identifier = new MemcachedCasIdentifier(otherAddedCasValue.getCas());
167 final IdentifiedValue<V> iv = new DefaultIdentifiedValue<>(
168 identifier, valueMarshalling.getUnmarshaller().unmarshallFrom((byte[]) otherAddedCasValue.getValue()));
169 return iv;
170 }
171
172 log.info("Cache {}, unable to retrieve recently added candidate for key {}, looping", name, internalKey);
173 }
174 });
175 }
176
177 @Override
178 public CompletionStage<Map<String, Optional<V>>> getBulk(Iterable<String> internalKeys) {
179 return perform(() -> {
180 if (isEmpty(internalKeys)) {
181 return new HashMap<>();
182 }
183
184
185 final VersionedExternalCacheRequestContext cacheContext = ensureCacheContext();
186
187 final Set<String> externalKeys = StreamSupport.stream(internalKeys.spliterator(), false)
188 .map(cacheContext::externalEntryKeyFor)
189 .collect(Collectors.toSet());
190
191
192 final Map<String, Object> haveValues = clientSupplier.get().getBulk(externalKeys);
193
194 return externalKeys.stream().collect(Collectors.toMap(
195 cacheContext::internalEntryKeyFor,
196 k -> unmarshall((byte[]) haveValues.get(k), valueMarshalling)));
197 });
198 }
199
200 @Override
201 public CompletionStage<Map<String, V>> getBulk(Function<Set<String>, Map<String, V>> factory, Iterable<String> internalKeys) {
202 return perform(() -> {
203 if (isEmpty(internalKeys)) {
204 return new HashMap<>();
205 }
206
207
208 final VersionedExternalCacheRequestContext cacheContext = ensureCacheContext();
209
210 final Set<String> externalKeys = Collections.unmodifiableSet(
211 StreamSupport.stream(internalKeys.spliterator(), false)
212 .map(cacheContext::externalEntryKeyFor)
213 .collect(Collectors.toSet()));
214
215
216
217 final Map<String, Object> haveValues = clientSupplier.get().getBulk(externalKeys);
218 log.trace("{} of {} entries have values", haveValues.size(), externalKeys.size());
219 final Set<String> missingExternalKeys = new HashSet<>(externalKeys);
220 missingExternalKeys.removeAll(haveValues.keySet());
221
222
223 final Map<String, V> grandResult = haveValues.entrySet().stream().collect(Collectors.toMap(
224 e -> cacheContext.internalEntryKeyFor(e.getKey()),
225 e -> unmarshall((byte[]) e.getValue(), valueMarshalling).get()
226 ));
227
228 if (!missingExternalKeys.isEmpty()) {
229
230 final Set<String> missingInternalKeys = Collections.unmodifiableSet(
231 missingExternalKeys.stream().map(cacheContext::internalEntryKeyFor).collect(Collectors.toSet()));
232 final Map<String, V> missingValues = factory.apply(missingInternalKeys);
233 FactoryUtils.verifyFactoryResult(missingValues, missingInternalKeys);
234
235
236 final Map<String, Future<Boolean>> internalKeyToFutureMap = missingValues.entrySet().stream().collect(Collectors.toMap(
237 Map.Entry::getKey,
238 e -> clientSupplier.get().set(
239 cacheContext.externalEntryKeyFor(e.getKey()), expiryTime(ttlSeconds), marshall(e.getValue(), valueMarshalling))
240 ));
241
242
243 for (Map.Entry<String, Future<Boolean>> e : internalKeyToFutureMap.entrySet()) {
244 e.getValue().get();
245 }
246
247 grandResult.putAll(missingValues);
248 }
249
250 return grandResult;
251 });
252 }
253
254 @Override
255 public CompletionStage<Map<String, Optional<IdentifiedValue<V>>>> getBulkIdentified(Iterable<String> internalKeys) {
256 return perform(() -> {
257 if (isEmpty(internalKeys)) {
258 return new HashMap<>();
259 }
260
261
262 final VersionedExternalCacheRequestContext cacheContext = ensureCacheContext();
263
264
265 final Map<String, Future<CASValue<Object>>> internalKeyToFuture =
266 StreamSupport.stream(internalKeys.spliterator(), false)
267 .distinct()
268 .collect(Collectors.toMap(
269 k -> k,
270 k -> clientSupplier.get().asyncGets(cacheContext.externalEntryKeyFor(k))
271 ));
272
273 return internalKeyToFuture.entrySet().stream().collect(Collectors.toMap(
274 Map.Entry::getKey,
275 e -> identifiedValueFrom(e.getValue(), valueMarshalling)
276 ));
277 });
278 }
279
280 @Override
281 public CompletionStage<Boolean> put(String internalKey, V value, PutPolicy policy) {
282 return perform(() -> {
283 final String externalKey = buildExternalKey(internalKey);
284 final byte[] valueBytes = valueMarshalling.getMarshaller().marshallToBytes(requireNonNull(value));
285
286 final Future<Boolean> putOp =
287 putOperationForPolicy(policy, clientSupplier.get(), externalKey, expiryTime(ttlSeconds), valueBytes);
288
289 return putOp.get();
290 });
291 }
292
293 @Override
294 public CompletionStage<Boolean> removeIf(String internalKey, CasIdentifier casId) {
295 return perform(() -> {
296 final String externalKey = buildExternalKey(internalKey);
297 final Future<Boolean> delOp = clientSupplier.get().delete(externalKey, safeExtractId(casId));
298 return delOp.get();
299 });
300 }
301
302 @Override
303 public CompletionStage<Boolean> replaceIf(String internalKey, CasIdentifier casId, V newValue) {
304 return perform(() -> {
305 final String externalKey = buildExternalKey(internalKey);
306 final CASResponse casOp = clientSupplier.get().cas(
307 externalKey,
308 safeExtractId(casId),
309 expiryTime(ttlSeconds),
310 valueMarshalling.getMarshaller().marshallToBytes(requireNonNull(newValue)));
311 return casOp == CASResponse.OK;
312 });
313 }
314
315 @Override
316 public CompletionStage<Void> remove(Iterable<String> internalKeys) {
317
318 return perform(() -> {
319 if (isEmpty(internalKeys)) {
320 return null;
321 }
322
323
324 final List<Future<Boolean>> deleteOps =
325 StreamSupport.stream(internalKeys.spliterator(), false)
326 .map(this::buildExternalKey)
327 .map(k -> clientSupplier.get().delete(k))
328 .collect(Collectors.toList());
329
330
331 for (Future<Boolean> delOp : deleteOps) {
332 delOp.get();
333 }
334
335 return null;
336 });
337 }
338
339 @Override
340 public CompletionStage<Void> removeAll() {
341 return perform(() -> {
342 ensureCacheContext().updateCacheVersion(MemcachedUtils.cacheVersionIncrementer(clientSupplier));
343 return null;
344 });
345 }
346
347 @VisibleForTesting
348 void refreshCacheVersion() {
349
350 ensureCacheContext().updateCacheVersion(MemcachedUtils.cacheVersionSupplier(clientSupplier));
351 }
352
353 private String buildExternalKey(String internalKey) throws OperationTimeoutException {
354 final VersionedExternalCacheRequestContext cacheContext = ensureCacheContext();
355 return cacheContext.externalEntryKeyFor(internalKey);
356 }
357
358 protected VersionedExternalCacheRequestContext<V> ensureCacheContext() {
359 final RequestContext requestContext = contextSupplier.get();
360
361 return requestContext.computeIfAbsent(this, () -> {
362
363
364 log.trace("Cache {}: Setting up a new context", name);
365 return new VersionedExternalCacheRequestContext<>(
366 keyGenerator, name, requestContext::partitionIdentifier,
367 MemcachedUtils.cacheVersionSupplier(clientSupplier),
368 lockTimeout);
369 });
370 }
371
372 @Override
373 protected Logger getLogger() {
374 return log;
375 }
376
377 @Override
378 protected ExternalCacheException mapException(Exception ex) {
379 return MemcachedUtils.mapException(ex);
380 }
381 }