1 package com.atlassian.vcache.internal.legacy;
2
3 import com.atlassian.cache.Cache;
4 import com.atlassian.marshalling.api.MarshallingPair;
5 import com.atlassian.vcache.ExternalCacheException;
6 import com.atlassian.vcache.PutPolicy;
7 import com.atlassian.vcache.internal.MetricLabel;
8 import com.atlassian.vcache.internal.RequestContext;
9 import com.atlassian.vcache.internal.core.ExternalCacheKeyGenerator;
10 import com.atlassian.vcache.internal.core.cas.IdentifiedData;
11 import com.atlassian.vcache.internal.core.metrics.CacheType;
12 import com.atlassian.vcache.internal.core.metrics.MetricsRecorder;
13 import com.atlassian.vcache.internal.core.service.AbstractExternalCacheRequestContext;
14 import com.atlassian.vcache.internal.core.service.AbstractStableReadExternalCache;
15 import com.atlassian.vcache.internal.core.service.UnversionedExternalCacheRequestContext;
16 import org.slf4j.Logger;
17 import org.slf4j.LoggerFactory;
18
19 import java.util.Collections;
20 import java.util.Map;
21 import java.util.Optional;
22 import java.util.Set;
23 import java.util.concurrent.CompletableFuture;
24 import java.util.concurrent.CompletionStage;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.atomic.AtomicBoolean;
27 import java.util.function.Function;
28 import java.util.function.Supplier;
29 import java.util.stream.Collectors;
30
31 import static com.atlassian.vcache.internal.core.cas.IdentifiedUtils.marshall;
32 import static com.atlassian.vcache.internal.core.cas.IdentifiedUtils.unmarshall;
33 import static java.util.Objects.requireNonNull;
34
35
36
37
38
39
40
41 class LegacyStableReadExternalCache<V>
42 extends AbstractStableReadExternalCache<V> {
43 private static final Logger log = LoggerFactory.getLogger(LegacyStableReadExternalCache.class);
44
45 private final Cache<String, IdentifiedData> delegate;
46 private final Supplier<RequestContext> contextSupplier;
47 private final ExternalCacheKeyGenerator keyGenerator;
48 private final Optional<MarshallingPair<V>> valueMarshalling;
49 private final LegacyServiceSettings serviceSettings;
50
51 LegacyStableReadExternalCache(
52 Cache<String, IdentifiedData> delegate,
53 Supplier<RequestContext> contextSupplier,
54 ExternalCacheKeyGenerator keyGenerator,
55 Optional<MarshallingPair<V>> valueMarshalling,
56 LegacyServiceSettings serviceSettings,
57 MetricsRecorder metricsRecorder) {
58 super(delegate.getName(), metricsRecorder);
59 this.delegate = requireNonNull(delegate);
60 this.contextSupplier = requireNonNull(contextSupplier);
61 this.keyGenerator = requireNonNull(keyGenerator);
62 this.valueMarshalling = requireNonNull(valueMarshalling);
63 this.serviceSettings = requireNonNull(serviceSettings);
64 }
65
66 @Override
67 public CompletionStage<Boolean> internalPut(String internalKey, V value, PutPolicy policy) {
68 final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
69 return perform(
70 () -> {
71
72
73 final AtomicBoolean remotePutResult = new AtomicBoolean(true);
74
75
76
77
78
79 cacheContext.computeValue(internalKey, (key, oldValue) -> {
80 if (oldValue != null) {
81 return oldValue.thenApply(oldV -> {
82 remotePutResult.set(remotePut(cacheContext.externalEntryKeyFor(internalKey), value, policy));
83 return Optional.of(value);
84 });
85 } else {
86 remotePutResult.set(remotePut(cacheContext.externalEntryKeyFor(internalKey), value, policy));
87 return CompletableFuture.completedFuture(Optional.of(value));
88 }
89 });
90 return remotePutResult.get();
91 },
92 (result) -> {
93 if (!result) {
94 cacheContext.forgetValue(internalKey);
95 }
96 });
97 }
98
99 private Boolean remotePut(String externalKey, V value, PutPolicy policy) {
100 final IdentifiedData identifiedData = marshall(value, valueMarshalling);
101 return LegacyUtils.directPut(externalKey, identifiedData, policy, delegate, serviceSettings.isAvoidCasOps());
102 }
103
104 @Override
105 protected CompletionStage<Void> internalRemove(Iterable<String> internalKeys) {
106 return perform(() -> {
107
108 final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
109 for (String key : internalKeys) {
110 delegate.remove(cacheContext.externalEntryKeyFor(key));
111 cacheContext.forgetValue(key);
112 }
113
114 return null;
115 });
116 }
117
118 @Override
119 protected CompletionStage<Void> internalRemoveAll() {
120 return perform(() -> {
121 final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
122 delegate.removeAll();
123 cacheContext.forgetAllValues();
124 return null;
125 });
126 }
127
128 @Override
129 protected Logger getLogger() {
130 return log;
131 }
132
133 @Override
134 protected AbstractExternalCacheRequestContext<V> ensureCacheContext() {
135 final RequestContext requestContext = contextSupplier.get();
136
137 return requestContext.computeIfAbsent(this, () -> {
138 log.trace("Cache {}: Setting up a new context", delegate.getName());
139 return new UnversionedExternalCacheRequestContext<>(
140 keyGenerator, delegate.getName(), requestContext::partitionIdentifier);
141 });
142 }
143
144 @Override
145 protected V handleCreation(String internalKey, Supplier<V> supplier)
146 throws ExecutionException, InterruptedException {
147 final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
148 final V candidateValue = requireNonNull(supplier.get());
149 final IdentifiedData candidateIdentifiedData = marshall(candidateValue, valueMarshalling);
150 final String externalKey = cacheContext.externalEntryKeyFor(internalKey);
151
152 if (serviceSettings.isAvoidCasOps()) {
153 delegate.put(externalKey, candidateIdentifiedData);
154 } else {
155 final Optional<V> otherAddedValue =
156 unmarshall(delegate.putIfAbsent(externalKey, candidateIdentifiedData), valueMarshalling);
157
158 if (otherAddedValue.isPresent()) {
159 getLogger().info("Cache {}, unable to add candidate for key {}, use what was added", name, internalKey);
160 return otherAddedValue.get();
161 }
162 }
163 return candidateValue;
164 }
165
166 @Override
167 protected Map<String, V> handleCreation(Function<Set<String>, Map<String, V>> factory, Set<String> externalKeys)
168 throws ExecutionException, InterruptedException {
169
170
171 final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
172 metricsRecorder.record(name, CacheType.EXTERNAL, MetricLabel.NUMBER_OF_REMOTE_GET, 1);
173 final Map<String, Optional<V>> candidateValues = directGetBulk(externalKeys);
174
175 final Set<String> missingExternalKeys = candidateValues.entrySet().stream()
176 .filter(e -> !e.getValue().isPresent())
177 .map(Map.Entry::getKey)
178 .collect(Collectors.toSet());
179
180
181 final Map<String, V> grandResult = candidateValues.entrySet().stream()
182 .filter(e -> e.getValue().isPresent())
183 .collect(Collectors.toMap(
184 e -> cacheContext.internalEntryKeyFor(e.getKey()),
185 e -> e.getValue().get()));
186
187 if (!missingExternalKeys.isEmpty()) {
188 getLogger().trace("Cache {}: getBulk(Function): calling factory to create {} values",
189 name, missingExternalKeys.size());
190
191 final Set<String> missingInternalKeys = Collections.unmodifiableSet(
192 missingExternalKeys.stream().map(cacheContext::internalEntryKeyFor).collect(Collectors.toSet()));
193 final Map<String, V> missingValues = factory.apply(missingInternalKeys);
194 if (missingInternalKeys.size() != missingValues.size()) {
195 getLogger().warn("Cache {}: getBulk(Function): mismatch on generated values, expected ",
196 name, missingInternalKeys.size() + " but got " + missingValues.size());
197 throw new ExternalCacheException(ExternalCacheException.Reason.FUNCTION_INCORRECT_RESULT);
198 }
199
200
201 missingValues.entrySet().stream().forEach(e ->
202 delegate.put(
203 cacheContext.externalEntryKeyFor(e.getKey()),
204 marshall(e.getValue(), valueMarshalling)));
205
206 grandResult.putAll(missingValues);
207 }
208
209 return grandResult;
210 }
211
212 @Override
213 protected final ExternalCacheException mapException(Exception ex) {
214 return LegacyUtils.mapException(ex);
215 }
216
217 @Override
218 protected final Optional<V> directGet(String externalKey) {
219 return unmarshall(delegate.get(externalKey), valueMarshalling);
220 }
221
222 @Override
223 protected final Map<String, Optional<V>> directGetBulk(Set<String> externalKeys) {
224 return LegacyUtils.directGetBulk(externalKeys, delegate, valueMarshalling);
225 }
226 }