View Javadoc

1   package com.atlassian.vcache.internal.core.service;
2   
3   import com.atlassian.marshalling.api.MarshallingException;
4   import com.atlassian.vcache.ExternalCacheException;
5   import com.atlassian.vcache.VCacheUtils;
6   import com.atlassian.vcache.internal.MetricLabel;
7   import com.atlassian.vcache.internal.core.metrics.CacheType;
8   import com.atlassian.vcache.internal.core.metrics.MetricsRecorder;
9   
10  import java.util.HashMap;
11  import java.util.Map;
12  import java.util.Optional;
13  import java.util.Set;
14  import java.util.concurrent.CompletableFuture;
15  import java.util.concurrent.CompletionStage;
16  import java.util.concurrent.ExecutionException;
17  import java.util.function.Function;
18  import java.util.function.Supplier;
19  import java.util.stream.Collectors;
20  import java.util.stream.StreamSupport;
21  
22  import static com.atlassian.vcache.ExternalCacheException.Reason.MARSHALLER_FAILURE;
23  import static com.atlassian.vcache.ExternalCacheException.Reason.UNCLASSIFIED_FAILURE;
24  import static com.atlassian.vcache.internal.core.VCacheCoreUtils.failed;
25  import static com.atlassian.vcache.internal.core.VCacheCoreUtils.isEmpty;
26  import static java.util.Objects.requireNonNull;
27  
28  /**
29   * Provides operations common to both {@link com.atlassian.vcache.StableReadExternalCache} and
30   * {@link com.atlassian.vcache.TransactionalExternalCache} instances.
31   *
32   * @param <V> the value type
33   * @since 1.0.0
34   */
35  public abstract class AbstractNonDirectExternalCache<V>
36          extends AbstractExternalCache<V> {
37  
38      protected final MetricsRecorder metricsRecorder;
39  
40      protected AbstractNonDirectExternalCache(String name, MetricsRecorder metricsRecorder) {
41          super(name);
42          this.metricsRecorder = requireNonNull(metricsRecorder);
43      }
44  
45      /**
46       * Handles the creation of an entry, if required.
47       *
48       * @param internalKey the internal key for the entry.
49       * @param supplier    called to create the value, if required
50       * @return the value associated with the key.
51       */
52      protected abstract V handleCreation(String internalKey, Supplier<V> supplier)
53              throws ExecutionException, InterruptedException;
54  
55      /**
56       * Handles the creation of a number of entries
57       *
58       * @param factory      called to create the required values.
59       * @param externalKeys the external keys for the entries.
60       * @return the values associated with the supplied keys (mapped on internal key)
61       */
62      protected abstract Map<String, V> handleCreation(
63              Function<Set<String>, Map<String, V>> factory, Set<String> externalKeys)
64              throws ExecutionException, InterruptedException;
65  
66      /**
67       * Checks for values recorded for specified internal keys.
68       */
69      protected abstract Map<String, Optional<V>> checkValuesRecorded(Iterable<String> internalKeys);
70  
71      /**
72       * Performs a direct get operation against the external cache using the supplied external key.
73       */
74      protected abstract Optional<V> directGet(String externalKey);
75  
76      /**
77       * Performs a direct bulk get operation against the external cache using the supplied external keys.
78       */
79      protected abstract Map<String, Optional<V>> directGetBulk(Set<String> externalKeys);
80  
81      @Override
82      public CompletionStage<Optional<V>> get(String internalKey) {
83          final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
84  
85          final String externalKey = cacheContext.externalEntryKeyFor(internalKey);
86  
87          final CompletionStage<Optional<V>> value = cacheContext.computeValue(internalKey,
88                  (key, oldValue) -> {
89  
90                      if (oldValue != null && !oldValue.toCompletableFuture().isCompletedExceptionally()) {
91                          return oldValue;
92                      }
93                      // If a transactional cache has fired removeAll, then we always consider the remote to be empty.
94                      // This is only true fo transactional caches.
95                      if (cacheContext.hasRemoveAll()) {
96                          return CompletableFuture.completedFuture(Optional.empty());
97                      }
98  
99                      // We go to the remote cache. If get get an exception, then we return an Optional.empty but put
100                     // the exceptional value into the map so that we will try again.
101                     return CompletableFuture.<Optional<V>>completedFuture(Optional.empty())
102                             .thenApply((ignore) -> {
103                                 metricsRecorder.record(name, CacheType.EXTERNAL, MetricLabel.NUMBER_OF_REMOTE_GET, 1);
104                                 return directGet(externalKey);
105                             });
106                 });
107 
108         // Handles the case where the direct failed - we return empty but leave the exceptional future in the mapso we
109         // can retry next time (if there is one).
110         return CompletableFuture.completedFuture(VCacheUtils.fold(value, v -> v, e -> Optional.empty()));
111     }
112 
113     @Override
114     public CompletionStage<V> get(String internalKey, Supplier<V> supplier) {
115         final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
116 
117         return cacheContext.computeValue(internalKey,
118                 (key, oldValue) -> {
119                     if (oldValue != null && !oldValue.toCompletableFuture().isCompletedExceptionally()) {
120                         return oldValue.thenCompose(value -> {
121                             if (value.isPresent()) {
122                                 return CompletableFuture.completedFuture(value);
123                             } else {
124                                 return getNewValue(internalKey, supplier);
125                             }
126                         });
127                     } else {
128                         getLogger().trace("Cache {}, creating candidate for key {}", name, internalKey);
129 
130                         final String externalKey = cacheContext.externalEntryKeyFor(internalKey);
131                         metricsRecorder.record(name, CacheType.EXTERNAL, MetricLabel.NUMBER_OF_REMOTE_GET, 1);
132 
133                         // If a transactional cache has fired removeAll, then we always consider the remote to be empty.
134                         // This is only true fo transactional caches.
135                         if (!cacheContext.hasRemoveAll()) {
136                             final Optional<V> result = directGet(externalKey);
137                             if (result.isPresent()) {
138                                 // A valid value exists in the external cache
139                                 return CompletableFuture.completedFuture(result);
140                             }
141                         }
142                         getLogger().trace("Cache {}, creating candidate for key {}", name, internalKey);
143                         return getNewValue(internalKey, supplier);
144                     }
145                 })
146                 // Strip the Optional that is needed for the cacheContext here without completing the future.
147                 .thenCompose(value -> CompletableFuture.completedFuture(value.get()));
148     }
149 
150     @Override
151     public CompletionStage<Map<String, Optional<V>>> getBulk(Iterable<String> internalKeys) {
152         return perform(() -> {
153             if (isEmpty(internalKeys)) {
154                 return new HashMap<>();
155             }
156 
157             // Get the recorded values first
158             final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
159             final Map<String, Optional<V>> grandResult = checkValuesRecorded(internalKeys);
160 
161             // Calculate the externalKeys for the entries that are missing
162             final Set<String> missingExternalKeys = StreamSupport.stream(internalKeys.spliterator(), false)
163                     .filter(k -> !grandResult.containsKey(k))
164                     .map(cacheContext::externalEntryKeyFor)
165                     .collect(Collectors.toSet());
166 
167             if (missingExternalKeys.isEmpty()) {
168                 getLogger().trace("Cache {}: getBulk(): have all the requested entries cached", name);
169                 return grandResult;
170             }
171             getLogger().trace("Cache {}: getBulk(): not cached {} requested entries", name, missingExternalKeys.size());
172 
173             // Get the missing values.
174             metricsRecorder.record(name, CacheType.EXTERNAL, MetricLabel.NUMBER_OF_REMOTE_GET, 1);
175             final Map<String, Optional<V>> candidateValues = directGetBulk(missingExternalKeys);
176 
177             return candidateValues.entrySet().stream().collect(
178                     () -> grandResult,
179                     (m, e) -> {
180                         final Optional<V> result = e.getValue();
181                         cacheContext.recordValue(cacheContext.internalEntryKeyFor(e.getKey()), result);
182                         m.put(cacheContext.internalEntryKeyFor(e.getKey()), result);
183                     },
184                     Map::putAll
185             );
186         });
187     }
188 
189     @Override
190     public CompletionStage<Map<String, V>> getBulk(
191             Function<Set<String>, Map<String, V>> factory, Iterable<String> internalKeys) {
192         return perform(() -> {
193             if (isEmpty(internalKeys)) {
194                 return new HashMap<>();
195             }
196 
197             // Get the recorded values first and collect the ones that have values.
198             final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
199             final Map<String, V> grandResult = checkValuesRecorded(internalKeys).entrySet().stream()
200                     .filter(e -> e.getValue().isPresent())
201                     .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().get()));
202 
203             // Calculate the candidate externalKeys for the entries that are missing
204             final Set<String> candidateMissingExternalKeys = StreamSupport.stream(internalKeys.spliterator(), false)
205                     .filter(k -> !grandResult.containsKey(k))
206                     .map(cacheContext::externalEntryKeyFor)
207                     .collect(Collectors.toSet());
208 
209             // Bail out if we have all the entries requested
210             if (candidateMissingExternalKeys.isEmpty()) {
211                 getLogger().trace("Cache {}: getBulk(Function): had all the requested entries cached", name);
212                 return grandResult;
213             }
214             getLogger().trace("Cache {}: getBulk(Function): checking external cache for {} keys",
215                     name, candidateMissingExternalKeys.size());
216 
217             final Map<String, V> missingValues = handleCreation(factory, candidateMissingExternalKeys);
218             cacheContext.recordValues(missingValues);
219             grandResult.putAll(missingValues);
220 
221             return grandResult;
222         });
223     }
224 
225     private CompletionStage<Optional<V>> getNewValue(String internalKey, Supplier<V> supplier) {
226         try {
227             return CompletableFuture.completedFuture(Optional.of(handleCreation(internalKey, supplier)));
228         } catch (MarshallingException ex) {
229             return failed(new CompletableFuture<Optional<V>>(), new ExternalCacheException(MARSHALLER_FAILURE, ex));
230         } catch (ExecutionException | InterruptedException ex) {
231             return failed(new CompletableFuture<Optional<V>>(), new ExternalCacheException(UNCLASSIFIED_FAILURE, ex));
232         } catch (ExternalCacheException ece) {
233             return failed(new CompletableFuture<Optional<V>>(), ece);
234         } catch (Exception ex) {
235             return failed(new CompletableFuture<Optional<V>>(), mapException(ex));
236         }
237     }
238 }