1 package com.atlassian.cache.memory;
2
3 import java.util.Collection;
4 import java.util.Objects;
5 import java.util.SortedMap;
6 import java.util.concurrent.ConcurrentHashMap;
7 import java.util.concurrent.ConcurrentMap;
8 import java.util.concurrent.ExecutionException;
9 import java.util.concurrent.locks.Lock;
10 import java.util.concurrent.locks.ReadWriteLock;
11 import java.util.concurrent.locks.ReentrantLock;
12 import java.util.concurrent.locks.ReentrantReadWriteLock;
13 import javax.annotation.Nonnull;
14 import javax.annotation.Nullable;
15
16 import com.atlassian.cache.Cache;
17 import com.atlassian.cache.CacheEntryListener;
18 import com.atlassian.cache.CacheException;
19 import com.atlassian.cache.CacheLoader;
20 import com.atlassian.cache.CacheSettings;
21 import com.atlassian.cache.CacheStatisticsKey;
22 import com.atlassian.cache.impl.CacheEntryListenerSupport;
23 import com.atlassian.cache.impl.DefaultCacheEntryListenerSupport;
24 import com.atlassian.instrumentation.DefaultInstrumentRegistry;
25 import com.atlassian.instrumentation.SimpleTimer;
26 import com.atlassian.instrumentation.caches.CacheCollector;
27 import com.atlassian.instrumentation.caches.CacheKeys;
28
29 import com.google.common.base.Throwables;
30 import com.google.common.cache.RemovalListener;
31 import com.google.common.cache.RemovalNotification;
32 import com.google.common.collect.ImmutableSortedMap;
33 import com.google.common.util.concurrent.UncheckedExecutionException;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 import static com.atlassian.cache.memory.DelegatingCacheStatistics.toStatistics;
38 import static java.util.Objects.requireNonNull;
39
40
41
42
43
44
45 class DelegatingCache<K, V> extends ManagedCacheSupport implements Cache<K, V>
46 {
47 static final CreateFunction DEFAULT_CREATE_FUNCTION = new DefaultCreateFunction();
48
49 private final Logger eventLogger;
50 private final Logger stacktraceLogger;
51
52 private final com.google.common.cache.Cache<K, V> internalCache;
53 private final CacheEntryListenerSupport<K, V> listenerSupport;
54 private final CacheCollector collector;
55 private final CacheLoader<K, V> theLoader;
56
57 private final ConcurrentMap<K, ReentrantLock> barriers = new ConcurrentHashMap<>(16);
58 private final Lock loadLock;
59 private final Lock removeAllLock;
60 {
61
62 final ReadWriteLock loadVsRemoveAllLock = new ReentrantReadWriteLock(true);
63 loadLock = loadVsRemoveAllLock.readLock();
64 removeAllLock = loadVsRemoveAllLock.writeLock();
65 }
66
67 protected DelegatingCache(final com.google.common.cache.Cache<K, V> internalCache,
68 String name,
69 CacheSettings settings,
70 @Nullable CacheLoader<K, V> theLoader)
71 {
72 super(name, settings);
73 this.internalCache = internalCache;
74 this.listenerSupport = new DefaultCacheEntryListenerSupport<>();
75 this.theLoader = theLoader;
76 this.collector = new DefaultInstrumentRegistry().pullCacheCollector(name, internalCache::size);
77
78 this.eventLogger = LoggerFactory.getLogger("com.atlassian.cache.event." + name);
79 this.stacktraceLogger = LoggerFactory.getLogger("com.atlassian.cache.stacktrace." + name);
80 }
81
82 static <K, V> DelegatingCache<K, V> create(final com.google.common.cache.Cache<K, V> internalCache,
83 String name,
84 CacheSettings settings,
85 CacheLoader<K, V> cacheLoader)
86 {
87 return DEFAULT_CREATE_FUNCTION.create(internalCache, name, settings, cacheLoader);
88 }
89
90 @Override
91 public CacheCollector getCacheCollector()
92 {
93 return collector;
94 }
95
96 @Override
97 public boolean containsKey(@Nonnull K key)
98 {
99 return null != internalCache.getIfPresent(key);
100 }
101
102 @Nonnull
103 @Override
104 public Collection<K> getKeys()
105 {
106 try
107 {
108 return internalCache.asMap().keySet();
109 }
110 catch (Exception e)
111 {
112 throw new CacheException(e);
113 }
114 }
115
116 @Override
117 public void put(@Nonnull final K key, @Nonnull final V value)
118 {
119 try
120 {
121 V oldValue = internalCache.asMap().put(key, value);
122 if (isCollectorStatisticsEnabled())
123 {
124 collector.put();
125 }
126 if (oldValue == null)
127 {
128
129
130 listenerSupport.notifyAdd(key, value);
131 }
132 }
133 catch (Exception e)
134 {
135 throw new CacheException(e);
136 }
137 }
138
139 @Override
140 public V get(@Nonnull final K key)
141 {
142 rejectNullKey(key);
143
144 if (theLoader == null)
145 {
146 V value = internalCache.getIfPresent(key);
147 if (isCollectorStatisticsEnabled())
148 {
149 if (value == null)
150 {
151 collector.miss();
152 }
153 else
154 {
155 collector.hit();
156 }
157 }
158 return value;
159 }
160 else
161 {
162 return get(key, () -> theLoader.load(key));
163 }
164 }
165
166 @Nonnull
167 @Override
168 public V get(@Nonnull final K key, @Nonnull final com.atlassian.cache.Supplier<? extends V> valueLoader)
169 {
170 rejectNullKey(key);
171
172
173
174 final boolean[] missed = new boolean[1];
175 try
176 {
177 return internalCache.get(key, () -> {
178 missed[0] = true;
179 acquireLockFor(key);
180 loadLock.lock();
181
182 final SimpleTimer timer = isCollectorStatisticsEnabled() ? new SimpleTimer(CacheKeys.LOAD_TIME.getName()) : null;
183 if (timer != null)
184 {
185 timer.start();
186 }
187
188 try
189 {
190 return Objects.requireNonNull(valueLoader.get());
191 }
192 finally
193 {
194 if (timer != null)
195 {
196 timer.end();
197 collector.put();
198 collector.getSplits().add(timer);
199 }
200 }
201 });
202 }
203 catch (ExecutionException e)
204 {
205 throw new CacheException("Unknown failure", e);
206 }
207 catch (UncheckedExecutionException e)
208 {
209 Throwable cause = e.getCause();
210 Throwables.propagateIfInstanceOf(cause, CacheException.class);
211 throw new CacheException(cause);
212 }
213 finally
214 {
215 if (missed[0])
216 {
217 loadLock.unlock();
218 releaseLockFor(key);
219 }
220
221 if (isCollectorStatisticsEnabled())
222 {
223 if (missed[0])
224 {
225 collector.miss();
226 }
227 else
228 {
229 collector.hit();
230 }
231 }
232 }
233 }
234
235 @Override
236 public void remove(@Nonnull final K key)
237 {
238 acquireLockFor(key);
239 try
240 {
241 internalCache.invalidate(key);
242 if (isCollectorStatisticsEnabled())
243 {
244 collector.remove();
245 }
246 }
247 catch (Exception e)
248 {
249 throw new CacheException(e);
250 }
251 finally
252 {
253 releaseLockFor(key);
254 }
255 }
256
257 @Override
258 public void removeAll()
259 {
260 removeAllLock.lock();
261 try
262 {
263 internalCache.invalidateAll();
264 eventLogger.info("Cache {} was flushed", getName());
265 if (stacktraceLogger.isInfoEnabled()) {
266 stacktraceLogger.info("Cache {} was flushed. Stacktrace:", getName(), new Exception());
267 }
268 }
269 catch (Exception e)
270 {
271 throw new CacheException(e);
272 }
273 finally
274 {
275 removeAllLock.unlock();
276 }
277 }
278
279 @Override
280 public V putIfAbsent(@Nonnull K key, @Nonnull V value)
281 {
282 try
283 {
284 V oldValue = internalCache.asMap().putIfAbsent(key, value);
285 if (oldValue == null)
286 {
287
288
289 listenerSupport.notifyAdd(key, value);
290 }
291 else
292 {
293 if (isCollectorStatisticsEnabled())
294 {
295 collector.put();
296 }
297 }
298 return oldValue;
299 }
300 catch (Exception e)
301 {
302 throw new CacheException(e);
303 }
304 }
305
306 @Override
307 public boolean remove(@Nonnull K key, @Nonnull V value)
308 {
309 try
310 {
311 return internalCache.asMap().remove(key, value);
312 }
313 catch (Exception e)
314 {
315 throw new CacheException(e);
316 }
317 finally
318 {
319 if (isCollectorStatisticsEnabled())
320 {
321 collector.remove();
322 }
323 }
324 }
325
326 @Override
327 public boolean replace(@Nonnull K key, @Nonnull V oldValue, @Nonnull V newValue)
328 {
329 try
330 {
331 return internalCache.asMap().replace(key, oldValue, newValue);
332 }
333 catch (Exception e)
334 {
335 throw new CacheException(e);
336 }
337 }
338
339 @Nonnull
340 @Override
341 public SortedMap<CacheStatisticsKey, java.util.function.Supplier<Long>> getStatistics()
342 {
343 if (isStatisticsEnabled())
344 {
345 return toStatistics(internalCache);
346 }
347 else
348 {
349 return ImmutableSortedMap.of();
350 }
351 }
352
353 @Override
354 public void clear()
355 {
356 removeAll();
357 }
358
359 private boolean isCollectorStatisticsEnabled()
360 {
361 return collector.isEnabled();
362 }
363
364 @Override
365 public boolean isStatisticsEnabled() {
366 return settings.getStatisticsEnabled();
367 }
368
369 @Override
370 public boolean equals(@Nullable final Object other)
371 {
372 if (other instanceof DelegatingCache)
373 {
374 DelegatingCache<?, ?> otherDelegatingCache = (DelegatingCache<?, ?>) other;
375 if (internalCache.equals(otherDelegatingCache.internalCache))
376 {
377 return true;
378 }
379 }
380 return false;
381 }
382
383 @Override
384 public int hashCode()
385 {
386 return 3 + internalCache.hashCode();
387 }
388
389 @Override
390 public void addListener(@Nonnull CacheEntryListener<K, V> listener, boolean includeValues)
391 {
392 listenerSupport.add(listener, includeValues);
393 }
394
395 @Override
396 public void removeListener(@Nonnull CacheEntryListener<K, V> listener)
397 {
398 listenerSupport.remove(listener);
399 }
400
401 protected static class DelegatingRemovalListener<K, V> implements RemovalListener<K, V>
402 {
403 private DelegatingCache<K, V> cache;
404
405 protected void onSupply(K key, V value)
406 {
407 cache.listenerSupport.notifyAdd(key, value);
408 }
409
410 @Override
411 public void onRemoval(@Nonnull RemovalNotification<K, V> notification)
412 {
413 switch (notification.getCause())
414 {
415 case COLLECTED:
416 case EXPIRED:
417 case SIZE:
418 cache.listenerSupport.notifyEvict(notification.getKey(), notification.getValue());
419 break;
420 case EXPLICIT:
421 cache.listenerSupport.notifyRemove(notification.getKey(), notification.getValue());
422 break;
423 case REPLACED:
424 K key = requireNonNull(notification.getKey());
425 cache.listenerSupport.notifyUpdate(key, cache.internalCache.getIfPresent(key),
426 notification.getValue());
427 break;
428 }
429 }
430
431 public void setCache(DelegatingCache<K, V> cache)
432 {
433 this.cache = cache;
434 }
435 }
436
437 void rejectNullKey(K key)
438 {
439 if (key == null)
440 {
441 throw new CacheException(new NullPointerException("Null keys are not supported"));
442 }
443 }
444
445 private ReentrantLock acquireLockFor(@Nonnull K key)
446 {
447 final ReentrantLock barrier = new ReentrantLock();
448 barrier.lock();
449 while (true)
450 {
451 final ReentrantLock existing = barriers.putIfAbsent(key, barrier);
452
453 if (existing == null)
454 {
455
456 return barrier;
457 }
458 else if (existing.isHeldByCurrentThread())
459 {
460
461 existing.lock();
462 return existing;
463 }
464
465 existing.lock();
466
467
468 existing.unlock();
469 }
470 }
471
472 private void releaseLockFor(K key)
473 {
474 final ReentrantLock barrier = barriers.get(key);
475 if (barrier != null && barrier.isHeldByCurrentThread())
476 {
477 if (barrier.getHoldCount() <= 1)
478 {
479 barriers.remove(key);
480 }
481 barrier.unlock();
482 }
483 }
484
485 public interface CreateFunction
486 {
487 <K,V> DelegatingCache<K, V> create(final com.google.common.cache.Cache<K, V> internalCache,
488 String name,
489 CacheSettings settings,
490 CacheLoader<K, V> cacheLoader);
491 }
492
493 public static class DefaultCreateFunction implements CreateFunction
494 {
495
496 @Override
497 public <K,V> DelegatingCache<K, V> create(com.google.common.cache.Cache<K, V> internalCache, String name,
498 CacheSettings settings, CacheLoader<K, V> cacheLoader) {
499 return new DelegatingCache<>(internalCache, name, settings, cacheLoader);
500 }
501 }
502 }
503