View Javadoc

1   package com.atlassian.vcache.internal.core.service;
2   
3   import com.atlassian.vcache.PutPolicy;
4   import com.atlassian.vcache.StableReadExternalCache;
5   import com.atlassian.vcache.internal.ExternalCacheExceptionListener;
6   import com.atlassian.vcache.internal.MetricLabel;
7   import com.atlassian.vcache.internal.core.metrics.CacheType;
8   import com.atlassian.vcache.internal.core.metrics.MetricsRecorder;
9   import org.slf4j.Logger;
10  import org.slf4j.LoggerFactory;
11  
12  import java.time.Duration;
13  import java.util.HashMap;
14  import java.util.HashSet;
15  import java.util.Map;
16  import java.util.Optional;
17  import java.util.Set;
18  import java.util.concurrent.CompletionStage;
19  import java.util.concurrent.ExecutionException;
20  import java.util.function.Function;
21  import java.util.function.Supplier;
22  import java.util.stream.Collectors;
23  import java.util.stream.StreamSupport;
24  
25  import static com.atlassian.vcache.VCacheUtils.unsafeJoin;
26  import static com.atlassian.vcache.internal.core.VCacheCoreUtils.isEmpty;
27  import static java.util.Objects.requireNonNull;
28  
29  /**
30   * Provides operations common to {@link com.atlassian.vcache.StableReadExternalCache} instances.
31   * <p>
32   * Locking is provided to synchronise the local and remote caches. There are two locking strategies - a cache wide
33   * read-write lock and a per-key lock. The cache wide lock provides exclusion for multi-key operations such as
34   * removeAll and getBulk. The key lock provides locking for single key get and put whilst honouring the cache lock.
35   *
36   * @param <V> the value type
37   * @since 1.0.0
38   */
39  public abstract class AbstractStableReadExternalCache<V>
40          extends AbstractExternalCache<V>
41          implements StableReadExternalCache<V> {
42  
43      private static final Logger log = LoggerFactory.getLogger(AbstractStableReadExternalCache.class);
44      protected final MetricsRecorder metricsRecorder;
45  
46      protected AbstractStableReadExternalCache(
47              String name,
48              MetricsRecorder metricsRecorder,
49              Duration lockTimeout,
50              ExternalCacheExceptionListener externalCacheExceptionListener) {
51          super(name, lockTimeout, externalCacheExceptionListener);
52          this.metricsRecorder = requireNonNull(metricsRecorder);
53      }
54  
55      protected abstract boolean internalPut(String internalKey, V value, PutPolicy policy);
56  
57      protected abstract void internalRemoveAll();
58  
59      protected abstract void internalRemove(Iterable<String> keys);
60  
61      /**
62       * Handles the creation of an entry, if required.
63       *
64       * @param internalKey    the internal key for the entry.
65       * @param candidateValue the candidate value to add, if required
66       * @return the value associated with the key.
67       */
68      protected abstract V handleCreation(String internalKey, V candidateValue)
69              throws ExecutionException, InterruptedException;
70  
71      /**
72       * Performs a direct get operation against the external cache using the supplied external key.
73       */
74      protected abstract Optional<V> directGet(String externalKey);
75  
76      /**
77       * Performs a direct bulk get operation against the external cache using the supplied external keys.
78       */
79      protected abstract Map<String, Optional<V>> directGetBulk(Set<String> externalKeys);
80  
81      @Override
82      public final CompletionStage<Optional<V>> get(String internalKey) {
83          return perform(() -> {
84              final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
85  
86              return cacheContext.getGlobalLock().withLock(() -> internalGetWithoutLock(internalKey, cacheContext));
87          });
88      }
89  
90      @Override
91      public final CompletionStage<V> get(String internalKey, Supplier<V> supplier) {
92          return perform(() -> {
93              final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
94  
95              // Check for existing value. Do it under a lock so that we can forget the recorded value (a side-effect
96              // of calling get().
97              final Optional<V> existingValue = cacheContext.getGlobalLock().withLock(() -> {
98                  final Optional<V> value = internalGetWithoutLock(internalKey, cacheContext);
99  
100                 if (value.isPresent()) {
101                     return value;
102                 }
103 
104                 // No value existed, so now forget we did the lookup, so we can see if other threads do it.
105                 cacheContext.forgetValue(internalKey);
106                 return Optional.empty();
107             });
108 
109             return existingValue.orElseGet(() -> {
110                 // Calculate a candidate value, not holding a lock
111                 final V candidateValue = requireNonNull(supplier.get());
112 
113                 // Now record the candidate value
114                 return cacheContext.getGlobalLock().withLock(() -> {
115                     // Check for a value being added by another thread in the current request context.
116                     final Optional<Optional<V>> doubleCheck = cacheContext.getValueRecorded(internalKey);
117                     if (doubleCheck.isPresent() && doubleCheck.get().isPresent()) {
118                         return doubleCheck.get().get();
119                     }
120 
121                     // Now attempt to add remotely
122                     try {
123                         final V finalValue = handleCreation(internalKey, candidateValue);
124                         cacheContext.recordValue(internalKey, Optional.of(finalValue));
125                         return finalValue;
126                     } catch (final Exception e) {
127                         // We have a supplier created value and memcached is not working:- This mitigates flogging the supplier
128                         cacheContext.recordValue(internalKey, Optional.of(candidateValue));
129                         return candidateValue;
130                     }
131                 });
132             });
133         });
134     }
135 
136     private Optional<V> internalGetWithoutLock(String internalKey, AbstractExternalCacheRequestContext<V> cacheContext) {
137         // Introduced so we could avoid duplicate locking and retrieval of AbstractExternalCacheRequestContext.
138         // Provides performance boost for get with supplier operation.
139 
140         // Check if we have recorded a value already
141         final Optional<Optional<V>> recordedValue = cacheContext.getValueRecorded(internalKey);
142 
143         return recordedValue.orElseGet(() -> {
144             // Now check externally
145             final String externalKey = cacheContext.externalEntryKeyFor(internalKey);
146             metricsRecorder.record(name, CacheType.EXTERNAL, MetricLabel.NUMBER_OF_REMOTE_GET, 1);
147             final Optional<V> externalValue = directGet(externalKey);
148             cacheContext.recordValue(internalKey, externalValue);
149 
150             return externalValue;
151         });
152     }
153 
154 
155     @Override
156     public final CompletionStage<Map<String, Optional<V>>> getBulk(Iterable<String> internalKeys) {
157         return perform(() -> {
158             if (isEmpty(internalKeys)) {
159                 return new HashMap<>();
160             }
161 
162             final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
163             return cacheContext.getGlobalLock().withLock(() -> {
164                 // Get the recorded values first
165                 final Map<String, Optional<V>> grandResult = checkValuesRecorded(internalKeys);
166 
167                 // Calculate the externalKeys for the entries that are missing
168                 final Set<String> missingExternalKeys = StreamSupport.stream(internalKeys.spliterator(), false)
169                         .filter(k -> !grandResult.containsKey(k))
170                         .map(cacheContext::externalEntryKeyFor)
171                         .collect(Collectors.toSet());
172 
173                 if (missingExternalKeys.isEmpty()) {
174                     getLogger().trace("Cache {}: getBulk(): have all the requested entries cached", name);
175                     return grandResult;
176                 }
177                 getLogger().trace("Cache {}: getBulk(): not cached {} requested entries", name, missingExternalKeys.size());
178 
179                 // Get the missing values.
180                 metricsRecorder.record(name, CacheType.EXTERNAL, MetricLabel.NUMBER_OF_REMOTE_GET, 1);
181                 final Map<String, Optional<V>> candidateValues = directGetBulk(missingExternalKeys);
182 
183                 return candidateValues.entrySet().stream().collect(
184                         () -> grandResult,
185                         (m, e) -> {
186                             final Optional<V> result = e.getValue();
187                             cacheContext.recordValue(cacheContext.internalEntryKeyFor(e.getKey()), result);
188                             m.put(cacheContext.internalEntryKeyFor(e.getKey()), result);
189                         },
190                         Map::putAll
191                 );
192             });
193         });
194     }
195 
196     @Override
197     public final CompletionStage<Map<String, V>> getBulk(
198             Function<Set<String>, Map<String, V>> factory, Iterable<String> internalKeys) {
199         return perform(() -> {
200             if (isEmpty(internalKeys)) {
201                 return new HashMap<>();
202             }
203 
204             final Map<String, V> grandResult = new HashMap<>();
205             final Set<String> missingInternalKeys = new HashSet<>();
206             final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
207             // Use the getBulk() to get the known state for each key. Do this logic under lock as the cacheContext
208             // is being updated.
209             cacheContext.getGlobalLock().withLock(() -> {
210                 final Map<String, Optional<V>> knownState = unsafeJoin(getBulk(internalKeys));
211                 knownState.forEach((key, value) -> {
212                     if (value.isPresent()) {
213                         grandResult.put(key, value.get());
214                     } else {
215                         missingInternalKeys.add(key);
216                         cacheContext.forgetValue(key); // to allow future cache operations
217                     }
218                 });
219             });
220 
221             // Bail out if we have all the entries requested
222             if (missingInternalKeys.isEmpty()) {
223                 return grandResult;
224             }
225 
226             // Create the candidate missing values NOT under a lock
227             final Map<String, V> candidateValues = factory.apply(missingInternalKeys);
228             FactoryUtils.verifyFactoryResult(candidateValues, missingInternalKeys);
229 
230             // Now under lock and key, merge the candidate values in to the results and the external cache.
231             cacheContext.getGlobalLock().withLock(() ->
232                     candidateValues.entrySet().forEach(entry -> {
233                         metricsRecorder.record(name, CacheType.EXTERNAL, MetricLabel.NUMBER_OF_REMOTE_GET, 1);
234                         V finalValue;
235                         try {
236                             final boolean added = unsafeJoin(put(entry.getKey(), entry.getValue(), PutPolicy.ADD_ONLY));
237 
238                             if (added) {
239                                 finalValue = entry.getValue();
240                             } else {
241                                 log.trace("Was unable to store the candidate value, so needing to retrieve what's there now");
242                                 finalValue = unsafeJoin(get(entry.getKey(), entry::getValue));
243                             }
244                         } catch (final Exception ignore) {
245                             // We could not save the value in Memcached so just put it into the local cache.
246                             finalValue = entry.getValue();
247                         }
248                         grandResult.put(entry.getKey(), finalValue);
249                         cacheContext.recordValue(entry.getKey(), Optional.of(finalValue));
250                     }));
251 
252             return grandResult;
253         });
254     }
255 
256     @Override
257     public final CompletionStage<Boolean> put(final String internalKey, final V value, final PutPolicy policy) {
258         return perform(() -> {
259             final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
260 
261             final boolean successful = cacheContext.getGlobalLock().withLock(() -> internalPut(internalKey, value, policy));
262             if (successful) {
263                 cacheContext.recordValue(internalKey, Optional.of(value));
264             } else {
265                 cacheContext.forgetValue(internalKey);
266             }
267             return successful;
268         });
269     }
270 
271     @Override
272     public final CompletionStage<Void> remove(final Iterable<String> keys) {
273         return perform(() -> {
274             ensureCacheContext().getGlobalLock().withLock(() -> internalRemove(keys));
275             return null;
276         });
277     }
278 
279     @Override
280     public final CompletionStage<Void> removeAll() {
281         return perform(() -> {
282             final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
283             cacheContext.getGlobalLock().withLock(() -> {
284                 internalRemoveAll();
285                 cacheContext.forgetAllValues();
286             });
287             return null;
288         });
289     }
290 
291     private Map<String, Optional<V>> checkValuesRecorded(Iterable<String> internalKeys) {
292         final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
293         //noinspection OptionalGetWithoutIsPresent
294         return StreamSupport.stream(internalKeys.spliterator(), false)
295                 .filter(k -> cacheContext.getValueRecorded(k).isPresent())
296                 .distinct()
297                 .collect(Collectors.toMap(
298                         k -> k,
299                         k -> cacheContext.getValueRecorded(k).get())
300                 );
301     }
302 }