1 package com.atlassian.vcache.internal.redis;
2
3 import com.atlassian.marshalling.api.MarshallingPair;
4 import com.atlassian.vcache.ExternalCacheException;
5 import com.atlassian.vcache.ExternalCacheSettings;
6 import com.atlassian.vcache.PutPolicy;
7 import com.atlassian.vcache.internal.MetricLabel;
8 import com.atlassian.vcache.internal.RequestContext;
9 import com.atlassian.vcache.internal.core.ExternalCacheKeyGenerator;
10 import com.atlassian.vcache.internal.core.VCacheCoreUtils;
11 import com.atlassian.vcache.internal.core.metrics.CacheType;
12 import com.atlassian.vcache.internal.core.metrics.MetricsRecorder;
13 import com.atlassian.vcache.internal.core.service.AbstractStableReadExternalCache;
14 import com.atlassian.vcache.internal.core.service.VersionedExternalCacheRequestContext;
15 import org.slf4j.Logger;
16 import org.slf4j.LoggerFactory;
17 import redis.clients.jedis.Jedis;
18 import redis.clients.jedis.Pipeline;
19 import redis.clients.jedis.Response;
20
21 import java.util.Collections;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.Optional;
25 import java.util.Set;
26 import java.util.concurrent.CompletableFuture;
27 import java.util.concurrent.CompletionStage;
28 import java.util.concurrent.ExecutionException;
29 import java.util.concurrent.atomic.AtomicBoolean;
30 import java.util.function.Function;
31 import java.util.function.Supplier;
32 import java.util.stream.Collectors;
33 import java.util.stream.StreamSupport;
34
35 import static com.atlassian.vcache.internal.core.VCacheCoreUtils.isEmpty;
36 import static com.atlassian.vcache.internal.core.VCacheCoreUtils.marshall;
37 import static com.atlassian.vcache.internal.core.VCacheCoreUtils.unmarshall;
38 import static java.util.Objects.requireNonNull;
39
40
41
42
43
44
45
46 class RedisStableReadExternalCache<V>
47 extends AbstractStableReadExternalCache<V> {
48 private static final Logger log = LoggerFactory.getLogger(RedisStableReadExternalCache.class);
49
50 private final Supplier<Jedis> clientSupplier;
51 private final Supplier<RequestContext> contextSupplier;
52 private final ExternalCacheKeyGenerator keyGenerator;
53 private final MarshallingPair<V> valueMarshalling;
54 private final int defaultTtl;
55
56 RedisStableReadExternalCache(
57 Supplier<Jedis> clientSupplier,
58 Supplier<RequestContext> contextSupplier,
59 ExternalCacheKeyGenerator keyGenerator,
60 String name,
61 MarshallingPair<V> valueMarshalling,
62 ExternalCacheSettings settings,
63 MetricsRecorder metricsRecorder) {
64 super(name, metricsRecorder);
65 this.clientSupplier = requireNonNull(clientSupplier);
66 this.contextSupplier = requireNonNull(contextSupplier);
67 this.keyGenerator = requireNonNull(keyGenerator);
68 this.valueMarshalling = requireNonNull(valueMarshalling);
69 this.defaultTtl = VCacheCoreUtils.roundUpToSeconds(settings.getDefaultTtl().get());
70 }
71
72 @Override
73 public CompletionStage<Boolean> internalPut(String internalKey, V value, PutPolicy policy) {
74 final VersionedExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
75 return perform(
76 () -> {
77
78
79 final AtomicBoolean remotePutResult = new AtomicBoolean(true);
80 cacheContext.computeValue(internalKey, (key, oldValue) -> {
81
82
83
84
85
86 if (oldValue != null) {
87 return oldValue.thenApply((oldV) -> {
88 remotePutResult.set(remotePut(cacheContext.externalEntryKeyFor(key), value, policy));
89 return Optional.of(value);
90 });
91 } else {
92 remotePutResult.set(remotePut(cacheContext.externalEntryKeyFor(key), value, policy));
93 return CompletableFuture.completedFuture(Optional.of(value));
94 }
95 });
96 return remotePutResult.get();
97 },
98 (result) -> {
99 if (!result) {
100 cacheContext.forgetValue(internalKey);
101 }
102 });
103 }
104
105 private boolean remotePut(String externalKey, V value, PutPolicy policy) {
106 final byte[] valueBytes = valueMarshalling.getMarshaller().marshallToBytes(value);
107 return RedisUtils.putOperationForPolicy(
108 policy, clientSupplier, externalKey, defaultTtl, valueBytes);
109 }
110
111 @Override
112 protected CompletionStage<Void> internalRemove(Iterable<String> internalKeys) {
113
114 return perform(() -> {
115 if (isEmpty(internalKeys)) {
116 return null;
117 }
118
119
120 try (Jedis client = clientSupplier.get()) {
121 final VersionedExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
122 final List<byte[]> externalKeysList = StreamSupport.stream(internalKeys.spliterator(), false)
123 .map(cacheContext::externalEntryKeyFor)
124 .map(String::getBytes)
125 .collect(Collectors.toList());
126 final byte[][] externalKeysAsBytes = externalKeysList.toArray(new byte[externalKeysList.size()][]);
127
128 final long numDeleted = client.del(externalKeysAsBytes);
129 if (numDeleted != externalKeysAsBytes.length) {
130 log.info("Cache {}: only able to delete {} of {} keys", name, numDeleted, externalKeysAsBytes.length);
131 }
132
133
134 StreamSupport.stream(internalKeys.spliterator(), false)
135 .forEach(k -> cacheContext.recordValue(k, Optional.empty()));
136 }
137
138 return null;
139 });
140 }
141
142 @Override
143 protected CompletionStage<Void> internalRemoveAll() {
144 return perform(() -> {
145 final VersionedExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
146 cacheContext.updateCacheVersion(
147 RedisUtils.incrementCacheVersion(clientSupplier, cacheContext.externalCacheVersionKey()));
148 cacheContext.forgetAllValues();
149 return null;
150 });
151 }
152
153 @Override
154 protected Logger getLogger() {
155 return log;
156 }
157
158 @Override
159 protected VersionedExternalCacheRequestContext<V> ensureCacheContext() {
160 final RequestContext requestContext = contextSupplier.get();
161
162 return requestContext.computeIfAbsent(this, () -> {
163
164
165 log.trace("Cache {}: Setting up a new context", name);
166 final VersionedExternalCacheRequestContext<V> newCacheContext = new VersionedExternalCacheRequestContext<>(
167 keyGenerator, name, requestContext::partitionIdentifier);
168 newCacheContext.updateCacheVersion(
169 RedisUtils.obtainCacheVersion(
170 clientSupplier,
171 newCacheContext.externalCacheVersionKey(),
172 defaultTtl + 1));
173 return newCacheContext;
174 });
175 }
176
177 @Override
178 protected V handleCreation(String internalKey, Supplier<V> supplier)
179 throws ExecutionException, InterruptedException {
180 final VersionedExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
181 final V candidateValue = requireNonNull(supplier.get());
182 final byte[] candidateValueBytes = valueMarshalling.getMarshaller().marshallToBytes(candidateValue);
183 final String externalKey = cacheContext.externalEntryKeyFor(internalKey);
184
185
186 try (Jedis client = clientSupplier.get()) {
187 for (; ; ) {
188 final long addOp = client.setnx(externalKey.getBytes(), candidateValueBytes);
189 if (addOp == 1) {
190
191
192 break;
193 }
194
195 getLogger().info("Cache {}, unable to add candidate for key {}, retrieve what was added", name, internalKey);
196 final Optional<V> otherAddedValue = unmarshall(client.get(externalKey.getBytes()), valueMarshalling);
197 if (otherAddedValue.isPresent()) {
198 return otherAddedValue.get();
199 }
200
201 getLogger().info("Cache {}, unable to retrieve recently added candidate for key {}, looping", name, internalKey);
202 }
203 }
204 return candidateValue;
205 }
206
207 @Override
208 protected Map<String, V> handleCreation(Function<Set<String>, Map<String, V>> factory, Set<String> externalKeys)
209 throws ExecutionException, InterruptedException {
210
211 final VersionedExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
212 metricsRecorder.record(name, CacheType.EXTERNAL, MetricLabel.NUMBER_OF_REMOTE_GET, 1);
213 final Map<String, Optional<V>> candidateValues =
214 RedisUtils.directGetBulk(externalKeys, clientSupplier, valueMarshalling);
215
216
217 final Map<String, V> grandResult = candidateValues.entrySet().stream()
218 .filter(e -> e.getValue().isPresent())
219 .collect(Collectors.toMap(
220 e -> cacheContext.internalEntryKeyFor(e.getKey()),
221 e -> e.getValue().get()));
222 getLogger().trace("Cache {}: getBulk(Function): {} of {} entries have values",
223 name, grandResult.size(), externalKeys.size());
224
225
226 final List<String> missingExternalKeys = candidateValues.entrySet().stream()
227 .filter(e -> !e.getValue().isPresent())
228 .map(Map.Entry::getKey)
229 .collect(Collectors.toList());
230
231 if (!missingExternalKeys.isEmpty()) {
232 getLogger().trace("Cache {}: getBulk(Function): calling factory to create {} values",
233 name, missingExternalKeys.size());
234
235
236 final Set<String> missingInternalKeys = Collections.unmodifiableSet(
237 missingExternalKeys.stream().map(cacheContext::internalEntryKeyFor).collect(Collectors.toSet()));
238 final Map<String, V> missingValues = factory.apply(missingInternalKeys);
239 if (missingInternalKeys.size() != missingValues.size()) {
240 getLogger().warn("Cache {}: getBulk(Function): mismatch on generated values, expected ",
241 name, missingInternalKeys.size() + " but got " + missingValues.size());
242 throw new ExternalCacheException(ExternalCacheException.Reason.FUNCTION_INCORRECT_RESULT);
243 }
244
245
246 try (Jedis client = clientSupplier.get()) {
247 final Pipeline pipeline = client.pipelined();
248 final Map<String, Response<String>> internalKeyToResponseMap =
249 missingValues.entrySet().stream().collect(Collectors.toMap(
250 Map.Entry::getKey,
251 e -> pipeline.setex(
252 cacheContext.externalEntryKeyFor(e.getKey()).getBytes(),
253 defaultTtl,
254 marshall(e.getValue(), valueMarshalling))
255 ));
256 pipeline.sync();
257
258 internalKeyToResponseMap.entrySet().stream()
259 .filter(e -> !RedisUtils.OK.equals(e.getValue().get()))
260 .forEach(e -> log.warn("Cache {}: Unable to set key {}", name, e.getKey()));
261 }
262
263 grandResult.putAll(missingValues);
264 }
265
266 return grandResult;
267 }
268
269 @Override
270 public CompletionStage<Map<String, Optional<V>>> getBulk(String... keys) {
271 return super.getBulk(keys);
272 }
273
274 @Override
275 public CompletionStage<Map<String, V>> getBulk(Function<Set<String>, Map<String, V>> factory, String... keys) {
276 return super.getBulk(factory, keys);
277 }
278
279 @Override
280 protected final ExternalCacheException mapException(Exception ex) {
281 return RedisUtils.mapException(ex);
282 }
283
284 @Override
285 protected final Optional<V> directGet(String externalKey) {
286 try (Jedis client = clientSupplier.get()) {
287 return unmarshall(client.get(externalKey.getBytes()), valueMarshalling);
288 }
289 }
290
291 @Override
292 protected final Map<String, Optional<V>> directGetBulk(Set<String> externalKeys) {
293 return RedisUtils.directGetBulk(externalKeys, clientSupplier, valueMarshalling);
294 }
295 }