View Javadoc

1   package com.atlassian.cache.memory;
2   
3   import java.util.concurrent.ConcurrentHashMap;
4   import java.util.concurrent.ConcurrentMap;
5   import java.util.concurrent.locks.Lock;
6   import java.util.concurrent.locks.ReadWriteLock;
7   import java.util.concurrent.locks.ReentrantReadWriteLock;
8   
9   import javax.annotation.Nonnull;
10  
11  import com.atlassian.cache.impl.OneShotLatch;
12  
13  import com.google.common.cache.CacheLoader;
14  
15  import static com.atlassian.util.concurrent.Assertions.notNull;
16  
17  /**
18   *
19   * @since v2.4.5
20   */
21  public class BlockingCacheLoader<K,V> extends CacheLoader<K,V>
22  {
23      private final ConcurrentMap<K, OneShotLatch> barriers = new ConcurrentHashMap<K, OneShotLatch>(16);
24      private final CacheLoader<K, V> delegate;
25  
26      // This needs to be fair to ensure that removeAll does not starve for a busy cache
27      private final ReadWriteLock loadVsRemoveAllLock = new ReentrantReadWriteLock(true);
28  
29      BlockingCacheLoader(final CacheLoader<K, V> delegate)
30      {
31          this.delegate = notNull("delegate", delegate);
32      }
33  
34      Lock loadLock()
35      {
36          return loadVsRemoveAllLock.readLock();
37      }
38  
39      Lock removeAllLock()
40      {
41          return loadVsRemoveAllLock.writeLock();
42      }
43  
44      @SuppressWarnings("LockAcquiredButNotSafelyReleased")
45      @Override
46      public V load(@Nonnull K key) throws Exception
47      {
48          // to be released by postGetCleanup
49          acquire(key);
50          loadLock().lock();
51          return delegate.load(key);
52      }
53  
54      void postGetCleanup(@Nonnull K key)
55      {
56          final OneShotLatch barrier = barriers.get(key);
57          if (barrier != null && barrier.isHeldByCurrentThread())
58          {
59              // At this point, we know that Guava's LoadingValueReference has been replaced and we no longer
60              // need to hold up removeAll, so release the coarser lock first.
61              loadLock().unlock();
62              barriers.remove(key);
63              barrier.release();
64          }
65      }
66  
67      OneShotLatch acquire(@Nonnull K key)
68      {
69          final OneShotLatch barrier = new OneShotLatch();
70          while (true)
71          {
72              final OneShotLatch existing = barriers.putIfAbsent(key, barrier);
73  
74              // successfully raised a new barrier
75              if (existing == null)
76              {
77                  return barrier;
78              }
79  
80              // There is an existing barrier that happens-before us; wait for it to be released.
81              // There is no need to attempt a remove or replace to evict it from the barriers map;
82              // The thread that owned it would have taken care of that before releasing it.
83              existing.await();
84          }
85      }
86  
87      void release(@Nonnull K key)
88      {
89          final OneShotLatch existing = barriers.get(key);
90          if (existing == null || !existing.isHeldByCurrentThread())
91          {
92              throw new IllegalMonitorStateException("existing=" + existing);
93          }
94          barriers.remove(key);
95          existing.release();
96      }
97  
98  
99  }