1 package com.atlassian.vcache.internal.core.service;
2
3 import com.atlassian.vcache.PutPolicy;
4 import com.atlassian.vcache.StableReadExternalCache;
5 import com.atlassian.vcache.VCacheException;
6 import com.atlassian.vcache.internal.core.metrics.MetricsRecorder;
7 import org.slf4j.Logger;
8 import org.slf4j.LoggerFactory;
9
10 import java.util.Map;
11 import java.util.Optional;
12 import java.util.Set;
13 import java.util.concurrent.CompletionStage;
14 import java.util.concurrent.locks.ReentrantLock;
15 import java.util.function.Function;
16 import java.util.function.Supplier;
17 import java.util.stream.Collectors;
18 import java.util.stream.StreamSupport;
19
20
21
22
23
24
25
26
27
28
29
30 public abstract class AbstractStableReadExternalCache<V>
31 extends AbstractNonDirectExternalCache<V>
32 implements StableReadExternalCache<V> {
33
34 private static final Logger log = LoggerFactory.getLogger(AbstractStableReadExternalCache.class);
35
36 private final ReentrantLock cacheLock = new ReentrantLock();
37
38 protected AbstractStableReadExternalCache(String name, MetricsRecorder metricsRecorder) {
39 super(name, metricsRecorder);
40 }
41
42 protected final Map<String, Optional<V>> checkValuesRecorded(Iterable<String> internalKeys) {
43 final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
44
45 return StreamSupport.stream(internalKeys.spliterator(), false)
46 .filter(k -> cacheContext.getValueRecorded(k).isPresent())
47 .distinct()
48 .collect(Collectors.toMap(
49 k -> k,
50 k -> cacheContext.getValueRecorded(k).get())
51 );
52 }
53
54 @Override
55 public final CompletionStage<Optional<V>> get(String internalKey) {
56 return withCacheLock(() -> super.get(internalKey));
57 }
58
59 @Override
60 public final CompletionStage<V> get(String internalKey, Supplier<V> supplier) {
61 return withCacheLock(() -> super.get(internalKey, supplier));
62 }
63
64 @Override
65 public final CompletionStage<Map<String, Optional<V>>> getBulk(Iterable<String> internalKeys) {
66 return withCacheLock(() -> super.getBulk(internalKeys));
67 }
68
69 @Override
70 public final CompletionStage<Map<String, V>> getBulk(Function<Set<String>, Map<String, V>> factory, Iterable<String> internalKeys) {
71 return withCacheLock(() -> super.getBulk(factory, internalKeys));
72 }
73
74 @Override
75 public final CompletionStage<Boolean> put(final String internalKey, final V value, final PutPolicy policy) {
76 return withCacheLock(() -> internalPut(internalKey, value, policy));
77 }
78
79 protected abstract CompletionStage<Boolean> internalPut(String internalKey, V value, PutPolicy policy);
80
81 @Override
82 public final CompletionStage<Void> remove(final Iterable<String> keys) {
83 return withCacheLock(() -> internalRemove(keys));
84 }
85
86 protected abstract CompletionStage<Void> internalRemove(Iterable<String> keys);
87
88
89 @Override
90 public CompletionStage<Void> removeAll() {
91 return withCacheLock(this::internalRemoveAll);
92 }
93
94 protected abstract CompletionStage<Void> internalRemoveAll();
95
96 private <R> R withCacheLock(final Supplier<R> result) {
97 if (cacheLock.isHeldByCurrentThread()) {
98 log.error("Detected a recursive call to cache: {}", getName());
99 throw new VCacheException("Detected a recursive call to cache: " + getName());
100 }
101 cacheLock.lock();
102 try {
103 return result.get();
104 } finally {
105 cacheLock.unlock();
106 }
107 }
108 }