1 package com.atlassian.vcache.internal.redis;
2
3 import com.atlassian.marshalling.api.MarshallingPair;
4 import com.atlassian.vcache.ExternalCacheException;
5 import com.atlassian.vcache.ExternalCacheSettings;
6 import com.atlassian.vcache.PutPolicy;
7 import com.atlassian.vcache.internal.MetricLabel;
8 import com.atlassian.vcache.internal.RequestContext;
9 import com.atlassian.vcache.internal.core.ExternalCacheKeyGenerator;
10 import com.atlassian.vcache.internal.core.VCacheCoreUtils;
11 import com.atlassian.vcache.internal.core.metrics.CacheType;
12 import com.atlassian.vcache.internal.core.metrics.MetricsRecorder;
13 import com.atlassian.vcache.internal.core.service.AbstractStableReadExternalCache;
14 import com.atlassian.vcache.internal.core.service.VersionedExternalCacheRequestContext;
15 import org.slf4j.Logger;
16 import org.slf4j.LoggerFactory;
17 import redis.clients.jedis.Jedis;
18
19 import java.time.Duration;
20 import java.util.List;
21 import java.util.Map;
22 import java.util.Optional;
23 import java.util.Set;
24 import java.util.concurrent.CompletionStage;
25 import java.util.concurrent.ExecutionException;
26 import java.util.function.Function;
27 import java.util.function.Supplier;
28 import java.util.stream.Collectors;
29 import java.util.stream.StreamSupport;
30
31 import static com.atlassian.vcache.internal.core.VCacheCoreUtils.isEmpty;
32 import static com.atlassian.vcache.internal.core.VCacheCoreUtils.unmarshall;
33 import static java.util.Objects.requireNonNull;
34
35
36
37
38
39
40
41 class RedisStableReadExternalCache<V>
42 extends AbstractStableReadExternalCache<V> {
43 private static final Logger log = LoggerFactory.getLogger(RedisStableReadExternalCache.class);
44
45 private final Supplier<Jedis> clientSupplier;
46 private final Supplier<RequestContext> contextSupplier;
47 private final ExternalCacheKeyGenerator keyGenerator;
48 private final MarshallingPair<V> valueMarshalling;
49 private final int defaultTtl;
50
51 RedisStableReadExternalCache(
52 Supplier<Jedis> clientSupplier,
53 Supplier<RequestContext> contextSupplier,
54 ExternalCacheKeyGenerator keyGenerator,
55 String name,
56 MarshallingPair<V> valueMarshalling,
57 ExternalCacheSettings settings,
58 MetricsRecorder metricsRecorder,
59 Duration lockTimeout) {
60 super(name, metricsRecorder, lockTimeout, (n, ex) -> {});
61 this.clientSupplier = requireNonNull(clientSupplier);
62 this.contextSupplier = requireNonNull(contextSupplier);
63 this.keyGenerator = requireNonNull(keyGenerator);
64 this.valueMarshalling = requireNonNull(valueMarshalling);
65
66 this.defaultTtl = VCacheCoreUtils.roundUpToSeconds(settings.getDefaultTtl().get());
67 }
68
69 @Override
70 public boolean internalPut(String internalKey, V value, PutPolicy policy) {
71 final String externalKey = ensureCacheContext().externalEntryKeyFor(internalKey);
72 final byte[] valueBytes = valueMarshalling.getMarshaller().marshallToBytes(value);
73 return RedisUtils.putOperationForPolicy(
74 policy, clientSupplier, externalKey, defaultTtl, valueBytes);
75 }
76
77 @Override
78 protected void internalRemove(Iterable<String> internalKeys) {
79 if (isEmpty(internalKeys)) {
80 return;
81 }
82
83
84 try (Jedis client = clientSupplier.get()) {
85 final VersionedExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
86 final List<byte[]> externalKeysList = StreamSupport.stream(internalKeys.spliterator(), false)
87 .map(cacheContext::externalEntryKeyFor)
88 .map(String::getBytes)
89 .collect(Collectors.toList());
90 final byte[][] externalKeysAsBytes = externalKeysList.toArray(new byte[externalKeysList.size()][]);
91
92 final long numDeleted = client.del(externalKeysAsBytes);
93 if (numDeleted != externalKeysAsBytes.length) {
94 log.info("Cache {}: only able to delete {} of {} keys", name, numDeleted, externalKeysAsBytes.length);
95 }
96
97
98 StreamSupport.stream(internalKeys.spliterator(), false)
99 .forEach(k -> cacheContext.recordValue(k, Optional.empty()));
100 }
101 }
102
103 @Override
104 protected void internalRemoveAll() {
105 ensureCacheContext().updateCacheVersion(RedisUtils.cacheVersionIncrementer(clientSupplier));
106 }
107
108 @Override
109 protected Logger getLogger() {
110 return log;
111 }
112
113 @Override
114 protected VersionedExternalCacheRequestContext<V> ensureCacheContext() {
115 final RequestContext requestContext = contextSupplier.get();
116
117 return requestContext.computeIfAbsent(this, () -> {
118
119
120 log.trace("Cache {}: Setting up a new context", name);
121 return new VersionedExternalCacheRequestContext<>(
122 keyGenerator, name, requestContext::partitionIdentifier,
123 RedisUtils.cacheVersionSupplier(clientSupplier, defaultTtl + 1),
124 lockTimeout);
125 });
126 }
127
128 @Override
129 protected V handleCreation(String internalKey, V candidateValue)
130 throws ExecutionException, InterruptedException {
131 final VersionedExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
132 final byte[] candidateValueBytes = valueMarshalling.getMarshaller().marshallToBytes(candidateValue);
133 final String externalKey = cacheContext.externalEntryKeyFor(internalKey);
134
135
136 try (Jedis client = clientSupplier.get()) {
137 for (; ; ) {
138 metricsRecorder.record(name, CacheType.EXTERNAL, MetricLabel.NUMBER_OF_REMOTE_GET, 1);
139 final long addOp = client.setnx(externalKey.getBytes(), candidateValueBytes);
140 if (addOp == 1) {
141
142
143 break;
144 }
145
146 getLogger().info("Cache {}, unable to add candidate for key {}, retrieve what was added", name, internalKey);
147 metricsRecorder.record(name, CacheType.EXTERNAL, MetricLabel.NUMBER_OF_REMOTE_GET, 1);
148 final Optional<V> otherAddedValue = unmarshall(client.get(externalKey.getBytes()), valueMarshalling);
149 if (otherAddedValue.isPresent()) {
150 return otherAddedValue.get();
151 }
152
153 getLogger().info("Cache {}, unable to retrieve recently added candidate for key {}, looping", name, internalKey);
154 }
155 }
156 return candidateValue;
157 }
158
159 @Override
160 public CompletionStage<Map<String, V>> getBulk(Function<Set<String>, Map<String, V>> factory, String... keys) {
161 return super.getBulk(factory, keys);
162 }
163
164 @Override
165 protected final ExternalCacheException mapException(Exception ex) {
166 return RedisUtils.mapException(ex);
167 }
168
169 @Override
170 protected final Optional<V> directGet(String externalKey) {
171 try (Jedis client = clientSupplier.get()) {
172 return unmarshall(client.get(externalKey.getBytes()), valueMarshalling);
173 }
174 }
175
176 @Override
177 protected final Map<String, Optional<V>> directGetBulk(Set<String> externalKeys) {
178 return RedisUtils.directGetBulk(externalKeys, clientSupplier, valueMarshalling);
179 }
180 }