1 package com.atlassian.vcache.internal.guava;
2
3 import com.atlassian.marshalling.api.MarshallingPair;
4 import com.atlassian.vcache.CasIdentifier;
5 import com.atlassian.vcache.DirectExternalCache;
6 import com.atlassian.vcache.ExternalCacheException;
7 import com.atlassian.vcache.IdentifiedValue;
8 import com.atlassian.vcache.PutPolicy;
9 import com.atlassian.vcache.internal.RequestContext;
10 import com.atlassian.vcache.internal.core.ExternalCacheKeyGenerator;
11 import com.atlassian.vcache.internal.core.cas.IdentifiedData;
12 import com.atlassian.vcache.internal.core.service.AbstractExternalCache;
13 import com.atlassian.vcache.internal.core.service.AbstractExternalCacheRequestContext;
14 import com.atlassian.vcache.internal.core.service.UnversionedExternalCacheRequestContext;
15 import com.google.common.cache.Cache;
16 import org.slf4j.Logger;
17 import org.slf4j.LoggerFactory;
18
19 import java.util.HashMap;
20 import java.util.Map;
21 import java.util.Objects;
22 import java.util.Optional;
23 import java.util.Set;
24 import java.util.concurrent.CompletionStage;
25 import java.util.function.Function;
26 import java.util.function.Supplier;
27 import java.util.stream.Collectors;
28 import java.util.stream.StreamSupport;
29
30 import static com.atlassian.vcache.VCacheUtils.unsafeJoin;
31 import static com.atlassian.vcache.internal.core.VCacheCoreUtils.isEmpty;
32 import static com.atlassian.vcache.internal.core.cas.IdentifiedUtils.marshall;
33 import static com.atlassian.vcache.internal.core.cas.IdentifiedUtils.safeCast;
34 import static com.atlassian.vcache.internal.core.cas.IdentifiedUtils.unmarshall;
35 import static com.atlassian.vcache.internal.core.cas.IdentifiedUtils.unmarshallIdentified;
36 import static java.util.Objects.requireNonNull;
37
38
39
40
41
42
43
44 public class GuavaDirectExternalCache<V>
45 extends AbstractExternalCache<V>
46 implements DirectExternalCache<V> {
47 private static final Logger log = LoggerFactory.getLogger(GuavaDirectExternalCache.class);
48
49 private final Cache<String, IdentifiedData> delegate;
50 private final Supplier<RequestContext> contextSupplier;
51 private final ExternalCacheKeyGenerator keyGenerator;
52 private final Optional<MarshallingPair<V>> valueMarshalling;
53
54 public GuavaDirectExternalCache(
55 String name,
56 Cache<String, IdentifiedData> delegate,
57 Supplier<RequestContext> contextSupplier,
58 ExternalCacheKeyGenerator keyGenerator,
59 Optional<MarshallingPair<V>> valueMarshalling) {
60 super(name);
61 this.delegate = requireNonNull(delegate);
62 this.contextSupplier = requireNonNull(contextSupplier);
63 this.keyGenerator = requireNonNull(keyGenerator);
64 this.valueMarshalling = requireNonNull(valueMarshalling);
65 }
66
67 @Override
68 public CompletionStage<Optional<V>> get(String internalKey) {
69 return perform(() -> {
70 final String externalKey = buildExternalKey(internalKey);
71 final IdentifiedData identifiedData = delegate.getIfPresent(externalKey);
72 return unmarshall(identifiedData, valueMarshalling);
73 });
74 }
75
76 @Override
77 public CompletionStage<V> get(String internalKey, Supplier<V> supplier) {
78 return perform(() -> {
79 final String externalKey = buildExternalKey(internalKey);
80 final IdentifiedData identifiedData =
81 delegate.get(externalKey, () -> marshall(supplier.get(), valueMarshalling));
82 return unmarshall(identifiedData, valueMarshalling).get();
83 });
84 }
85
86 @Override
87 public CompletionStage<Optional<IdentifiedValue<V>>> getIdentified(String internalKey) {
88 return perform(() -> {
89 final String externalKey = buildExternalKey(internalKey);
90 return unmarshallIdentified(delegate.getIfPresent(externalKey), valueMarshalling);
91 });
92 }
93
94 @Override
95 public CompletionStage<IdentifiedValue<V>> getIdentified(String internalKey, Supplier<V> supplier) {
96 return perform(() -> {
97 final String externalKey = buildExternalKey(internalKey);
98 return unmarshallIdentified(
99 delegate.get(externalKey, () -> marshall(supplier.get(), valueMarshalling)),
100 valueMarshalling).get();
101 });
102 }
103
104 @Override
105 public CompletionStage<Map<String, Optional<V>>> getBulk(Iterable<String> internalKeys) {
106 return perform(() -> {
107 if (isEmpty(internalKeys)) {
108 return new HashMap<>();
109 }
110
111
112 final AbstractExternalCacheRequestContext cacheContext = ensureCacheContext();
113 return StreamSupport.stream(internalKeys.spliterator(), false)
114 .distinct()
115 .collect(Collectors.toMap(
116 Objects::requireNonNull,
117 k -> unmarshall(delegate.getIfPresent(cacheContext.externalEntryKeyFor(k)), valueMarshalling)));
118 });
119 }
120
121 @Override
122 public CompletionStage<Map<String, V>> getBulk(
123 Function<Set<String>, Map<String, V>> factory, Iterable<String> internalKeys) {
124 return perform(() -> {
125 if (isEmpty(internalKeys)) {
126 return new HashMap<>();
127 }
128
129
130 final AbstractExternalCacheRequestContext cacheContext = ensureCacheContext();
131 final Map<String, Optional<V>> existingValues = unsafeJoin(getBulk(internalKeys));
132
133
134 final Map<String, V> grandResult = existingValues.entrySet().stream()
135 .filter(e -> e.getValue().isPresent())
136 .collect(Collectors.toMap(
137 Map.Entry::getKey,
138 e -> e.getValue().get()));
139
140
141 if (grandResult.size() == existingValues.size()) {
142 return grandResult;
143 }
144
145
146 final Set<String> missingInternalKeys = existingValues.entrySet().stream()
147 .filter(e -> !e.getValue().isPresent())
148 .map(Map.Entry::getKey)
149 .collect(Collectors.toSet());
150
151 final Map<String, V> missingValues = factory.apply(missingInternalKeys);
152
153 missingValues.entrySet().forEach(e -> {
154 final Optional<V> existing = unmarshall(
155 delegate.asMap().putIfAbsent(
156 cacheContext.externalEntryKeyFor(e.getKey()),
157 marshall(e.getValue(), valueMarshalling)),
158 valueMarshalling);
159 grandResult.put(e.getKey(), existing.orElse(e.getValue()));
160 });
161
162 return grandResult;
163 });
164 }
165
166 @Override
167 public CompletionStage<Map<String, Optional<IdentifiedValue<V>>>> getBulkIdentified(Iterable<String> internalKeys) {
168 return perform(() -> {
169 if (isEmpty(internalKeys)) {
170 return new HashMap<>();
171 }
172
173
174 final AbstractExternalCacheRequestContext cacheContext = ensureCacheContext();
175 return StreamSupport.stream(internalKeys.spliterator(), false)
176 .distinct()
177 .collect(Collectors.toMap(
178 Objects::requireNonNull,
179 k -> unmarshallIdentified(delegate.getIfPresent(cacheContext.externalEntryKeyFor(k)), valueMarshalling)));
180 });
181 }
182
183
184 @Override
185 public CompletionStage<Boolean> put(String internalKey, V value, PutPolicy policy) {
186 return perform(() -> {
187 final AbstractExternalCacheRequestContext cacheContext = ensureCacheContext();
188 final String externalKey = cacheContext.externalEntryKeyFor(internalKey);
189 final IdentifiedData identifiedData = marshall(value, valueMarshalling);
190
191 return GuavaUtils.directPut(externalKey, identifiedData, policy, delegate);
192 });
193 }
194
195 @Override
196 public CompletionStage<Boolean> removeIf(String internalKey, CasIdentifier casId) {
197 return perform(() -> {
198 final AbstractExternalCacheRequestContext cacheContext = ensureCacheContext();
199 final String externalKey = cacheContext.externalEntryKeyFor(internalKey);
200 final IdentifiedData existingData = safeCast(casId);
201 return delegate.asMap().remove(externalKey, existingData);
202 });
203 }
204
205 @Override
206 public CompletionStage<Boolean> replaceIf(String internalKey, CasIdentifier casId, V newValue) {
207 return perform(() -> {
208 final AbstractExternalCacheRequestContext cacheContext = ensureCacheContext();
209 final String externalKey = cacheContext.externalEntryKeyFor(internalKey);
210 final IdentifiedData existingData = safeCast(casId);
211 final IdentifiedData newData = marshall(newValue, valueMarshalling);
212 return delegate.asMap().replace(externalKey, existingData, newData);
213 });
214 }
215
216 @Override
217 public CompletionStage<Void> remove(Iterable<String> internalKeys) {
218 return perform(() -> {
219 final AbstractExternalCacheRequestContext cacheContext = ensureCacheContext();
220
221 StreamSupport.stream(internalKeys.spliterator(), false)
222 .distinct()
223 .map(cacheContext::externalEntryKeyFor)
224 .forEach(k -> delegate.asMap().remove(k));
225 return null;
226 });
227 }
228
229 @Override
230 public CompletionStage<Void> removeAll() {
231 return perform(() -> {
232 delegate.asMap().clear();
233 return null;
234 });
235 }
236
237 protected AbstractExternalCacheRequestContext<V> ensureCacheContext() {
238 final RequestContext requestContext = contextSupplier.get();
239
240 return requestContext.computeIfAbsent(this, () -> {
241 log.trace("Cache {}: Setting up a new context", getName());
242 return new UnversionedExternalCacheRequestContext<>(
243 keyGenerator, getName(), requestContext::partitionIdentifier);
244 });
245 }
246
247 @Override
248 protected ExternalCacheException mapException(Exception ex) {
249 return GuavaUtils.mapException(ex);
250 }
251
252 @Override
253 protected Logger getLogger() {
254 return log;
255 }
256
257 private String buildExternalKey(String internalKey) {
258 final AbstractExternalCacheRequestContext cacheContext = ensureCacheContext();
259 return cacheContext.externalEntryKeyFor(internalKey);
260 }
261 }