1 package com.atlassian.vcache.internal.redis;
2
3 import com.atlassian.marshalling.api.MarshallingPair;
4 import com.atlassian.vcache.ExternalCacheException;
5 import com.atlassian.vcache.ExternalCacheSettings;
6 import com.atlassian.vcache.internal.RequestContext;
7 import com.atlassian.vcache.internal.core.ExternalCacheKeyGenerator;
8 import com.atlassian.vcache.internal.core.TransactionControlManager;
9 import com.atlassian.vcache.internal.core.VCacheCoreUtils;
10 import com.atlassian.vcache.internal.core.metrics.MetricsRecorder;
11 import com.atlassian.vcache.internal.core.service.AbstractTransactionalExternalCache;
12 import com.atlassian.vcache.internal.core.service.VersionedExternalCacheRequestContext;
13 import org.slf4j.Logger;
14 import org.slf4j.LoggerFactory;
15 import redis.clients.jedis.Jedis;
16 import redis.clients.jedis.Pipeline;
17 import redis.clients.jedis.Response;
18
19 import java.time.Duration;
20 import java.util.List;
21 import java.util.Map;
22 import java.util.Optional;
23 import java.util.Set;
24 import java.util.function.Supplier;
25 import java.util.stream.Collectors;
26
27 import static com.atlassian.vcache.internal.core.VCacheCoreUtils.marshall;
28 import static com.atlassian.vcache.internal.core.VCacheCoreUtils.unmarshall;
29 import static java.util.Objects.requireNonNull;
30
31
32
33
34
35
36
37 class RedisTransactionalExternalCache<V>
38 extends AbstractTransactionalExternalCache<V> {
39 private static final Logger log = LoggerFactory.getLogger(RedisTransactionalExternalCache.class);
40
41 private final Supplier<Jedis> clientSupplier;
42 private final ExternalCacheKeyGenerator keyGenerator;
43 private final MarshallingPair<V> valueMarshalling;
44 private final int defaultTtl;
45 private final TransactionControlManager transactionControlManager;
46
47 RedisTransactionalExternalCache(
48 Supplier<Jedis> clientSupplier,
49 Supplier<RequestContext> contextSupplier,
50 ExternalCacheKeyGenerator keyGenerator,
51 String name,
52 MarshallingPair<V> valueMarshalling,
53 ExternalCacheSettings settings,
54 TransactionControlManager transactionControlManager,
55 MetricsRecorder metricsRecorder,
56 Duration lockTimeout) {
57 super(name, contextSupplier, metricsRecorder, lockTimeout);
58 this.clientSupplier = requireNonNull(clientSupplier);
59 this.keyGenerator = requireNonNull(keyGenerator);
60 this.valueMarshalling = requireNonNull(valueMarshalling);
61 this.defaultTtl = VCacheCoreUtils.roundUpToSeconds(settings.getDefaultTtl().get());
62 this.transactionControlManager = requireNonNull(transactionControlManager);
63 }
64
65 @Override
66 public void transactionSync() {
67 log.trace("Cache {}: synchronising operations", name);
68 final VersionedExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
69
70 if (cacheContext.hasRemoveAll()) {
71 cacheContext.updateCacheVersion(
72 RedisUtils.incrementCacheVersion(clientSupplier, cacheContext.externalCacheVersionKey()));
73 }
74
75 performKeyedOperations(cacheContext);
76 cacheContext.forgetAll();
77 }
78
79 private void performKeyedOperations(VersionedExternalCacheRequestContext<V> cacheContext) {
80
81
82
83
84
85
86 final List<Response<Long>> deleteResponses;
87 final List<Response<String>> putResponses;
88 try (Jedis client = clientSupplier.get()) {
89 final Pipeline pipeline = client.pipelined();
90
91 deleteResponses = cacheContext.getKeyedOperations().stream()
92 .filter(e -> e.getValue().isRemove())
93 .map(e -> pipeline.del(cacheContext.externalEntryKeyFor(e.getKey()).getBytes()))
94 .collect(Collectors.toList());
95
96 putResponses = cacheContext.getKeyedOperations().stream()
97 .filter(e -> e.getValue().isPut())
98 .map(e ->
99 RedisUtils.pipelinePutOperationForPolicy(
100 pipeline,
101 cacheContext.externalEntryKeyFor(e.getKey()),
102 e.getValue().getPolicy(),
103 marshall(e.getValue().getValue(), valueMarshalling),
104 defaultTtl))
105 .collect(Collectors.toList());
106
107 pipeline.sync();
108 }
109
110
111 final long numFailedRemoves = deleteResponses.stream()
112 .filter(r -> r.get() == 0)
113 .count();
114 if (numFailedRemoves > 0) {
115 log.debug("Cache {}: Failed to perform {} of {} transactional removes, clearing the cache.",
116 name, numFailedRemoves, deleteResponses.size());
117 cacheContext.updateCacheVersion(
118 RedisUtils.incrementCacheVersion(clientSupplier, cacheContext.externalCacheVersionKey()));
119 } else {
120
121 final long numFailedPuts = putResponses.stream()
122 .filter(r -> !RedisUtils.OK.equals(r.get()))
123 .count();
124 if (numFailedPuts > 0) {
125 log.debug("Cache {}: Failed to perform {} of {} transactional puts, clearing the cache.",
126 name, numFailedPuts, putResponses.size());
127 cacheContext.updateCacheVersion(
128 RedisUtils.incrementCacheVersion(clientSupplier, cacheContext.externalCacheVersionKey()));
129 }
130 }
131 }
132
133 @Override
134 protected VersionedExternalCacheRequestContext<V> ensureCacheContext() {
135 final RequestContext requestContext = contextSupplier.get();
136
137 transactionControlManager.registerTransactionalExternalCache(requestContext, name, this);
138
139 return requestContext.computeIfAbsent(this, () -> {
140
141
142 log.trace("Cache {}: Setting up a new context", name);
143 final VersionedExternalCacheRequestContext<V> newCacheContext = new VersionedExternalCacheRequestContext<>(
144 keyGenerator, name, requestContext::partitionIdentifier, lockTimeout);
145 newCacheContext.updateCacheVersion(
146 RedisUtils.obtainCacheVersion(clientSupplier, newCacheContext.externalCacheVersionKey(),
147 defaultTtl + 1));
148 return newCacheContext;
149 });
150 }
151
152 @Override
153 protected Logger getLogger() {
154 return log;
155 }
156
157 @Override
158 protected final ExternalCacheException mapException(Exception ex) {
159 return RedisUtils.mapException(ex);
160 }
161
162 @Override
163 protected final Optional<V> directGet(String externalKey) {
164 try (Jedis client = clientSupplier.get()) {
165 return unmarshall(client.get(externalKey.getBytes()), valueMarshalling);
166 }
167 }
168
169 @Override
170 protected final Map<String, Optional<V>> directGetBulk(Set<String> externalKeys) {
171 return RedisUtils.directGetBulk(externalKeys, clientSupplier, valueMarshalling);
172 }
173 }