View Javadoc

1   package com.atlassian.vcache.internal.redis;
2   
3   import com.atlassian.marshalling.api.MarshallingPair;
4   import com.atlassian.vcache.CasIdentifier;
5   import com.atlassian.vcache.ExternalCacheException;
6   import com.atlassian.vcache.PutPolicy;
7   import org.slf4j.Logger;
8   import org.slf4j.LoggerFactory;
9   import redis.clients.jedis.Jedis;
10  import redis.clients.jedis.Pipeline;
11  import redis.clients.jedis.Response;
12  import redis.clients.jedis.exceptions.JedisException;
13  
14  import java.util.Collections;
15  import java.util.List;
16  import java.util.Map;
17  import java.util.Optional;
18  import java.util.Set;
19  import java.util.function.Supplier;
20  import java.util.stream.Collectors;
21  
22  import static com.atlassian.vcache.ExternalCacheException.Reason.UNCLASSIFIED_FAILURE;
23  import static com.atlassian.vcache.internal.core.VCacheCoreUtils.unmarshall;
24  
25  /**
26   * Common utility methods that are specific for the Redis implementation.
27   *
28   * @since 1.0.0
29   */
30  class RedisUtils {
31      static final String OK = "OK";
32      static final byte[] NX = "NX".getBytes();
33      static final byte[] EX = "EX".getBytes();
34      static final byte[] XX = "EX".getBytes();
35  
36      private static final Logger log = LoggerFactory.getLogger(RedisUtils.class);
37  
38      private static final String LUA_OBTAIN_CACHE_VERSION =
39              "local cur = redis.call(\"get\",KEYS[1]) " +
40                      "if cur then " +
41                      "    return cur " +
42                      "else " +
43                      "    redis.call(\"setex\",KEYS[1],ARGV[2],\"666\") " +
44                      "    return \"666\" " +
45                      "end";
46  
47      static ExternalCacheException mapException(Exception ex) {
48          if (ex instanceof JedisException) {
49              return new ExternalCacheException(ExternalCacheException.Reason.NETWORK_FAILURE, ex);
50          }
51          return new ExternalCacheException(UNCLASSIFIED_FAILURE, ex);
52      }
53  
54      static <V> Map<String, Optional<V>> directGetBulk(
55              Set<String> externalKeys, Supplier<Jedis> clientSupplier, MarshallingPair<V> valueMarshalling) {
56          // Returns map of keys that contain values, so need to handle the missing ones
57          try (Jedis client = clientSupplier.get()) {
58              // Implementation note, I have not used the mput() method here as the implementation is
59              // non-trivial, and it's not clear that there is any performance benefit to be had.
60              // Using Pipeline ensures a single request/response to Redis.
61              final Pipeline pipeline = client.pipelined();
62              final Map<String, Response<byte[]>> externalKeyToResponseMap = externalKeys.stream()
63                      .collect(Collectors.toMap(
64                              k -> k,
65                              k -> pipeline.get(k.getBytes())
66                      ));
67              pipeline.sync();
68  
69              return externalKeyToResponseMap.entrySet().stream().collect(Collectors.toMap(
70                      Map.Entry::getKey,
71                      e -> unmarshall(e.getValue().get(), valueMarshalling)
72              ));
73          }
74      }
75  
76      static boolean putOperationForPolicy(
77              PutPolicy policy, Supplier<Jedis> clientSupplier, String externalKey, int defaultTtl, byte[] valueBytes) {
78          try (Jedis client = clientSupplier.get()) {
79              final Pipeline pipeline = client.pipelined();
80              final Response<String> result =
81                      pipelinePutOperationForPolicy(pipeline, externalKey, policy, valueBytes, defaultTtl);
82              pipeline.sync();
83              return OK.equals(result.get());
84          }
85      }
86  
87      static byte[] safeExtractValue(CasIdentifier casId) {
88          if (casId instanceof RedisCasIdentifier) {
89              return ((RedisCasIdentifier) casId).getValue();
90          }
91  
92          log.warn("Passed an unknown CasIdentifier instance of class {}.", casId.getClass().getName());
93          throw new ExternalCacheException(UNCLASSIFIED_FAILURE);
94      }
95  
96      public static long obtainCacheVersion(Supplier<Jedis> clientSupplier, String externalCacheVersionKey, long ttlSecs) {
97          try (Jedis client = clientSupplier.get()) {
98              final List<String> keys = Collections.singletonList(externalCacheVersionKey);
99              final List<String> args = Collections.singletonList(Long.toString(ttlSecs));
100             final String resultString = (String) client.eval(LUA_OBTAIN_CACHE_VERSION, keys, args);
101             return Long.valueOf(resultString);
102         }
103     }
104 
105     public static long incrementCacheVersion(Supplier<Jedis> clientSupplier, String externalCacheVersionKey) {
106         try (Jedis client = clientSupplier.get()) {
107             return client.incr(externalCacheVersionKey);
108         }
109     }
110 
111     public static Response<String> pipelinePutOperationForPolicy(Pipeline pipeline,
112                                                                  String externalKey,
113                                                                  PutPolicy policy,
114                                                                  byte[] valueBytes,
115                                                                  int defaultTtl) {
116         if (policy == PutPolicy.ADD_ONLY) {
117             return pipeline.set(externalKey.getBytes(), valueBytes, NX, EX, defaultTtl);
118         } else if (policy == PutPolicy.PUT_ALWAYS) {
119             return pipeline.setex(externalKey.getBytes(), defaultTtl, valueBytes);
120         } else if (policy == PutPolicy.REPLACE_ONLY) {
121             return pipeline.set(externalKey.getBytes(), valueBytes, XX, EX, defaultTtl);
122         } else {
123             throw new IllegalArgumentException("Unknown put policy: " + policy);
124         }
125     }
126 }