1 package com.atlassian.vcache.internal.redis;
2
3 import com.atlassian.marshalling.api.MarshallingPair;
4 import com.atlassian.vcache.ExternalCacheException;
5 import com.atlassian.vcache.ExternalCacheSettings;
6 import com.atlassian.vcache.internal.RequestContext;
7 import com.atlassian.vcache.internal.core.ExternalCacheKeyGenerator;
8 import com.atlassian.vcache.internal.core.TransactionControlManager;
9 import com.atlassian.vcache.internal.core.VCacheCoreUtils;
10 import com.atlassian.vcache.internal.core.metrics.MetricsRecorder;
11 import com.atlassian.vcache.internal.core.service.AbstractTransactionalExternalCache;
12 import com.atlassian.vcache.internal.core.service.VersionedExternalCacheRequestContext;
13 import org.slf4j.Logger;
14 import org.slf4j.LoggerFactory;
15 import redis.clients.jedis.Jedis;
16 import redis.clients.jedis.Pipeline;
17 import redis.clients.jedis.Response;
18
19 import java.util.List;
20 import java.util.Map;
21 import java.util.Optional;
22 import java.util.Set;
23 import java.util.function.Supplier;
24 import java.util.stream.Collectors;
25
26 import static com.atlassian.vcache.internal.core.VCacheCoreUtils.marshall;
27 import static com.atlassian.vcache.internal.core.VCacheCoreUtils.unmarshall;
28 import static java.util.Objects.requireNonNull;
29
30
31
32
33
34
35
36 class RedisTransactionalExternalCache<V>
37 extends AbstractTransactionalExternalCache<V> {
38 private static final Logger log = LoggerFactory.getLogger(RedisTransactionalExternalCache.class);
39
40 private final Supplier<Jedis> clientSupplier;
41 private final ExternalCacheKeyGenerator keyGenerator;
42 private final MarshallingPair<V> valueMarshalling;
43 private final int defaultTtl;
44 private final TransactionControlManager transactionControlManager;
45
46 RedisTransactionalExternalCache(
47 Supplier<Jedis> clientSupplier,
48 Supplier<RequestContext> contextSupplier,
49 ExternalCacheKeyGenerator keyGenerator,
50 String name,
51 MarshallingPair<V> valueMarshalling,
52 ExternalCacheSettings settings,
53 TransactionControlManager transactionControlManager,
54 MetricsRecorder metricsRecorder) {
55 super(name, contextSupplier, metricsRecorder);
56 this.clientSupplier = requireNonNull(clientSupplier);
57 this.keyGenerator = requireNonNull(keyGenerator);
58 this.valueMarshalling = requireNonNull(valueMarshalling);
59 this.defaultTtl = VCacheCoreUtils.roundUpToSeconds(settings.getDefaultTtl().get());
60 this.transactionControlManager = requireNonNull(transactionControlManager);
61 }
62
63 @Override
64 public void transactionSync() {
65 log.trace("Cache {}: synchronising operations", name);
66 final VersionedExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
67
68 if (cacheContext.hasRemoveAll()) {
69 cacheContext.updateCacheVersion(
70 RedisUtils.incrementCacheVersion(clientSupplier, cacheContext.externalCacheVersionKey()));
71 }
72
73 performKeyedOperations(cacheContext);
74 cacheContext.forgetAll();
75 }
76
77 private void performKeyedOperations(VersionedExternalCacheRequestContext<V> cacheContext) {
78
79
80
81
82
83
84 final List<Response<Long>> deleteResponses;
85 final List<Response<String>> putResponses;
86 try (Jedis client = clientSupplier.get()) {
87 final Pipeline pipeline = client.pipelined();
88
89 deleteResponses = cacheContext.getKeyedOperations().stream()
90 .filter(e -> e.getValue().isRemove())
91 .map(e -> pipeline.del(cacheContext.externalEntryKeyFor(e.getKey()).getBytes()))
92 .collect(Collectors.toList());
93
94 putResponses = cacheContext.getKeyedOperations().stream()
95 .filter(e -> e.getValue().isPut())
96 .map(e ->
97 RedisUtils.pipelinePutOperationForPolicy(
98 pipeline,
99 cacheContext.externalEntryKeyFor(e.getKey()),
100 e.getValue().getPolicy(),
101 marshall(e.getValue().getValue(), valueMarshalling),
102 defaultTtl))
103 .collect(Collectors.toList());
104
105 pipeline.sync();
106 }
107
108
109 final long numFailedRemoves = deleteResponses.stream()
110 .filter(r -> r.get() == 0)
111 .count();
112 if (numFailedRemoves > 0) {
113 log.debug("Cache {}: Failed to perform {} of {} transactional removes, clearing the cache.",
114 name, numFailedRemoves, deleteResponses.size());
115 cacheContext.updateCacheVersion(
116 RedisUtils.incrementCacheVersion(clientSupplier, cacheContext.externalCacheVersionKey()));
117 } else {
118
119 final long numFailedPuts = putResponses.stream()
120 .filter(r -> !RedisUtils.OK.equals(r.get()))
121 .count();
122 if (numFailedPuts > 0) {
123 log.debug("Cache {}: Failed to perform {} of {} transactional puts, clearing the cache.",
124 name, numFailedPuts, putResponses.size());
125 cacheContext.updateCacheVersion(
126 RedisUtils.incrementCacheVersion(clientSupplier, cacheContext.externalCacheVersionKey()));
127 }
128 }
129 }
130
131 @Override
132 protected VersionedExternalCacheRequestContext<V> ensureCacheContext() {
133 final RequestContext requestContext = contextSupplier.get();
134
135 transactionControlManager.registerTransactionalExternalCache(requestContext, name, this);
136
137 return requestContext.computeIfAbsent(this, () -> {
138
139
140 log.trace("Cache {}: Setting up a new context", name);
141 final VersionedExternalCacheRequestContext<V> newCacheContext = new VersionedExternalCacheRequestContext<>(
142 keyGenerator, name, requestContext::partitionIdentifier);
143 newCacheContext.updateCacheVersion(
144 RedisUtils.obtainCacheVersion(clientSupplier, newCacheContext.externalCacheVersionKey(),
145 defaultTtl + 1));
146 return newCacheContext;
147 });
148 }
149
150 @Override
151 protected Logger getLogger() {
152 return log;
153 }
154
155 @Override
156 protected final ExternalCacheException mapException(Exception ex) {
157 return RedisUtils.mapException(ex);
158 }
159
160 @Override
161 protected final Optional<V> directGet(String externalKey) {
162 try (Jedis client = clientSupplier.get()) {
163 return unmarshall(client.get(externalKey.getBytes()), valueMarshalling);
164 }
165 }
166
167 @Override
168 protected final Map<String, Optional<V>> directGetBulk(Set<String> externalKeys) {
169 return RedisUtils.directGetBulk(externalKeys, clientSupplier, valueMarshalling);
170 }
171 }