1 package com.atlassian.vcache.internal.guava;
2
3 import com.atlassian.marshalling.api.MarshallingPair;
4 import com.atlassian.vcache.CasIdentifier;
5 import com.atlassian.vcache.DirectExternalCache;
6 import com.atlassian.vcache.ExternalCacheException;
7 import com.atlassian.vcache.IdentifiedValue;
8 import com.atlassian.vcache.PutPolicy;
9 import com.atlassian.vcache.internal.RequestContext;
10 import com.atlassian.vcache.internal.core.ExternalCacheKeyGenerator;
11 import com.atlassian.vcache.internal.core.cas.IdentifiedData;
12 import com.atlassian.vcache.internal.core.service.AbstractExternalCache;
13 import com.atlassian.vcache.internal.core.service.AbstractExternalCacheRequestContext;
14 import com.atlassian.vcache.internal.core.service.FactoryUtils;
15 import com.atlassian.vcache.internal.core.service.UnversionedExternalCacheRequestContext;
16 import com.google.common.cache.Cache;
17 import org.slf4j.Logger;
18 import org.slf4j.LoggerFactory;
19
20 import java.time.Duration;
21 import java.util.HashMap;
22 import java.util.Map;
23 import java.util.Objects;
24 import java.util.Optional;
25 import java.util.Set;
26 import java.util.concurrent.CompletionStage;
27 import java.util.function.Function;
28 import java.util.function.Supplier;
29 import java.util.stream.Collectors;
30 import java.util.stream.StreamSupport;
31
32 import static com.atlassian.vcache.VCacheUtils.unsafeJoin;
33 import static com.atlassian.vcache.internal.core.VCacheCoreUtils.isEmpty;
34 import static com.atlassian.vcache.internal.core.cas.IdentifiedUtils.marshall;
35 import static com.atlassian.vcache.internal.core.cas.IdentifiedUtils.safeCast;
36 import static com.atlassian.vcache.internal.core.cas.IdentifiedUtils.unmarshall;
37 import static com.atlassian.vcache.internal.core.cas.IdentifiedUtils.unmarshallIdentified;
38 import static java.util.Objects.requireNonNull;
39
40
41
42
43
44
45
46 public class GuavaDirectExternalCache<V>
47 extends AbstractExternalCache<V>
48 implements DirectExternalCache<V> {
49 private static final Logger log = LoggerFactory.getLogger(GuavaDirectExternalCache.class);
50
51 private final Cache<String, IdentifiedData> delegate;
52 private final Supplier<RequestContext> contextSupplier;
53 private final ExternalCacheKeyGenerator keyGenerator;
54 private final Optional<MarshallingPair<V>> valueMarshalling;
55
56 public GuavaDirectExternalCache(
57 String name,
58 Cache<String, IdentifiedData> delegate,
59 Supplier<RequestContext> contextSupplier,
60 ExternalCacheKeyGenerator keyGenerator,
61 Optional<MarshallingPair<V>> valueMarshalling,
62 Duration lockTimeout) {
63 super(name, lockTimeout);
64 this.delegate = requireNonNull(delegate);
65 this.contextSupplier = requireNonNull(contextSupplier);
66 this.keyGenerator = requireNonNull(keyGenerator);
67 this.valueMarshalling = requireNonNull(valueMarshalling);
68 }
69
70 @Override
71 public CompletionStage<Optional<V>> get(String internalKey) {
72 return perform(() -> {
73 final String externalKey = buildExternalKey(internalKey);
74 final IdentifiedData identifiedData = delegate.getIfPresent(externalKey);
75 return unmarshall(identifiedData, valueMarshalling);
76 });
77 }
78
79 @Override
80 public CompletionStage<V> get(String internalKey, Supplier<V> supplier) {
81 return perform(() -> {
82 final String externalKey = buildExternalKey(internalKey);
83 final IdentifiedData identifiedData =
84 delegate.get(externalKey, () -> marshall(supplier.get(), valueMarshalling));
85 return unmarshall(identifiedData, valueMarshalling).get();
86 });
87 }
88
89 @Override
90 public CompletionStage<Optional<IdentifiedValue<V>>> getIdentified(String internalKey) {
91 return perform(() -> {
92 final String externalKey = buildExternalKey(internalKey);
93 return unmarshallIdentified(delegate.getIfPresent(externalKey), valueMarshalling);
94 });
95 }
96
97 @Override
98 public CompletionStage<IdentifiedValue<V>> getIdentified(String internalKey, Supplier<V> supplier) {
99 return perform(() -> {
100 final String externalKey = buildExternalKey(internalKey);
101 return unmarshallIdentified(
102 delegate.get(externalKey, () -> marshall(supplier.get(), valueMarshalling)),
103 valueMarshalling).get();
104 });
105 }
106
107 @Override
108 public CompletionStage<Map<String, Optional<V>>> getBulk(Iterable<String> internalKeys) {
109 return perform(() -> {
110 if (isEmpty(internalKeys)) {
111 return new HashMap<>();
112 }
113
114
115 final AbstractExternalCacheRequestContext cacheContext = ensureCacheContext();
116 return StreamSupport.stream(internalKeys.spliterator(), false)
117 .distinct()
118 .collect(Collectors.toMap(
119 Objects::requireNonNull,
120 k -> unmarshall(delegate.getIfPresent(cacheContext.externalEntryKeyFor(k)), valueMarshalling)));
121 });
122 }
123
124 @Override
125 public CompletionStage<Map<String, V>> getBulk(
126 Function<Set<String>, Map<String, V>> factory, Iterable<String> internalKeys) {
127 return perform(() -> {
128 if (isEmpty(internalKeys)) {
129 return new HashMap<>();
130 }
131
132
133 final AbstractExternalCacheRequestContext cacheContext = ensureCacheContext();
134 final Map<String, Optional<V>> existingValues = unsafeJoin(getBulk(internalKeys));
135
136
137 final Map<String, V> grandResult = existingValues.entrySet().stream()
138 .filter(e -> e.getValue().isPresent())
139 .collect(Collectors.toMap(
140 Map.Entry::getKey,
141 e -> e.getValue().get()));
142
143
144 if (grandResult.size() == existingValues.size()) {
145 return grandResult;
146 }
147
148
149 final Set<String> missingInternalKeys = existingValues.entrySet().stream()
150 .filter(e -> !e.getValue().isPresent())
151 .map(Map.Entry::getKey)
152 .collect(Collectors.toSet());
153
154 final Map<String, V> missingValues = factory.apply(missingInternalKeys);
155 FactoryUtils.verifyFactoryResult(missingValues, missingInternalKeys);
156
157
158 missingValues.entrySet().forEach(e -> {
159 final Optional<V> existing = unmarshall(
160 delegate.asMap().putIfAbsent(
161 cacheContext.externalEntryKeyFor(e.getKey()),
162 marshall(e.getValue(), valueMarshalling)),
163 valueMarshalling);
164 grandResult.put(e.getKey(), existing.orElse(e.getValue()));
165 });
166
167 return grandResult;
168 });
169 }
170
171 @Override
172 public CompletionStage<Map<String, Optional<IdentifiedValue<V>>>> getBulkIdentified(Iterable<String> internalKeys) {
173 return perform(() -> {
174 if (isEmpty(internalKeys)) {
175 return new HashMap<>();
176 }
177
178
179 final AbstractExternalCacheRequestContext cacheContext = ensureCacheContext();
180 return StreamSupport.stream(internalKeys.spliterator(), false)
181 .distinct()
182 .collect(Collectors.toMap(
183 Objects::requireNonNull,
184 k -> unmarshallIdentified(delegate.getIfPresent(cacheContext.externalEntryKeyFor(k)), valueMarshalling)));
185 });
186 }
187
188
189 @Override
190 public CompletionStage<Boolean> put(String internalKey, V value, PutPolicy policy) {
191 return perform(() -> {
192 final AbstractExternalCacheRequestContext cacheContext = ensureCacheContext();
193 final String externalKey = cacheContext.externalEntryKeyFor(internalKey);
194 final IdentifiedData identifiedData = marshall(value, valueMarshalling);
195
196 return GuavaUtils.directPut(externalKey, identifiedData, policy, delegate);
197 });
198 }
199
200 @Override
201 public CompletionStage<Boolean> removeIf(String internalKey, CasIdentifier casId) {
202 return perform(() -> {
203 final AbstractExternalCacheRequestContext cacheContext = ensureCacheContext();
204 final String externalKey = cacheContext.externalEntryKeyFor(internalKey);
205 final IdentifiedData existingData = safeCast(casId);
206 return delegate.asMap().remove(externalKey, existingData);
207 });
208 }
209
210 @Override
211 public CompletionStage<Boolean> replaceIf(String internalKey, CasIdentifier casId, V newValue) {
212 return perform(() -> {
213 final AbstractExternalCacheRequestContext cacheContext = ensureCacheContext();
214 final String externalKey = cacheContext.externalEntryKeyFor(internalKey);
215 final IdentifiedData existingData = safeCast(casId);
216 final IdentifiedData newData = marshall(newValue, valueMarshalling);
217 return delegate.asMap().replace(externalKey, existingData, newData);
218 });
219 }
220
221 @Override
222 public CompletionStage<Void> remove(Iterable<String> internalKeys) {
223 return perform(() -> {
224 final AbstractExternalCacheRequestContext cacheContext = ensureCacheContext();
225
226 StreamSupport.stream(internalKeys.spliterator(), false)
227 .distinct()
228 .map(cacheContext::externalEntryKeyFor)
229 .forEach(k -> delegate.asMap().remove(k));
230 return null;
231 });
232 }
233
234 @Override
235 public CompletionStage<Void> removeAll() {
236 return perform(() -> {
237 delegate.asMap().clear();
238 return null;
239 });
240 }
241
242 protected AbstractExternalCacheRequestContext<V> ensureCacheContext() {
243 final RequestContext requestContext = contextSupplier.get();
244
245 return requestContext.computeIfAbsent(this, () -> {
246 log.trace("Cache {}: Setting up a new context", getName());
247 return new UnversionedExternalCacheRequestContext<>(
248 keyGenerator, getName(), requestContext::partitionIdentifier, lockTimeout);
249 });
250 }
251
252 @Override
253 protected ExternalCacheException mapException(Exception ex) {
254 return GuavaUtils.mapException(ex);
255 }
256
257 @Override
258 protected Logger getLogger() {
259 return log;
260 }
261
262 private String buildExternalKey(String internalKey) {
263 final AbstractExternalCacheRequestContext cacheContext = ensureCacheContext();
264 return cacheContext.externalEntryKeyFor(internalKey);
265 }
266 }