View Javadoc

1   package com.atlassian.vcache.internal.redis;
2   
3   import com.atlassian.marshalling.api.MarshallingPair;
4   import com.atlassian.vcache.ExternalCacheException;
5   import com.atlassian.vcache.ExternalCacheSettings;
6   import com.atlassian.vcache.internal.RequestContext;
7   import com.atlassian.vcache.internal.core.ExternalCacheKeyGenerator;
8   import com.atlassian.vcache.internal.core.TransactionControlManager;
9   import com.atlassian.vcache.internal.core.VCacheCoreUtils;
10  import com.atlassian.vcache.internal.core.metrics.MetricsRecorder;
11  import com.atlassian.vcache.internal.core.service.AbstractTransactionalExternalCache;
12  import com.atlassian.vcache.internal.core.service.VersionedExternalCacheRequestContext;
13  import org.slf4j.Logger;
14  import org.slf4j.LoggerFactory;
15  import redis.clients.jedis.Jedis;
16  import redis.clients.jedis.Pipeline;
17  import redis.clients.jedis.Response;
18  
19  import java.time.Duration;
20  import java.util.List;
21  import java.util.Map;
22  import java.util.Optional;
23  import java.util.Set;
24  import java.util.function.Supplier;
25  import java.util.stream.Collectors;
26  
27  import static com.atlassian.vcache.ExternalCacheException.Reason.TRANSACTION_FAILURE;
28  import static com.atlassian.vcache.VCacheUtils.fold;
29  import static com.atlassian.vcache.internal.core.VCacheCoreUtils.marshall;
30  import static com.atlassian.vcache.internal.core.VCacheCoreUtils.unmarshall;
31  import static java.util.Objects.requireNonNull;
32  
33  /**
34   * Redis implementation of {@link com.atlassian.vcache.TransactionalExternalCache}.
35   *
36   * @param <V> the value type
37   * @since 1.0.0
38   */
39  class RedisTransactionalExternalCache<V>
40          extends AbstractTransactionalExternalCache<V> {
41      private static final Logger log = LoggerFactory.getLogger(RedisTransactionalExternalCache.class);
42  
43      private final Supplier<Jedis> clientSupplier;
44      private final ExternalCacheKeyGenerator keyGenerator;
45      private final MarshallingPair<V> valueMarshalling;
46      private final int defaultTtl;
47      private final TransactionControlManager transactionControlManager;
48  
49      RedisTransactionalExternalCache(
50              Supplier<Jedis> clientSupplier,
51              Supplier<RequestContext> contextSupplier,
52              ExternalCacheKeyGenerator keyGenerator,
53              String name,
54              MarshallingPair<V> valueMarshalling,
55              ExternalCacheSettings settings,
56              TransactionControlManager transactionControlManager,
57              MetricsRecorder metricsRecorder,
58              Duration lockTimeout) {
59          super(name, contextSupplier, metricsRecorder, lockTimeout, (n, ex) -> {});
60          this.clientSupplier = requireNonNull(clientSupplier);
61          this.keyGenerator = requireNonNull(keyGenerator);
62          this.valueMarshalling = requireNonNull(valueMarshalling);
63          this.defaultTtl = VCacheCoreUtils.roundUpToSeconds(settings.getDefaultTtl().get());
64          this.transactionControlManager = requireNonNull(transactionControlManager);
65      }
66  
67      @Override
68      public void transactionSync() {
69          log.trace("Cache {}: synchronising operations", name);
70          final VersionedExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
71  
72          fold(perform(() -> {
73                      if (cacheContext.hasRemoveAll()) {
74                          cacheContext.updateCacheVersion(RedisUtils.cacheVersionIncrementer(clientSupplier));
75                      }
76  
77                      performKeyedOperations(cacheContext);
78  
79                      return null;
80                  }),
81                  v -> null,
82                  err -> {
83                      log.warn("Cache {}: an operation failed during transaction sync ({}). Clearing cache to remove stale entries.", name, err.getMessage());
84                      try {
85                          cacheContext.updateCacheVersion(RedisUtils.cacheVersionIncrementer(clientSupplier));
86                      } catch (RuntimeException e) {
87                          log.error("Cache {}: failed to clear the cache: {}", name, e.getMessage());
88                      }
89  
90                      return null;
91                  });
92  
93          cacheContext.forgetAll();
94      }
95  
96      private void performKeyedOperations(VersionedExternalCacheRequestContext<V> cacheContext) {
97          // The approach used to perform the operations is:
98          // - schedule all the operations
99          // - check the result of all the operations and log a warning is necessary
100         // - if any operation failed, wipe the cache
101 
102         // Right then, lets submit all the operations.
103         final List<Response<Long>> deleteResponses;
104         final List<Response<String>> putResponses;
105         try (Jedis client = clientSupplier.get()) {
106             final Pipeline pipeline = client.pipelined();
107 
108             deleteResponses = cacheContext.getKeyedOperations().stream()
109                     .filter(e -> e.getValue().isRemove())
110                     .map(e -> pipeline.del(cacheContext.externalEntryKeyFor(e.getKey()).getBytes()))
111                     .collect(Collectors.toList());
112 
113             putResponses = cacheContext.getKeyedOperations().stream()
114                     .filter(e -> e.getValue().isPut())
115                     .map(e ->
116                             RedisUtils.pipelinePutOperationForPolicy(
117                                     pipeline,
118                                     cacheContext.externalEntryKeyFor(e.getKey()),
119                                     e.getValue().getPolicy(),
120                                     marshall(e.getValue().getValue(), valueMarshalling),
121                                     defaultTtl))
122                     .collect(Collectors.toList());
123 
124             pipeline.sync();
125         }
126 
127         // Check if unable to perform the removes (deletes)
128         final long numFailedRemoves = deleteResponses.stream()
129                 .filter(r -> r.get() == 0)
130                 .count();
131         if (numFailedRemoves > 0) {
132             log.warn("Cache {}: Failed to perform {} of {} transactional removes, clearing the cache.",
133                     name, numFailedRemoves, deleteResponses.size());
134             throw new ExternalCacheException(TRANSACTION_FAILURE);
135         } else {
136             // Check for failed puts, which is terrible
137             final long numFailedPuts = putResponses.stream()
138                     .filter(r -> !RedisUtils.OK.equals(r.get()))
139                     .count();
140             if (numFailedPuts > 0) {
141                 log.warn("Cache {}: Failed to perform {} of {} transactional puts, clearing the cache.",
142                         name, numFailedPuts, putResponses.size());
143                 throw new ExternalCacheException(TRANSACTION_FAILURE);
144             }
145         }
146     }
147 
148     @Override
149     protected VersionedExternalCacheRequestContext<V> ensureCacheContext() {
150         final RequestContext requestContext = contextSupplier.get();
151 
152         transactionControlManager.registerTransactionalExternalCache(requestContext, name, this);
153 
154         return requestContext.computeIfAbsent(this, () -> {
155             // Need to build a new context, which involves getting the current cache version, or setting it if it does
156             // not exist.
157             log.trace("Cache {}: Setting up a new context", name);
158             return new VersionedExternalCacheRequestContext<>(
159                     keyGenerator, name, requestContext::partitionIdentifier,
160                     RedisUtils.cacheVersionSupplier(clientSupplier, defaultTtl + 1),
161                     lockTimeout);
162         });
163     }
164 
165     @Override
166     protected Logger getLogger() {
167         return log;
168     }
169 
170     @Override
171     protected final ExternalCacheException mapException(Exception ex) {
172         return RedisUtils.mapException(ex);
173     }
174 
175     @Override
176     protected final Optional<V> directGet(String externalKey) {
177         try (Jedis client = clientSupplier.get()) {
178             return unmarshall(client.get(externalKey.getBytes()), valueMarshalling);
179         }
180     }
181 
182     @Override
183     protected final Map<String, Optional<V>> directGetBulk(Set<String> externalKeys) {
184         return RedisUtils.directGetBulk(externalKeys, clientSupplier, valueMarshalling);
185     }
186 }