View Javadoc
1   package com.atlassian.cache.memory;
2   
3   import java.util.Collection;
4   import java.util.Objects;
5   import java.util.SortedMap;
6   import java.util.concurrent.ConcurrentHashMap;
7   import java.util.concurrent.ConcurrentMap;
8   import java.util.concurrent.ExecutionException;
9   import java.util.concurrent.locks.Lock;
10  import java.util.concurrent.locks.ReadWriteLock;
11  import java.util.concurrent.locks.ReentrantLock;
12  import java.util.concurrent.locks.ReentrantReadWriteLock;
13  import javax.annotation.Nonnull;
14  import javax.annotation.Nullable;
15  
16  import com.atlassian.cache.Cache;
17  import com.atlassian.cache.CacheEntryListener;
18  import com.atlassian.cache.CacheException;
19  import com.atlassian.cache.CacheLoader;
20  import com.atlassian.cache.CacheSettings;
21  import com.atlassian.cache.CacheStatisticsKey;
22  import com.atlassian.cache.impl.CacheEntryListenerSupport;
23  import com.atlassian.cache.impl.DefaultCacheEntryListenerSupport;
24  import com.atlassian.instrumentation.DefaultInstrumentRegistry;
25  import com.atlassian.instrumentation.SimpleTimer;
26  import com.atlassian.instrumentation.caches.CacheCollector;
27  import com.atlassian.instrumentation.caches.CacheKeys;
28  
29  import com.google.common.base.Throwables;
30  import com.google.common.cache.RemovalListener;
31  import com.google.common.cache.RemovalNotification;
32  import com.google.common.collect.ImmutableSortedMap;
33  import com.google.common.util.concurrent.UncheckedExecutionException;
34  import org.slf4j.Logger;
35  import org.slf4j.LoggerFactory;
36  
37  import static com.atlassian.cache.memory.DelegatingCacheStatistics.toStatistics;
38  import static java.util.Objects.requireNonNull;
39  
40  /**
41   * A Cache that delegates Concurrent Map.
42   *
43   * @since 2.0
44   */
45  class DelegatingCache<K, V> extends ManagedCacheSupport implements Cache<K, V>
46  {
47      static final CreateFunction DEFAULT_CREATE_FUNCTION = new DefaultCreateFunction();
48  
49      private final Logger eventLogger;
50      private final Logger stacktraceLogger;
51  
52      private final com.google.common.cache.Cache<K, V> internalCache;
53      private final CacheEntryListenerSupport<K, V> listenerSupport;
54      private final CacheCollector collector;
55      private final CacheLoader<K, V> theLoader;
56  
57      private final ConcurrentMap<K, ReentrantLock> barriers = new ConcurrentHashMap<>(16);
58      private final Lock loadLock;
59      private final Lock removeAllLock;
60      {
61          // This needs to be fair to ensure that removeAll does not starve for a busy cache
62          final ReadWriteLock loadVsRemoveAllLock = new ReentrantReadWriteLock(true);
63          loadLock = loadVsRemoveAllLock.readLock();
64          removeAllLock = loadVsRemoveAllLock.writeLock();
65      }
66  
67      protected DelegatingCache(final com.google.common.cache.Cache<K, V> internalCache,
68                              String name,
69                              CacheSettings settings,
70                              @Nullable CacheLoader<K, V> theLoader)
71      {
72          super(name, settings);
73          this.internalCache = internalCache;
74          this.listenerSupport = new DefaultCacheEntryListenerSupport<>();
75          this.theLoader = theLoader;
76          this.collector = new DefaultInstrumentRegistry().pullCacheCollector(name, internalCache::size);
77  
78          this.eventLogger = LoggerFactory.getLogger("com.atlassian.cache.event." + name);
79          this.stacktraceLogger = LoggerFactory.getLogger("com.atlassian.cache.stacktrace." + name);
80      }
81  
82      static <K, V> DelegatingCache<K, V> create(final com.google.common.cache.Cache<K, V> internalCache,
83                                                 String name,
84                                                 CacheSettings settings,
85                                                 CacheLoader<K, V> cacheLoader)
86      {
87          return DEFAULT_CREATE_FUNCTION.create(internalCache, name, settings, cacheLoader);
88      }
89  
90      @Override
91      public CacheCollector getCacheCollector()
92      {
93          return collector;
94      }
95  
96      @Override
97      public boolean containsKey(@Nonnull K key)
98      {
99          return null != internalCache.getIfPresent(key);
100     }
101 
102     @Nonnull
103     @Override
104     public Collection<K> getKeys()
105     {
106         try
107         {
108             return internalCache.asMap().keySet();
109         }
110         catch (Exception e)
111         {
112             throw new CacheException(e);
113         }
114     }
115 
116     @Override
117     public void put(@Nonnull final K key, @Nonnull final V value)
118     {
119         try
120         {
121             V oldValue = internalCache.asMap().put(key, value);
122             if (isCollectorStatisticsEnabled())
123             {
124                 collector.put();
125             }
126             if (oldValue == null)
127             {
128                 // Here we care only for the case when oldValue was null, e.g. missing as in the other
129                 // cases DelegatingRemovalListener will be called with REPLACED notification
130                 listenerSupport.notifyAdd(key, value);
131             }
132         }
133         catch (Exception e)
134         {
135             throw new CacheException(e);
136         }
137     }
138 
139     @Override
140     public V get(@Nonnull final K key)
141     {
142         rejectNullKey(key);
143 
144         if (theLoader == null)
145         {
146             V value = internalCache.getIfPresent(key);
147             if (isCollectorStatisticsEnabled())
148             {
149                 if (value == null)
150                 {
151                     collector.miss();
152                 }
153                 else
154                 {
155                     collector.hit();
156                 }
157             }
158             return value;
159         }
160         else
161         {
162             return get(key, () -> theLoader.load(key));
163         }
164     }
165 
166     @Nonnull
167     @Override
168     public V get(@Nonnull final K key, @Nonnull final com.atlassian.cache.Supplier<? extends V> valueLoader)
169     {
170         rejectNullKey(key);
171 
172         // Using the following array as a mechanism for the valueLoader to flag whether it has been invoked.
173         // When the valueLoader is invoked, it will record that there was a miss.
174         final boolean[] missed = new boolean[1];
175         try
176         {
177             return internalCache.get(key, () -> {
178                 missed[0] = true;
179                 acquireLockFor(key);
180                 loadLock.lock();
181 
182                 final SimpleTimer timer = isCollectorStatisticsEnabled() ? new SimpleTimer(CacheKeys.LOAD_TIME.getName()) : null;
183                 if (timer != null)
184                 {
185                     timer.start();
186                 }
187 
188                 try
189                 {
190                     return Objects.requireNonNull(valueLoader.get());
191                 }
192                 finally
193                 {
194                     if (timer != null)
195                     {
196                         timer.end();
197                         collector.put();
198                         collector.getSplits().add(timer);
199                     }
200                 }
201             });
202         }
203         catch (ExecutionException e)
204         {
205             throw new CacheException("Unknown failure", e);
206         }
207         catch (UncheckedExecutionException e)
208         {
209             Throwable cause = e.getCause();
210             Throwables.propagateIfInstanceOf(cause, CacheException.class);
211             throw new CacheException(cause);
212         }
213         finally
214         {
215             if (missed[0])
216             {
217                 loadLock.unlock();
218                 releaseLockFor(key);
219             }
220 
221             if (isCollectorStatisticsEnabled())
222             {
223                 if (missed[0])
224                 {
225                     collector.miss();
226                 }
227                 else
228                 {
229                     collector.hit();
230                 }
231             }
232         }
233     }
234 
235     @Override
236     public void remove(@Nonnull final K key)
237     {
238         acquireLockFor(key);
239         try
240         {
241             internalCache.invalidate(key);
242             if (isCollectorStatisticsEnabled())
243             {
244                 collector.remove();
245             }
246         }
247         catch (Exception e)
248         {
249             throw new CacheException(e);
250         }
251         finally
252         {
253             releaseLockFor(key);
254         }
255     }
256 
257     @Override
258     public void removeAll()
259     {
260         removeAllLock.lock();
261         try
262         {
263             internalCache.invalidateAll();
264             eventLogger.info("Cache {} was flushed", getName());
265             if (stacktraceLogger.isInfoEnabled()) {
266                 stacktraceLogger.info("Cache {} was flushed. Stacktrace:", getName(), new Exception());
267             }
268         }
269         catch (Exception e)
270         {
271             throw new CacheException(e);
272         }
273         finally
274         {
275             removeAllLock.unlock();
276         }
277     }
278 
279     @Override
280     public V putIfAbsent(@Nonnull K key, @Nonnull V value)
281     {
282         try
283         {
284             V oldValue = internalCache.asMap().putIfAbsent(key, value);
285             if (oldValue == null)
286             {
287                 // Here we care only for the case when oldValue was null, e.g. missing as in the other
288                 // cases DelegatingRemovalListener will be called with REPLACED notification
289                 listenerSupport.notifyAdd(key, value);
290             }
291             else
292             {
293                 if (isCollectorStatisticsEnabled())
294                 {
295                     collector.put();
296                 }
297             }
298             return oldValue;
299         }
300         catch (Exception e)
301         {
302             throw new CacheException(e);
303         }
304     }
305 
306     @Override
307     public boolean remove(@Nonnull K key, @Nonnull V value)
308     {
309         try
310         {
311             return internalCache.asMap().remove(key, value);
312         }
313         catch (Exception e)
314         {
315             throw new CacheException(e);
316         }
317         finally
318         {
319             if (isCollectorStatisticsEnabled())
320             {
321                 collector.remove();
322             }
323         }
324     }
325 
326     @Override
327     public boolean replace(@Nonnull K key, @Nonnull V oldValue, @Nonnull V newValue)
328     {
329         try
330         {
331             return internalCache.asMap().replace(key, oldValue, newValue);
332         }
333         catch (Exception e)
334         {
335             throw new CacheException(e);
336         }
337     }
338 
339     @Nonnull
340     @Override
341     public SortedMap<CacheStatisticsKey, java.util.function.Supplier<Long>> getStatistics()
342     {
343         if (isStatisticsEnabled())
344         {
345             return toStatistics(internalCache);
346         }
347         else
348         {
349             return ImmutableSortedMap.of();
350         }
351     }
352 
353     @Override
354     public void clear()
355     {
356         removeAll();
357     }
358 
359     private boolean isCollectorStatisticsEnabled()
360     {
361         return collector.isEnabled();
362     }
363 
364     @Override
365     public boolean isStatisticsEnabled() {
366         return settings.getStatisticsEnabled();
367     }
368 
369     @Override
370     public boolean equals(@Nullable final Object other)
371     {
372         if (other instanceof DelegatingCache)
373         {
374             DelegatingCache<?, ?> otherDelegatingCache = (DelegatingCache<?, ?>) other;
375             if (internalCache.equals(otherDelegatingCache.internalCache))
376             {
377                 return true;
378             }
379         }
380         return false;
381     }
382 
383     @Override
384     public int hashCode()
385     {
386         return 3 + internalCache.hashCode();
387     }
388 
389     @Override
390     public void addListener(@Nonnull CacheEntryListener<K, V> listener, boolean includeValues)
391     {
392         listenerSupport.add(listener, includeValues);
393     }
394 
395     @Override
396     public void removeListener(@Nonnull CacheEntryListener<K, V> listener)
397     {
398         listenerSupport.remove(listener);
399     }
400 
401     protected static class DelegatingRemovalListener<K, V> implements RemovalListener<K, V>
402     {
403         private DelegatingCache<K, V> cache;
404 
405         protected void onSupply(K key, V value)
406         {
407             cache.listenerSupport.notifyAdd(key, value);
408         }
409 
410         @Override
411         public void onRemoval(@Nonnull RemovalNotification<K, V> notification)
412         {
413             switch (notification.getCause())
414             {
415                 case COLLECTED:
416                 case EXPIRED:
417                 case SIZE:
418                     cache.listenerSupport.notifyEvict(notification.getKey(), notification.getValue());
419                     break;
420                 case EXPLICIT:
421                     cache.listenerSupport.notifyRemove(notification.getKey(), notification.getValue());
422                     break;
423                 case REPLACED:
424                     K key = requireNonNull(notification.getKey());
425                     cache.listenerSupport.notifyUpdate(key, cache.internalCache.getIfPresent(key),
426                             notification.getValue());
427                     break;
428             }
429         }
430 
431         public void setCache(DelegatingCache<K, V> cache)
432         {
433             this.cache = cache;
434         }
435     }
436 
437     void rejectNullKey(K key)
438     {
439         if (key == null)
440         {
441             throw new CacheException(new NullPointerException("Null keys are not supported"));
442         }
443     }
444 
445     private ReentrantLock acquireLockFor(@Nonnull K key)
446     {
447         final ReentrantLock barrier = new ReentrantLock();
448         barrier.lock();
449         while (true)
450         {
451             final ReentrantLock existing = barriers.putIfAbsent(key, barrier);
452 
453             if (existing == null)
454             {
455                 // We successfully acquired the lock for the first time.
456                 return barrier;
457             }
458             else if (existing.isHeldByCurrentThread())
459             {
460                 // We already hold the lock, re-acquire it again in a re-entrant fashion.
461                 existing.lock();
462                 return existing;
463             }
464             // else some other thread holds the lock, wait for it to be released before trying again.
465             existing.lock();
466             // Now we have woken up, release the lock we just acquired so that any other thread(s) waiting on the same
467             // lock will be woken up.
468             existing.unlock();
469         }
470     }
471 
472     private void releaseLockFor(K key)
473     {
474         final ReentrantLock barrier = barriers.get(key);
475         if (barrier != null && barrier.isHeldByCurrentThread())
476         {
477             if (barrier.getHoldCount() <= 1)
478             {
479                 barriers.remove(key);
480             }
481             barrier.unlock();
482         }
483     }
484 
485     public interface CreateFunction
486     {
487         <K,V> DelegatingCache<K, V> create(final com.google.common.cache.Cache<K, V> internalCache,
488                                                                  String name,
489                                                                  CacheSettings settings,
490                                                                  CacheLoader<K, V> cacheLoader);
491     }
492 
493     public static class DefaultCreateFunction implements CreateFunction
494     {
495 
496         @Override
497         public <K,V> DelegatingCache<K, V> create(com.google.common.cache.Cache<K, V> internalCache, String name,
498                                             CacheSettings settings, CacheLoader<K, V> cacheLoader) {
499             return new DelegatingCache<>(internalCache, name, settings, cacheLoader);
500         }
501     }
502 }
503