1 package com.atlassian.vcache.internal.redis;
2
3 import com.atlassian.marshalling.api.MarshallingPair;
4 import com.atlassian.vcache.CasIdentifier;
5 import com.atlassian.vcache.ExternalCacheException;
6 import com.atlassian.vcache.PutPolicy;
7 import org.slf4j.Logger;
8 import org.slf4j.LoggerFactory;
9 import redis.clients.jedis.Jedis;
10 import redis.clients.jedis.Pipeline;
11 import redis.clients.jedis.Response;
12 import redis.clients.jedis.exceptions.JedisException;
13
14 import java.util.Collections;
15 import java.util.List;
16 import java.util.Map;
17 import java.util.Optional;
18 import java.util.Set;
19 import java.util.function.Function;
20 import java.util.function.Supplier;
21 import java.util.stream.Collectors;
22
23 import static com.atlassian.vcache.ExternalCacheException.Reason.UNCLASSIFIED_FAILURE;
24 import static com.atlassian.vcache.internal.core.VCacheCoreUtils.unmarshall;
25
26
27
28
29
30
31 class RedisUtils {
32 static final String OK = "OK";
33 static final byte[] NX = "NX".getBytes();
34 static final byte[] EX = "EX".getBytes();
35 static final byte[] XX = "EX".getBytes();
36
37 private static final Logger log = LoggerFactory.getLogger(RedisUtils.class);
38
39 private static final String LUA_OBTAIN_CACHE_VERSION =
40 "local cur = redis.call(\"get\",KEYS[1]) " +
41 "if cur then " +
42 " return cur " +
43 "else " +
44 " redis.call(\"setex\",KEYS[1],ARGV[2],\"666\") " +
45 " return \"666\" " +
46 "end";
47
48 static ExternalCacheException mapException(Exception ex) {
49 if (ex instanceof JedisException) {
50 return new ExternalCacheException(ExternalCacheException.Reason.NETWORK_FAILURE, ex);
51 }
52 return new ExternalCacheException(UNCLASSIFIED_FAILURE, ex);
53 }
54
55 static <V> Map<String, Optional<V>> directGetBulk(
56 Set<String> externalKeys, Supplier<Jedis> clientSupplier, MarshallingPair<V> valueMarshalling) {
57
58 try (Jedis client = clientSupplier.get()) {
59
60
61
62 final Pipeline pipeline = client.pipelined();
63 final Map<String, Response<byte[]>> externalKeyToResponseMap = externalKeys.stream()
64 .collect(Collectors.toMap(
65 k -> k,
66 k -> pipeline.get(k.getBytes())
67 ));
68 pipeline.sync();
69
70 return externalKeyToResponseMap.entrySet().stream().collect(Collectors.toMap(
71 Map.Entry::getKey,
72 e -> unmarshall(e.getValue().get(), valueMarshalling)
73 ));
74 }
75 }
76
77 static boolean putOperationForPolicy(
78 PutPolicy policy, Supplier<Jedis> clientSupplier, String externalKey, int defaultTtl, byte[] valueBytes) {
79 try (Jedis client = clientSupplier.get()) {
80 final Pipeline pipeline = client.pipelined();
81 final Response<String> result =
82 pipelinePutOperationForPolicy(pipeline, externalKey, policy, valueBytes, defaultTtl);
83 pipeline.sync();
84 return OK.equals(result.get());
85 }
86 }
87
88 static byte[] safeExtractValue(CasIdentifier casId) {
89 if (casId instanceof RedisCasIdentifier) {
90 return ((RedisCasIdentifier) casId).getValue();
91 }
92
93 log.warn("Passed an unknown CasIdentifier instance of class {}.", casId.getClass().getName());
94 throw new ExternalCacheException(UNCLASSIFIED_FAILURE);
95 }
96
97 private static long obtainCacheVersion(Supplier<Jedis> clientSupplier, String externalCacheVersionKey, long ttlSecs) {
98 try (Jedis client = clientSupplier.get()) {
99 final List<String> keys = Collections.singletonList(externalCacheVersionKey);
100 final List<String> args = Collections.singletonList(Long.toString(ttlSecs));
101 final String resultString = (String) client.eval(LUA_OBTAIN_CACHE_VERSION, keys, args);
102 return Long.valueOf(resultString);
103 }
104 }
105
106 private static long incrementCacheVersion(Supplier<Jedis> clientSupplier, String externalCacheVersionKey) {
107 try (Jedis client = clientSupplier.get()) {
108 return client.incr(externalCacheVersionKey);
109 }
110 }
111
112 static Function<String, Long> cacheVersionIncrementer(Supplier<Jedis> clientSupplier) {
113 return externalCacheVersionKey -> incrementCacheVersion(clientSupplier, externalCacheVersionKey);
114 }
115
116 static Function<String, Long> cacheVersionSupplier(Supplier<Jedis> clientSupplier, long ttlSecs) {
117 return externalCacheVersionKey -> obtainCacheVersion(clientSupplier, externalCacheVersionKey, ttlSecs);
118 }
119
120 public static Response<String> pipelinePutOperationForPolicy(Pipeline pipeline,
121 String externalKey,
122 PutPolicy policy,
123 byte[] valueBytes,
124 int defaultTtl) {
125 if (policy == PutPolicy.ADD_ONLY) {
126 return pipeline.set(externalKey.getBytes(), valueBytes, NX, EX, defaultTtl);
127 } else if (policy == PutPolicy.PUT_ALWAYS) {
128 return pipeline.setex(externalKey.getBytes(), defaultTtl, valueBytes);
129 } else if (policy == PutPolicy.REPLACE_ONLY) {
130 return pipeline.set(externalKey.getBytes(), valueBytes, XX, EX, defaultTtl);
131 } else {
132 throw new IllegalArgumentException("Unknown put policy: " + policy);
133 }
134 }
135 }