View Javadoc

1   package com.atlassian.vcache.internal.redis;
2   
3   import com.atlassian.marshalling.api.MarshallingPair;
4   import com.atlassian.vcache.ExternalCacheException;
5   import com.atlassian.vcache.ExternalCacheSettings;
6   import com.atlassian.vcache.PutPolicy;
7   import com.atlassian.vcache.internal.MetricLabel;
8   import com.atlassian.vcache.internal.RequestContext;
9   import com.atlassian.vcache.internal.core.ExternalCacheKeyGenerator;
10  import com.atlassian.vcache.internal.core.VCacheCoreUtils;
11  import com.atlassian.vcache.internal.core.metrics.CacheType;
12  import com.atlassian.vcache.internal.core.metrics.MetricsRecorder;
13  import com.atlassian.vcache.internal.core.service.AbstractStableReadExternalCache;
14  import com.atlassian.vcache.internal.core.service.VersionedExternalCacheRequestContext;
15  import org.slf4j.Logger;
16  import org.slf4j.LoggerFactory;
17  import redis.clients.jedis.Jedis;
18  
19  import java.time.Duration;
20  import java.util.List;
21  import java.util.Map;
22  import java.util.Optional;
23  import java.util.Set;
24  import java.util.concurrent.CompletionStage;
25  import java.util.concurrent.ExecutionException;
26  import java.util.function.Function;
27  import java.util.function.Supplier;
28  import java.util.stream.Collectors;
29  import java.util.stream.StreamSupport;
30  
31  import static com.atlassian.vcache.internal.core.VCacheCoreUtils.isEmpty;
32  import static com.atlassian.vcache.internal.core.VCacheCoreUtils.unmarshall;
33  import static java.util.Objects.requireNonNull;
34  
35  /**
36   * Redis implementation of {@link com.atlassian.vcache.StableReadExternalCache}.
37   *
38   * @param <V> the value type
39   * @since 1.0.0
40   */
41  class RedisStableReadExternalCache<V>
42          extends AbstractStableReadExternalCache<V> {
43      private static final Logger log = LoggerFactory.getLogger(RedisStableReadExternalCache.class);
44  
45      private final Supplier<Jedis> clientSupplier;
46      private final Supplier<RequestContext> contextSupplier;
47      private final ExternalCacheKeyGenerator keyGenerator;
48      private final MarshallingPair<V> valueMarshalling;
49      private final int defaultTtl;
50  
51      RedisStableReadExternalCache(
52              Supplier<Jedis> clientSupplier,
53              Supplier<RequestContext> contextSupplier,
54              ExternalCacheKeyGenerator keyGenerator,
55              String name,
56              MarshallingPair<V> valueMarshalling,
57              ExternalCacheSettings settings,
58              MetricsRecorder metricsRecorder,
59              Duration lockTimeout) {
60          super(name, metricsRecorder, lockTimeout);
61          this.clientSupplier = requireNonNull(clientSupplier);
62          this.contextSupplier = requireNonNull(contextSupplier);
63          this.keyGenerator = requireNonNull(keyGenerator);
64          this.valueMarshalling = requireNonNull(valueMarshalling);
65          //noinspection OptionalGetWithoutIsPresent
66          this.defaultTtl = VCacheCoreUtils.roundUpToSeconds(settings.getDefaultTtl().get());
67      }
68  
69      @Override
70      public boolean internalPut(String internalKey, V value, PutPolicy policy) {
71          final String externalKey = ensureCacheContext().externalEntryKeyFor(internalKey);
72          final byte[] valueBytes = valueMarshalling.getMarshaller().marshallToBytes(value);
73          return RedisUtils.putOperationForPolicy(
74                  policy, clientSupplier, externalKey, defaultTtl, valueBytes);
75      }
76  
77      @Override
78      protected void internalRemove(Iterable<String> internalKeys) {
79          if (isEmpty(internalKeys)) {
80              return;
81          }
82  
83          // Lodge all the requests for delete
84          try (Jedis client = clientSupplier.get()) {
85              final VersionedExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
86              final List<byte[]> externalKeysList = StreamSupport.stream(internalKeys.spliterator(), false)
87                      .map(cacheContext::externalEntryKeyFor)
88                      .map(String::getBytes)
89                      .collect(Collectors.toList());
90              final byte[][] externalKeysAsBytes = externalKeysList.toArray(new byte[externalKeysList.size()][]);
91  
92              final long numDeleted = client.del(externalKeysAsBytes);
93              if (numDeleted != externalKeysAsBytes.length) {
94                  log.info("Cache {}: only able to delete {} of {} keys", name, numDeleted, externalKeysAsBytes.length);
95              }
96  
97              // Forget the values removed
98              StreamSupport.stream(internalKeys.spliterator(), false)
99                      .forEach(k -> cacheContext.recordValue(k, Optional.empty()));
100         }
101     }
102 
103     @Override
104     protected void internalRemoveAll() {
105         final VersionedExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
106         cacheContext.updateCacheVersion(
107                 RedisUtils.incrementCacheVersion(clientSupplier, cacheContext.externalCacheVersionKey()));
108     }
109 
110     @Override
111     protected Logger getLogger() {
112         return log;
113     }
114 
115     @Override
116     protected VersionedExternalCacheRequestContext<V> ensureCacheContext() {
117         final RequestContext requestContext = contextSupplier.get();
118 
119         return requestContext.computeIfAbsent(this, () -> {
120             // Need to build a new context, which involves getting the current cache version, or setting it if it does
121             // not exist.
122             log.trace("Cache {}: Setting up a new context", name);
123             final VersionedExternalCacheRequestContext<V> newCacheContext = new VersionedExternalCacheRequestContext<>(
124                     keyGenerator,
125                     name,
126                     requestContext::partitionIdentifier,
127                     lockTimeout);
128             newCacheContext.updateCacheVersion(
129                     RedisUtils.obtainCacheVersion(
130                             clientSupplier,
131                             newCacheContext.externalCacheVersionKey(),
132                             defaultTtl + 1));
133             return newCacheContext;
134         });
135     }
136 
137     @Override
138     protected V handleCreation(String internalKey, V candidateValue)
139             throws ExecutionException, InterruptedException {
140         final VersionedExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
141         final byte[] candidateValueBytes = valueMarshalling.getMarshaller().marshallToBytes(candidateValue);
142         final String externalKey = cacheContext.externalEntryKeyFor(internalKey);
143 
144         // Loop until either able to add the candidate value, or retrieve one that has been added by another thread
145         try (Jedis client = clientSupplier.get()) {
146             for (; ; ) {
147                 metricsRecorder.record(name, CacheType.EXTERNAL, MetricLabel.NUMBER_OF_REMOTE_GET, 1);
148                 final long addOp = client.setnx(externalKey.getBytes(), candidateValueBytes);
149                 if (addOp == 1) {
150                     // I break here, rather than just return, due to battling with the compiler. Unless written
151                     // this way, the compiler will not allow the lambda structure.
152                     break;
153                 }
154 
155                 getLogger().info("Cache {}, unable to add candidate for key {}, retrieve what was added", name, internalKey);
156                 metricsRecorder.record(name, CacheType.EXTERNAL, MetricLabel.NUMBER_OF_REMOTE_GET, 1);
157                 final Optional<V> otherAddedValue = unmarshall(client.get(externalKey.getBytes()), valueMarshalling);
158                 if (otherAddedValue.isPresent()) {
159                     return otherAddedValue.get();
160                 }
161 
162                 getLogger().info("Cache {}, unable to retrieve recently added candidate for key {}, looping", name, internalKey);
163             }
164         }
165         return candidateValue;
166     }
167 
168     @Override
169     public CompletionStage<Map<String, V>> getBulk(Function<Set<String>, Map<String, V>> factory, String... keys) {
170         return super.getBulk(factory, keys);
171     }
172 
173     @Override
174     protected final ExternalCacheException mapException(Exception ex) {
175         return RedisUtils.mapException(ex);
176     }
177 
178     @Override
179     protected final Optional<V> directGet(String externalKey) {
180         try (Jedis client = clientSupplier.get()) {
181             return unmarshall(client.get(externalKey.getBytes()), valueMarshalling);
182         }
183     }
184 
185     @Override
186     protected final Map<String, Optional<V>> directGetBulk(Set<String> externalKeys) {
187         return RedisUtils.directGetBulk(externalKeys, clientSupplier, valueMarshalling);
188     }
189 }