View Javadoc

1   package com.atlassian.vcache.internal.core.service;
2   
3   import com.atlassian.vcache.PutPolicy;
4   import com.atlassian.vcache.StableReadExternalCache;
5   import com.atlassian.vcache.VCacheException;
6   import com.atlassian.vcache.internal.core.metrics.MetricsRecorder;
7   import org.slf4j.Logger;
8   import org.slf4j.LoggerFactory;
9   
10  import java.util.Map;
11  import java.util.Optional;
12  import java.util.Set;
13  import java.util.concurrent.CompletionStage;
14  import java.util.concurrent.locks.ReentrantLock;
15  import java.util.function.Function;
16  import java.util.function.Supplier;
17  import java.util.stream.Collectors;
18  import java.util.stream.StreamSupport;
19  
20  /**
21   * Provides operations common to {@link com.atlassian.vcache.StableReadExternalCache} instances.
22   * <p>
23   * Locking is provided to synchronise the local and remote caches. There are two locking strategies - a cache wide
24   * read-write lock and a per-key lock. The cache wide lock provides exclusion for multi-key operations such as
25   * removeAll and getBulk. The key lock provides locking for single key get and put whilst honouring the cache lock.
26   *
27   * @param <V> the value type
28   * @since 1.0.0
29   */
30  public abstract class AbstractStableReadExternalCache<V>
31          extends AbstractNonDirectExternalCache<V>
32          implements StableReadExternalCache<V> {
33  
34      private static final Logger log = LoggerFactory.getLogger(AbstractStableReadExternalCache.class);
35  
36      private final ReentrantLock cacheLock = new ReentrantLock();
37  
38      protected AbstractStableReadExternalCache(String name, MetricsRecorder metricsRecorder) {
39          super(name, metricsRecorder);
40      }
41  
42      protected final Map<String, Optional<V>> checkValuesRecorded(Iterable<String> internalKeys) {
43          final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
44          //noinspection OptionalGetWithoutIsPresent
45          return StreamSupport.stream(internalKeys.spliterator(), false)
46                  .filter(k -> cacheContext.getValueRecorded(k).isPresent())
47                  .distinct()
48                  .collect(Collectors.toMap(
49                          k -> k,
50                          k -> cacheContext.getValueRecorded(k).get())
51                  );
52      }
53  
54      @Override
55      public final CompletionStage<Optional<V>> get(String internalKey) {
56          return withCacheLock(() -> super.get(internalKey));
57      }
58  
59      @Override
60      public final CompletionStage<V> get(String internalKey, Supplier<V> supplier) {
61          return withCacheLock(() -> super.get(internalKey, supplier));
62      }
63  
64      @Override
65      public final CompletionStage<Map<String, Optional<V>>> getBulk(Iterable<String> internalKeys) {
66          return withCacheLock(() -> super.getBulk(internalKeys));
67      }
68  
69      @Override
70      public final CompletionStage<Map<String, V>> getBulk(Function<Set<String>, Map<String, V>> factory, Iterable<String> internalKeys) {
71          return withCacheLock(() -> super.getBulk(factory, internalKeys));
72      }
73  
74      @Override
75      public final CompletionStage<Boolean> put(final String internalKey, final V value, final PutPolicy policy) {
76          return withCacheLock(() -> internalPut(internalKey, value, policy));
77      }
78  
79      protected abstract CompletionStage<Boolean> internalPut(String internalKey, V value, PutPolicy policy);
80  
81      @Override
82      public final CompletionStage<Void> remove(final Iterable<String> keys) {
83          return withCacheLock(() -> internalRemove(keys));
84      }
85  
86      protected abstract CompletionStage<Void> internalRemove(Iterable<String> keys);
87  
88  
89      @Override
90      public CompletionStage<Void> removeAll() {
91          return withCacheLock(this::internalRemoveAll);
92      }
93  
94      protected abstract CompletionStage<Void> internalRemoveAll();
95  
96      private <R> R withCacheLock(final Supplier<R> result) {
97          if (cacheLock.isHeldByCurrentThread()) {
98              log.error("Detected a recursive call to cache: {}", getName());
99              throw new VCacheException("Detected a recursive call to cache: " + getName());
100         }
101         cacheLock.lock();
102         try {
103             return result.get();
104         } finally {
105             cacheLock.unlock();
106         }
107     }
108 }