View Javadoc
1   package com.atlassian.cache.ehcache;
2   
3   import java.io.Serializable;
4   import java.util.Collection;
5   import java.util.Map;
6   import java.util.concurrent.locks.Lock;
7   import java.util.concurrent.locks.ReadWriteLock;
8   import java.util.concurrent.locks.ReentrantReadWriteLock;
9   
10  import com.atlassian.cache.CacheLoader;
11  
12  import com.google.common.base.Function;
13  import com.google.common.base.Throwables;
14  import com.google.common.collect.Multimap;
15  import com.google.common.collect.Multimaps;
16  
17  import net.sf.ehcache.CacheException;
18  import net.sf.ehcache.Element;
19  import net.sf.ehcache.concurrent.LockType;
20  import net.sf.ehcache.concurrent.Sync;
21  import net.sf.ehcache.constructs.blocking.BlockingCache;
22  import net.sf.ehcache.constructs.blocking.SelfPopulatingCache;
23  
24  import static java.util.Objects.requireNonNull;
25  
26  /**
27   * A rewrite of {@link SelfPopulatingCache} with stronger concurrency guarantees.
28   *
29   * We don't lock on {@link LoadingCache#loadValueAndReleaseLock(Object)}, instead we put a flag for each loading operation.
30   * If a {@link LoadingCache#remove(Object)} operation happens during load operation and before putting the value in to the cache,
31   * newly loaded value will not be put into the cache and only returned to the caller.
32   * Subsequent calls with the same key will trigger a new load operation thus ensuring eventual consistency.
33   * We only lock for actually putting the value into the cache, not loading.
34   *
35   * @see BlockingCache
36   * @see SynchronizedLoadingCacheDecorator
37   * @param <K> the cache's key type
38   * @param <V> the cache's value type
39   */
40  public class LoadingCache<K,V> extends BlockingCache
41  {
42      /**
43       * Default value for number of mutexes in StripedReadWriteLockSync.DEFAULT_NUMBER_OF_MUTEXES is 2048,
44       * however it's an overkill for Cloud where cache access is mostly request based and does not have
45       * high concurrent usage. Reducing to 64 as default with system property override option.
46       */
47      private static final int DEFAULT_NUMBER_OF_MUTEXES = Integer.getInteger(LoadingCache.class.getName() + '.' + "DEFAULT_NUMBER_OF_MUTEXES", 64);
48  
49      private final CacheLoader<K, V> loader;
50      private final SynchronizedLoadingCacheDecorator delegate;
51  
52      // This needs to be fair to ensure that removeAll does not starve for a busy cache
53      private final ReadWriteLock loadVsRemoveAllLock = new ReentrantReadWriteLock(true);
54  
55      public LoadingCache(final SynchronizedLoadingCacheDecorator cache, final CacheLoader<K, V> loader) throws CacheException
56      {
57          super(cache, DEFAULT_NUMBER_OF_MUTEXES);
58          this.loader = requireNonNull(loader, "loader");
59          this.delegate = cache;
60      }
61  
62      Lock loadLock()
63      {
64          return loadVsRemoveAllLock.readLock();
65      }
66  
67      Lock removeAllLock()
68      {
69          return loadVsRemoveAllLock.writeLock();
70      }
71  
72      @Override
73      public Element get(final Object key)
74      {
75          if (key == null)
76          {
77              throw new NullPointerException("null keys are not permitted");
78          }
79          Element element = super.get(key);
80          return (element != null) ? element : loadValueAndReleaseLock(key);
81      }
82  
83      /**
84       * Handle a cache miss by loading the value and releasing the write lock.
85       * <p>
86       * On a cache miss, {@link BlockingCache#get(Object) super.get(key)} returns {@code null} with the write
87       * lock still held.  It is the caller's (our) responsibility to load the value and
88       * {@link BlockingCache#put(Element) put} it into the cache, which will implicitly release the lock.
89       * The lock must be released regardless of whether or not the load operation throws an exception.
90       * </p>
91       *
92       * @param key the key for which we must load the value
93       * @return a cache element representing the key and the corresponding value that was loaded for it
94       */
95      private Element loadValueAndReleaseLock(final Object key)
96      {
97          Element result;
98          loadLock().lock();
99          try
100         {
101             result = delegate.synchronizedLoad(key, this::getFromLoader, loaded -> {
102                 if (loaded.getObjectValue() != null)
103                 {
104                     getCache().put(loaded);
105                 }
106             });
107         }
108         finally
109         {
110             // If the value is null, then loadValueAndReleaseLock is throwing an exception.  We still need to
111             // release the lock, but the actual return value does not matter.  Note that unlike SelfPopulatingCache,
112             // we are not using put(new Element(key, null)) to do this.  That would do an explicit removal (and with
113             // replication!) which seems pretty pointless.
114             final Sync lock = getLockForKey(key);
115             if (lock.isHeldByCurrentThread(LockType.WRITE))
116             {
117                 lock.unlock(LockType.WRITE);
118             }
119 
120             // It isn't safe to let removeAll proceed until after we have done our put, so this
121             // coarser read lock has to be held until the very end. :(
122             loadLock().unlock();
123         }
124         return result;
125     }
126 
127     @SuppressWarnings({ "ConstantConditions", "unchecked" })
128     private V getFromLoader(Object key)
129     {
130         final V value;
131         try
132         {
133             value = loader.load((K)key);
134         }
135         catch (final RuntimeException re)
136         {
137             put(new Element(key, null));
138             throw propagate(key, re);
139         }
140         catch (final Error err)
141         {
142             put(new Element(key, null));
143             throw propagate(key, err);
144         }
145 
146         if (value == null)
147         {
148             throw new CacheException("CacheLoader returned null for key " + key);
149         }
150         return value;
151     }
152 
153     // Make sure we acquire the write lock when removing a key.  BlockingCache should really be
154     // doing this itself, but it isn't.  This prevents remove from overlapping with a lazy load
155     // for the same key.
156 
157     @Override
158     public boolean remove(Serializable key, boolean doNotNotifyCacheReplicators)
159     {
160         final Sync sync = getLockForKey(key);
161         sync.lock(LockType.WRITE);
162         try
163         {
164             return super.remove(key, doNotNotifyCacheReplicators);
165         }
166         finally
167         {
168             sync.unlock(LockType.WRITE);
169         }
170     }
171 
172     @Override
173     public boolean remove(Serializable key)
174     {
175         final Sync sync = getLockForKey(key);
176         sync.lock(LockType.WRITE);
177         try
178         {
179             return super.remove(key);
180         }
181         finally
182         {
183             sync.unlock(LockType.WRITE);
184         }
185     }
186 
187     @Override
188     public boolean remove(Object key)
189     {
190         final Sync sync = getLockForKey(key);
191         sync.lock(LockType.WRITE);
192         try
193         {
194             return super.remove(key);
195         }
196         finally
197         {
198             sync.unlock(LockType.WRITE);
199         }
200     }
201 
202     @Override
203     public boolean remove(Object key, final boolean doNotNotifyCacheReplicators)
204     {
205         final Sync sync = getLockForKey(key);
206         sync.lock(LockType.WRITE);
207         try
208         {
209             return super.remove(key, doNotNotifyCacheReplicators);
210         }
211         finally
212         {
213             sync.unlock(LockType.WRITE);
214         }
215     }
216 
217 
218 
219     // removeAll(Collection) and removeAll(Collection, boolean) can specify multiple keys.  As with the
220     // single-key remove methods, these need to acquire write locks to prevent them from overlapping with
221     // lazy loads.  We could take the simple approach of iterating and delegating each one to remove(Object),
222     // but it is easy enough to group them into lock stripes first, and this seems like a wise precaution
223     // against a large cache removing several values at once (say, through expiration).
224 
225     @Override
226     public void removeAll(Collection<?> keys)
227     {
228         removeGroupedBySync(keys, new RemoveCallback()
229         {
230             @Override
231             public void removeUnderLock(Collection<?> keysForSync)
232             {
233                 underlyingCache.removeAll(keysForSync);
234             }
235         });
236     }
237 
238     @Override
239     public void removeAll(Collection<?> keys, final boolean doNotNotifyCacheReplicators)
240     {
241         removeGroupedBySync(keys, new RemoveCallback()
242         {
243             @Override
244             public void removeUnderLock(Collection<?> keysForSync)
245             {
246                 underlyingCache.removeAll(keysForSync, doNotNotifyCacheReplicators);
247             }
248         });
249     }
250 
251     /**
252      * Partitions the supplied keys into subsets that share the same lock, then calls
253      * {@link #removeGroupedBySync(Multimap, RemoveCallback)} with the results so that the callback
254      * can be applied to each subset with the corresponding write lock held.
255      *
256      * @param allKeys the keys to be grouped into subsets that share a lock, then removed from the cache
257      * @param callback the removal function to apply each lock's subset of keys while that write lock is held
258      */
259     private void removeGroupedBySync(Collection<?> allKeys, RemoveCallback callback)
260     {
261         final Multimap<Sync,?> map = Multimaps.index(allKeys, new Function<Object,Sync>()
262         {
263             @Override
264             public Sync apply(Object key)
265             {
266                 return getLockForKey(key);
267             }
268         });
269         removeGroupedBySync(map, callback);
270     }
271 
272     /**
273      * For each sync, acquires the write lock and calls {@link RemoveCallback#removeUnderLock(Collection)} with
274      * the corresponding list of keys, then releases the lock.
275      *
276      * @param keysBySync the mapping of locks to each lock's corresponding keys
277      * @param callback the removal function to apply each lock's subset of keys while that write lock is held
278      */
279     private static <K> void removeGroupedBySync(Multimap<Sync,K> keysBySync, RemoveCallback callback)
280     {
281         for (Map.Entry<Sync,Collection<K>> entry : keysBySync.asMap().entrySet())
282         {
283             final Sync sync = entry.getKey();
284             final Collection<K> keysUsingThisSync = entry.getValue();
285             sync.lock(LockType.WRITE);
286             try
287             {
288                 callback.removeUnderLock(keysUsingThisSync);
289             }
290             finally
291             {
292                 sync.unlock(LockType.WRITE);
293             }
294         }
295     }
296 
297 
298 
299     // The removeAll functions would have to acquire every single striped lock individually to prevent
300     // overlap with lazy loaders, and this probably isn't acceptable.  Instead, we pay the cost of an
301     // additional R/W lock that loaders can share and removeAll must acquire exclusively.
302 
303     @Override
304     public void removeAll()
305     {
306         removeAllLock().lock();
307         try
308         {
309             super.removeAll();
310         }
311         finally
312         {
313             removeAllLock().unlock();
314         }
315     }
316 
317     @Override
318     public void removeAll(boolean doNotNotifyCacheReplicators)
319     {
320         removeAllLock().lock();
321         try
322         {
323             super.removeAll(doNotNotifyCacheReplicators);
324         }
325         finally
326         {
327             removeAllLock().unlock();
328         }
329     }
330 
331     private static RuntimeException propagate(Object key, Throwable e)
332     {
333         Throwables.propagateIfInstanceOf(e, CacheException.class);
334         throw new CacheException("Could not fetch object for cache entry with key \"" + key + "\".", e);
335     }
336 
337     interface RemoveCallback
338     {
339         void removeUnderLock(Collection<?> keys);
340     }
341 }