View Javadoc

1   package com.atlassian.vcache.internal.redis;
2   
3   import com.atlassian.marshalling.api.MarshallingPair;
4   import com.atlassian.vcache.ExternalCacheException;
5   import com.atlassian.vcache.ExternalCacheSettings;
6   import com.atlassian.vcache.internal.RequestContext;
7   import com.atlassian.vcache.internal.core.ExternalCacheKeyGenerator;
8   import com.atlassian.vcache.internal.core.TransactionControlManager;
9   import com.atlassian.vcache.internal.core.VCacheCoreUtils;
10  import com.atlassian.vcache.internal.core.metrics.MetricsRecorder;
11  import com.atlassian.vcache.internal.core.service.AbstractTransactionalExternalCache;
12  import com.atlassian.vcache.internal.core.service.VersionedExternalCacheRequestContext;
13  import org.slf4j.Logger;
14  import org.slf4j.LoggerFactory;
15  import redis.clients.jedis.Jedis;
16  import redis.clients.jedis.Pipeline;
17  import redis.clients.jedis.Response;
18  
19  import java.util.List;
20  import java.util.Map;
21  import java.util.Optional;
22  import java.util.Set;
23  import java.util.function.Supplier;
24  import java.util.stream.Collectors;
25  
26  import static com.atlassian.vcache.internal.core.VCacheCoreUtils.marshall;
27  import static com.atlassian.vcache.internal.core.VCacheCoreUtils.unmarshall;
28  import static java.util.Objects.requireNonNull;
29  
30  /**
31   * Redis implementation of {@link com.atlassian.vcache.TransactionalExternalCache}.
32   *
33   * @param <V> the value type
34   * @since 1.0.0
35   */
36  class RedisTransactionalExternalCache<V>
37          extends AbstractTransactionalExternalCache<V> {
38      private static final Logger log = LoggerFactory.getLogger(RedisTransactionalExternalCache.class);
39  
40      private final Supplier<Jedis> clientSupplier;
41      private final ExternalCacheKeyGenerator keyGenerator;
42      private final MarshallingPair<V> valueMarshalling;
43      private final int defaultTtl;
44      private final TransactionControlManager transactionControlManager;
45  
46      RedisTransactionalExternalCache(
47              Supplier<Jedis> clientSupplier,
48              Supplier<RequestContext> contextSupplier,
49              ExternalCacheKeyGenerator keyGenerator,
50              String name,
51              MarshallingPair<V> valueMarshalling,
52              ExternalCacheSettings settings,
53              TransactionControlManager transactionControlManager,
54              MetricsRecorder metricsRecorder) {
55          super(name, contextSupplier, metricsRecorder);
56          this.clientSupplier = requireNonNull(clientSupplier);
57          this.keyGenerator = requireNonNull(keyGenerator);
58          this.valueMarshalling = requireNonNull(valueMarshalling);
59          this.defaultTtl = VCacheCoreUtils.roundUpToSeconds(settings.getDefaultTtl().get());
60          this.transactionControlManager = requireNonNull(transactionControlManager);
61      }
62  
63      @Override
64      public void transactionSync() {
65          log.trace("Cache {}: synchronising operations", name);
66          final VersionedExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
67  
68          if (cacheContext.hasRemoveAll()) {
69              cacheContext.updateCacheVersion(
70                      RedisUtils.incrementCacheVersion(clientSupplier, cacheContext.externalCacheVersionKey()));
71          }
72  
73          performKeyedOperations(cacheContext);
74          cacheContext.forgetAll();
75      }
76  
77      private void performKeyedOperations(VersionedExternalCacheRequestContext<V> cacheContext) {
78          // The approach used to perform the operations is:
79          // - schedule all the operations
80          // - check the result of all the operations and log a warning is necessary
81          // - if any operation failed, wipe the cache
82  
83          // Right then, lets submit all the operations.
84          final List<Response<Long>> deleteResponses;
85          final List<Response<String>> putResponses;
86          try (Jedis client = clientSupplier.get()) {
87              final Pipeline pipeline = client.pipelined();
88  
89              deleteResponses = cacheContext.getKeyedOperations().stream()
90                      .filter(e -> e.getValue().isRemove())
91                      .map(e -> pipeline.del(cacheContext.externalEntryKeyFor(e.getKey()).getBytes()))
92                      .collect(Collectors.toList());
93  
94              putResponses = cacheContext.getKeyedOperations().stream()
95                      .filter(e -> e.getValue().isPut())
96                      .map(e ->
97                              RedisUtils.pipelinePutOperationForPolicy(
98                                      pipeline,
99                                      cacheContext.externalEntryKeyFor(e.getKey()),
100                                     e.getValue().getPolicy(),
101                                     marshall(e.getValue().getValue(), valueMarshalling),
102                                     defaultTtl))
103                     .collect(Collectors.toList());
104 
105             pipeline.sync();
106         }
107 
108         // Check if unable to perform the removes (deletes)
109         final long numFailedRemoves = deleteResponses.stream()
110                 .filter(r -> r.get() == 0)
111                 .count();
112         if (numFailedRemoves > 0) {
113             log.debug("Cache {}: Failed to perform {} of {} transactional removes, clearing the cache.",
114                     name, numFailedRemoves, deleteResponses.size());
115             cacheContext.updateCacheVersion(
116                     RedisUtils.incrementCacheVersion(clientSupplier, cacheContext.externalCacheVersionKey()));
117         } else {
118             // Check for failed puts, which is terrible
119             final long numFailedPuts = putResponses.stream()
120                     .filter(r -> !RedisUtils.OK.equals(r.get()))
121                     .count();
122             if (numFailedPuts > 0) {
123                 log.debug("Cache {}: Failed to perform {} of {} transactional puts, clearing the cache.",
124                         name, numFailedPuts, putResponses.size());
125                 cacheContext.updateCacheVersion(
126                         RedisUtils.incrementCacheVersion(clientSupplier, cacheContext.externalCacheVersionKey()));
127             }
128         }
129     }
130 
131     @Override
132     protected VersionedExternalCacheRequestContext<V> ensureCacheContext() {
133         final RequestContext requestContext = contextSupplier.get();
134 
135         transactionControlManager.registerTransactionalExternalCache(requestContext, name, this);
136 
137         return requestContext.computeIfAbsent(this, () -> {
138             // Need to build a new context, which involves getting the current cache version, or setting it if it does
139             // not exist.
140             log.trace("Cache {}: Setting up a new context", name);
141             final VersionedExternalCacheRequestContext<V> newCacheContext = new VersionedExternalCacheRequestContext<>(
142                     keyGenerator, name, requestContext::partitionIdentifier);
143             newCacheContext.updateCacheVersion(
144                     RedisUtils.obtainCacheVersion(clientSupplier, newCacheContext.externalCacheVersionKey(),
145                             defaultTtl + 1));
146             return newCacheContext;
147         });
148     }
149 
150     @Override
151     protected Logger getLogger() {
152         return log;
153     }
154 
155     @Override
156     protected final ExternalCacheException mapException(Exception ex) {
157         return RedisUtils.mapException(ex);
158     }
159 
160     @Override
161     protected final Optional<V> directGet(String externalKey) {
162         try (Jedis client = clientSupplier.get()) {
163             return unmarshall(client.get(externalKey.getBytes()), valueMarshalling);
164         }
165     }
166 
167     @Override
168     protected final Map<String, Optional<V>> directGetBulk(Set<String> externalKeys) {
169         return RedisUtils.directGetBulk(externalKeys, clientSupplier, valueMarshalling);
170     }
171 }