View Javadoc

1   package com.atlassian.vcache.internal.legacy;
2   
3   import com.atlassian.cache.Cache;
4   import com.atlassian.marshalling.api.MarshallingPair;
5   import com.atlassian.vcache.ExternalCacheException;
6   import com.atlassian.vcache.PutPolicy;
7   import com.atlassian.vcache.internal.MetricLabel;
8   import com.atlassian.vcache.internal.RequestContext;
9   import com.atlassian.vcache.internal.core.ExternalCacheKeyGenerator;
10  import com.atlassian.vcache.internal.core.cas.IdentifiedData;
11  import com.atlassian.vcache.internal.core.metrics.CacheType;
12  import com.atlassian.vcache.internal.core.metrics.MetricsRecorder;
13  import com.atlassian.vcache.internal.core.service.AbstractExternalCacheRequestContext;
14  import com.atlassian.vcache.internal.core.service.AbstractStableReadExternalCache;
15  import com.atlassian.vcache.internal.core.service.UnversionedExternalCacheRequestContext;
16  import org.slf4j.Logger;
17  import org.slf4j.LoggerFactory;
18  
19  import java.util.Collections;
20  import java.util.Map;
21  import java.util.Optional;
22  import java.util.Set;
23  import java.util.concurrent.CompletableFuture;
24  import java.util.concurrent.CompletionStage;
25  import java.util.concurrent.ExecutionException;
26  import java.util.concurrent.atomic.AtomicBoolean;
27  import java.util.function.Function;
28  import java.util.function.Supplier;
29  import java.util.stream.Collectors;
30  
31  import static com.atlassian.vcache.internal.core.cas.IdentifiedUtils.marshall;
32  import static com.atlassian.vcache.internal.core.cas.IdentifiedUtils.unmarshall;
33  import static java.util.Objects.requireNonNull;
34  
35  /**
36   * Implementation that backs onto Atlassian Cache.
37   *
38   * @param <V> the value type
39   * @since 1.0.0
40   */
41  class LegacyStableReadExternalCache<V>
42          extends AbstractStableReadExternalCache<V> {
43      private static final Logger log = LoggerFactory.getLogger(LegacyStableReadExternalCache.class);
44  
45      private final Cache<String, IdentifiedData> delegate;
46      private final Supplier<RequestContext> contextSupplier;
47      private final ExternalCacheKeyGenerator keyGenerator;
48      private final Optional<MarshallingPair<V>> valueMarshalling;
49      private final LegacyServiceSettings serviceSettings;
50  
51      LegacyStableReadExternalCache(
52              Cache<String, IdentifiedData> delegate,
53              Supplier<RequestContext> contextSupplier,
54              ExternalCacheKeyGenerator keyGenerator,
55              Optional<MarshallingPair<V>> valueMarshalling,
56              LegacyServiceSettings serviceSettings,
57              MetricsRecorder metricsRecorder) {
58          super(delegate.getName(), metricsRecorder);
59          this.delegate = requireNonNull(delegate);
60          this.contextSupplier = requireNonNull(contextSupplier);
61          this.keyGenerator = requireNonNull(keyGenerator);
62          this.valueMarshalling = requireNonNull(valueMarshalling);
63          this.serviceSettings = requireNonNull(serviceSettings);
64      }
65  
66      @Override
67      public CompletionStage<Boolean> internalPut(String internalKey, V value, PutPolicy policy) {
68          final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
69          return perform(
70                  () -> {
71                      // This stores the result of the remote operation. If it fails then we can roll back. We use Atomic because in the
72                      // future we may be async.
73                      final AtomicBoolean remotePutResult = new AtomicBoolean(true);
74  
75                      // computeValue allows us to get the current CompletableFuture (if there is one) and chain the next
76                      // put() onto the old future. If the old future is still running then we preserve
77                      // ordering (and so consistency). Also any blocked gets will get the new value when the chain
78                      // completes. The map locks the update to the future so updates are nicely serialized.
79                      cacheContext.computeValue(internalKey, (key, oldValue) -> {
80                          if (oldValue != null) {
81                              return oldValue.thenApply(oldV -> {
82                                  remotePutResult.set(remotePut(cacheContext.externalEntryKeyFor(internalKey), value, policy));
83                                  return Optional.of(value);
84                              });
85                          } else {
86                              remotePutResult.set(remotePut(cacheContext.externalEntryKeyFor(internalKey), value, policy));
87                              return CompletableFuture.completedFuture(Optional.of(value));
88                          }
89                      });
90                      return remotePutResult.get();
91                  },
92                  (result) -> {
93                      if (!result) {
94                          cacheContext.forgetValue(internalKey);
95                      }
96                  });
97      }
98  
99      private Boolean remotePut(String externalKey, V value, PutPolicy policy) {
100         final IdentifiedData identifiedData = marshall(value, valueMarshalling);
101         return LegacyUtils.directPut(externalKey, identifiedData, policy, delegate, serviceSettings.isAvoidCasOps());
102     }
103 
104     @Override
105     protected CompletionStage<Void> internalRemove(Iterable<String> internalKeys) {
106         return perform(() -> {
107             // There is no bulk delete in the api, so need to remove each one
108             final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
109             for (String key : internalKeys) {
110                 delegate.remove(cacheContext.externalEntryKeyFor(key));
111                 cacheContext.forgetValue(key);
112             }
113 
114             return null;
115         });
116     }
117 
118     @Override
119     protected CompletionStage<Void> internalRemoveAll() {
120         return perform(() -> {
121             final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
122             delegate.removeAll();
123             cacheContext.forgetAllValues();
124             return null;
125         });
126     }
127 
128     @Override
129     protected Logger getLogger() {
130         return log;
131     }
132 
133     @Override
134     protected AbstractExternalCacheRequestContext<V> ensureCacheContext() {
135         final RequestContext requestContext = contextSupplier.get();
136 
137         return requestContext.computeIfAbsent(this, () -> {
138             log.trace("Cache {}: Setting up a new context", delegate.getName());
139             return new UnversionedExternalCacheRequestContext<>(
140                     keyGenerator, delegate.getName(), requestContext::partitionIdentifier);
141         });
142     }
143 
144     @Override
145     protected V handleCreation(String internalKey, Supplier<V> supplier)
146             throws ExecutionException, InterruptedException {
147         final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
148         final V candidateValue = requireNonNull(supplier.get());
149         final IdentifiedData candidateIdentifiedData = marshall(candidateValue, valueMarshalling);
150         final String externalKey = cacheContext.externalEntryKeyFor(internalKey);
151 
152         if (serviceSettings.isAvoidCasOps()) {
153             delegate.put(externalKey, candidateIdentifiedData);
154         } else {
155             final Optional<V> otherAddedValue =
156                     unmarshall(delegate.putIfAbsent(externalKey, candidateIdentifiedData), valueMarshalling);
157 
158             if (otherAddedValue.isPresent()) {
159                 getLogger().info("Cache {}, unable to add candidate for key {}, use what was added", name, internalKey);
160                 return otherAddedValue.get();
161             }
162         }
163         return candidateValue;
164     }
165 
166     @Override
167     protected Map<String, V> handleCreation(Function<Set<String>, Map<String, V>> factory, Set<String> externalKeys)
168             throws ExecutionException, InterruptedException {
169         // Get the missing values from the external cache.
170         // Returns map of keys that contain values, so need to handle the missing ones
171         final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
172         metricsRecorder.record(name, CacheType.EXTERNAL, MetricLabel.NUMBER_OF_REMOTE_GET, 1);
173         final Map<String, Optional<V>> candidateValues = directGetBulk(externalKeys);
174 
175         final Set<String> missingExternalKeys = candidateValues.entrySet().stream()
176                 .filter(e -> !e.getValue().isPresent())
177                 .map(Map.Entry::getKey)
178                 .collect(Collectors.toSet());
179 
180         // Add the retrieved values to the grand result
181         final Map<String, V> grandResult = candidateValues.entrySet().stream()
182                 .filter(e -> e.getValue().isPresent())
183                 .collect(Collectors.toMap(
184                         e -> cacheContext.internalEntryKeyFor(e.getKey()),
185                         e -> e.getValue().get()));
186 
187         if (!missingExternalKeys.isEmpty()) {
188             getLogger().trace("Cache {}: getBulk(Function): calling factory to create {} values",
189                     name, missingExternalKeys.size());
190             // Okay, need to get the missing values and mapping from externalKeys to internalKeys
191             final Set<String> missingInternalKeys = Collections.unmodifiableSet(
192                     missingExternalKeys.stream().map(cacheContext::internalEntryKeyFor).collect(Collectors.toSet()));
193             final Map<String, V> missingValues = factory.apply(missingInternalKeys);
194             if (missingInternalKeys.size() != missingValues.size()) {
195                 getLogger().warn("Cache {}: getBulk(Function): mismatch on generated values, expected ",
196                         name, missingInternalKeys.size() + " but got " + missingValues.size());
197                 throw new ExternalCacheException(ExternalCacheException.Reason.FUNCTION_INCORRECT_RESULT);
198             }
199 
200             // Okay, got the missing values, now need to add them
201             missingValues.entrySet().stream().forEach(e ->
202                     delegate.put(
203                             cacheContext.externalEntryKeyFor(e.getKey()),
204                             marshall(e.getValue(), valueMarshalling)));
205 
206             grandResult.putAll(missingValues);
207         }
208 
209         return grandResult;
210     }
211 
212     @Override
213     protected final ExternalCacheException mapException(Exception ex) {
214         return LegacyUtils.mapException(ex);
215     }
216 
217     @Override
218     protected final Optional<V> directGet(String externalKey) {
219         return unmarshall(delegate.get(externalKey), valueMarshalling);
220     }
221 
222     @Override
223     protected final Map<String, Optional<V>> directGetBulk(Set<String> externalKeys) {
224         return LegacyUtils.directGetBulk(externalKeys, delegate, valueMarshalling);
225     }
226 }