View Javadoc

1   package com.atlassian.vcache.internal.core.service;
2   
3   import com.atlassian.vcache.PutPolicy;
4   import com.atlassian.vcache.StableReadExternalCache;
5   import com.atlassian.vcache.VCacheException;
6   import com.atlassian.vcache.internal.MetricLabel;
7   import com.atlassian.vcache.internal.core.metrics.CacheType;
8   import com.atlassian.vcache.internal.core.metrics.MetricsRecorder;
9   import org.slf4j.Logger;
10  import org.slf4j.LoggerFactory;
11  
12  import java.time.Duration;
13  import java.util.HashMap;
14  import java.util.HashSet;
15  import java.util.Map;
16  import java.util.Optional;
17  import java.util.Set;
18  import java.util.concurrent.CompletionStage;
19  import java.util.concurrent.ExecutionException;
20  import java.util.function.Function;
21  import java.util.function.Supplier;
22  import java.util.stream.Collectors;
23  import java.util.stream.StreamSupport;
24  
25  import static com.atlassian.vcache.VCacheUtils.unsafeJoin;
26  import static com.atlassian.vcache.internal.core.VCacheCoreUtils.isEmpty;
27  import static java.util.Objects.requireNonNull;
28  
29  /**
30   * Provides operations common to {@link com.atlassian.vcache.StableReadExternalCache} instances.
31   * <p>
32   * Locking is provided to synchronise the local and remote caches. There are two locking strategies - a cache wide
33   * read-write lock and a per-key lock. The cache wide lock provides exclusion for multi-key operations such as
34   * removeAll and getBulk. The key lock provides locking for single key get and put whilst honouring the cache lock.
35   *
36   * @param <V> the value type
37   * @since 1.0.0
38   */
39  public abstract class AbstractStableReadExternalCache<V>
40          extends AbstractExternalCache<V>
41          implements StableReadExternalCache<V> {
42  
43      private static final Logger log = LoggerFactory.getLogger(AbstractStableReadExternalCache.class);
44      protected final MetricsRecorder metricsRecorder;
45  
46      protected AbstractStableReadExternalCache(String name, MetricsRecorder metricsRecorder, Duration lockTimeout) {
47          super(name, lockTimeout);
48          this.metricsRecorder = requireNonNull(metricsRecorder);
49      }
50  
51      protected abstract boolean internalPut(String internalKey, V value, PutPolicy policy);
52  
53      protected abstract void internalRemoveAll();
54  
55      protected abstract void internalRemove(Iterable<String> keys);
56  
57      /**
58       * Handles the creation of an entry, if required.
59       *
60       * @param internalKey    the internal key for the entry.
61       * @param candidateValue the candidate value to add, if required
62       * @return the value associated with the key.
63       */
64      protected abstract V handleCreation(String internalKey, V candidateValue)
65              throws ExecutionException, InterruptedException;
66  
67      /**
68       * Performs a direct get operation against the external cache using the supplied external key.
69       */
70      protected abstract Optional<V> directGet(String externalKey);
71  
72      /**
73       * Performs a direct bulk get operation against the external cache using the supplied external keys.
74       */
75      protected abstract Map<String, Optional<V>> directGetBulk(Set<String> externalKeys);
76  
77      @Override
78      public final CompletionStage<Optional<V>> get(String internalKey) {
79          final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
80          return perform(() ->
81                  cacheContext.getGlobalLock().withLock(() -> {
82                      // Check if we have recorded a value already
83                      final Optional<Optional<V>> recordedValue = cacheContext.getValueRecorded(internalKey);
84  
85                      return recordedValue.orElseGet(() -> {
86                          // Now check externally
87                          final String externalKey = cacheContext.externalEntryKeyFor(internalKey);
88                          metricsRecorder.record(name, CacheType.EXTERNAL, MetricLabel.NUMBER_OF_REMOTE_GET, 1);
89                          final Optional<V> externalValue = directGet(externalKey);
90                          cacheContext.recordValue(internalKey, externalValue);
91  
92                          return externalValue;
93                      });
94                  }));
95      }
96  
97      @Override
98      public final CompletionStage<V> get(String internalKey, Supplier<V> supplier) {
99          final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
100         return perform(() -> {
101             // Check for existing value. Do it under a lock so that we can forget the recorded value (a side-effect
102             // of calling get().
103             final Optional<V> existingValue = cacheContext.getGlobalLock().withLock(() -> {
104                 final Optional<V> value = unsafeJoin(get(internalKey));
105                 if (value.isPresent()) {
106                     return value;
107                 }
108 
109                 // No value existed, so now forget we did the lookup, so we can see if other threads do it.
110                 cacheContext.forgetValue(internalKey);
111                 return Optional.empty();
112             });
113 
114             return existingValue.orElseGet(() -> {
115                 // Calculate a candidate value, not holding a lock
116                 final V candidateValue = requireNonNull(supplier.get());
117 
118                 // Now record the candidate value
119                 return cacheContext.getGlobalLock().withLock(() -> {
120                     // Check for a value being added by another thread in the current request context.
121                     final Optional<Optional<V>> doubleCheck = cacheContext.getValueRecorded(internalKey);
122                     if (doubleCheck.isPresent() && doubleCheck.get().isPresent()) {
123                         //noinspection OptionalGetWithoutIsPresent
124                         return doubleCheck.get().get();
125                     }
126 
127                     // Now attempt to add remotely
128                     try {
129                         final V finalValue = handleCreation(internalKey, candidateValue);
130                         cacheContext.recordValue(internalKey, Optional.of(finalValue));
131                         return finalValue;
132                     } catch (ExecutionException | InterruptedException e) {
133                         throw new VCacheException("Update failure", e);
134                     }
135                 });
136             });
137         });
138     }
139 
140     @Override
141     public final CompletionStage<Map<String, Optional<V>>> getBulk(Iterable<String> internalKeys) {
142         return perform(() -> {
143             if (isEmpty(internalKeys)) {
144                 return new HashMap<>();
145             }
146 
147             final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
148             return cacheContext.getGlobalLock().withLock(() -> {
149                 // Get the recorded values first
150                 final Map<String, Optional<V>> grandResult = checkValuesRecorded(internalKeys);
151 
152                 // Calculate the externalKeys for the entries that are missing
153                 final Set<String> missingExternalKeys = StreamSupport.stream(internalKeys.spliterator(), false)
154                         .filter(k -> !grandResult.containsKey(k))
155                         .map(cacheContext::externalEntryKeyFor)
156                         .collect(Collectors.toSet());
157 
158                 if (missingExternalKeys.isEmpty()) {
159                     getLogger().trace("Cache {}: getBulk(): have all the requested entries cached", name);
160                     return grandResult;
161                 }
162                 getLogger().trace("Cache {}: getBulk(): not cached {} requested entries", name, missingExternalKeys.size());
163 
164                 // Get the missing values.
165                 metricsRecorder.record(name, CacheType.EXTERNAL, MetricLabel.NUMBER_OF_REMOTE_GET, 1);
166                 final Map<String, Optional<V>> candidateValues = directGetBulk(missingExternalKeys);
167 
168                 return candidateValues.entrySet().stream().collect(
169                         () -> grandResult,
170                         (m, e) -> {
171                             final Optional<V> result = e.getValue();
172                             cacheContext.recordValue(cacheContext.internalEntryKeyFor(e.getKey()), result);
173                             m.put(cacheContext.internalEntryKeyFor(e.getKey()), result);
174                         },
175                         Map::putAll
176                 );
177             });
178         });
179     }
180 
181     @Override
182     public final CompletionStage<Map<String, V>> getBulk(
183             Function<Set<String>, Map<String, V>> factory, Iterable<String> internalKeys) {
184         return perform(() -> {
185             if (isEmpty(internalKeys)) {
186                 return new HashMap<>();
187             }
188 
189             final Map<String, V> grandResult = new HashMap<>();
190             final Set<String> missingInternalKeys = new HashSet<>();
191             final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
192             // Use the getBulk() to get the known state for each key. Do this logic under lock as the cacheContext
193             // is being updated.
194             cacheContext.getGlobalLock().withLock(() -> {
195                 final Map<String, Optional<V>> knownState = unsafeJoin(getBulk(internalKeys));
196                 knownState.entrySet().forEach(entry -> {
197                     if (entry.getValue().isPresent()) {
198                         //noinspection OptionalGetWithoutIsPresent
199                         grandResult.put(entry.getKey(), entry.getValue().get());
200                     } else {
201                         missingInternalKeys.add(entry.getKey());
202                         cacheContext.forgetValue(entry.getKey()); // to allow future cache operations
203                     }
204                 });
205             });
206 
207             // Bail out if we have all the entries requested
208             if (missingInternalKeys.isEmpty()) {
209                 return grandResult;
210             }
211 
212             // Create the candidate missing values NOT under a lock
213             final Map<String, V> candidateValues = factory.apply(missingInternalKeys);
214             FactoryUtils.verifyFactoryResult(candidateValues, missingInternalKeys);
215 
216             // Now under lock and key, merge the candidate values in to the results and the external cache.
217             cacheContext.getGlobalLock().withLock(() ->
218                     candidateValues.entrySet().forEach(entry -> {
219                         metricsRecorder.record(name, CacheType.EXTERNAL, MetricLabel.NUMBER_OF_REMOTE_GET, 1);
220                         final boolean added = unsafeJoin(put(entry.getKey(), entry.getValue(), PutPolicy.ADD_ONLY));
221                         final V finalValue;
222                         if (added) {
223                             finalValue = entry.getValue();
224                         } else {
225                             log.trace("Was unable to store the candidate value, so needing to retrieve what's there now");
226                             finalValue = unsafeJoin(get(entry.getKey(), entry::getValue));
227                         }
228 
229                         grandResult.put(entry.getKey(), finalValue);
230                         cacheContext.recordValue(entry.getKey(), Optional.of(finalValue));
231                     }));
232 
233             return grandResult;
234         });
235     }
236 
237     @Override
238     public final CompletionStage<Boolean> put(final String internalKey, final V value, final PutPolicy policy) {
239         return perform(() -> {
240             final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
241 
242             final boolean successful = cacheContext.getGlobalLock().withLock(() -> internalPut(internalKey, value, policy));
243             if (successful) {
244                 cacheContext.recordValue(internalKey, Optional.of(value));
245             } else {
246                 cacheContext.forgetValue(internalKey);
247             }
248             return successful;
249         });
250     }
251 
252     @Override
253     public final CompletionStage<Void> remove(final Iterable<String> keys) {
254         return perform(() -> {
255             ensureCacheContext().getGlobalLock().withLock(() -> internalRemove(keys));
256             return null;
257         });
258     }
259 
260     @Override
261     public final CompletionStage<Void> removeAll() {
262         return perform(() -> {
263             final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
264             cacheContext.getGlobalLock().withLock(() -> {
265                 internalRemoveAll();
266                 cacheContext.forgetAllValues();
267             });
268             return null;
269         });
270     }
271 
272     private Map<String, Optional<V>> checkValuesRecorded(Iterable<String> internalKeys) {
273         final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
274         //noinspection OptionalGetWithoutIsPresent
275         return StreamSupport.stream(internalKeys.spliterator(), false)
276                 .filter(k -> cacheContext.getValueRecorded(k).isPresent())
277                 .distinct()
278                 .collect(Collectors.toMap(
279                         k -> k,
280                         k -> cacheContext.getValueRecorded(k).get())
281                 );
282     }
283 }