View Javadoc

1   package com.atlassian.vcache.internal.redis;
2   
3   import com.atlassian.marshalling.api.MarshallingPair;
4   import com.atlassian.vcache.ExternalCacheException;
5   import com.atlassian.vcache.ExternalCacheSettings;
6   import com.atlassian.vcache.PutPolicy;
7   import com.atlassian.vcache.internal.MetricLabel;
8   import com.atlassian.vcache.internal.RequestContext;
9   import com.atlassian.vcache.internal.core.ExternalCacheKeyGenerator;
10  import com.atlassian.vcache.internal.core.VCacheCoreUtils;
11  import com.atlassian.vcache.internal.core.metrics.CacheType;
12  import com.atlassian.vcache.internal.core.metrics.MetricsRecorder;
13  import com.atlassian.vcache.internal.core.service.AbstractStableReadExternalCache;
14  import com.atlassian.vcache.internal.core.service.VersionedExternalCacheRequestContext;
15  import org.slf4j.Logger;
16  import org.slf4j.LoggerFactory;
17  import redis.clients.jedis.Jedis;
18  import redis.clients.jedis.Pipeline;
19  import redis.clients.jedis.Response;
20  
21  import java.util.Collections;
22  import java.util.List;
23  import java.util.Map;
24  import java.util.Optional;
25  import java.util.Set;
26  import java.util.concurrent.CompletableFuture;
27  import java.util.concurrent.CompletionStage;
28  import java.util.concurrent.ExecutionException;
29  import java.util.concurrent.atomic.AtomicBoolean;
30  import java.util.function.Function;
31  import java.util.function.Supplier;
32  import java.util.stream.Collectors;
33  import java.util.stream.StreamSupport;
34  
35  import static com.atlassian.vcache.internal.core.VCacheCoreUtils.isEmpty;
36  import static com.atlassian.vcache.internal.core.VCacheCoreUtils.marshall;
37  import static com.atlassian.vcache.internal.core.VCacheCoreUtils.unmarshall;
38  import static java.util.Objects.requireNonNull;
39  
40  /**
41   * Redis implementation of {@link com.atlassian.vcache.StableReadExternalCache}.
42   *
43   * @param <V> the value type
44   * @since 1.0.0
45   */
46  class RedisStableReadExternalCache<V>
47          extends AbstractStableReadExternalCache<V> {
48      private static final Logger log = LoggerFactory.getLogger(RedisStableReadExternalCache.class);
49  
50      private final Supplier<Jedis> clientSupplier;
51      private final Supplier<RequestContext> contextSupplier;
52      private final ExternalCacheKeyGenerator keyGenerator;
53      private final MarshallingPair<V> valueMarshalling;
54      private final int defaultTtl;
55  
56      RedisStableReadExternalCache(
57              Supplier<Jedis> clientSupplier,
58              Supplier<RequestContext> contextSupplier,
59              ExternalCacheKeyGenerator keyGenerator,
60              String name,
61              MarshallingPair<V> valueMarshalling,
62              ExternalCacheSettings settings,
63              MetricsRecorder metricsRecorder) {
64          super(name, metricsRecorder);
65          this.clientSupplier = requireNonNull(clientSupplier);
66          this.contextSupplier = requireNonNull(contextSupplier);
67          this.keyGenerator = requireNonNull(keyGenerator);
68          this.valueMarshalling = requireNonNull(valueMarshalling);
69          this.defaultTtl = VCacheCoreUtils.roundUpToSeconds(settings.getDefaultTtl().get());
70      }
71  
72      @Override
73      public CompletionStage<Boolean> internalPut(String internalKey, V value, PutPolicy policy) {
74          final VersionedExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
75          return perform(
76                  () -> {
77                      // This stores the result of the remote operation. If it fails then we can roll back. We use Atomic because in the
78                      // future we may be async.
79                      final AtomicBoolean remotePutResult = new AtomicBoolean(true);
80                      cacheContext.computeValue(internalKey, (key, oldValue) -> {
81  
82                          // computeValue allows us to get the current CompletableFuture (if there is one) and chain the next
83                          // put() onto the old future. If the old future is still running then we preserve
84                          // ordering (and so consistency). Also any blocked gets will get the new value when the chain
85                          // completes. The map locks the update to the future so updates are nicely serialized.
86                          if (oldValue != null) {
87                              return oldValue.thenApply((oldV) -> {
88                                  remotePutResult.set(remotePut(cacheContext.externalEntryKeyFor(key), value, policy));
89                                  return Optional.of(value);
90                              });
91                          } else {
92                              remotePutResult.set(remotePut(cacheContext.externalEntryKeyFor(key), value, policy));
93                              return CompletableFuture.completedFuture(Optional.of(value));
94                          }
95                      });
96                      return remotePutResult.get();
97                  },
98                  (result) -> {
99                      if (!result) {
100                         cacheContext.forgetValue(internalKey);
101                     }
102                 });
103     }
104 
105     private boolean remotePut(String externalKey, V value, PutPolicy policy) {
106         final byte[] valueBytes = valueMarshalling.getMarshaller().marshallToBytes(value);
107         return RedisUtils.putOperationForPolicy(
108                 policy, clientSupplier, externalKey, defaultTtl, valueBytes);
109     }
110 
111     @Override
112     protected CompletionStage<Void> internalRemove(Iterable<String> internalKeys) {
113         // There is no bulk delete in the api, so need to remove each one async
114         return perform(() -> {
115             if (isEmpty(internalKeys)) {
116                 return null;
117             }
118 
119             // Lodge all the requests for delete
120             try (Jedis client = clientSupplier.get()) {
121                 final VersionedExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
122                 final List<byte[]> externalKeysList = StreamSupport.stream(internalKeys.spliterator(), false)
123                         .map(cacheContext::externalEntryKeyFor)
124                         .map(String::getBytes)
125                         .collect(Collectors.toList());
126                 final byte[][] externalKeysAsBytes = externalKeysList.toArray(new byte[externalKeysList.size()][]);
127 
128                 final long numDeleted = client.del(externalKeysAsBytes);
129                 if (numDeleted != externalKeysAsBytes.length) {
130                     log.info("Cache {}: only able to delete {} of {} keys", name, numDeleted, externalKeysAsBytes.length);
131                 }
132 
133                 // Forget the values removed
134                 StreamSupport.stream(internalKeys.spliterator(), false)
135                         .forEach(k -> cacheContext.recordValue(k, Optional.empty()));
136             }
137 
138             return null;
139         });
140     }
141 
142     @Override
143     protected CompletionStage<Void> internalRemoveAll() {
144         return perform(() -> {
145             final VersionedExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
146             cacheContext.updateCacheVersion(
147                     RedisUtils.incrementCacheVersion(clientSupplier, cacheContext.externalCacheVersionKey()));
148             cacheContext.forgetAllValues();
149             return null;
150         });
151     }
152 
153     @Override
154     protected Logger getLogger() {
155         return log;
156     }
157 
158     @Override
159     protected VersionedExternalCacheRequestContext<V> ensureCacheContext() {
160         final RequestContext requestContext = contextSupplier.get();
161 
162         return requestContext.computeIfAbsent(this, () -> {
163             // Need to build a new context, which involves getting the current cache version, or setting it if it does
164             // not exist.
165             log.trace("Cache {}: Setting up a new context", name);
166             final VersionedExternalCacheRequestContext<V> newCacheContext = new VersionedExternalCacheRequestContext<>(
167                     keyGenerator, name, requestContext::partitionIdentifier);
168             newCacheContext.updateCacheVersion(
169                     RedisUtils.obtainCacheVersion(
170                             clientSupplier,
171                             newCacheContext.externalCacheVersionKey(),
172                             defaultTtl + 1));
173             return newCacheContext;
174         });
175     }
176 
177     @Override
178     protected V handleCreation(String internalKey, Supplier<V> supplier)
179             throws ExecutionException, InterruptedException {
180         final VersionedExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
181         final V candidateValue = requireNonNull(supplier.get());
182         final byte[] candidateValueBytes = valueMarshalling.getMarshaller().marshallToBytes(candidateValue);
183         final String externalKey = cacheContext.externalEntryKeyFor(internalKey);
184 
185         // Loop until either able to add the candidate value, or retrieve one that has been added by another thread
186         try (Jedis client = clientSupplier.get()) {
187             for (; ; ) {
188                 final long addOp = client.setnx(externalKey.getBytes(), candidateValueBytes);
189                 if (addOp == 1) {
190                     // I break here, rather than just return, due to battling with the compiler. Unless written
191                     // this way, the compiler will not allow the lambda structure.
192                     break;
193                 }
194 
195                 getLogger().info("Cache {}, unable to add candidate for key {}, retrieve what was added", name, internalKey);
196                 final Optional<V> otherAddedValue = unmarshall(client.get(externalKey.getBytes()), valueMarshalling);
197                 if (otherAddedValue.isPresent()) {
198                     return otherAddedValue.get();
199                 }
200 
201                 getLogger().info("Cache {}, unable to retrieve recently added candidate for key {}, looping", name, internalKey);
202             }
203         }
204         return candidateValue;
205     }
206 
207     @Override
208     protected Map<String, V> handleCreation(Function<Set<String>, Map<String, V>> factory, Set<String> externalKeys)
209             throws ExecutionException, InterruptedException {
210         // Get the known values from the external cache.
211         final VersionedExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
212         metricsRecorder.record(name, CacheType.EXTERNAL, MetricLabel.NUMBER_OF_REMOTE_GET, 1);
213         final Map<String, Optional<V>> candidateValues =
214                 RedisUtils.directGetBulk(externalKeys, clientSupplier, valueMarshalling);
215 
216         // Add the known values to the grand result
217         final Map<String, V> grandResult = candidateValues.entrySet().stream()
218                 .filter(e -> e.getValue().isPresent())
219                 .collect(Collectors.toMap(
220                         e -> cacheContext.internalEntryKeyFor(e.getKey()),
221                         e -> e.getValue().get()));
222         getLogger().trace("Cache {}: getBulk(Function): {} of {} entries have values",
223                 name, grandResult.size(), externalKeys.size());
224 
225         // Calculate the missing keys
226         final List<String> missingExternalKeys = candidateValues.entrySet().stream()
227                 .filter(e -> !e.getValue().isPresent())
228                 .map(Map.Entry::getKey)
229                 .collect(Collectors.toList());
230 
231         if (!missingExternalKeys.isEmpty()) {
232             getLogger().trace("Cache {}: getBulk(Function): calling factory to create {} values",
233                     name, missingExternalKeys.size());
234 
235             // Okay, need to get the missing values and mapping from externalKeys to internalKeys
236             final Set<String> missingInternalKeys = Collections.unmodifiableSet(
237                     missingExternalKeys.stream().map(cacheContext::internalEntryKeyFor).collect(Collectors.toSet()));
238             final Map<String, V> missingValues = factory.apply(missingInternalKeys);
239             if (missingInternalKeys.size() != missingValues.size()) {
240                 getLogger().warn("Cache {}: getBulk(Function): mismatch on generated values, expected ",
241                         name, missingInternalKeys.size() + " but got " + missingValues.size());
242                 throw new ExternalCacheException(ExternalCacheException.Reason.FUNCTION_INCORRECT_RESULT);
243             }
244 
245             // Okay, got the missing values, now need to add them to Redis
246             try (Jedis client = clientSupplier.get()) {
247                 final Pipeline pipeline = client.pipelined();
248                 final Map<String, Response<String>> internalKeyToResponseMap =
249                         missingValues.entrySet().stream().collect(Collectors.toMap(
250                                 Map.Entry::getKey,
251                                 e -> pipeline.setex(
252                                         cacheContext.externalEntryKeyFor(e.getKey()).getBytes(),
253                                         defaultTtl,
254                                         marshall(e.getValue(), valueMarshalling))
255                         ));
256                 pipeline.sync();
257 
258                 internalKeyToResponseMap.entrySet().stream()
259                         .filter(e -> !RedisUtils.OK.equals(e.getValue().get()))
260                         .forEach(e -> log.warn("Cache {}: Unable to set key {}", name, e.getKey()));
261             }
262 
263             grandResult.putAll(missingValues);
264         }
265 
266         return grandResult;
267     }
268 
269     @Override
270     public CompletionStage<Map<String, Optional<V>>> getBulk(String... keys) {
271         return super.getBulk(keys);
272     }
273 
274     @Override
275     public CompletionStage<Map<String, V>> getBulk(Function<Set<String>, Map<String, V>> factory, String... keys) {
276         return super.getBulk(factory, keys);
277     }
278 
279     @Override
280     protected final ExternalCacheException mapException(Exception ex) {
281         return RedisUtils.mapException(ex);
282     }
283 
284     @Override
285     protected final Optional<V> directGet(String externalKey) {
286         try (Jedis client = clientSupplier.get()) {
287             return unmarshall(client.get(externalKey.getBytes()), valueMarshalling);
288         }
289     }
290 
291     @Override
292     protected final Map<String, Optional<V>> directGetBulk(Set<String> externalKeys) {
293         return RedisUtils.directGetBulk(externalKeys, clientSupplier, valueMarshalling);
294     }
295 }