1 package com.atlassian.vcache.internal.memcached;
2
3 import java.util.Collections;
4 import java.util.HashMap;
5 import java.util.HashSet;
6 import java.util.List;
7 import java.util.Map;
8 import java.util.Optional;
9 import java.util.Set;
10 import java.util.concurrent.CompletableFuture;
11 import java.util.concurrent.Future;
12 import java.util.function.Function;
13 import java.util.function.Supplier;
14 import java.util.stream.Collectors;
15 import java.util.stream.StreamSupport;
16 import javax.annotation.Nonnull;
17
18 import com.atlassian.vcache.CasIdentifier;
19 import com.atlassian.vcache.DirectExternalCache;
20 import com.atlassian.vcache.ExternalCacheException;
21 import com.atlassian.vcache.ExternalCacheSettings;
22 import com.atlassian.vcache.IdentifiedValue;
23 import com.atlassian.vcache.Marshaller;
24 import com.atlassian.vcache.PutPolicy;
25 import com.atlassian.vcache.internal.RequestContext;
26 import com.atlassian.vcache.internal.core.DefaultIdentifiedValue;
27 import com.atlassian.vcache.internal.core.ExternalCacheKeyGenerator;
28 import com.atlassian.vcache.internal.core.VCacheUtils;
29 import com.atlassian.vcache.internal.core.service.AbstractExternalCache;
30
31 import com.google.common.annotations.VisibleForTesting;
32 import net.spy.memcached.CASResponse;
33 import net.spy.memcached.CASValue;
34 import net.spy.memcached.MemcachedClientIF;
35 import net.spy.memcached.OperationTimeoutException;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38
39 import static com.atlassian.vcache.internal.core.VCacheUtils.isEmpty;
40 import static com.atlassian.vcache.internal.core.VCacheUtils.marshall;
41 import static com.atlassian.vcache.internal.core.VCacheUtils.unmarshall;
42 import static com.atlassian.vcache.internal.memcached.MemcachedUtils.identifiedValueFrom;
43 import static com.atlassian.vcache.internal.memcached.MemcachedUtils.putOperationForPolicy;
44 import static com.atlassian.vcache.internal.memcached.MemcachedUtils.safeExtractId;
45 import static java.util.Objects.requireNonNull;
46
47
48
49
50
51
52
53
54 class MemcachedDirectExternalCache<V>
55 extends AbstractExternalCache<V>
56 implements DirectExternalCache<V>
57 {
58 private static final Logger log = LoggerFactory.getLogger(MemcachedDirectExternalCache.class);
59
60 private final Supplier<MemcachedClientIF> clientSupplier;
61 private final Supplier<RequestContext> contextSupplier;
62 private final ExternalCacheKeyGenerator keyGenerator;
63 private final Marshaller<V> valueMarshaller;
64 private final ExternalCacheSettings settings;
65 private final int defaultTtl;
66
67 MemcachedDirectExternalCache(
68 Supplier<MemcachedClientIF> clientSupplier,
69 Supplier<RequestContext> contextSupplier,
70 ExternalCacheKeyGenerator keyGenerator,
71 String name,
72 Marshaller<V> valueMarshaller,
73 ExternalCacheSettings settings)
74 {
75 super(name);
76 this.clientSupplier = requireNonNull(clientSupplier);
77 this.contextSupplier = requireNonNull(contextSupplier);
78 this.keyGenerator = requireNonNull(keyGenerator);
79 this.valueMarshaller = requireNonNull(valueMarshaller);
80 this.settings = requireNonNull(settings);
81 this.defaultTtl = VCacheUtils.roundUpToSeconds(settings.getDefaultTtl().get());
82 }
83
84 @Nonnull
85 @Override
86 public CompletableFuture<Optional<V>> get(String internalKey)
87 {
88 return perform(() -> {
89 final String externalKey = buildExternalKey(internalKey);
90 return unmarshall(clientSupplier.get().get(externalKey), valueMarshaller);
91 });
92 }
93
94 @Nonnull
95 @Override
96 public CompletableFuture<V> get(String internalKey, Supplier<V> supplier)
97 {
98 return perform(() -> {
99 final String externalKey = buildExternalKey(internalKey);
100 final Optional<V> existingValue = unmarshall(clientSupplier.get().get(externalKey), valueMarshaller);
101 if (existingValue.isPresent())
102 {
103 return existingValue.get();
104 }
105
106 log.trace("Cache {}, creating candidate for key {}", name, internalKey);
107 final V candidateValue = requireNonNull(supplier.get());
108 final byte[] candidateValueBytes = valueMarshaller.marshall(candidateValue);
109
110
111 for (; ; )
112 {
113 final Future<Boolean> addOp = clientSupplier.get().add(externalKey, defaultTtl, candidateValueBytes);
114 if (addOp.get())
115 {
116
117
118 break;
119 }
120
121 log.info("Cache {}, unable to add candidate for key {}, retrieve what was added", name, internalKey);
122 final Optional<V> otherAddedValue = unmarshall(clientSupplier.get().get(externalKey), valueMarshaller);
123 if (otherAddedValue.isPresent())
124 {
125 return otherAddedValue.get();
126 }
127
128 log.info("Cache {}, unable to retrieve recently added candidate for key {}, looping", name, internalKey);
129 }
130 return candidateValue;
131 });
132 }
133
134 @Nonnull
135 @Override
136 public CompletableFuture<Optional<IdentifiedValue<V>>> getIdentified(String internalKey)
137 {
138 return perform(() -> {
139 final String externalKey = buildExternalKey(internalKey);
140 final CASValue<Object> casValue = clientSupplier.get().gets(externalKey);
141 if (casValue == null)
142 {
143 return Optional.empty();
144 }
145
146 final CasIdentifier identifier = new MemcachedCasIdentifier(casValue.getCas());
147 final IdentifiedValue<V> iv = new DefaultIdentifiedValue<>(
148 identifier, valueMarshaller.unmarshall((byte[]) casValue.getValue()));
149 return Optional.of(iv);
150 });
151 }
152
153 @Nonnull
154 @Override
155 public CompletableFuture<Map<String, Optional<V>>> getBulk(Iterable<String> internalKeys)
156 {
157 return perform(() -> {
158 if (isEmpty(internalKeys))
159 {
160 return new HashMap<>();
161 }
162
163
164 final ExternalCacheRequestContext cacheContext = ensureCacheContext();
165
166 final Set<String> externalKeys = StreamSupport.stream(internalKeys.spliterator(), false)
167 .map(cacheContext::externalEntryKeyFor)
168 .collect(Collectors.toSet());
169
170
171 final Map<String, Object> haveValues = clientSupplier.get().getBulk(externalKeys);
172
173 return externalKeys.stream().collect(Collectors.toMap(
174 cacheContext::internalEntryKeyFor,
175 k -> unmarshall(haveValues.get(k), valueMarshaller)));
176 });
177 }
178
179 @Nonnull
180 @Override
181 public CompletableFuture<Map<String, V>> getBulk(Function<Set<String>, Map<String, V>> factory, Iterable<String> internalKeys)
182 {
183 return perform(() -> {
184 if (isEmpty(internalKeys))
185 {
186 return new HashMap<>();
187 }
188
189
190 final ExternalCacheRequestContext cacheContext = ensureCacheContext();
191
192 final Set<String> externalKeys = Collections.unmodifiableSet(
193 StreamSupport.stream(internalKeys.spliterator(), false)
194 .map(cacheContext::externalEntryKeyFor)
195 .collect(Collectors.toSet()));
196
197
198
199 final Map<String, Object> haveValues = clientSupplier.get().getBulk(externalKeys);
200 log.trace("{} of {} entries have values", haveValues.size(), externalKeys.size());
201 final Set<String> missingExternalKeys = new HashSet<>(externalKeys);
202 missingExternalKeys.removeAll(haveValues.keySet());
203
204
205 final Map<String, V> grandResult = haveValues.entrySet().stream().collect(Collectors.toMap(
206 e -> cacheContext.internalEntryKeyFor(e.getKey()),
207 e -> unmarshall(e.getValue(), valueMarshaller).get()
208 ));
209
210 if (!missingExternalKeys.isEmpty())
211 {
212
213 final Set<String> missingInternalKeys = Collections.unmodifiableSet(
214 missingExternalKeys.stream().map(cacheContext::internalEntryKeyFor).collect(Collectors.toSet()));
215 final Map<String, V> missingValues = factory.apply(missingInternalKeys);
216
217
218 final Map<String, Future<Boolean>> internalKeyToFutureMap = missingValues.entrySet().stream().collect(Collectors.toMap(
219 Map.Entry::getKey,
220 e -> clientSupplier.get().set(
221 cacheContext.externalEntryKeyFor(e.getKey()), defaultTtl, marshall(e.getValue(), valueMarshaller))
222 ));
223
224
225 for (Map.Entry<String, Future<Boolean>> e : internalKeyToFutureMap.entrySet())
226 {
227 e.getValue().get();
228 }
229
230 grandResult.putAll(missingValues);
231 }
232
233 return grandResult;
234 });
235 }
236
237 @Nonnull
238 @Override
239 public CompletableFuture<Map<String, Optional<IdentifiedValue<V>>>> getBulkIdentified(Iterable<String> internalKeys)
240 {
241 return perform(() -> {
242 if (isEmpty(internalKeys))
243 {
244 return new HashMap<>();
245 }
246
247
248 final ExternalCacheRequestContext cacheContext = ensureCacheContext();
249
250
251 final Map<String, Future<CASValue<Object>>> internalKeyToFuture =
252 StreamSupport.stream(internalKeys.spliterator(), false)
253 .distinct()
254 .collect(Collectors.toMap(
255 k -> k,
256 k -> clientSupplier.get().asyncGets(cacheContext.externalEntryKeyFor(k))
257 ));
258
259 return internalKeyToFuture.entrySet().stream().collect(Collectors.toMap(
260 Map.Entry::getKey,
261 e -> identifiedValueFrom(e.getValue(), valueMarshaller)
262 ));
263 });
264 }
265
266 @Nonnull
267 @Override
268 public CompletableFuture<Boolean> put(String internalKey, V value, PutPolicy policy)
269 {
270 return perform(() -> {
271 final String externalKey = buildExternalKey(internalKey);
272 final byte[] valueBytes = valueMarshaller.marshall(value);
273
274 final Future<Boolean> putOp =
275 putOperationForPolicy(policy, clientSupplier.get(), externalKey, defaultTtl, valueBytes);
276
277 return putOp.get();
278 });
279 }
280
281 @Nonnull
282 @Override
283 public CompletableFuture<Boolean> removeIf(String internalKey, CasIdentifier casId)
284 {
285 return perform(() -> {
286 final String partitionedKey = buildExternalKey(internalKey);
287 final Future<Boolean> delOp = clientSupplier.get().delete(partitionedKey, safeExtractId(casId));
288 return delOp.get();
289 });
290 }
291
292 @Nonnull
293 @Override
294 public CompletableFuture<Boolean> replaceIf(String internalKey, CasIdentifier casId, V newValue)
295 {
296 return perform(() -> {
297 final String externalKey = buildExternalKey(internalKey);
298 final CASResponse casOp = clientSupplier.get().cas(
299 externalKey, safeExtractId(casId), defaultTtl, valueMarshaller.marshall(newValue));
300 return casOp == CASResponse.OK;
301 });
302 }
303
304 @Nonnull
305 @Override
306 public CompletableFuture<Void> remove(Iterable<String> internalKeys)
307 {
308
309 return perform(() -> {
310 if (isEmpty(internalKeys))
311 {
312 return null;
313 }
314
315
316 final List<Future<Boolean>> deleteOps =
317 StreamSupport.stream(internalKeys.spliterator(), false)
318 .map(this::buildExternalKey)
319 .map(k -> clientSupplier.get().delete(k))
320 .collect(Collectors.toList());
321
322
323 for (Future<Boolean> delOp : deleteOps)
324 {
325 delOp.get();
326 }
327
328 return null;
329 });
330 }
331
332 @Nonnull
333 @Override
334 public CompletableFuture<Void> removeAll()
335 {
336 return perform(() -> {
337 final ExternalCacheRequestContext cacheContext = ensureCacheContext();
338 cacheContext.updateCacheVersion(
339 clientSupplier.get().incr(cacheContext.externalCacheVersionKey(), 1, 1));
340 return null;
341 });
342 }
343
344 @VisibleForTesting
345 void refreshCacheVersion()
346 {
347
348 final ExternalCacheRequestContext cacheContext = ensureCacheContext();
349 cacheContext.updateCacheVersion(
350 clientSupplier.get().incr(cacheContext.externalCacheVersionKey(), 0, 1));
351 }
352
353 private String buildExternalKey(String internalKey) throws OperationTimeoutException
354 {
355 final ExternalCacheRequestContext cacheContext = ensureCacheContext();
356 return cacheContext.externalEntryKeyFor(internalKey);
357 }
358
359 @Nonnull
360 protected ExternalCacheRequestContext ensureCacheContext() throws OperationTimeoutException
361 {
362 final RequestContext requestContext = contextSupplier.get();
363
364 return requestContext.computeIfAbsent(this, () -> {
365
366
367 log.trace("Cache {}: Setting up a new context", name);
368 final ExternalCacheRequestContext newCacheContext =
369 new ExternalCacheRequestContext(keyGenerator, name, requestContext::partitionIdentifier);
370 newCacheContext.updateCacheVersion(
371 clientSupplier.get().incr(newCacheContext.externalCacheVersionKey(), 0, 1));
372 return newCacheContext;
373 });
374 }
375
376 @Nonnull
377 @Override
378 protected Logger getLogger()
379 {
380 return log;
381 }
382
383 @Nonnull
384 @Override
385 protected ExternalCacheException mapException(Exception ex)
386 {
387 return MemcachedUtils.mapException(ex);
388 }
389 }