View Javadoc

1   package com.atlassian.vcache.internal.redis;
2   
3   import com.atlassian.marshalling.api.MarshallingPair;
4   import com.atlassian.vcache.CasIdentifier;
5   import com.atlassian.vcache.ExternalCacheException;
6   import com.atlassian.vcache.PutPolicy;
7   import org.slf4j.Logger;
8   import org.slf4j.LoggerFactory;
9   import redis.clients.jedis.Jedis;
10  import redis.clients.jedis.Pipeline;
11  import redis.clients.jedis.Response;
12  import redis.clients.jedis.exceptions.JedisException;
13  
14  import java.util.Collections;
15  import java.util.List;
16  import java.util.Map;
17  import java.util.Optional;
18  import java.util.Set;
19  import java.util.function.Function;
20  import java.util.function.Supplier;
21  import java.util.stream.Collectors;
22  
23  import static com.atlassian.vcache.ExternalCacheException.Reason.UNCLASSIFIED_FAILURE;
24  import static com.atlassian.vcache.internal.core.VCacheCoreUtils.unmarshall;
25  
26  /**
27   * Common utility methods that are specific for the Redis implementation.
28   *
29   * @since 1.0.0
30   */
31  class RedisUtils {
32      static final String OK = "OK";
33      static final byte[] NX = "NX".getBytes();
34      static final byte[] EX = "EX".getBytes();
35      static final byte[] XX = "EX".getBytes();
36  
37      private static final Logger log = LoggerFactory.getLogger(RedisUtils.class);
38  
39      private static final String LUA_OBTAIN_CACHE_VERSION =
40              "local cur = redis.call(\"get\",KEYS[1]) " +
41                      "if cur then " +
42                      "    return cur " +
43                      "else " +
44                      "    redis.call(\"setex\",KEYS[1],ARGV[2],\"666\") " +
45                      "    return \"666\" " +
46                      "end";
47  
48      static ExternalCacheException mapException(Exception ex) {
49          if (ex instanceof JedisException) {
50              return new ExternalCacheException(ExternalCacheException.Reason.NETWORK_FAILURE, ex);
51          }
52          return new ExternalCacheException(UNCLASSIFIED_FAILURE, ex);
53      }
54  
55      static <V> Map<String, Optional<V>> directGetBulk(
56              Set<String> externalKeys, Supplier<Jedis> clientSupplier, MarshallingPair<V> valueMarshalling) {
57          // Returns map of keys that contain values, so need to handle the missing ones
58          try (Jedis client = clientSupplier.get()) {
59              // Implementation note, I have not used the mput() method here as the implementation is
60              // non-trivial, and it's not clear that there is any performance benefit to be had.
61              // Using Pipeline ensures a single request/response to Redis.
62              final Pipeline pipeline = client.pipelined();
63              final Map<String, Response<byte[]>> externalKeyToResponseMap = externalKeys.stream()
64                      .collect(Collectors.toMap(
65                              k -> k,
66                              k -> pipeline.get(k.getBytes())
67                      ));
68              pipeline.sync();
69  
70              return externalKeyToResponseMap.entrySet().stream().collect(Collectors.toMap(
71                      Map.Entry::getKey,
72                      e -> unmarshall(e.getValue().get(), valueMarshalling)
73              ));
74          }
75      }
76  
77      static boolean putOperationForPolicy(
78              PutPolicy policy, Supplier<Jedis> clientSupplier, String externalKey, int defaultTtl, byte[] valueBytes) {
79          try (Jedis client = clientSupplier.get()) {
80              final Pipeline pipeline = client.pipelined();
81              final Response<String> result =
82                      pipelinePutOperationForPolicy(pipeline, externalKey, policy, valueBytes, defaultTtl);
83              pipeline.sync();
84              return OK.equals(result.get());
85          }
86      }
87  
88      static byte[] safeExtractValue(CasIdentifier casId) {
89          if (casId instanceof RedisCasIdentifier) {
90              return ((RedisCasIdentifier) casId).getValue();
91          }
92  
93          log.warn("Passed an unknown CasIdentifier instance of class {}.", casId.getClass().getName());
94          throw new ExternalCacheException(UNCLASSIFIED_FAILURE);
95      }
96  
97      private static long obtainCacheVersion(Supplier<Jedis> clientSupplier, String externalCacheVersionKey, long ttlSecs) {
98          try (Jedis client = clientSupplier.get()) {
99              final List<String> keys = Collections.singletonList(externalCacheVersionKey);
100             final List<String> args = Collections.singletonList(Long.toString(ttlSecs));
101             final String resultString = (String) client.eval(LUA_OBTAIN_CACHE_VERSION, keys, args);
102             return Long.valueOf(resultString);
103         }
104     }
105 
106     private static long incrementCacheVersion(Supplier<Jedis> clientSupplier, String externalCacheVersionKey) {
107         try (Jedis client = clientSupplier.get()) {
108             return client.incr(externalCacheVersionKey);
109         }
110     }
111 
112     static Function<String, Long> cacheVersionIncrementer(Supplier<Jedis> clientSupplier) {
113         return externalCacheVersionKey -> incrementCacheVersion(clientSupplier, externalCacheVersionKey);
114     }
115 
116     static Function<String, Long> cacheVersionSupplier(Supplier<Jedis> clientSupplier, long ttlSecs) {
117         return externalCacheVersionKey -> obtainCacheVersion(clientSupplier, externalCacheVersionKey, ttlSecs);
118     }
119 
120     public static Response<String> pipelinePutOperationForPolicy(Pipeline pipeline,
121                                                                  String externalKey,
122                                                                  PutPolicy policy,
123                                                                  byte[] valueBytes,
124                                                                  int defaultTtl) {
125         if (policy == PutPolicy.ADD_ONLY) {
126             return pipeline.set(externalKey.getBytes(), valueBytes, NX, EX, defaultTtl);
127         } else if (policy == PutPolicy.PUT_ALWAYS) {
128             return pipeline.setex(externalKey.getBytes(), defaultTtl, valueBytes);
129         } else if (policy == PutPolicy.REPLACE_ONLY) {
130             return pipeline.set(externalKey.getBytes(), valueBytes, XX, EX, defaultTtl);
131         } else {
132             throw new IllegalArgumentException("Unknown put policy: " + policy);
133         }
134     }
135 }