View Javadoc

1   package com.atlassian.vcache.internal.core.service;
2   
3   import com.atlassian.vcache.PutPolicy;
4   import com.atlassian.vcache.TransactionalExternalCache;
5   import com.atlassian.vcache.internal.MetricLabel;
6   import com.atlassian.vcache.internal.RequestContext;
7   import com.atlassian.vcache.internal.core.TransactionControl;
8   import com.atlassian.vcache.internal.core.metrics.CacheType;
9   import com.atlassian.vcache.internal.core.metrics.MetricsRecorder;
10  
11  import java.time.Duration;
12  import java.util.Collections;
13  import java.util.HashMap;
14  import java.util.Map;
15  import java.util.Optional;
16  import java.util.Set;
17  import java.util.concurrent.CompletionStage;
18  import java.util.concurrent.ExecutionException;
19  import java.util.function.Function;
20  import java.util.function.Supplier;
21  import java.util.stream.Collectors;
22  import java.util.stream.StreamSupport;
23  
24  import static com.atlassian.vcache.internal.core.VCacheCoreUtils.isEmpty;
25  import static java.util.Objects.requireNonNull;
26  
27  /**
28   * Provides operations common for {@link com.atlassian.vcache.TransactionalExternalCache} instances.
29   *
30   * @param <V> the value type
31   * @since 1.0.0
32   */
33  public abstract class AbstractTransactionalExternalCache<V>
34          extends AbstractExternalCache<V>
35          implements TransactionalExternalCache<V>, TransactionControl {
36  
37      protected final Supplier<RequestContext> contextSupplier;
38      protected final MetricsRecorder metricsRecorder;
39  
40      protected AbstractTransactionalExternalCache(
41              String name,
42              Supplier<RequestContext> contextSupplier,
43              MetricsRecorder metricsRecorder,
44              Duration lockTimeout) {
45          super(name, lockTimeout);
46          this.contextSupplier = requireNonNull(contextSupplier);
47          this.metricsRecorder = requireNonNull(metricsRecorder);
48      }
49  
50      /**
51       * Performs a direct get operation against the external cache using the supplied external key.
52       */
53      protected abstract Optional<V> directGet(String externalKey);
54  
55      /**
56       * Performs a direct bulk get operation against the external cache using the supplied external keys.
57       */
58      protected abstract Map<String, Optional<V>> directGetBulk(Set<String> externalKeys);
59  
60      @Override
61      public final CompletionStage<Optional<V>> get(String internalKey) {
62          final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
63          return perform(() -> {
64              // Check if we have recorded a value already
65              final Optional<Optional<V>> recordedValue = cacheContext.getValueRecorded(internalKey);
66  
67              return recordedValue.orElseGet(() -> {
68                  // Check if a removeAll() has happened
69                  if (cacheContext.hasRemoveAll()) {
70                      return Optional.empty();
71                  }
72  
73                  // Now check externally
74                  final String externalKey = cacheContext.externalEntryKeyFor(internalKey);
75                  metricsRecorder.record(name, CacheType.EXTERNAL, MetricLabel.NUMBER_OF_REMOTE_GET, 1);
76                  final Optional<V> externalValue = directGet(externalKey);
77                  cacheContext.recordValue(internalKey, externalValue);
78  
79                  return externalValue;
80              });
81          });
82      }
83  
84      @Override
85      public final CompletionStage<V> get(String internalKey, Supplier<V> supplier) {
86          final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
87  
88          return perform(() -> {
89              final String externalKey = cacheContext.externalEntryKeyFor(internalKey);
90              metricsRecorder.record(name, CacheType.EXTERNAL, MetricLabel.NUMBER_OF_REMOTE_GET, 1);
91  
92              // Check if we have recorded a value already
93              final Optional<Optional<V>> recordedValue = cacheContext.getValueRecorded(internalKey);
94              if (recordedValue.isPresent()) {
95                  if (recordedValue.get().isPresent()) {
96                      return recordedValue.get().get();
97                  }
98                  // There was a remove, so need to re-create
99                  getLogger().trace("Cache {}, creating candidate for key {}", name, internalKey);
100                 return handleCreation(internalKey, supplier);
101             }
102 
103             // If a transactional cache has fired removeAll, then we always consider the remote to be empty.
104             if (!cacheContext.hasRemoveAll()) {
105                 final Optional<V> result = directGet(externalKey);
106                 if (result.isPresent()) {
107                     // A valid value exists in the external cache
108                     return result.get();
109                 }
110             }
111             getLogger().trace("Cache {}, creating candidate for key {}", name, internalKey);
112             return handleCreation(internalKey, supplier);
113         });
114     }
115 
116     @Override
117     public final CompletionStage<Map<String, Optional<V>>> getBulk(Iterable<String> internalKeys) {
118         return perform(() -> {
119             if (isEmpty(internalKeys)) {
120                 return new HashMap<>();
121             }
122 
123             // Get the recorded values first
124             final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
125             final Map<String, Optional<V>> grandResult = checkValuesRecorded(internalKeys);
126 
127             // Calculate the externalKeys for the entries that are missing
128             final Set<String> missingExternalKeys = StreamSupport.stream(internalKeys.spliterator(), false)
129                     .filter(k -> !grandResult.containsKey(k))
130                     .map(cacheContext::externalEntryKeyFor)
131                     .collect(Collectors.toSet());
132 
133             if (missingExternalKeys.isEmpty()) {
134                 getLogger().trace("Cache {}: getBulk(): have all the requested entries cached", name);
135                 return grandResult;
136             }
137             getLogger().trace("Cache {}: getBulk(): not cached {} requested entries", name, missingExternalKeys.size());
138 
139             // Get the missing values.
140             metricsRecorder.record(name, CacheType.EXTERNAL, MetricLabel.NUMBER_OF_REMOTE_GET, 1);
141             final Map<String, Optional<V>> candidateValues = directGetBulk(missingExternalKeys);
142 
143             return candidateValues.entrySet().stream().collect(
144                     () -> grandResult,
145                     (m, e) -> {
146                         final Optional<V> result = e.getValue();
147                         cacheContext.recordValue(cacheContext.internalEntryKeyFor(e.getKey()), result);
148                         m.put(cacheContext.internalEntryKeyFor(e.getKey()), result);
149                     },
150                     Map::putAll
151             );
152         });
153     }
154 
155     @Override
156     public final CompletionStage<Map<String, V>> getBulk(
157             Function<Set<String>, Map<String, V>> factory, Iterable<String> internalKeys) {
158         return perform(() -> {
159             if (isEmpty(internalKeys)) {
160                 return new HashMap<>();
161             }
162 
163             // Get the recorded values first and collect the ones that have values.
164             final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
165             //noinspection OptionalGetWithoutIsPresent
166             final Map<String, V> grandResult = checkValuesRecorded(internalKeys).entrySet().stream()
167                     .filter(e -> e.getValue().isPresent())
168                     .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().get()));
169 
170             // Calculate the candidate externalKeys for the entries that are missing
171             final Set<String> candidateMissingExternalKeys = StreamSupport.stream(internalKeys.spliterator(), false)
172                     .filter(k -> !grandResult.containsKey(k))
173                     .map(cacheContext::externalEntryKeyFor)
174                     .collect(Collectors.toSet());
175 
176             // Bail out if we have all the entries requested
177             if (candidateMissingExternalKeys.isEmpty()) {
178                 getLogger().trace("Cache {}: getBulk(Function): had all the requested entries cached", name);
179                 return grandResult;
180             }
181             getLogger().trace("Cache {}: getBulk(Function): checking external cache for {} keys",
182                     name, candidateMissingExternalKeys.size());
183 
184             final Map<String, V> missingValues = handleCreation(factory, candidateMissingExternalKeys);
185             cacheContext.recordValues(missingValues);
186             grandResult.putAll(missingValues);
187 
188             return grandResult;
189         });
190     }
191 
192     @Override
193     public final void put(String internalKey, V value, PutPolicy policy) {
194         final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
195         cacheContext.recordPut(internalKey, value, policy);
196     }
197 
198     @Override
199     public final void remove(Iterable<String> internalKeys) {
200         final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
201         cacheContext.recordRemove(internalKeys);
202     }
203 
204     @Override
205     public final void removeAll() {
206         final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
207         cacheContext.recordRemoveAll();
208     }
209 
210     @Override
211     public final boolean transactionDiscard() {
212         final RequestContext requestContext = contextSupplier.get();
213         final Optional<AbstractExternalCacheRequestContext<V>> cacheRequestContext = requestContext.get(this);
214 
215         if (!cacheRequestContext.isPresent()) {
216             // there are no pending operations
217             return false;
218         }
219 
220         final boolean hasPendingOperations = cacheRequestContext.get().hasPendingOperations();
221         cacheRequestContext.get().forgetAll();
222         return hasPendingOperations;
223     }
224 
225     private Map<String, Optional<V>> checkValuesRecorded(Iterable<String> internalKeys) {
226         final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
227 
228         final Map<String, Optional<V>> result = new HashMap<>();
229 
230         internalKeys.forEach(k -> {
231             final Optional<Optional<V>> valueRecorded = cacheContext.getValueRecorded(k);
232             if (valueRecorded.isPresent()) {
233                 result.put(k, valueRecorded.get());
234             } else if (cacheContext.hasRemoveAll()) {
235                 result.put(k, Optional.empty());
236             }
237         });
238 
239         return result;
240     }
241 
242     private V handleCreation(String internalKey, Supplier<V> supplier) throws ExecutionException, InterruptedException {
243         final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
244         final V suppliedValue = requireNonNull(supplier.get());
245         cacheContext.recordPutPolicy(internalKey, suppliedValue, PutPolicy.ADD_ONLY);
246         cacheContext.recordValue(internalKey, Optional.of(suppliedValue));
247         return suppliedValue;
248     }
249 
250     private Map<String, V> handleCreation(Function<Set<String>, Map<String, V>> factory, Set<String> externalKeys)
251             throws ExecutionException, InterruptedException {
252         // Get the missing values from the external cache.
253         final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
254 
255         // Need to handle if removeAll has been performed OR a remove has been done, and hence not check remotely.
256         // Otherwise, need to check remotely.
257         final Map<String, V> grandResult = new HashMap<>();
258         final Set<String> missingExternalKeys = fillInKnownValuesFromBackingCache(cacheContext, externalKeys, grandResult);
259 
260         if (!missingExternalKeys.isEmpty()) {
261             getLogger().trace("Cache {}: getBulk(Function): calling factory to create {} values",
262                     name, missingExternalKeys.size());
263             // Okay, need to get the missing values and mapping from externalKeys to internalKeys
264             final Set<String> missingInternalKeys = Collections.unmodifiableSet(
265                     missingExternalKeys.stream().map(cacheContext::internalEntryKeyFor).collect(Collectors.toSet()));
266             final Map<String, V> missingValues = factory.apply(missingInternalKeys);
267             FactoryUtils.verifyFactoryResult(missingValues, missingInternalKeys);
268 
269             // Okay, got the missing values, now need to record adding them
270             missingValues.entrySet().forEach(e -> put(e.getKey(), e.getValue(), PutPolicy.ADD_ONLY));
271 
272             grandResult.putAll(missingValues);
273         }
274 
275         return grandResult;
276     }
277 
278     private Set<String> fillInKnownValuesFromBackingCache(
279             AbstractExternalCacheRequestContext<V> cacheContext, Set<String> externalKeys, Map<String, V> grandResult) {
280         final Set<String> missingExternalKeys;
281 
282         if (cacheContext.hasRemoveAll()) {
283             missingExternalKeys = externalKeys;
284         } else {
285             // Initial list of missing keys are the keys that have a recorded remove operation against them.
286             missingExternalKeys = externalKeys.stream()
287                     .filter(k -> {
288                         final Optional<Optional<V>> valueRecorded =
289                                 cacheContext.getValueRecorded(cacheContext.internalEntryKeyFor(k));
290                         // If a value is recorded, it has to be a remove, otherwise not be passed to this method.
291                         return valueRecorded.isPresent();
292                     })
293                     .collect(Collectors.toSet());
294 
295             // Calculate list of keys we need to check for, as they may exist remotely
296             final Set<String> externalKeysNotRemoved = externalKeys.stream()
297                     .filter(k -> !missingExternalKeys.contains(k))
298                     .collect(Collectors.toSet());
299 
300             if (!externalKeysNotRemoved.isEmpty()) {
301                 metricsRecorder.record(name, CacheType.EXTERNAL, MetricLabel.NUMBER_OF_REMOTE_GET, 1);
302                 final Map<String, Optional<V>> candidateValues = directGetBulk(externalKeysNotRemoved);
303 
304                 candidateValues.entrySet().forEach(e -> {
305                     if (e.getValue().isPresent()) {
306                         grandResult.put(
307                                 cacheContext.internalEntryKeyFor(e.getKey()),
308                                 e.getValue().get());
309                     } else {
310                         missingExternalKeys.add(e.getKey());
311                     }
312                 });
313             }
314         }
315 
316         return missingExternalKeys;
317     }
318 }