1 package com.atlassian.vcache.internal.legacy;
2
3 import com.atlassian.cache.Cache;
4 import com.atlassian.marshalling.api.MarshallingPair;
5 import com.atlassian.vcache.CasIdentifier;
6 import com.atlassian.vcache.DirectExternalCache;
7 import com.atlassian.vcache.ExternalCacheException;
8 import com.atlassian.vcache.IdentifiedValue;
9 import com.atlassian.vcache.PutPolicy;
10 import com.atlassian.vcache.internal.RequestContext;
11 import com.atlassian.vcache.internal.core.ExternalCacheKeyGenerator;
12 import com.atlassian.vcache.internal.core.cas.IdentifiedData;
13 import com.atlassian.vcache.internal.core.service.AbstractExternalCache;
14 import com.atlassian.vcache.internal.core.service.AbstractExternalCacheRequestContext;
15 import com.atlassian.vcache.internal.core.service.FactoryUtils;
16 import com.atlassian.vcache.internal.core.service.UnversionedExternalCacheRequestContext;
17 import org.slf4j.Logger;
18 import org.slf4j.LoggerFactory;
19
20 import java.util.HashMap;
21 import java.util.Map;
22 import java.util.Objects;
23 import java.util.Optional;
24 import java.util.Set;
25 import java.util.concurrent.CompletionStage;
26 import java.util.function.Function;
27 import java.util.function.Supplier;
28 import java.util.stream.Collectors;
29 import java.util.stream.StreamSupport;
30
31 import static com.atlassian.vcache.VCacheUtils.unsafeJoin;
32 import static com.atlassian.vcache.internal.core.VCacheCoreUtils.isEmpty;
33 import static com.atlassian.vcache.internal.core.cas.IdentifiedUtils.marshall;
34 import static com.atlassian.vcache.internal.core.cas.IdentifiedUtils.safeCast;
35 import static com.atlassian.vcache.internal.core.cas.IdentifiedUtils.unmarshall;
36 import static com.atlassian.vcache.internal.core.cas.IdentifiedUtils.unmarshallIdentified;
37 import static java.util.Objects.requireNonNull;
38
39
40
41
42
43
44
45 class LegacyDirectExternalCache<V>
46 extends AbstractExternalCache<V>
47 implements DirectExternalCache<V> {
48 private static final Logger log = LoggerFactory.getLogger(LegacyDirectExternalCache.class);
49
50 private final Cache<String, IdentifiedData> delegate;
51 private final Supplier<RequestContext> contextSupplier;
52 private final ExternalCacheKeyGenerator keyGenerator;
53 private final Optional<MarshallingPair<V>> valueMarshalling;
54 private final LegacyServiceSettings serviceSettings;
55
56 LegacyDirectExternalCache(
57 Cache<String, IdentifiedData> delegate,
58 Supplier<RequestContext> contextSupplier,
59 ExternalCacheKeyGenerator keyGenerator,
60 Optional<MarshallingPair<V>> valueMarshalling,
61 LegacyServiceSettings serviceSettings) {
62 super(delegate.getName(), serviceSettings.getLockTimeout());
63 this.delegate = requireNonNull(delegate);
64 this.contextSupplier = requireNonNull(contextSupplier);
65 this.keyGenerator = requireNonNull(keyGenerator);
66 this.valueMarshalling = requireNonNull(valueMarshalling);
67 this.serviceSettings = requireNonNull(serviceSettings);
68 }
69
70 @Override
71 public CompletionStage<Optional<V>> get(String internalKey) {
72 return perform(() -> {
73 final String externalKey = buildExternalKey(internalKey);
74 final IdentifiedData identifiedData = delegate.get(externalKey);
75 return unmarshall(identifiedData, valueMarshalling);
76 });
77 }
78
79 @Override
80 public CompletionStage<V> get(String internalKey, Supplier<V> supplier) {
81 return perform(() -> {
82 final String externalKey = buildExternalKey(internalKey);
83 final IdentifiedData identifiedData =
84 delegate.get(externalKey, () -> marshall(supplier.get(), valueMarshalling));
85 return unmarshall(identifiedData, valueMarshalling).get();
86 });
87 }
88
89 @Override
90 public CompletionStage<Optional<IdentifiedValue<V>>> getIdentified(String internalKey) {
91 return perform(() -> {
92 verifyCasOpsSupported();
93 final String externalKey = buildExternalKey(internalKey);
94 return unmarshallIdentified(delegate.get(externalKey), valueMarshalling);
95 });
96 }
97
98 @Override
99 public CompletionStage<IdentifiedValue<V>> getIdentified(String internalKey, Supplier<V> supplier) {
100 return perform(() -> {
101 verifyCasOpsSupported();
102 final String externalKey = buildExternalKey(internalKey);
103 return unmarshallIdentified(
104 delegate.get(externalKey, () -> marshall(supplier.get(), valueMarshalling)),
105 valueMarshalling).get();
106 });
107 }
108
109 @Override
110 public CompletionStage<Map<String, Optional<V>>> getBulk(Iterable<String> internalKeys) {
111 return perform(() -> {
112 if (isEmpty(internalKeys)) {
113 return new HashMap<>();
114 }
115
116
117 final AbstractExternalCacheRequestContext cacheContext = ensureCacheContext();
118 return StreamSupport.stream(internalKeys.spliterator(), false)
119 .distinct()
120 .collect(Collectors.toMap(
121 Objects::requireNonNull,
122 k -> unmarshall(delegate.get(cacheContext.externalEntryKeyFor(k)), valueMarshalling)));
123 });
124 }
125
126 @Override
127 public CompletionStage<Map<String, V>> getBulk(
128 Function<Set<String>, Map<String, V>> factory, Iterable<String> internalKeys) {
129 return perform(() -> {
130 if (isEmpty(internalKeys)) {
131 return new HashMap<>();
132 }
133
134
135 final AbstractExternalCacheRequestContext cacheContext = ensureCacheContext();
136 final Map<String, Optional<V>> existingValues = unsafeJoin(getBulk(internalKeys));
137
138
139 final Map<String, V> grandResult = existingValues.entrySet().stream()
140 .filter(e -> e.getValue().isPresent())
141 .collect(Collectors.toMap(
142 Map.Entry::getKey,
143 e -> e.getValue().get()));
144
145
146 if (grandResult.size() == existingValues.size()) {
147 return grandResult;
148 }
149
150
151 final Set<String> missingInternalKeys = existingValues.entrySet().stream()
152 .filter(e -> !e.getValue().isPresent())
153 .map(Map.Entry::getKey)
154 .collect(Collectors.toSet());
155
156 final Map<String, V> missingValues = factory.apply(missingInternalKeys);
157 FactoryUtils.verifyFactoryResult(missingValues, missingInternalKeys);
158
159 missingValues.entrySet().forEach(e -> {
160 if (serviceSettings.isAvoidCasOps()) {
161 delegate.put(cacheContext.externalEntryKeyFor(e.getKey()), marshall(e.getValue(), valueMarshalling));
162 grandResult.put(e.getKey(), e.getValue());
163 } else {
164 final Optional<V> existing = unmarshall(
165 delegate.putIfAbsent(
166 cacheContext.externalEntryKeyFor(e.getKey()),
167 marshall(e.getValue(), valueMarshalling)),
168 valueMarshalling);
169 grandResult.put(e.getKey(), existing.orElse(e.getValue()));
170 }
171 });
172
173 return grandResult;
174 });
175 }
176
177 @Override
178 public CompletionStage<Map<String, Optional<IdentifiedValue<V>>>> getBulkIdentified(Iterable<String> internalKeys) {
179 return perform(() -> {
180 verifyCasOpsSupported();
181
182 if (isEmpty(internalKeys)) {
183 return new HashMap<>();
184 }
185
186
187 final AbstractExternalCacheRequestContext cacheContext = ensureCacheContext();
188 return StreamSupport.stream(internalKeys.spliterator(), false)
189 .distinct()
190 .collect(Collectors.toMap(
191 Objects::requireNonNull,
192 k -> unmarshallIdentified(delegate.get(cacheContext.externalEntryKeyFor(k)), valueMarshalling)));
193 });
194 }
195
196
197 @Override
198 public CompletionStage<Boolean> put(String internalKey, V value, PutPolicy policy) {
199 return perform(() -> {
200 final AbstractExternalCacheRequestContext cacheContext = ensureCacheContext();
201 final String externalKey = cacheContext.externalEntryKeyFor(internalKey);
202 final IdentifiedData identifiedData = marshall(value, valueMarshalling);
203
204 return LegacyUtils.directPut(externalKey, identifiedData, policy, delegate, serviceSettings.isAvoidCasOps());
205 });
206 }
207
208 @Override
209 public CompletionStage<Boolean> removeIf(String internalKey, CasIdentifier casId) {
210 return perform(() -> {
211 verifyCasOpsSupported();
212 final AbstractExternalCacheRequestContext cacheContext = ensureCacheContext();
213 final String externalKey = cacheContext.externalEntryKeyFor(internalKey);
214 final IdentifiedData existingData = safeCast(casId);
215 return delegate.remove(externalKey, existingData);
216 });
217 }
218
219 @Override
220 public CompletionStage<Boolean> replaceIf(String internalKey, CasIdentifier casId, V newValue) {
221 return perform(() -> {
222 verifyCasOpsSupported();
223 final AbstractExternalCacheRequestContext cacheContext = ensureCacheContext();
224 final String externalKey = cacheContext.externalEntryKeyFor(internalKey);
225 final IdentifiedData existingData = safeCast(casId);
226 final IdentifiedData newData = marshall(newValue, valueMarshalling);
227 return delegate.replace(externalKey, existingData, newData);
228 });
229 }
230
231 @Override
232 public CompletionStage<Void> remove(Iterable<String> internalKeys) {
233 return perform(() -> {
234 final AbstractExternalCacheRequestContext cacheContext = ensureCacheContext();
235
236 StreamSupport.stream(internalKeys.spliterator(), false)
237 .distinct()
238 .map(cacheContext::externalEntryKeyFor)
239 .forEach(delegate::remove);
240 return null;
241 });
242 }
243
244 @Override
245 public CompletionStage<Void> removeAll() {
246 return perform(() -> {
247 delegate.removeAll();
248 return null;
249 });
250 }
251
252 protected AbstractExternalCacheRequestContext<V> ensureCacheContext() {
253 final RequestContext requestContext = contextSupplier.get();
254
255 return requestContext.computeIfAbsent(this, () -> {
256 log.trace("Cache {}: Setting up a new context", delegate.getName());
257 return new UnversionedExternalCacheRequestContext<>(
258 keyGenerator,
259 delegate.getName(),
260 requestContext::partitionIdentifier,
261 serviceSettings.getLockTimeout());
262 });
263 }
264
265 @Override
266 protected ExternalCacheException mapException(Exception ex) {
267 return LegacyUtils.mapException(ex);
268 }
269
270 @Override
271 protected Logger getLogger() {
272 return log;
273 }
274
275 private String buildExternalKey(String internalKey) {
276 final AbstractExternalCacheRequestContext cacheContext = ensureCacheContext();
277 return cacheContext.externalEntryKeyFor(internalKey);
278 }
279
280 private void verifyCasOpsSupported() {
281 if (serviceSettings.isAvoidCasOps()) {
282 throw new UnsupportedOperationException("CAS operations not supported in this configuration");
283 }
284 }
285 }