1 package com.atlassian.vcache.internal.redis;
2
3 import com.atlassian.marshalling.api.MarshallingPair;
4 import com.atlassian.vcache.CasIdentifier;
5 import com.atlassian.vcache.ExternalCacheException;
6 import com.atlassian.vcache.PutPolicy;
7 import org.slf4j.Logger;
8 import org.slf4j.LoggerFactory;
9 import redis.clients.jedis.Jedis;
10 import redis.clients.jedis.Pipeline;
11 import redis.clients.jedis.Response;
12 import redis.clients.jedis.exceptions.JedisException;
13
14 import java.util.Collections;
15 import java.util.List;
16 import java.util.Map;
17 import java.util.Optional;
18 import java.util.Set;
19 import java.util.function.Supplier;
20 import java.util.stream.Collectors;
21
22 import static com.atlassian.vcache.ExternalCacheException.Reason.UNCLASSIFIED_FAILURE;
23 import static com.atlassian.vcache.internal.core.VCacheCoreUtils.unmarshall;
24
25
26
27
28
29
30 class RedisUtils {
31 static final String OK = "OK";
32 static final byte[] NX = "NX".getBytes();
33 static final byte[] EX = "EX".getBytes();
34 static final byte[] XX = "EX".getBytes();
35
36 private static final Logger log = LoggerFactory.getLogger(RedisUtils.class);
37
38 private static final String LUA_OBTAIN_CACHE_VERSION =
39 "local cur = redis.call(\"get\",KEYS[1]) " +
40 "if cur then " +
41 " return cur " +
42 "else " +
43 " redis.call(\"setex\",KEYS[1],ARGV[2],\"666\") " +
44 " return \"666\" " +
45 "end";
46
47 static ExternalCacheException mapException(Exception ex) {
48 if (ex instanceof JedisException) {
49 return new ExternalCacheException(ExternalCacheException.Reason.NETWORK_FAILURE, ex);
50 }
51 return new ExternalCacheException(UNCLASSIFIED_FAILURE, ex);
52 }
53
54 static <V> Map<String, Optional<V>> directGetBulk(
55 Set<String> externalKeys, Supplier<Jedis> clientSupplier, MarshallingPair<V> valueMarshalling) {
56
57 try (Jedis client = clientSupplier.get()) {
58
59
60
61 final Pipeline pipeline = client.pipelined();
62 final Map<String, Response<byte[]>> externalKeyToResponseMap = externalKeys.stream()
63 .collect(Collectors.toMap(
64 k -> k,
65 k -> pipeline.get(k.getBytes())
66 ));
67 pipeline.sync();
68
69 return externalKeyToResponseMap.entrySet().stream().collect(Collectors.toMap(
70 Map.Entry::getKey,
71 e -> unmarshall(e.getValue().get(), valueMarshalling)
72 ));
73 }
74 }
75
76 static boolean putOperationForPolicy(
77 PutPolicy policy, Supplier<Jedis> clientSupplier, String externalKey, int defaultTtl, byte[] valueBytes) {
78 try (Jedis client = clientSupplier.get()) {
79 final Pipeline pipeline = client.pipelined();
80 final Response<String> result =
81 pipelinePutOperationForPolicy(pipeline, externalKey, policy, valueBytes, defaultTtl);
82 pipeline.sync();
83 return OK.equals(result.get());
84 }
85 }
86
87 static byte[] safeExtractValue(CasIdentifier casId) {
88 if (casId instanceof RedisCasIdentifier) {
89 return ((RedisCasIdentifier) casId).getValue();
90 }
91
92 log.warn("Passed an unknown CasIdentifier instance of class {}.", casId.getClass().getName());
93 throw new ExternalCacheException(UNCLASSIFIED_FAILURE);
94 }
95
96 public static long obtainCacheVersion(Supplier<Jedis> clientSupplier, String externalCacheVersionKey, long ttlSecs) {
97 try (Jedis client = clientSupplier.get()) {
98 final List<String> keys = Collections.singletonList(externalCacheVersionKey);
99 final List<String> args = Collections.singletonList(Long.toString(ttlSecs));
100 final String resultString = (String) client.eval(LUA_OBTAIN_CACHE_VERSION, keys, args);
101 return Long.valueOf(resultString);
102 }
103 }
104
105 public static long incrementCacheVersion(Supplier<Jedis> clientSupplier, String externalCacheVersionKey) {
106 try (Jedis client = clientSupplier.get()) {
107 return client.incr(externalCacheVersionKey);
108 }
109 }
110
111 public static Response<String> pipelinePutOperationForPolicy(Pipeline pipeline,
112 String externalKey,
113 PutPolicy policy,
114 byte[] valueBytes,
115 int defaultTtl) {
116 if (policy == PutPolicy.ADD_ONLY) {
117 return pipeline.set(externalKey.getBytes(), valueBytes, NX, EX, defaultTtl);
118 } else if (policy == PutPolicy.PUT_ALWAYS) {
119 return pipeline.setex(externalKey.getBytes(), defaultTtl, valueBytes);
120 } else if (policy == PutPolicy.REPLACE_ONLY) {
121 return pipeline.set(externalKey.getBytes(), valueBytes, XX, EX, defaultTtl);
122 } else {
123 throw new IllegalArgumentException("Unknown put policy: " + policy);
124 }
125 }
126 }