1 package com.atlassian.vcache.internal.legacy;
2
3 import com.atlassian.cache.Cache;
4 import com.atlassian.marshalling.api.MarshallingPair;
5 import com.atlassian.vcache.CasIdentifier;
6 import com.atlassian.vcache.DirectExternalCache;
7 import com.atlassian.vcache.ExternalCacheException;
8 import com.atlassian.vcache.IdentifiedValue;
9 import com.atlassian.vcache.PutPolicy;
10 import com.atlassian.vcache.internal.RequestContext;
11 import com.atlassian.vcache.internal.core.ExternalCacheKeyGenerator;
12 import com.atlassian.vcache.internal.core.cas.IdentifiedData;
13 import com.atlassian.vcache.internal.core.service.AbstractExternalCache;
14 import com.atlassian.vcache.internal.core.service.AbstractExternalCacheRequestContext;
15 import com.atlassian.vcache.internal.core.service.UnversionedExternalCacheRequestContext;
16 import org.slf4j.Logger;
17 import org.slf4j.LoggerFactory;
18
19 import java.util.HashMap;
20 import java.util.Map;
21 import java.util.Objects;
22 import java.util.Optional;
23 import java.util.Set;
24 import java.util.concurrent.CompletionStage;
25 import java.util.function.Function;
26 import java.util.function.Supplier;
27 import java.util.stream.Collectors;
28 import java.util.stream.StreamSupport;
29
30 import static com.atlassian.vcache.VCacheUtils.unsafeJoin;
31 import static com.atlassian.vcache.internal.core.VCacheCoreUtils.isEmpty;
32 import static com.atlassian.vcache.internal.core.cas.IdentifiedUtils.marshall;
33 import static com.atlassian.vcache.internal.core.cas.IdentifiedUtils.safeCast;
34 import static com.atlassian.vcache.internal.core.cas.IdentifiedUtils.unmarshall;
35 import static com.atlassian.vcache.internal.core.cas.IdentifiedUtils.unmarshallIdentified;
36 import static java.util.Objects.requireNonNull;
37
38
39
40
41
42
43
44 class LegacyDirectExternalCache<V>
45 extends AbstractExternalCache<V>
46 implements DirectExternalCache<V> {
47 private static final Logger log = LoggerFactory.getLogger(LegacyDirectExternalCache.class);
48
49 private final Cache<String, IdentifiedData> delegate;
50 private final Supplier<RequestContext> contextSupplier;
51 private final ExternalCacheKeyGenerator keyGenerator;
52 private final Optional<MarshallingPair<V>> valueMarshalling;
53 private final LegacyServiceSettings serviceSettings;
54
55 LegacyDirectExternalCache(
56 Cache<String, IdentifiedData> delegate,
57 Supplier<RequestContext> contextSupplier,
58 ExternalCacheKeyGenerator keyGenerator,
59 Optional<MarshallingPair<V>> valueMarshalling,
60 LegacyServiceSettings serviceSettings) {
61 super(delegate.getName());
62 this.delegate = requireNonNull(delegate);
63 this.contextSupplier = requireNonNull(contextSupplier);
64 this.keyGenerator = requireNonNull(keyGenerator);
65 this.valueMarshalling = requireNonNull(valueMarshalling);
66 this.serviceSettings = requireNonNull(serviceSettings);
67 }
68
69 @Override
70 public CompletionStage<Optional<V>> get(String internalKey) {
71 return perform(() -> {
72 final String externalKey = buildExternalKey(internalKey);
73 final IdentifiedData identifiedData = delegate.get(externalKey);
74 return unmarshall(identifiedData, valueMarshalling);
75 });
76 }
77
78 @Override
79 public CompletionStage<V> get(String internalKey, Supplier<V> supplier) {
80 return perform(() -> {
81 final String externalKey = buildExternalKey(internalKey);
82 final IdentifiedData identifiedData =
83 delegate.get(externalKey, () -> marshall(supplier.get(), valueMarshalling));
84 return unmarshall(identifiedData, valueMarshalling).get();
85 });
86 }
87
88 @Override
89 public CompletionStage<Optional<IdentifiedValue<V>>> getIdentified(String internalKey) {
90 return perform(() -> {
91 verifyCasOpsSupported();
92 final String externalKey = buildExternalKey(internalKey);
93 return unmarshallIdentified(delegate.get(externalKey), valueMarshalling);
94 });
95 }
96
97 @Override
98 public CompletionStage<IdentifiedValue<V>> getIdentified(String internalKey, Supplier<V> supplier) {
99 return perform(() -> {
100 verifyCasOpsSupported();
101 final String externalKey = buildExternalKey(internalKey);
102 return unmarshallIdentified(
103 delegate.get(externalKey, () -> marshall(supplier.get(), valueMarshalling)),
104 valueMarshalling).get();
105 });
106 }
107
108 @Override
109 public CompletionStage<Map<String, Optional<V>>> getBulk(Iterable<String> internalKeys) {
110 return perform(() -> {
111 if (isEmpty(internalKeys)) {
112 return new HashMap<>();
113 }
114
115
116 final AbstractExternalCacheRequestContext cacheContext = ensureCacheContext();
117 return StreamSupport.stream(internalKeys.spliterator(), false)
118 .distinct()
119 .collect(Collectors.toMap(
120 Objects::requireNonNull,
121 k -> unmarshall(delegate.get(cacheContext.externalEntryKeyFor(k)), valueMarshalling)));
122 });
123 }
124
125 @Override
126 public CompletionStage<Map<String, V>> getBulk(
127 Function<Set<String>, Map<String, V>> factory, Iterable<String> internalKeys) {
128 return perform(() -> {
129 if (isEmpty(internalKeys)) {
130 return new HashMap<>();
131 }
132
133
134 final AbstractExternalCacheRequestContext cacheContext = ensureCacheContext();
135 final Map<String, Optional<V>> existingValues = unsafeJoin(getBulk(internalKeys));
136
137
138 final Map<String, V> grandResult = existingValues.entrySet().stream()
139 .filter(e -> e.getValue().isPresent())
140 .collect(Collectors.toMap(
141 Map.Entry::getKey,
142 e -> e.getValue().get()));
143
144
145 if (grandResult.size() == existingValues.size()) {
146 return grandResult;
147 }
148
149
150 final Set<String> missingInternalKeys = existingValues.entrySet().stream()
151 .filter(e -> !e.getValue().isPresent())
152 .map(Map.Entry::getKey)
153 .collect(Collectors.toSet());
154
155 final Map<String, V> missingValues = factory.apply(missingInternalKeys);
156
157 missingValues.entrySet().forEach(e -> {
158 if (serviceSettings.isAvoidCasOps()) {
159 delegate.put(cacheContext.externalEntryKeyFor(e.getKey()), marshall(e.getValue(), valueMarshalling));
160 grandResult.put(e.getKey(), e.getValue());
161 } else {
162 final Optional<V> existing = unmarshall(
163 delegate.putIfAbsent(
164 cacheContext.externalEntryKeyFor(e.getKey()),
165 marshall(e.getValue(), valueMarshalling)),
166 valueMarshalling);
167 grandResult.put(e.getKey(), existing.orElse(e.getValue()));
168 }
169 });
170
171 return grandResult;
172 });
173 }
174
175 @Override
176 public CompletionStage<Map<String, Optional<IdentifiedValue<V>>>> getBulkIdentified(Iterable<String> internalKeys) {
177 return perform(() -> {
178 verifyCasOpsSupported();
179
180 if (isEmpty(internalKeys)) {
181 return new HashMap<>();
182 }
183
184
185 final AbstractExternalCacheRequestContext cacheContext = ensureCacheContext();
186 return StreamSupport.stream(internalKeys.spliterator(), false)
187 .distinct()
188 .collect(Collectors.toMap(
189 Objects::requireNonNull,
190 k -> unmarshallIdentified(delegate.get(cacheContext.externalEntryKeyFor(k)), valueMarshalling)));
191 });
192 }
193
194
195 @Override
196 public CompletionStage<Boolean> put(String internalKey, V value, PutPolicy policy) {
197 return perform(() -> {
198 final AbstractExternalCacheRequestContext cacheContext = ensureCacheContext();
199 final String externalKey = cacheContext.externalEntryKeyFor(internalKey);
200 final IdentifiedData identifiedData = marshall(value, valueMarshalling);
201
202 return LegacyUtils.directPut(externalKey, identifiedData, policy, delegate, serviceSettings.isAvoidCasOps());
203 });
204 }
205
206 @Override
207 public CompletionStage<Boolean> removeIf(String internalKey, CasIdentifier casId) {
208 return perform(() -> {
209 verifyCasOpsSupported();
210 final AbstractExternalCacheRequestContext cacheContext = ensureCacheContext();
211 final String externalKey = cacheContext.externalEntryKeyFor(internalKey);
212 final IdentifiedData existingData = safeCast(casId);
213 return delegate.remove(externalKey, existingData);
214 });
215 }
216
217 @Override
218 public CompletionStage<Boolean> replaceIf(String internalKey, CasIdentifier casId, V newValue) {
219 return perform(() -> {
220 verifyCasOpsSupported();
221 final AbstractExternalCacheRequestContext cacheContext = ensureCacheContext();
222 final String externalKey = cacheContext.externalEntryKeyFor(internalKey);
223 final IdentifiedData existingData = safeCast(casId);
224 final IdentifiedData newData = marshall(newValue, valueMarshalling);
225 return delegate.replace(externalKey, existingData, newData);
226 });
227 }
228
229 @Override
230 public CompletionStage<Void> remove(Iterable<String> internalKeys) {
231 return perform(() -> {
232 final AbstractExternalCacheRequestContext cacheContext = ensureCacheContext();
233
234 StreamSupport.stream(internalKeys.spliterator(), false)
235 .distinct()
236 .map(cacheContext::externalEntryKeyFor)
237 .forEach(delegate::remove);
238 return null;
239 });
240 }
241
242 @Override
243 public CompletionStage<Void> removeAll() {
244 return perform(() -> {
245 delegate.removeAll();
246 return null;
247 });
248 }
249
250 protected AbstractExternalCacheRequestContext<V> ensureCacheContext() {
251 final RequestContext requestContext = contextSupplier.get();
252
253 return requestContext.computeIfAbsent(this, () -> {
254 log.trace("Cache {}: Setting up a new context", delegate.getName());
255 return new UnversionedExternalCacheRequestContext<>(
256 keyGenerator, delegate.getName(), requestContext::partitionIdentifier);
257 });
258 }
259
260 @Override
261 protected ExternalCacheException mapException(Exception ex) {
262 return LegacyUtils.mapException(ex);
263 }
264
265 @Override
266 protected Logger getLogger() {
267 return log;
268 }
269
270 private String buildExternalKey(String internalKey) {
271 final AbstractExternalCacheRequestContext cacheContext = ensureCacheContext();
272 return cacheContext.externalEntryKeyFor(internalKey);
273 }
274
275 private void verifyCasOpsSupported() {
276 if (serviceSettings.isAvoidCasOps()) {
277 throw new UnsupportedOperationException("CAS operations not supported in this configuration");
278 }
279 }
280 }