1 package com.atlassian.vcache.internal.redis;
2
3 import com.atlassian.marshalling.api.MarshallingPair;
4 import com.atlassian.vcache.ExternalCacheException;
5 import com.atlassian.vcache.ExternalCacheSettings;
6 import com.atlassian.vcache.PutPolicy;
7 import com.atlassian.vcache.internal.MetricLabel;
8 import com.atlassian.vcache.internal.RequestContext;
9 import com.atlassian.vcache.internal.core.ExternalCacheKeyGenerator;
10 import com.atlassian.vcache.internal.core.VCacheCoreUtils;
11 import com.atlassian.vcache.internal.core.metrics.CacheType;
12 import com.atlassian.vcache.internal.core.metrics.MetricsRecorder;
13 import com.atlassian.vcache.internal.core.service.AbstractStableReadExternalCache;
14 import com.atlassian.vcache.internal.core.service.VersionedExternalCacheRequestContext;
15 import org.slf4j.Logger;
16 import org.slf4j.LoggerFactory;
17 import redis.clients.jedis.Jedis;
18
19 import java.time.Duration;
20 import java.util.List;
21 import java.util.Map;
22 import java.util.Optional;
23 import java.util.Set;
24 import java.util.concurrent.CompletionStage;
25 import java.util.concurrent.ExecutionException;
26 import java.util.function.Function;
27 import java.util.function.Supplier;
28 import java.util.stream.Collectors;
29 import java.util.stream.StreamSupport;
30
31 import static com.atlassian.vcache.internal.core.VCacheCoreUtils.isEmpty;
32 import static com.atlassian.vcache.internal.core.VCacheCoreUtils.unmarshall;
33 import static java.util.Objects.requireNonNull;
34
35
36
37
38
39
40
41 class RedisStableReadExternalCache<V>
42 extends AbstractStableReadExternalCache<V> {
43 private static final Logger log = LoggerFactory.getLogger(RedisStableReadExternalCache.class);
44
45 private final Supplier<Jedis> clientSupplier;
46 private final Supplier<RequestContext> contextSupplier;
47 private final ExternalCacheKeyGenerator keyGenerator;
48 private final MarshallingPair<V> valueMarshalling;
49 private final int defaultTtl;
50
51 RedisStableReadExternalCache(
52 Supplier<Jedis> clientSupplier,
53 Supplier<RequestContext> contextSupplier,
54 ExternalCacheKeyGenerator keyGenerator,
55 String name,
56 MarshallingPair<V> valueMarshalling,
57 ExternalCacheSettings settings,
58 MetricsRecorder metricsRecorder,
59 Duration lockTimeout) {
60 super(name, metricsRecorder, lockTimeout);
61 this.clientSupplier = requireNonNull(clientSupplier);
62 this.contextSupplier = requireNonNull(contextSupplier);
63 this.keyGenerator = requireNonNull(keyGenerator);
64 this.valueMarshalling = requireNonNull(valueMarshalling);
65
66 this.defaultTtl = VCacheCoreUtils.roundUpToSeconds(settings.getDefaultTtl().get());
67 }
68
69 @Override
70 public boolean internalPut(String internalKey, V value, PutPolicy policy) {
71 final String externalKey = ensureCacheContext().externalEntryKeyFor(internalKey);
72 final byte[] valueBytes = valueMarshalling.getMarshaller().marshallToBytes(value);
73 return RedisUtils.putOperationForPolicy(
74 policy, clientSupplier, externalKey, defaultTtl, valueBytes);
75 }
76
77 @Override
78 protected void internalRemove(Iterable<String> internalKeys) {
79 if (isEmpty(internalKeys)) {
80 return;
81 }
82
83
84 try (Jedis client = clientSupplier.get()) {
85 final VersionedExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
86 final List<byte[]> externalKeysList = StreamSupport.stream(internalKeys.spliterator(), false)
87 .map(cacheContext::externalEntryKeyFor)
88 .map(String::getBytes)
89 .collect(Collectors.toList());
90 final byte[][] externalKeysAsBytes = externalKeysList.toArray(new byte[externalKeysList.size()][]);
91
92 final long numDeleted = client.del(externalKeysAsBytes);
93 if (numDeleted != externalKeysAsBytes.length) {
94 log.info("Cache {}: only able to delete {} of {} keys", name, numDeleted, externalKeysAsBytes.length);
95 }
96
97
98 StreamSupport.stream(internalKeys.spliterator(), false)
99 .forEach(k -> cacheContext.recordValue(k, Optional.empty()));
100 }
101 }
102
103 @Override
104 protected void internalRemoveAll() {
105 final VersionedExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
106 cacheContext.updateCacheVersion(
107 RedisUtils.incrementCacheVersion(clientSupplier, cacheContext.externalCacheVersionKey()));
108 }
109
110 @Override
111 protected Logger getLogger() {
112 return log;
113 }
114
115 @Override
116 protected VersionedExternalCacheRequestContext<V> ensureCacheContext() {
117 final RequestContext requestContext = contextSupplier.get();
118
119 return requestContext.computeIfAbsent(this, () -> {
120
121
122 log.trace("Cache {}: Setting up a new context", name);
123 final VersionedExternalCacheRequestContext<V> newCacheContext = new VersionedExternalCacheRequestContext<>(
124 keyGenerator,
125 name,
126 requestContext::partitionIdentifier,
127 lockTimeout);
128 newCacheContext.updateCacheVersion(
129 RedisUtils.obtainCacheVersion(
130 clientSupplier,
131 newCacheContext.externalCacheVersionKey(),
132 defaultTtl + 1));
133 return newCacheContext;
134 });
135 }
136
137 @Override
138 protected V handleCreation(String internalKey, V candidateValue)
139 throws ExecutionException, InterruptedException {
140 final VersionedExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
141 final byte[] candidateValueBytes = valueMarshalling.getMarshaller().marshallToBytes(candidateValue);
142 final String externalKey = cacheContext.externalEntryKeyFor(internalKey);
143
144
145 try (Jedis client = clientSupplier.get()) {
146 for (; ; ) {
147 metricsRecorder.record(name, CacheType.EXTERNAL, MetricLabel.NUMBER_OF_REMOTE_GET, 1);
148 final long addOp = client.setnx(externalKey.getBytes(), candidateValueBytes);
149 if (addOp == 1) {
150
151
152 break;
153 }
154
155 getLogger().info("Cache {}, unable to add candidate for key {}, retrieve what was added", name, internalKey);
156 metricsRecorder.record(name, CacheType.EXTERNAL, MetricLabel.NUMBER_OF_REMOTE_GET, 1);
157 final Optional<V> otherAddedValue = unmarshall(client.get(externalKey.getBytes()), valueMarshalling);
158 if (otherAddedValue.isPresent()) {
159 return otherAddedValue.get();
160 }
161
162 getLogger().info("Cache {}, unable to retrieve recently added candidate for key {}, looping", name, internalKey);
163 }
164 }
165 return candidateValue;
166 }
167
168 @Override
169 public CompletionStage<Map<String, V>> getBulk(Function<Set<String>, Map<String, V>> factory, String... keys) {
170 return super.getBulk(factory, keys);
171 }
172
173 @Override
174 protected final ExternalCacheException mapException(Exception ex) {
175 return RedisUtils.mapException(ex);
176 }
177
178 @Override
179 protected final Optional<V> directGet(String externalKey) {
180 try (Jedis client = clientSupplier.get()) {
181 return unmarshall(client.get(externalKey.getBytes()), valueMarshalling);
182 }
183 }
184
185 @Override
186 protected final Map<String, Optional<V>> directGetBulk(Set<String> externalKeys) {
187 return RedisUtils.directGetBulk(externalKeys, clientSupplier, valueMarshalling);
188 }
189 }