View Javadoc

1   package com.atlassian.vcache.internal.redis;
2   
3   import com.atlassian.marshalling.api.MarshallingPair;
4   import com.atlassian.vcache.ExternalCacheException;
5   import com.atlassian.vcache.ExternalCacheSettings;
6   import com.atlassian.vcache.internal.RequestContext;
7   import com.atlassian.vcache.internal.core.ExternalCacheKeyGenerator;
8   import com.atlassian.vcache.internal.core.TransactionControlManager;
9   import com.atlassian.vcache.internal.core.VCacheCoreUtils;
10  import com.atlassian.vcache.internal.core.metrics.MetricsRecorder;
11  import com.atlassian.vcache.internal.core.service.AbstractTransactionalExternalCache;
12  import com.atlassian.vcache.internal.core.service.VersionedExternalCacheRequestContext;
13  import org.slf4j.Logger;
14  import org.slf4j.LoggerFactory;
15  import redis.clients.jedis.Jedis;
16  import redis.clients.jedis.Pipeline;
17  import redis.clients.jedis.Response;
18  
19  import java.time.Duration;
20  import java.util.List;
21  import java.util.Map;
22  import java.util.Optional;
23  import java.util.Set;
24  import java.util.function.Supplier;
25  import java.util.stream.Collectors;
26  
27  import static com.atlassian.vcache.internal.core.VCacheCoreUtils.marshall;
28  import static com.atlassian.vcache.internal.core.VCacheCoreUtils.unmarshall;
29  import static java.util.Objects.requireNonNull;
30  
31  /**
32   * Redis implementation of {@link com.atlassian.vcache.TransactionalExternalCache}.
33   *
34   * @param <V> the value type
35   * @since 1.0.0
36   */
37  class RedisTransactionalExternalCache<V>
38          extends AbstractTransactionalExternalCache<V> {
39      private static final Logger log = LoggerFactory.getLogger(RedisTransactionalExternalCache.class);
40  
41      private final Supplier<Jedis> clientSupplier;
42      private final ExternalCacheKeyGenerator keyGenerator;
43      private final MarshallingPair<V> valueMarshalling;
44      private final int defaultTtl;
45      private final TransactionControlManager transactionControlManager;
46  
47      RedisTransactionalExternalCache(
48              Supplier<Jedis> clientSupplier,
49              Supplier<RequestContext> contextSupplier,
50              ExternalCacheKeyGenerator keyGenerator,
51              String name,
52              MarshallingPair<V> valueMarshalling,
53              ExternalCacheSettings settings,
54              TransactionControlManager transactionControlManager,
55              MetricsRecorder metricsRecorder,
56              Duration lockTimeout) {
57          super(name, contextSupplier, metricsRecorder, lockTimeout);
58          this.clientSupplier = requireNonNull(clientSupplier);
59          this.keyGenerator = requireNonNull(keyGenerator);
60          this.valueMarshalling = requireNonNull(valueMarshalling);
61          this.defaultTtl = VCacheCoreUtils.roundUpToSeconds(settings.getDefaultTtl().get());
62          this.transactionControlManager = requireNonNull(transactionControlManager);
63      }
64  
65      @Override
66      public void transactionSync() {
67          log.trace("Cache {}: synchronising operations", name);
68          final VersionedExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
69  
70          if (cacheContext.hasRemoveAll()) {
71              cacheContext.updateCacheVersion(
72                      RedisUtils.incrementCacheVersion(clientSupplier, cacheContext.externalCacheVersionKey()));
73          }
74  
75          performKeyedOperations(cacheContext);
76          cacheContext.forgetAll();
77      }
78  
79      private void performKeyedOperations(VersionedExternalCacheRequestContext<V> cacheContext) {
80          // The approach used to perform the operations is:
81          // - schedule all the operations
82          // - check the result of all the operations and log a warning is necessary
83          // - if any operation failed, wipe the cache
84  
85          // Right then, lets submit all the operations.
86          final List<Response<Long>> deleteResponses;
87          final List<Response<String>> putResponses;
88          try (Jedis client = clientSupplier.get()) {
89              final Pipeline pipeline = client.pipelined();
90  
91              deleteResponses = cacheContext.getKeyedOperations().stream()
92                      .filter(e -> e.getValue().isRemove())
93                      .map(e -> pipeline.del(cacheContext.externalEntryKeyFor(e.getKey()).getBytes()))
94                      .collect(Collectors.toList());
95  
96              putResponses = cacheContext.getKeyedOperations().stream()
97                      .filter(e -> e.getValue().isPut())
98                      .map(e ->
99                              RedisUtils.pipelinePutOperationForPolicy(
100                                     pipeline,
101                                     cacheContext.externalEntryKeyFor(e.getKey()),
102                                     e.getValue().getPolicy(),
103                                     marshall(e.getValue().getValue(), valueMarshalling),
104                                     defaultTtl))
105                     .collect(Collectors.toList());
106 
107             pipeline.sync();
108         }
109 
110         // Check if unable to perform the removes (deletes)
111         final long numFailedRemoves = deleteResponses.stream()
112                 .filter(r -> r.get() == 0)
113                 .count();
114         if (numFailedRemoves > 0) {
115             log.debug("Cache {}: Failed to perform {} of {} transactional removes, clearing the cache.",
116                     name, numFailedRemoves, deleteResponses.size());
117             cacheContext.updateCacheVersion(
118                     RedisUtils.incrementCacheVersion(clientSupplier, cacheContext.externalCacheVersionKey()));
119         } else {
120             // Check for failed puts, which is terrible
121             final long numFailedPuts = putResponses.stream()
122                     .filter(r -> !RedisUtils.OK.equals(r.get()))
123                     .count();
124             if (numFailedPuts > 0) {
125                 log.debug("Cache {}: Failed to perform {} of {} transactional puts, clearing the cache.",
126                         name, numFailedPuts, putResponses.size());
127                 cacheContext.updateCacheVersion(
128                         RedisUtils.incrementCacheVersion(clientSupplier, cacheContext.externalCacheVersionKey()));
129             }
130         }
131     }
132 
133     @Override
134     protected VersionedExternalCacheRequestContext<V> ensureCacheContext() {
135         final RequestContext requestContext = contextSupplier.get();
136 
137         transactionControlManager.registerTransactionalExternalCache(requestContext, name, this);
138 
139         return requestContext.computeIfAbsent(this, () -> {
140             // Need to build a new context, which involves getting the current cache version, or setting it if it does
141             // not exist.
142             log.trace("Cache {}: Setting up a new context", name);
143             final VersionedExternalCacheRequestContext<V> newCacheContext = new VersionedExternalCacheRequestContext<>(
144                     keyGenerator, name, requestContext::partitionIdentifier, lockTimeout);
145             newCacheContext.updateCacheVersion(
146                     RedisUtils.obtainCacheVersion(clientSupplier, newCacheContext.externalCacheVersionKey(),
147                             defaultTtl + 1));
148             return newCacheContext;
149         });
150     }
151 
152     @Override
153     protected Logger getLogger() {
154         return log;
155     }
156 
157     @Override
158     protected final ExternalCacheException mapException(Exception ex) {
159         return RedisUtils.mapException(ex);
160     }
161 
162     @Override
163     protected final Optional<V> directGet(String externalKey) {
164         try (Jedis client = clientSupplier.get()) {
165             return unmarshall(client.get(externalKey.getBytes()), valueMarshalling);
166         }
167     }
168 
169     @Override
170     protected final Map<String, Optional<V>> directGetBulk(Set<String> externalKeys) {
171         return RedisUtils.directGetBulk(externalKeys, clientSupplier, valueMarshalling);
172     }
173 }