View Javadoc

1   package com.atlassian.vcache.internal.core.service;
2   
3   import com.atlassian.vcache.PutPolicy;
4   import com.atlassian.vcache.TransactionalExternalCache;
5   import com.atlassian.vcache.internal.ExternalCacheExceptionListener;
6   import com.atlassian.vcache.internal.MetricLabel;
7   import com.atlassian.vcache.internal.RequestContext;
8   import com.atlassian.vcache.internal.core.TransactionControl;
9   import com.atlassian.vcache.internal.core.metrics.CacheType;
10  import com.atlassian.vcache.internal.core.metrics.MetricsRecorder;
11  
12  import java.time.Duration;
13  import java.util.Collections;
14  import java.util.HashMap;
15  import java.util.Map;
16  import java.util.Optional;
17  import java.util.Set;
18  import java.util.concurrent.CompletionStage;
19  import java.util.concurrent.ExecutionException;
20  import java.util.function.Function;
21  import java.util.function.Supplier;
22  import java.util.stream.Collectors;
23  import java.util.stream.StreamSupport;
24  
25  import static com.atlassian.vcache.internal.core.VCacheCoreUtils.isEmpty;
26  import static java.util.Objects.requireNonNull;
27  
28  /**
29   * Provides operations common for {@link com.atlassian.vcache.TransactionalExternalCache} instances.
30   *
31   * @param <V> the value type
32   * @since 1.0.0
33   */
34  public abstract class AbstractTransactionalExternalCache<V>
35          extends AbstractExternalCache<V>
36          implements TransactionalExternalCache<V>, TransactionControl {
37  
38      protected final Supplier<RequestContext> contextSupplier;
39      protected final MetricsRecorder metricsRecorder;
40  
41      protected AbstractTransactionalExternalCache(
42              String name,
43              Supplier<RequestContext> contextSupplier,
44              MetricsRecorder metricsRecorder,
45              Duration lockTimeout,
46              ExternalCacheExceptionListener externalCacheExceptionListener) {
47          super(name, lockTimeout, externalCacheExceptionListener);
48          this.contextSupplier = requireNonNull(contextSupplier);
49          this.metricsRecorder = requireNonNull(metricsRecorder);
50      }
51  
52      /**
53       * Performs a direct get operation against the external cache using the supplied external key.
54       */
55      protected abstract Optional<V> directGet(String externalKey);
56  
57      /**
58       * Performs a direct bulk get operation against the external cache using the supplied external keys.
59       */
60      protected abstract Map<String, Optional<V>> directGetBulk(Set<String> externalKeys);
61  
62      @Override
63      public final CompletionStage<Optional<V>> get(String internalKey) {
64          return perform(() -> {
65              final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
66  
67              // Check if we have recorded a value already
68              final Optional<Optional<V>> recordedValue = cacheContext.getValueRecorded(internalKey);
69  
70              return recordedValue.orElseGet(() -> {
71                  // Check if a removeAll() has happened
72                  if (cacheContext.hasRemoveAll()) {
73                      return Optional.empty();
74                  }
75  
76                  // Now check externally
77                  final String externalKey = cacheContext.externalEntryKeyFor(internalKey);
78                  metricsRecorder.record(name, CacheType.EXTERNAL, MetricLabel.NUMBER_OF_REMOTE_GET, 1);
79                  final Optional<V> externalValue = directGet(externalKey);
80                  cacheContext.recordValue(internalKey, externalValue);
81  
82                  return externalValue;
83              });
84          });
85      }
86  
87      @Override
88      public final CompletionStage<V> get(String internalKey, Supplier<V> supplier) {
89          return perform(() -> {
90              final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
91  
92              final String externalKey = cacheContext.externalEntryKeyFor(internalKey);
93  
94              // Check if we have recorded a value already
95              final Optional<Optional<V>> recordedValue = cacheContext.getValueRecorded(internalKey);
96              if (recordedValue.isPresent()) {
97                  if (recordedValue.get().isPresent()) {
98                      return recordedValue.get().get();
99                  }
100                 // There was a remove, so need to re-create
101                 getLogger().trace("Cache {}, creating candidate for key {}", name, internalKey);
102                 return handleCreation(internalKey, supplier);
103             }
104 
105             // If a transactional cache has fired removeAll, then we always consider the remote to be empty.
106             if (!cacheContext.hasRemoveAll()) {
107                 metricsRecorder.record(name, CacheType.EXTERNAL, MetricLabel.NUMBER_OF_REMOTE_GET, 1);
108                 final Optional<V> result = directGet(externalKey);
109                 if (result.isPresent()) {
110                     // A valid value exists in the external cache
111                     cacheContext.recordValue(internalKey, result);
112                     return result.get();
113                 }
114             }
115             getLogger().trace("Cache {}, creating candidate for key {}", name, internalKey);
116             return handleCreation(internalKey, supplier);
117         });
118     }
119 
120     @Override
121     public final CompletionStage<Map<String, Optional<V>>> getBulk(Iterable<String> internalKeys) {
122         return perform(() -> {
123             if (isEmpty(internalKeys)) {
124                 return new HashMap<>();
125             }
126 
127             // Get the recorded values first
128             final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
129             final Map<String, Optional<V>> grandResult = checkValuesRecorded(internalKeys);
130 
131             // Calculate the externalKeys for the entries that are missing
132             final Set<String> missingExternalKeys = StreamSupport.stream(internalKeys.spliterator(), false)
133                     .filter(k -> !grandResult.containsKey(k))
134                     .map(cacheContext::externalEntryKeyFor)
135                     .collect(Collectors.toSet());
136 
137             if (missingExternalKeys.isEmpty()) {
138                 getLogger().trace("Cache {}: getBulk(): have all the requested entries cached", name);
139                 return grandResult;
140             }
141             getLogger().trace("Cache {}: getBulk(): not cached {} requested entries", name, missingExternalKeys.size());
142 
143             // Get the missing values.
144             metricsRecorder.record(name, CacheType.EXTERNAL, MetricLabel.NUMBER_OF_REMOTE_GET, 1);
145             final Map<String, Optional<V>> candidateValues = directGetBulk(missingExternalKeys);
146 
147             return candidateValues.entrySet().stream().collect(
148                     () -> grandResult,
149                     (m, e) -> {
150                         final Optional<V> result = e.getValue();
151                         cacheContext.recordValue(cacheContext.internalEntryKeyFor(e.getKey()), result);
152                         m.put(cacheContext.internalEntryKeyFor(e.getKey()), result);
153                     },
154                     Map::putAll
155             );
156         });
157     }
158 
159     @Override
160     public final CompletionStage<Map<String, V>> getBulk(
161             Function<Set<String>, Map<String, V>> factory, Iterable<String> internalKeys) {
162         return perform(() -> {
163             if (isEmpty(internalKeys)) {
164                 return new HashMap<>();
165             }
166 
167             // Get the recorded values first and collect the ones that have values.
168             final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
169             //noinspection OptionalGetWithoutIsPresent
170             final Map<String, V> grandResult = checkValuesRecorded(internalKeys).entrySet().stream()
171                     .filter(e -> e.getValue().isPresent())
172                     .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().get()));
173 
174             // Calculate the candidate externalKeys for the entries that are missing
175             final Set<String> candidateMissingExternalKeys = StreamSupport.stream(internalKeys.spliterator(), false)
176                     .filter(k -> !grandResult.containsKey(k))
177                     .map(cacheContext::externalEntryKeyFor)
178                     .collect(Collectors.toSet());
179 
180             // Bail out if we have all the entries requested
181             if (candidateMissingExternalKeys.isEmpty()) {
182                 getLogger().trace("Cache {}: getBulk(Function): had all the requested entries cached", name);
183                 return grandResult;
184             }
185             getLogger().trace("Cache {}: getBulk(Function): checking external cache for {} keys",
186                     name, candidateMissingExternalKeys.size());
187 
188             final Map<String, V> missingValues = handleCreation(factory, candidateMissingExternalKeys);
189             cacheContext.recordValues(missingValues);
190             grandResult.putAll(missingValues);
191 
192             return grandResult;
193         });
194     }
195 
196     @Override
197     public final void put(String internalKey, V value, PutPolicy policy) {
198         final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
199         cacheContext.recordPut(internalKey, value, policy);
200     }
201 
202     @Override
203     public final void remove(Iterable<String> internalKeys) {
204         final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
205         cacheContext.recordRemove(internalKeys);
206     }
207 
208     @Override
209     public final void removeAll() {
210         final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
211         cacheContext.recordRemoveAll();
212     }
213 
214     @Override
215     public final boolean transactionDiscard() {
216         final RequestContext requestContext = contextSupplier.get();
217         final Optional<AbstractExternalCacheRequestContext<V>> cacheRequestContext = requestContext.get(this);
218 
219         if (!cacheRequestContext.isPresent()) {
220             // there are no pending operations
221             return false;
222         }
223 
224         final boolean hasPendingOperations = cacheRequestContext.get().hasPendingOperations();
225         cacheRequestContext.get().forgetAll();
226         return hasPendingOperations;
227     }
228 
229     private Map<String, Optional<V>> checkValuesRecorded(Iterable<String> internalKeys) {
230         final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
231 
232         final Map<String, Optional<V>> result = new HashMap<>();
233 
234         internalKeys.forEach(k -> {
235             final Optional<Optional<V>> valueRecorded = cacheContext.getValueRecorded(k);
236             if (valueRecorded.isPresent()) {
237                 result.put(k, valueRecorded.get());
238             } else if (cacheContext.hasRemoveAll()) {
239                 result.put(k, Optional.empty());
240             }
241         });
242 
243         return result;
244     }
245 
246     private V handleCreation(String internalKey, Supplier<V> supplier) throws ExecutionException, InterruptedException {
247         final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
248         final V suppliedValue = requireNonNull(supplier.get());
249         cacheContext.recordPutPolicy(internalKey, suppliedValue, PutPolicy.ADD_ONLY);
250         cacheContext.recordValue(internalKey, Optional.of(suppliedValue));
251         return suppliedValue;
252     }
253 
254     private Map<String, V> handleCreation(Function<Set<String>, Map<String, V>> factory, Set<String> externalKeys)
255             throws ExecutionException, InterruptedException {
256         // Get the missing values from the external cache.
257         final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
258 
259         // Need to handle if removeAll has been performed OR a remove has been done, and hence not check remotely.
260         // Otherwise, need to check remotely.
261         final Map<String, V> grandResult = new HashMap<>();
262         final Set<String> missingExternalKeys = fillInKnownValuesFromBackingCache(cacheContext, externalKeys, grandResult);
263 
264         if (!missingExternalKeys.isEmpty()) {
265             getLogger().trace("Cache {}: getBulk(Function): calling factory to create {} values",
266                     name, missingExternalKeys.size());
267             // Okay, need to get the missing values and mapping from externalKeys to internalKeys
268             final Set<String> missingInternalKeys = Collections.unmodifiableSet(
269                     missingExternalKeys.stream().map(cacheContext::internalEntryKeyFor).collect(Collectors.toSet()));
270             final Map<String, V> missingValues = factory.apply(missingInternalKeys);
271             FactoryUtils.verifyFactoryResult(missingValues, missingInternalKeys);
272 
273             // Okay, got the missing values, now need to record adding them
274             missingValues.entrySet().forEach(e -> put(e.getKey(), e.getValue(), PutPolicy.ADD_ONLY));
275 
276             grandResult.putAll(missingValues);
277         }
278 
279         return grandResult;
280     }
281 
282     private Set<String> fillInKnownValuesFromBackingCache(
283             AbstractExternalCacheRequestContext<V> cacheContext, Set<String> externalKeys, Map<String, V> grandResult) {
284         final Set<String> missingExternalKeys;
285 
286         if (cacheContext.hasRemoveAll()) {
287             missingExternalKeys = externalKeys;
288         } else {
289             // Initial list of missing keys are the keys that have a recorded remove operation against them.
290             missingExternalKeys = externalKeys.stream()
291                     .filter(k -> {
292                         final Optional<Optional<V>> valueRecorded =
293                                 cacheContext.getValueRecorded(cacheContext.internalEntryKeyFor(k));
294                         // If a value is recorded, it has to be a remove, otherwise not be passed to this method.
295                         return valueRecorded.isPresent();
296                     })
297                     .collect(Collectors.toSet());
298 
299             // Calculate list of keys we need to check for, as they may exist remotely
300             final Set<String> externalKeysNotRemoved = externalKeys.stream()
301                     .filter(k -> !missingExternalKeys.contains(k))
302                     .collect(Collectors.toSet());
303 
304             if (!externalKeysNotRemoved.isEmpty()) {
305                 metricsRecorder.record(name, CacheType.EXTERNAL, MetricLabel.NUMBER_OF_REMOTE_GET, 1);
306                 final Map<String, Optional<V>> candidateValues = directGetBulk(externalKeysNotRemoved);
307 
308                 candidateValues.entrySet().forEach(e -> {
309                     if (e.getValue().isPresent()) {
310                         grandResult.put(
311                                 cacheContext.internalEntryKeyFor(e.getKey()),
312                                 e.getValue().get());
313                     } else {
314                         missingExternalKeys.add(e.getKey());
315                     }
316                 });
317             }
318         }
319 
320         return missingExternalKeys;
321     }
322 }