View Javadoc

1   package com.atlassian.vcache.internal.guava;
2   
3   import com.atlassian.marshalling.api.MarshallingPair;
4   import com.atlassian.vcache.ExternalCacheException;
5   import com.atlassian.vcache.PutPolicy;
6   import com.atlassian.vcache.internal.MetricLabel;
7   import com.atlassian.vcache.internal.RequestContext;
8   import com.atlassian.vcache.internal.core.ExternalCacheKeyGenerator;
9   import com.atlassian.vcache.internal.core.cas.IdentifiedData;
10  import com.atlassian.vcache.internal.core.metrics.CacheType;
11  import com.atlassian.vcache.internal.core.metrics.MetricsRecorder;
12  import com.atlassian.vcache.internal.core.service.AbstractExternalCacheRequestContext;
13  import com.atlassian.vcache.internal.core.service.AbstractStableReadExternalCache;
14  import com.atlassian.vcache.internal.core.service.UnversionedExternalCacheRequestContext;
15  import com.google.common.cache.Cache;
16  import org.slf4j.Logger;
17  import org.slf4j.LoggerFactory;
18  
19  import java.util.Collections;
20  import java.util.Map;
21  import java.util.Optional;
22  import java.util.Set;
23  import java.util.concurrent.CompletableFuture;
24  import java.util.concurrent.CompletionStage;
25  import java.util.concurrent.ExecutionException;
26  import java.util.concurrent.atomic.AtomicBoolean;
27  import java.util.function.Function;
28  import java.util.function.Supplier;
29  import java.util.stream.Collectors;
30  
31  import static com.atlassian.vcache.internal.core.cas.IdentifiedUtils.marshall;
32  import static com.atlassian.vcache.internal.core.cas.IdentifiedUtils.unmarshall;
33  import static java.util.Objects.requireNonNull;
34  
35  /**
36   * Guava implementation of {@link com.atlassian.vcache.StableReadExternalCache}.
37   *
38   * @param <V> the value type
39   * @since 1.0.0
40   */
41  public class GuavaStableReadExternalCache<V>
42          extends AbstractStableReadExternalCache<V> {
43      private static final Logger log = LoggerFactory.getLogger(GuavaStableReadExternalCache.class);
44  
45      private final Cache<String, IdentifiedData> delegate;
46      private final Supplier<RequestContext> contextSupplier;
47      private final ExternalCacheKeyGenerator keyGenerator;
48      private final Optional<MarshallingPair<V>> valueMarshalling;
49  
50      public GuavaStableReadExternalCache(
51              String name,
52              Cache<String, IdentifiedData> delegate,
53              Supplier<RequestContext> contextSupplier,
54              ExternalCacheKeyGenerator keyGenerator,
55              Optional<MarshallingPair<V>> valueMarshalling,
56              MetricsRecorder metricsRecorder) {
57          super(name, metricsRecorder);
58          this.contextSupplier = requireNonNull(contextSupplier);
59          this.keyGenerator = requireNonNull(keyGenerator);
60          this.valueMarshalling = requireNonNull(valueMarshalling);
61          this.delegate = requireNonNull(delegate);
62      }
63  
64      @Override
65      protected CompletionStage<Boolean> internalPut(String internalKey, V value, PutPolicy policy) {
66          final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
67          return perform(
68                  () -> {
69                      // This stores the result of the remote operation. If it fails then we can roll back. We use Atomic because in the
70                      // future we may be async.
71                      final AtomicBoolean remotePutResult = new AtomicBoolean(true);
72  
73                      // computeValue allows us to get the current CompletableFuture (if there is one) and chain the next
74                      // put() onto the old future. If the old future is still running then we preserve
75                      // ordering (and so consistency). Also any blocked gets will get the new value when the chain
76                      // completes. The map locks the update to the future so updates are nicely serialized.
77                      cacheContext.computeValue(internalKey, (k, oldValue) -> {
78                          if (oldValue != null) {
79                              return oldValue.thenApply(oldV -> {
80                                  remotePutResult.set(remotePut(cacheContext.externalEntryKeyFor(internalKey), value, policy));
81                                  return Optional.of(value);
82                              });
83                          } else {
84                              remotePutResult.set(remotePut(cacheContext.externalEntryKeyFor(internalKey), value, policy));
85                              return CompletableFuture.completedFuture(Optional.of(value));
86                          }
87                      });
88                      return remotePutResult.get();
89                  },
90                  (result) -> {
91                      if (!result) {
92                          cacheContext.forgetValue(internalKey);
93                      }
94                  });
95      }
96  
97      @Override
98      protected CompletionStage<Void> internalRemove(Iterable<String> internalKeys) {
99          return perform(() -> {
100             // There is no bulk delete in the api, so need to remove each one
101             final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
102             for (String key : internalKeys) {
103                 delegate.asMap().remove(cacheContext.externalEntryKeyFor(key));
104                 cacheContext.forgetValue(key);
105             }
106             return null;
107         });
108     }
109 
110     @Override
111     protected CompletionStage<Void> internalRemoveAll() {
112         return perform(() -> {
113             final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
114             delegate.asMap().clear();
115             cacheContext.forgetAllValues();
116             return null;
117         });
118     }
119 
120     @Override
121     protected Logger getLogger() {
122         return log;
123     }
124 
125     @Override
126     protected AbstractExternalCacheRequestContext<V> ensureCacheContext() {
127         final RequestContext requestContext = contextSupplier.get();
128 
129         return requestContext.computeIfAbsent(this, () -> {
130             log.trace("Cache {}: Setting up a new context", getName());
131             return new UnversionedExternalCacheRequestContext<>(
132                     keyGenerator, getName(), requestContext::partitionIdentifier);
133         });
134     }
135 
136     @Override
137     protected V handleCreation(String internalKey, Supplier<V> supplier)
138             throws ExecutionException, InterruptedException {
139         final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
140         final V candidateValue = requireNonNull(supplier.get());
141         final IdentifiedData candidateIdentifiedData = marshall(candidateValue, valueMarshalling);
142 
143         final String externalKey = cacheContext.externalEntryKeyFor(internalKey);
144         final Optional<V> otherAddedValue =
145                 unmarshall(delegate.asMap().putIfAbsent(externalKey, candidateIdentifiedData), valueMarshalling);
146 
147         if (otherAddedValue.isPresent()) {
148             getLogger().info("Cache {}, unable to add candidate for key {}, use what was added", name, internalKey);
149             return otherAddedValue.get();
150         }
151 
152         return candidateValue;
153     }
154 
155     @Override
156     protected Map<String, V> handleCreation(Function<Set<String>, Map<String, V>> factory, Set<String> externalKeys)
157             throws ExecutionException, InterruptedException {
158         // Get the missing values from the external cache.
159         // Returns map of keys that contain values, so need to handle the missing ones
160         final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
161         metricsRecorder.record(name, CacheType.EXTERNAL, MetricLabel.NUMBER_OF_REMOTE_GET, 1);
162 
163         final Map<String, Optional<V>> candidateValues = directGetBulk(externalKeys);
164 
165         final Set<String> missingExternalKeys = candidateValues.entrySet().stream()
166                 .filter(e -> !e.getValue().isPresent())
167                 .map(Map.Entry::getKey)
168                 .collect(Collectors.toSet());
169 
170         // Add the retrieved values to the grand result
171         final Map<String, V> grandResult = candidateValues.entrySet().stream()
172                 .filter(e -> e.getValue().isPresent())
173                 .collect(Collectors.toMap(
174                         e -> cacheContext.internalEntryKeyFor(e.getKey()),
175                         e -> e.getValue().get()));
176 
177         if (!missingExternalKeys.isEmpty()) {
178             getLogger().trace("Cache {}: getBulk(Function): calling factory to create {} values",
179                     name, missingExternalKeys.size());
180             // Okay, need to get the missing values and mapping from externalKeys to internalKeys
181             final Set<String> missingInternalKeys = Collections.unmodifiableSet(
182                     missingExternalKeys.stream().map(cacheContext::internalEntryKeyFor).collect(Collectors.toSet()));
183             final Map<String, V> missingValues = factory.apply(missingInternalKeys);
184             if (missingInternalKeys.size() != missingValues.size()) {
185                 getLogger().warn("Cache {}: getBulk(Function): mismatch on generated values, expected ",
186                         name, missingInternalKeys.size() + " but got " + missingValues.size());
187                 throw new ExternalCacheException(ExternalCacheException.Reason.FUNCTION_INCORRECT_RESULT);
188             }
189 
190             // Okay, got the missing values, now need to add them
191             missingValues.entrySet().stream().forEach(e ->
192                     delegate.put(
193                             cacheContext.externalEntryKeyFor(e.getKey()),
194                             marshall(e.getValue(), valueMarshalling)));
195 
196             grandResult.putAll(missingValues);
197         }
198         return grandResult;
199     }
200 
201     @Override
202     protected final ExternalCacheException mapException(Exception ex) {
203         return GuavaUtils.mapException(ex);
204     }
205 
206     @Override
207     protected final Optional<V> directGet(String externalKey) {
208         return unmarshall(delegate.getIfPresent(externalKey), valueMarshalling);
209     }
210 
211     @Override
212     protected final Map<String, Optional<V>> directGetBulk(Set<String> externalKeys) {
213         return GuavaUtils.directGetBulk(externalKeys, delegate, valueMarshalling);
214     }
215 
216     private boolean remotePut(String externalKey, V value, PutPolicy policy) {
217         final IdentifiedData identifiedData = marshall(value, valueMarshalling);
218         return GuavaUtils.directPut(externalKey, identifiedData, policy, delegate);
219     }
220 }