1 package com.atlassian.cache.memory;
2
3 import java.util.concurrent.ConcurrentHashMap;
4 import java.util.concurrent.ConcurrentMap;
5 import java.util.concurrent.locks.Lock;
6 import java.util.concurrent.locks.ReadWriteLock;
7 import java.util.concurrent.locks.ReentrantReadWriteLock;
8
9 import javax.annotation.Nonnull;
10
11 import com.atlassian.cache.impl.OneShotLatch;
12
13 import com.google.common.cache.CacheLoader;
14
15 import static com.atlassian.util.concurrent.Assertions.notNull;
16
17
18
19
20
21 public class BlockingCacheLoader<K,V> extends CacheLoader<K,V>
22 {
23 private final ConcurrentMap<K, OneShotLatch> barriers = new ConcurrentHashMap<K, OneShotLatch>(16);
24 private final CacheLoader<K, V> delegate;
25
26
27 private final ReadWriteLock loadVsRemoveAllLock = new ReentrantReadWriteLock(true);
28
29 BlockingCacheLoader(final CacheLoader<K, V> delegate)
30 {
31 this.delegate = notNull("delegate", delegate);
32 }
33
34 Lock loadLock()
35 {
36 return loadVsRemoveAllLock.readLock();
37 }
38
39 Lock removeAllLock()
40 {
41 return loadVsRemoveAllLock.writeLock();
42 }
43
44 @SuppressWarnings("LockAcquiredButNotSafelyReleased")
45 @Override
46 public V load(@Nonnull K key) throws Exception
47 {
48
49 acquire(key);
50 loadLock().lock();
51 return delegate.load(key);
52 }
53
54 void postGetCleanup(@Nonnull K key)
55 {
56 final OneShotLatch barrier = barriers.get(key);
57 if (barrier != null && barrier.isHeldByCurrentThread())
58 {
59
60
61 loadLock().unlock();
62 barriers.remove(key);
63 barrier.release();
64 }
65 }
66
67 OneShotLatch acquire(@Nonnull K key)
68 {
69 final OneShotLatch barrier = new OneShotLatch();
70 while (true)
71 {
72 final OneShotLatch existing = barriers.putIfAbsent(key, barrier);
73
74
75 if (existing == null)
76 {
77 return barrier;
78 }
79
80
81
82
83 existing.await();
84 }
85 }
86
87 void release(@Nonnull K key)
88 {
89 final OneShotLatch existing = barriers.get(key);
90 if (existing == null || !existing.isHeldByCurrentThread())
91 {
92 throw new IllegalMonitorStateException("existing=" + existing);
93 }
94 barriers.remove(key);
95 existing.release();
96 }
97
98
99 }