View Javadoc

1   package com.atlassian.vcache.internal.core.service;
2   
3   import com.atlassian.vcache.RequestCache;
4   import com.atlassian.vcache.VCacheException;
5   import com.atlassian.vcache.internal.RequestContext;
6   
7   import javax.annotation.Nullable;
8   import java.time.Duration;
9   import java.util.HashMap;
10  import java.util.Map;
11  import java.util.Objects;
12  import java.util.Optional;
13  import java.util.Set;
14  import java.util.concurrent.TimeUnit;
15  import java.util.concurrent.locks.StampedLock;
16  import java.util.function.Function;
17  import java.util.function.Supplier;
18  import java.util.stream.Collectors;
19  import java.util.stream.StreamSupport;
20  
21  import static com.atlassian.vcache.internal.NameValidator.requireValidCacheName;
22  import static java.util.Objects.requireNonNull;
23  
24  /**
25   * Read optimised version of a {@link RequestCache}. This cache is locked with a {@link StampedLock} which
26   * uses optimistic locking on the read path to improve performance when reading. Note that using this cache
27   * in a write heavy use case will cause poorer performance than {@link DefaultRequestCache}.
28   * <p>
29   * For a good description of the semantics of a {@link StampedLock}
30   * see: https://www.javaspecialists.eu/archive/Issue215.html
31   *
32   * @param <K> The key type
33   * @param <V> The value type
34   * @since 1.13.0
35   */
36  class ReadOptimisedRequestCache<K, V> implements RequestCache<K, V> {
37  
38      // This ThreadLocal is used to implement lock re-entrance during a bulk get.
39      private final ThreadLocal<Boolean> inWriteLock = ThreadLocal.withInitial(() -> false);
40  
41      private final String name;
42      private final Supplier<RequestContext> contextSupplier;
43      private final Duration lockTimeout;
44  
45      ReadOptimisedRequestCache(String name, Supplier<RequestContext> contextSupplier, Duration lockTimeout) {
46          this.name = requireValidCacheName(name);
47          this.contextSupplier = requireNonNull(contextSupplier);
48          this.lockTimeout = requireNonNull(lockTimeout);
49      }
50  
51      @Override
52      public Optional<V> get(K key) {
53          return Optional.ofNullable(withOptimisticReadLock(map -> map.get(key)));
54      }
55  
56      @Override
57      public V get(K key, Supplier<? extends V> supplier) {
58          final Optional<V> value = get(key);
59          return value.orElseGet(() -> {
60              final V candidateValue = requireNonNull(supplier.get());
61              final V existing = withWriteLock(map -> map.putIfAbsent(key, candidateValue));
62              return existing == null ? candidateValue : existing;
63          });
64      }
65  
66      @Override
67      public Map<K, V> getBulk(final Function<Set<K>, Map<K, V>> factory, final Iterable<K> keys) {
68          // Function that will be applied with or without lock as required.
69          final Function<Map<K, V>, Map<K, V>> cacheOps = map -> {
70              final Map<K, Optional<V>> existingValues =
71                      StreamSupport.stream(keys.spliterator(), false)
72                              .distinct()
73                              .collect(Collectors.toMap(Objects::requireNonNull, k -> Optional.ofNullable(map.get(k))));
74  
75              // Add known values to the grand result
76              final Map<K, V> grandResult = existingValues.entrySet().stream()
77                      .filter(e -> e.getValue().isPresent())
78                      .collect(Collectors.toMap(
79                              Map.Entry::getKey,
80                              e -> e.getValue().get()));
81  
82              // Bail out if we have all the values
83              if (grandResult.size() == existingValues.size()) {
84                  return grandResult;
85              }
86  
87              // Sadly we now need to call the factory to create the missing values and then merge into the grand result.
88              final Set<K> missingKeys = existingValues.entrySet().stream()
89                      .filter(e -> !e.getValue().isPresent())
90                      .map(Map.Entry::getKey)
91                      .collect(Collectors.toSet());
92  
93              final Map<K, V> missingValues = factory.apply(missingKeys);
94              FactoryUtils.verifyFactoryResult(missingValues, missingKeys);
95  
96              missingValues.forEach((key, value) -> {
97                  // Handle that another thread may have beaten us to the punch.
98                  final Optional<V> existing = Optional.ofNullable(map.putIfAbsent(key, value));
99                  grandResult.put(key, existing.orElse(value));
100             });
101 
102             return grandResult;
103         };
104 
105         // Allow bulk get lock to be re-entrant for the same cache on the same thread.
106         if (inWriteLock.get()) {
107             return cacheOps.apply(ensureDelegate().map);
108         } else {
109             try {
110                 inWriteLock.set(true);
111                 //noinspection ConstantConditions
112                 return withWriteLock(cacheOps);
113             } finally {
114                 inWriteLock.set(false);
115             }
116         }
117     }
118 
119     @Override
120     public void put(K key, V value) {
121         withWriteLock(map -> map.put(key, value));
122     }
123 
124     @Override
125     public Optional<V> putIfAbsent(K key, V value) {
126         return Optional.ofNullable(withWriteLock(map -> map.putIfAbsent(key, value)));
127     }
128 
129     @Override
130     public boolean replaceIf(K key, V currentValue, V newValue) {
131         //noinspection ConstantConditions
132         return withWriteLock(map -> map.replace(requireNonNull(key), requireNonNull(currentValue), requireNonNull(newValue)));
133     }
134 
135     @Override
136     public boolean removeIf(K key, V value) {
137         //noinspection ConstantConditions
138         return withWriteLock(map -> map.remove(requireNonNull(key), requireNonNull(value)));
139     }
140 
141     @Override
142     public void remove(K key) {
143         withWriteLock(map -> map.remove(key));
144     }
145 
146     @Override
147     public void removeAll() {
148         withWriteLock(map -> {
149             map.clear();
150             return 0;
151         });
152     }
153 
154     @Override
155     public String getName() {
156         return name;
157     }
158 
159     private MapAndLock<K, V> ensureDelegate() {
160         final RequestContext requestContext = contextSupplier.get();
161         return requestContext.computeIfAbsent(this, MapAndLock::new);
162     }
163 
164     @Nullable
165     private <R> R withOptimisticReadLock(Function<Map<K, V>, R> getV) {
166         final MapAndLock<K, V> mapAndLock = ensureDelegate();
167         final long optimisticStamp = mapAndLock.lock.tryOptimisticRead();
168         final R value = getV.apply(mapAndLock.map);
169         if (mapAndLock.lock.validate(optimisticStamp)) {
170             return value;
171         }
172         final long readLockStamp;
173         try {
174             readLockStamp = mapAndLock.lock.tryReadLock(lockTimeout.toMillis(), TimeUnit.MILLISECONDS);
175         } catch (InterruptedException e) {
176             throw new VCacheException("Lock acquisition on cache interrupted.", e);
177         }
178         if (readLockStamp != 0) {
179             try {
180                 return getV.apply(mapAndLock.map);
181             } finally {
182                 mapAndLock.lock.unlock(readLockStamp);
183             }
184         }
185         // The lock acquisition failed.
186         throw new VCacheException("Failed to lock cache");
187     }
188 
189     @Nullable
190     private <R> R withWriteLock(Function<Map<K, V>, R> putV) {
191         final MapAndLock<K, V> mapAndLock = ensureDelegate();
192         long stamp = 0;
193         try {
194             stamp = mapAndLock.lock.tryWriteLock(lockTimeout.toMillis(), TimeUnit.MILLISECONDS);
195             if (stamp != 0) {
196                 return putV.apply(mapAndLock.map);
197             } else {
198                 throw new VCacheException("Could not acquire write lock");
199             }
200         } catch (InterruptedException e) {
201             throw new VCacheException("Interrupted acquiring write lock", e);
202         } finally {
203             if (stamp != 0) {
204                 mapAndLock.lock.unlockWrite(stamp);
205             }
206         }
207     }
208 
209     private static class MapAndLock<K, V> {
210         final Map<K, V> map = new HashMap<>();
211         final StampedLock lock = new StampedLock();
212     }
213 }