View Javadoc

1   package com.atlassian.cache.memory;
2   
3   import java.util.Collection;
4   import java.util.Objects;
5   import java.util.SortedMap;
6   import java.util.concurrent.ConcurrentHashMap;
7   import java.util.concurrent.ConcurrentMap;
8   import java.util.concurrent.ExecutionException;
9   import java.util.concurrent.locks.Lock;
10  import java.util.concurrent.locks.ReadWriteLock;
11  import java.util.concurrent.locks.ReentrantReadWriteLock;
12  import javax.annotation.Nonnull;
13  import javax.annotation.Nullable;
14  
15  import com.atlassian.cache.Cache;
16  import com.atlassian.cache.CacheEntryListener;
17  import com.atlassian.cache.CacheException;
18  import com.atlassian.cache.CacheLoader;
19  import com.atlassian.cache.CacheSettings;
20  import com.atlassian.cache.CacheStatisticsKey;
21  import com.atlassian.cache.impl.CacheEntryListenerSupport;
22  import com.atlassian.cache.impl.DefaultCacheEntryListenerSupport;
23  import com.atlassian.cache.impl.OneShotLatch;
24  import com.atlassian.instrumentation.DefaultInstrumentRegistry;
25  import com.atlassian.instrumentation.SimpleTimer;
26  import com.atlassian.instrumentation.caches.CacheCollector;
27  import com.atlassian.instrumentation.caches.CacheKeys;
28  
29  import com.google.common.base.Throwables;
30  import com.google.common.cache.RemovalListener;
31  import com.google.common.cache.RemovalNotification;
32  import com.google.common.collect.ImmutableSortedMap;
33  import com.google.common.util.concurrent.UncheckedExecutionException;
34  
35  import static com.atlassian.cache.memory.DelegatingCacheStatistics.toStatistics;
36  import static java.util.Objects.requireNonNull;
37  
38  /**
39   * A Cache that delegates Concurrent Map.
40   *
41   * @since 2.0
42   */
43  class DelegatingCache<K, V> extends ManagedCacheSupport implements Cache<K, V>
44  {
45      private final com.google.common.cache.Cache<K, V> internalCache;
46      private final CacheEntryListenerSupport<K, V> listenerSupport;
47      private final CacheCollector collector;
48      private final CacheLoader<K, V> theLoader;
49  
50      private final ConcurrentMap<K, OneShotLatch> barriers = new ConcurrentHashMap<>(16);
51      private final Lock loadLock;
52      private final Lock removeAllLock;
53      {
54          // This needs to be fair to ensure that removeAll does not starve for a busy cache
55          final ReadWriteLock loadVsRemoveAllLock = new ReentrantReadWriteLock(true);
56          loadLock = loadVsRemoveAllLock.readLock();
57          removeAllLock = loadVsRemoveAllLock.writeLock();
58      }
59  
60      private DelegatingCache(final com.google.common.cache.Cache<K, V> internalCache,
61                              String name,
62                              CacheSettings settings,
63                              @Nullable CacheLoader<K, V> theLoader)
64      {
65          super(name, settings);
66          this.internalCache = internalCache;
67          this.listenerSupport = new DefaultCacheEntryListenerSupport<>();
68          this.theLoader = theLoader;
69          this.collector = new DefaultInstrumentRegistry().pullCacheCollector(name, internalCache::size);
70  
71          // Honor the CacheSettings value.
72          if (settings.getStatisticsEnabled() != null && settings.getStatisticsEnabled())
73          {
74              collector.setEnabled(true);
75          }
76      }
77  
78      static <K, V> DelegatingCache<K, V> create(final com.google.common.cache.Cache<K, V> internalCache,
79                                                 String name,
80                                                 CacheSettings settings,
81                                                 CacheLoader<K, V> cacheLoader)
82      {
83          return new DelegatingCache<>(internalCache, name, settings, cacheLoader);
84      }
85  
86      @Override
87      public CacheCollector getCacheCollector()
88      {
89          return collector;
90      }
91  
92      @Override
93      public boolean containsKey(@Nonnull K key)
94      {
95          return null != internalCache.getIfPresent(key);
96      }
97  
98      @Nonnull
99      @Override
100     public Collection<K> getKeys()
101     {
102         try
103         {
104             return internalCache.asMap().keySet();
105         }
106         catch (Exception e)
107         {
108             throw new CacheException(e);
109         }
110     }
111 
112     @Override
113     public void put(@Nonnull final K key, @Nonnull final V value)
114     {
115         try
116         {
117             V oldValue = internalCache.asMap().put(key, value);
118             if (isStatisticsEnabled())
119             {
120                 collector.put();
121             }
122             if (oldValue == null)
123             {
124                 // Here we care only for the case when oldValue was null, e.g. missing as in the other
125                 // cases DelegatingRemovalListener will be called with REPLACED notification
126                 listenerSupport.notifyAdd(key, value);
127             }
128         }
129         catch (Exception e)
130         {
131             throw new CacheException(e);
132         }
133     }
134 
135     @Override
136     public V get(@Nonnull final K key)
137     {
138         rejectNullKey(key);
139 
140         if (theLoader == null)
141         {
142             V value = internalCache.getIfPresent(key);
143             if (isStatisticsEnabled())
144             {
145                 if (value == null)
146                 {
147                     collector.miss();
148                 }
149                 else
150                 {
151                     collector.hit();
152                 }
153             }
154             return value;
155         }
156         else
157         {
158             return get(key, () -> theLoader.load(key));
159         }
160     }
161 
162     @Nonnull
163     @Override
164     public V get(@Nonnull final K key, @Nonnull final com.atlassian.cache.Supplier<? extends V> valueLoader)
165     {
166         rejectNullKey(key);
167 
168         // Using the following array as a mechanism for the valueLoader to flag whether it has been invoked.
169         // When the valueLoader is invoked, it will record that there was a miss.
170         final boolean[] missed = new boolean[1];
171         try
172         {
173             return internalCache.get(key, () -> {
174                 missed[0] = true;
175                 acquireLockFor(key);
176                 loadLock.lock();
177 
178                 final SimpleTimer timer = isStatisticsEnabled() ? new SimpleTimer(CacheKeys.LOAD_TIME.getName()) : null;
179                 if (timer != null)
180                 {
181                     timer.start();
182                 }
183 
184                 try
185                 {
186                     return Objects.requireNonNull(valueLoader.get());
187                 }
188                 finally
189                 {
190                     if (timer != null)
191                     {
192                         timer.end();
193                         collector.put();
194                         collector.getSplits().add(timer);
195                     }
196                 }
197             });
198         }
199         catch (ExecutionException e)
200         {
201             throw new CacheException("Unknown failure", e);
202         }
203         catch (UncheckedExecutionException e)
204         {
205             Throwable cause = e.getCause();
206             Throwables.propagateIfInstanceOf(cause, CacheException.class);
207             throw new CacheException(cause);
208         }
209         finally
210         {
211             if (missed[0])
212             {
213                 loadLock.unlock();
214                 releaseLockFor(key);
215             }
216 
217             if (isStatisticsEnabled())
218             {
219                 if (missed[0])
220                 {
221                     collector.miss();
222                 }
223                 else
224                 {
225                     collector.hit();
226                 }
227             }
228         }
229     }
230 
231     @Override
232     public void remove(@Nonnull final K key)
233     {
234         acquireLockFor(key);
235         try
236         {
237             internalCache.invalidate(key);
238             if (isStatisticsEnabled())
239             {
240                 collector.remove();
241             }
242         }
243         catch (Exception e)
244         {
245             throw new CacheException(e);
246         }
247         finally
248         {
249             releaseLockFor(key);
250         }
251     }
252 
253     @Override
254     public void removeAll()
255     {
256         removeAllLock.lock();
257         try
258         {
259             internalCache.invalidateAll();
260         }
261         catch (Exception e)
262         {
263             throw new CacheException(e);
264         }
265         finally
266         {
267             removeAllLock.unlock();
268         }
269     }
270 
271     @Override
272     public V putIfAbsent(@Nonnull K key, @Nonnull V value)
273     {
274         try
275         {
276             V oldValue = internalCache.asMap().putIfAbsent(key, value);
277             if (oldValue == null)
278             {
279                 // Here we care only for the case when oldValue was null, e.g. missing as in the other
280                 // cases DelegatingRemovalListener will be called with REPLACED notification
281                 listenerSupport.notifyAdd(key, value);
282             }
283             else
284             {
285                 if (isStatisticsEnabled())
286                 {
287                     collector.put();
288                 }
289             }
290             return oldValue;
291         }
292         catch (Exception e)
293         {
294             throw new CacheException(e);
295         }
296     }
297 
298     @Override
299     public boolean remove(@Nonnull K key, @Nonnull V value)
300     {
301         try
302         {
303             return internalCache.asMap().remove(key, value);
304         }
305         catch (Exception e)
306         {
307             throw new CacheException(e);
308         }
309         finally
310         {
311             if (isStatisticsEnabled())
312             {
313                 collector.remove();
314             }
315         }
316     }
317 
318     @Override
319     public boolean replace(@Nonnull K key, @Nonnull V oldValue, @Nonnull V newValue)
320     {
321         try
322         {
323             return internalCache.asMap().replace(key, oldValue, newValue);
324         }
325         catch (Exception e)
326         {
327             throw new CacheException(e);
328         }
329     }
330 
331     @Nonnull
332     @Override
333     public SortedMap<CacheStatisticsKey, com.atlassian.util.concurrent.Supplier<Long>> getStatistics()
334     {
335         if (isStatisticsEnabled())
336         {
337             return toStatistics(collector);
338         }
339         else
340         {
341             return ImmutableSortedMap.of();
342         }
343     }
344 
345     @Override
346     public void clear()
347     {
348         removeAll();
349     }
350 
351     @Override
352     public void setStatistics(final boolean enabled)
353     {
354         collector.setEnabled(enabled);
355     }
356 
357     @Override
358     public boolean isStatisticsEnabled()
359     {
360         // Either we have it enabled from UI or the config of the cache has it set from start up.
361         return collector.isEnabled();
362     }
363 
364     @Override
365     public boolean equals(@Nullable final Object other)
366     {
367         if (other instanceof DelegatingCache)
368         {
369             DelegatingCache<?, ?> otherDelegatingCache = (DelegatingCache<?, ?>) other;
370             if (internalCache.equals(otherDelegatingCache.internalCache))
371             {
372                 return true;
373             }
374         }
375         return false;
376     }
377 
378     @Override
379     public int hashCode()
380     {
381         return 3 + internalCache.hashCode();
382     }
383 
384     @Override
385     public void addListener(@Nonnull CacheEntryListener<K, V> listener, boolean includeValues)
386     {
387         listenerSupport.add(listener, includeValues);
388     }
389 
390     @Override
391     public void removeListener(@Nonnull CacheEntryListener<K, V> listener)
392     {
393         listenerSupport.remove(listener);
394     }
395 
396     protected static class DelegatingRemovalListener<K, V> implements RemovalListener<K, V>
397     {
398         private DelegatingCache<K, V> cache;
399 
400         protected void onSupply(K key, V value)
401         {
402             cache.listenerSupport.notifyAdd(key, value);
403         }
404 
405         @Override
406         public void onRemoval(@Nonnull RemovalNotification<K, V> notification)
407         {
408             switch (notification.getCause())
409             {
410                 case COLLECTED:
411                 case EXPIRED:
412                     cache.listenerSupport.notifyEvict(notification.getKey(), notification.getValue());
413                     break;
414                 case EXPLICIT:
415                     cache.listenerSupport.notifyRemove(notification.getKey(), notification.getValue());
416                     break;
417                 case REPLACED:
418                     K key = requireNonNull(notification.getKey());
419                     cache.listenerSupport.notifyUpdate(key, cache.internalCache.getIfPresent(key),
420                             notification.getValue());
421                     break;
422             }
423         }
424 
425         public void setCache(DelegatingCache<K, V> cache)
426         {
427             this.cache = cache;
428         }
429     }
430 
431     void rejectNullKey(K key)
432     {
433         if (key == null)
434         {
435             throw new CacheException(new NullPointerException("Null keys are not supported"));
436         }
437     }
438 
439     private OneShotLatch acquireLockFor(@Nonnull K key)
440     {
441         final OneShotLatch barrier = new OneShotLatch();
442         while (true)
443         {
444             final OneShotLatch existing = barriers.putIfAbsent(key, barrier);
445 
446             // successfully raised a new barrier
447             if (existing == null)
448             {
449                 return barrier;
450             }
451 
452             // There is an existing barrier that happens-before us; wait for it to be released.
453             // There is no need to attempt a remove or replace to evict it from the barriers map;
454             // The thread that owned it would have taken care of that before releasing it.
455             existing.await();
456         }
457     }
458 
459     private void releaseLockFor(K key)
460     {
461         final OneShotLatch barrier = barriers.get(key);
462         if (barrier != null && barrier.isHeldByCurrentThread())
463         {
464             barriers.remove(key);
465             barrier.release();
466         }
467     }
468 }
469