1 package com.atlassian.vcache.internal.core.service;
2
3 import com.atlassian.vcache.RequestCache;
4 import com.atlassian.vcache.VCacheException;
5 import com.atlassian.vcache.internal.RequestContext;
6
7 import javax.annotation.Nullable;
8 import java.time.Duration;
9 import java.util.HashMap;
10 import java.util.Map;
11 import java.util.Objects;
12 import java.util.Optional;
13 import java.util.Set;
14 import java.util.concurrent.TimeUnit;
15 import java.util.concurrent.locks.StampedLock;
16 import java.util.function.Function;
17 import java.util.function.Supplier;
18 import java.util.stream.Collectors;
19 import java.util.stream.StreamSupport;
20
21 import static com.atlassian.vcache.internal.NameValidator.requireValidCacheName;
22 import static java.util.Objects.requireNonNull;
23
24
25
26
27
28
29
30
31
32
33
34
35
36 class ReadOptimisedRequestCache<K, V> implements RequestCache<K, V> {
37
38
39 private final ThreadLocal<Boolean> inWriteLock = ThreadLocal.withInitial(() -> false);
40
41 private final String name;
42 private final Supplier<RequestContext> contextSupplier;
43 private final Duration lockTimeout;
44
45 ReadOptimisedRequestCache(String name, Supplier<RequestContext> contextSupplier, Duration lockTimeout) {
46 this.name = requireValidCacheName(name);
47 this.contextSupplier = requireNonNull(contextSupplier);
48 this.lockTimeout = requireNonNull(lockTimeout);
49 }
50
51 @Override
52 public Optional<V> get(K key) {
53 return Optional.ofNullable(withOptimisticReadLock(map -> map.get(key)));
54 }
55
56 @Override
57 public V get(K key, Supplier<? extends V> supplier) {
58 final Optional<V> value = get(key);
59 return value.orElseGet(() -> {
60 final V candidateValue = requireNonNull(supplier.get());
61 final V existing = withWriteLock(map -> map.putIfAbsent(key, candidateValue));
62 return existing == null ? candidateValue : existing;
63 });
64 }
65
66 @Override
67 public Map<K, V> getBulk(final Function<Set<K>, Map<K, V>> factory, final Iterable<K> keys) {
68
69 final Function<Map<K, V>, Map<K, V>> cacheOps = map -> {
70 final Map<K, Optional<V>> existingValues =
71 StreamSupport.stream(keys.spliterator(), false)
72 .distinct()
73 .collect(Collectors.toMap(Objects::requireNonNull, k -> Optional.ofNullable(map.get(k))));
74
75
76 final Map<K, V> grandResult = existingValues.entrySet().stream()
77 .filter(e -> e.getValue().isPresent())
78 .collect(Collectors.toMap(
79 Map.Entry::getKey,
80 e -> e.getValue().get()));
81
82
83 if (grandResult.size() == existingValues.size()) {
84 return grandResult;
85 }
86
87
88 final Set<K> missingKeys = existingValues.entrySet().stream()
89 .filter(e -> !e.getValue().isPresent())
90 .map(Map.Entry::getKey)
91 .collect(Collectors.toSet());
92
93 final Map<K, V> missingValues = factory.apply(missingKeys);
94 FactoryUtils.verifyFactoryResult(missingValues, missingKeys);
95
96 missingValues.forEach((key, value) -> {
97
98 final Optional<V> existing = Optional.ofNullable(map.putIfAbsent(key, value));
99 grandResult.put(key, existing.orElse(value));
100 });
101
102 return grandResult;
103 };
104
105
106 if (inWriteLock.get()) {
107 return cacheOps.apply(ensureDelegate().map);
108 } else {
109 try {
110 inWriteLock.set(true);
111
112 return withWriteLock(cacheOps);
113 } finally {
114 inWriteLock.set(false);
115 }
116 }
117 }
118
119 @Override
120 public void put(K key, V value) {
121 withWriteLock(map -> map.put(key, value));
122 }
123
124 @Override
125 public Optional<V> putIfAbsent(K key, V value) {
126 return Optional.ofNullable(withWriteLock(map -> map.putIfAbsent(key, value)));
127 }
128
129 @Override
130 public boolean replaceIf(K key, V currentValue, V newValue) {
131
132 return withWriteLock(map -> map.replace(requireNonNull(key), requireNonNull(currentValue), requireNonNull(newValue)));
133 }
134
135 @Override
136 public boolean removeIf(K key, V value) {
137
138 return withWriteLock(map -> map.remove(requireNonNull(key), requireNonNull(value)));
139 }
140
141 @Override
142 public void remove(K key) {
143 withWriteLock(map -> map.remove(key));
144 }
145
146 @Override
147 public void removeAll() {
148 withWriteLock(map -> {
149 map.clear();
150 return 0;
151 });
152 }
153
154 @Override
155 public String getName() {
156 return name;
157 }
158
159 private MapAndLock<K, V> ensureDelegate() {
160 final RequestContext requestContext = contextSupplier.get();
161 return requestContext.computeIfAbsent(this, MapAndLock::new);
162 }
163
164 @Nullable
165 private <R> R withOptimisticReadLock(Function<Map<K, V>, R> getV) {
166 final MapAndLock<K, V> mapAndLock = ensureDelegate();
167 final long optimisticStamp = mapAndLock.lock.tryOptimisticRead();
168 final R value = getV.apply(mapAndLock.map);
169 if (mapAndLock.lock.validate(optimisticStamp)) {
170 return value;
171 }
172 final long readLockStamp;
173 try {
174 readLockStamp = mapAndLock.lock.tryReadLock(lockTimeout.toMillis(), TimeUnit.MILLISECONDS);
175 } catch (InterruptedException e) {
176 throw new VCacheException("Lock acquisition on cache interrupted.", e);
177 }
178 if (readLockStamp != 0) {
179 try {
180 return getV.apply(mapAndLock.map);
181 } finally {
182 mapAndLock.lock.unlock(readLockStamp);
183 }
184 }
185
186 throw new VCacheException("Failed to lock cache");
187 }
188
189 @Nullable
190 private <R> R withWriteLock(Function<Map<K, V>, R> putV) {
191 final MapAndLock<K, V> mapAndLock = ensureDelegate();
192 long stamp = 0;
193 try {
194 stamp = mapAndLock.lock.tryWriteLock(lockTimeout.toMillis(), TimeUnit.MILLISECONDS);
195 if (stamp != 0) {
196 return putV.apply(mapAndLock.map);
197 } else {
198 throw new VCacheException("Could not acquire write lock");
199 }
200 } catch (InterruptedException e) {
201 throw new VCacheException("Interrupted acquiring write lock", e);
202 } finally {
203 if (stamp != 0) {
204 mapAndLock.lock.unlockWrite(stamp);
205 }
206 }
207 }
208
209 private static class MapAndLock<K, V> {
210 final Map<K, V> map = new HashMap<>();
211 final StampedLock lock = new StampedLock();
212 }
213 }