1 package com.atlassian.vcache.internal.redis;
2
3 import com.atlassian.marshalling.api.MarshallingPair;
4 import com.atlassian.vcache.ExternalCacheException;
5 import com.atlassian.vcache.ExternalCacheSettings;
6 import com.atlassian.vcache.internal.RequestContext;
7 import com.atlassian.vcache.internal.core.ExternalCacheKeyGenerator;
8 import com.atlassian.vcache.internal.core.TransactionControlManager;
9 import com.atlassian.vcache.internal.core.VCacheCoreUtils;
10 import com.atlassian.vcache.internal.core.metrics.MetricsRecorder;
11 import com.atlassian.vcache.internal.core.service.AbstractTransactionalExternalCache;
12 import com.atlassian.vcache.internal.core.service.VersionedExternalCacheRequestContext;
13 import org.slf4j.Logger;
14 import org.slf4j.LoggerFactory;
15 import redis.clients.jedis.Jedis;
16 import redis.clients.jedis.Pipeline;
17 import redis.clients.jedis.Response;
18
19 import java.time.Duration;
20 import java.util.List;
21 import java.util.Map;
22 import java.util.Optional;
23 import java.util.Set;
24 import java.util.function.Supplier;
25 import java.util.stream.Collectors;
26
27 import static com.atlassian.vcache.ExternalCacheException.Reason.TRANSACTION_FAILURE;
28 import static com.atlassian.vcache.VCacheUtils.fold;
29 import static com.atlassian.vcache.internal.core.VCacheCoreUtils.marshall;
30 import static com.atlassian.vcache.internal.core.VCacheCoreUtils.unmarshall;
31 import static java.util.Objects.requireNonNull;
32
33
34
35
36
37
38
39 class RedisTransactionalExternalCache<V>
40 extends AbstractTransactionalExternalCache<V> {
41 private static final Logger log = LoggerFactory.getLogger(RedisTransactionalExternalCache.class);
42
43 private final Supplier<Jedis> clientSupplier;
44 private final ExternalCacheKeyGenerator keyGenerator;
45 private final MarshallingPair<V> valueMarshalling;
46 private final int defaultTtl;
47 private final TransactionControlManager transactionControlManager;
48
49 RedisTransactionalExternalCache(
50 Supplier<Jedis> clientSupplier,
51 Supplier<RequestContext> contextSupplier,
52 ExternalCacheKeyGenerator keyGenerator,
53 String name,
54 MarshallingPair<V> valueMarshalling,
55 ExternalCacheSettings settings,
56 TransactionControlManager transactionControlManager,
57 MetricsRecorder metricsRecorder,
58 Duration lockTimeout) {
59 super(name, contextSupplier, metricsRecorder, lockTimeout, (n, ex) -> {});
60 this.clientSupplier = requireNonNull(clientSupplier);
61 this.keyGenerator = requireNonNull(keyGenerator);
62 this.valueMarshalling = requireNonNull(valueMarshalling);
63 this.defaultTtl = VCacheCoreUtils.roundUpToSeconds(settings.getDefaultTtl().get());
64 this.transactionControlManager = requireNonNull(transactionControlManager);
65 }
66
67 @Override
68 public void transactionSync() {
69 log.trace("Cache {}: synchronising operations", name);
70 final VersionedExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
71
72 fold(perform(() -> {
73 if (cacheContext.hasRemoveAll()) {
74 cacheContext.updateCacheVersion(RedisUtils.cacheVersionIncrementer(clientSupplier));
75 }
76
77 performKeyedOperations(cacheContext);
78
79 return null;
80 }),
81 v -> null,
82 err -> {
83 log.warn("Cache {}: an operation failed during transaction sync ({}). Clearing cache to remove stale entries.", name, err.getMessage());
84 try {
85 cacheContext.updateCacheVersion(RedisUtils.cacheVersionIncrementer(clientSupplier));
86 } catch (RuntimeException e) {
87 log.error("Cache {}: failed to clear the cache: {}", name, e.getMessage());
88 }
89
90 return null;
91 });
92
93 cacheContext.forgetAll();
94 }
95
96 private void performKeyedOperations(VersionedExternalCacheRequestContext<V> cacheContext) {
97
98
99
100
101
102
103 final List<Response<Long>> deleteResponses;
104 final List<Response<String>> putResponses;
105 try (Jedis client = clientSupplier.get()) {
106 final Pipeline pipeline = client.pipelined();
107
108 deleteResponses = cacheContext.getKeyedOperations().stream()
109 .filter(e -> e.getValue().isRemove())
110 .map(e -> pipeline.del(cacheContext.externalEntryKeyFor(e.getKey()).getBytes()))
111 .collect(Collectors.toList());
112
113 putResponses = cacheContext.getKeyedOperations().stream()
114 .filter(e -> e.getValue().isPut())
115 .map(e ->
116 RedisUtils.pipelinePutOperationForPolicy(
117 pipeline,
118 cacheContext.externalEntryKeyFor(e.getKey()),
119 e.getValue().getPolicy(),
120 marshall(e.getValue().getValue(), valueMarshalling),
121 defaultTtl))
122 .collect(Collectors.toList());
123
124 pipeline.sync();
125 }
126
127
128 final long numFailedRemoves = deleteResponses.stream()
129 .filter(r -> r.get() == 0)
130 .count();
131 if (numFailedRemoves > 0) {
132 log.warn("Cache {}: Failed to perform {} of {} transactional removes, clearing the cache.",
133 name, numFailedRemoves, deleteResponses.size());
134 throw new ExternalCacheException(TRANSACTION_FAILURE);
135 } else {
136
137 final long numFailedPuts = putResponses.stream()
138 .filter(r -> !RedisUtils.OK.equals(r.get()))
139 .count();
140 if (numFailedPuts > 0) {
141 log.warn("Cache {}: Failed to perform {} of {} transactional puts, clearing the cache.",
142 name, numFailedPuts, putResponses.size());
143 throw new ExternalCacheException(TRANSACTION_FAILURE);
144 }
145 }
146 }
147
148 @Override
149 protected VersionedExternalCacheRequestContext<V> ensureCacheContext() {
150 final RequestContext requestContext = contextSupplier.get();
151
152 transactionControlManager.registerTransactionalExternalCache(requestContext, name, this);
153
154 return requestContext.computeIfAbsent(this, () -> {
155
156
157 log.trace("Cache {}: Setting up a new context", name);
158 return new VersionedExternalCacheRequestContext<>(
159 keyGenerator, name, requestContext::partitionIdentifier,
160 RedisUtils.cacheVersionSupplier(clientSupplier, defaultTtl + 1),
161 lockTimeout);
162 });
163 }
164
165 @Override
166 protected Logger getLogger() {
167 return log;
168 }
169
170 @Override
171 protected final ExternalCacheException mapException(Exception ex) {
172 return RedisUtils.mapException(ex);
173 }
174
175 @Override
176 protected final Optional<V> directGet(String externalKey) {
177 try (Jedis client = clientSupplier.get()) {
178 return unmarshall(client.get(externalKey.getBytes()), valueMarshalling);
179 }
180 }
181
182 @Override
183 protected final Map<String, Optional<V>> directGetBulk(Set<String> externalKeys) {
184 return RedisUtils.directGetBulk(externalKeys, clientSupplier, valueMarshalling);
185 }
186 }