1 package com.atlassian.vcache.internal.guava;
2
3 import com.atlassian.marshalling.api.MarshallingPair;
4 import com.atlassian.vcache.ExternalCacheException;
5 import com.atlassian.vcache.PutPolicy;
6 import com.atlassian.vcache.internal.MetricLabel;
7 import com.atlassian.vcache.internal.RequestContext;
8 import com.atlassian.vcache.internal.core.ExternalCacheKeyGenerator;
9 import com.atlassian.vcache.internal.core.cas.IdentifiedData;
10 import com.atlassian.vcache.internal.core.metrics.CacheType;
11 import com.atlassian.vcache.internal.core.metrics.MetricsRecorder;
12 import com.atlassian.vcache.internal.core.service.AbstractExternalCacheRequestContext;
13 import com.atlassian.vcache.internal.core.service.AbstractStableReadExternalCache;
14 import com.atlassian.vcache.internal.core.service.UnversionedExternalCacheRequestContext;
15 import com.google.common.cache.Cache;
16 import org.slf4j.Logger;
17 import org.slf4j.LoggerFactory;
18
19 import java.util.Collections;
20 import java.util.Map;
21 import java.util.Optional;
22 import java.util.Set;
23 import java.util.concurrent.CompletableFuture;
24 import java.util.concurrent.CompletionStage;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.atomic.AtomicBoolean;
27 import java.util.function.Function;
28 import java.util.function.Supplier;
29 import java.util.stream.Collectors;
30
31 import static com.atlassian.vcache.internal.core.cas.IdentifiedUtils.marshall;
32 import static com.atlassian.vcache.internal.core.cas.IdentifiedUtils.unmarshall;
33 import static java.util.Objects.requireNonNull;
34
35
36
37
38
39
40
41 public class GuavaStableReadExternalCache<V>
42 extends AbstractStableReadExternalCache<V> {
43 private static final Logger log = LoggerFactory.getLogger(GuavaStableReadExternalCache.class);
44
45 private final Cache<String, IdentifiedData> delegate;
46 private final Supplier<RequestContext> contextSupplier;
47 private final ExternalCacheKeyGenerator keyGenerator;
48 private final Optional<MarshallingPair<V>> valueMarshalling;
49
50 public GuavaStableReadExternalCache(
51 String name,
52 Cache<String, IdentifiedData> delegate,
53 Supplier<RequestContext> contextSupplier,
54 ExternalCacheKeyGenerator keyGenerator,
55 Optional<MarshallingPair<V>> valueMarshalling,
56 MetricsRecorder metricsRecorder) {
57 super(name, metricsRecorder);
58 this.contextSupplier = requireNonNull(contextSupplier);
59 this.keyGenerator = requireNonNull(keyGenerator);
60 this.valueMarshalling = requireNonNull(valueMarshalling);
61 this.delegate = requireNonNull(delegate);
62 }
63
64 @Override
65 protected CompletionStage<Boolean> internalPut(String internalKey, V value, PutPolicy policy) {
66 final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
67 return perform(
68 () -> {
69
70
71 final AtomicBoolean remotePutResult = new AtomicBoolean(true);
72
73
74
75
76
77 cacheContext.computeValue(internalKey, (k, oldValue) -> {
78 if (oldValue != null) {
79 return oldValue.thenApply(oldV -> {
80 remotePutResult.set(remotePut(cacheContext.externalEntryKeyFor(internalKey), value, policy));
81 return Optional.of(value);
82 });
83 } else {
84 remotePutResult.set(remotePut(cacheContext.externalEntryKeyFor(internalKey), value, policy));
85 return CompletableFuture.completedFuture(Optional.of(value));
86 }
87 });
88 return remotePutResult.get();
89 },
90 (result) -> {
91 if (!result) {
92 cacheContext.forgetValue(internalKey);
93 }
94 });
95 }
96
97 @Override
98 protected CompletionStage<Void> internalRemove(Iterable<String> internalKeys) {
99 return perform(() -> {
100
101 final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
102 for (String key : internalKeys) {
103 delegate.asMap().remove(cacheContext.externalEntryKeyFor(key));
104 cacheContext.forgetValue(key);
105 }
106 return null;
107 });
108 }
109
110 @Override
111 protected CompletionStage<Void> internalRemoveAll() {
112 return perform(() -> {
113 final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
114 delegate.asMap().clear();
115 cacheContext.forgetAllValues();
116 return null;
117 });
118 }
119
120 @Override
121 protected Logger getLogger() {
122 return log;
123 }
124
125 @Override
126 protected AbstractExternalCacheRequestContext<V> ensureCacheContext() {
127 final RequestContext requestContext = contextSupplier.get();
128
129 return requestContext.computeIfAbsent(this, () -> {
130 log.trace("Cache {}: Setting up a new context", getName());
131 return new UnversionedExternalCacheRequestContext<>(
132 keyGenerator, getName(), requestContext::partitionIdentifier);
133 });
134 }
135
136 @Override
137 protected V handleCreation(String internalKey, Supplier<V> supplier)
138 throws ExecutionException, InterruptedException {
139 final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
140 final V candidateValue = requireNonNull(supplier.get());
141 final IdentifiedData candidateIdentifiedData = marshall(candidateValue, valueMarshalling);
142
143 final String externalKey = cacheContext.externalEntryKeyFor(internalKey);
144 final Optional<V> otherAddedValue =
145 unmarshall(delegate.asMap().putIfAbsent(externalKey, candidateIdentifiedData), valueMarshalling);
146
147 if (otherAddedValue.isPresent()) {
148 getLogger().info("Cache {}, unable to add candidate for key {}, use what was added", name, internalKey);
149 return otherAddedValue.get();
150 }
151
152 return candidateValue;
153 }
154
155 @Override
156 protected Map<String, V> handleCreation(Function<Set<String>, Map<String, V>> factory, Set<String> externalKeys)
157 throws ExecutionException, InterruptedException {
158
159
160 final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
161 metricsRecorder.record(name, CacheType.EXTERNAL, MetricLabel.NUMBER_OF_REMOTE_GET, 1);
162
163 final Map<String, Optional<V>> candidateValues = directGetBulk(externalKeys);
164
165 final Set<String> missingExternalKeys = candidateValues.entrySet().stream()
166 .filter(e -> !e.getValue().isPresent())
167 .map(Map.Entry::getKey)
168 .collect(Collectors.toSet());
169
170
171 final Map<String, V> grandResult = candidateValues.entrySet().stream()
172 .filter(e -> e.getValue().isPresent())
173 .collect(Collectors.toMap(
174 e -> cacheContext.internalEntryKeyFor(e.getKey()),
175 e -> e.getValue().get()));
176
177 if (!missingExternalKeys.isEmpty()) {
178 getLogger().trace("Cache {}: getBulk(Function): calling factory to create {} values",
179 name, missingExternalKeys.size());
180
181 final Set<String> missingInternalKeys = Collections.unmodifiableSet(
182 missingExternalKeys.stream().map(cacheContext::internalEntryKeyFor).collect(Collectors.toSet()));
183 final Map<String, V> missingValues = factory.apply(missingInternalKeys);
184 if (missingInternalKeys.size() != missingValues.size()) {
185 getLogger().warn("Cache {}: getBulk(Function): mismatch on generated values, expected ",
186 name, missingInternalKeys.size() + " but got " + missingValues.size());
187 throw new ExternalCacheException(ExternalCacheException.Reason.FUNCTION_INCORRECT_RESULT);
188 }
189
190
191 missingValues.entrySet().stream().forEach(e ->
192 delegate.put(
193 cacheContext.externalEntryKeyFor(e.getKey()),
194 marshall(e.getValue(), valueMarshalling)));
195
196 grandResult.putAll(missingValues);
197 }
198 return grandResult;
199 }
200
201 @Override
202 protected final ExternalCacheException mapException(Exception ex) {
203 return GuavaUtils.mapException(ex);
204 }
205
206 @Override
207 protected final Optional<V> directGet(String externalKey) {
208 return unmarshall(delegate.getIfPresent(externalKey), valueMarshalling);
209 }
210
211 @Override
212 protected final Map<String, Optional<V>> directGetBulk(Set<String> externalKeys) {
213 return GuavaUtils.directGetBulk(externalKeys, delegate, valueMarshalling);
214 }
215
216 private boolean remotePut(String externalKey, V value, PutPolicy policy) {
217 final IdentifiedData identifiedData = marshall(value, valueMarshalling);
218 return GuavaUtils.directPut(externalKey, identifiedData, policy, delegate);
219 }
220 }