View Javadoc
1   package com.atlassian.cache.hazelcast;
2   
3   import java.util.Collection;
4   import java.util.concurrent.TimeUnit;
5   
6   import javax.annotation.Nonnull;
7   
8   import com.atlassian.cache.Cache;
9   import com.atlassian.cache.CacheEntryEvent;
10  import com.atlassian.cache.CacheEntryListener;
11  import com.atlassian.cache.CacheException;
12  import com.atlassian.cache.CacheFactory;
13  import com.atlassian.cache.CacheLoader;
14  import com.atlassian.cache.CacheSettings;
15  import com.atlassian.cache.CacheSettingsBuilder;
16  import com.atlassian.cache.ManagedCache;
17  import com.atlassian.cache.Supplier;
18  import com.atlassian.cache.impl.CacheEntryListenerSupport;
19  import com.atlassian.cache.impl.CacheLoaderSupplier;
20  import com.atlassian.cache.impl.ValueCacheEntryListenerSupport;
21  
22  import com.google.common.base.Throwables;
23  import com.hazelcast.core.IMap;
24  
25  import org.slf4j.Logger;
26  import org.slf4j.LoggerFactory;
27  
28  import static com.google.common.base.Objects.equal;
29  
30  /**
31   * Implementation of {@link ManagedCache} and {@link Cache} that can be used when the cached values do not implement
32   * {@code Serializable} but cache invalidation must work cluster-wide.
33   * <p>
34   * This implementation tracks versions of entries in Hazelcast {@code IMap}, but stores the actual values in a local
35   * cache. Caches entries are invalidated when the cache is modified through one of the {@code put}, {@code clear},
36   * {@code remove} or {@code replace} methods. Cache entries are never invalidated when {@link #get(Object)} is called,
37   * not even when a value is lazily created and added to the cache.
38   * <p>
39   * These semantics mean that hybrid caches should only be used as computing caches (by providing a {@link
40   * com.atlassian.cache.CacheLoader}) and that values should not be manually {@link #put(Object, Object)} in the cache.
41   * Doing so will lead to frequent cache evictions.
42   *
43   * @since 2.4.0
44   */
45  public class HazelcastHybridCache<K, V> extends ManagedHybridCacheSupport implements Cache<K, V>
46  {
47  
48      private static final Logger log = LoggerFactory.getLogger(HazelcastHybridCache.class);
49  
50      private final Cache<K, Versioned<V>> localCache;
51      private final boolean selfLoading;
52      private final IMap<K, Long> versionMap;
53  
54      private final CacheEntryListenerSupport<K, V> listenerSupport = new ValueCacheEntryListenerSupport<K, V>()
55      {
56          @Override
57          protected void initValue(final CacheEntryListenerSupport<K, V> actualListenerSupport)
58          {
59              localCache.addListener(new DelegatingCacheEntryListener<K, V>(actualListenerSupport), true);
60          }
61  
62          @Override
63          protected void initValueless(final CacheEntryListenerSupport<K, V> actualListenerSupport)
64          {
65              localCache.addListener(new DelegatingCacheEntryListener<K, V>(actualListenerSupport), false);
66          }
67      };
68  
69      public HazelcastHybridCache(String name, CacheFactory localCacheFactory, IMap<K, Long> versionMap,
70              final CacheLoader<K, V> cacheLoader, HazelcastCacheManager cacheManager)
71      {
72          super(name, cacheManager);
73          this.selfLoading = cacheLoader != null;
74  
75          CacheLoader<K, Versioned<V>> versionedCacheLoader = selfLoading ? new CacheLoader<K, Versioned<V>>()
76          {
77              @Nonnull
78              @Override
79              public Versioned<V> load(@Nonnull K key)
80              {
81                  return loadAndVersion(key, new CacheLoaderSupplier<K, V>(key, cacheLoader));
82              }
83          } : null;
84  
85          this.versionMap = versionMap;
86          this.localCache = localCacheFactory.getCache(name, versionedCacheLoader, getCacheSettings());
87      }
88  
89      @Override
90      public void clear()
91      {
92          removeAll();
93      }
94  
95      @Override
96      public boolean containsKey(@Nonnull final K k)
97      {
98          return localCache.containsKey(k);
99      }
100 
101     @Override
102     public V get(@Nonnull K key)
103     {
104         return getInternal(key).getValue();
105     }
106 
107     @Nonnull
108     @Override
109     public V get(@Nonnull final K key, @Nonnull final Supplier<? extends V> valueSupplier)
110     {
111         return getInternal(key, new Supplier<Versioned<V>>()
112         {
113             @Override
114             public Versioned<V> get()
115             {
116                 return loadAndVersion(key, valueSupplier);
117             }
118         }).getValue();
119     }
120 
121     @Nonnull
122     @Override
123     public Collection<K> getKeys()
124     {
125         return localCache.getKeys();
126     }
127 
128     @Nonnull
129     @Override
130     public String getName()
131     {
132         return localCache.getName();
133     }
134 
135     @Override
136     public boolean isReplicateAsynchronously()
137     {
138         return false;
139     }
140 
141     @Override
142     public void put(@Nonnull K key, @Nonnull V value)
143     {
144         Long version = incrementVersion(key);
145         localCache.put(key, new Versioned<V>(value, version));
146     }
147 
148     /**
149      * {@inheritDoc}
150      * <p>
151      * This implementation has a weak spot: When there is an existing value, this method must return it. However, if
152      * that value is stored in a remote node, it cannot be returned. <tt>null</tt> is returned instead, which signals
153      * success. In order to not break any contracts, this implementation will invalidate the value on all nodes in this
154      * situation.
155      */
156     @Override
157     public V putIfAbsent(@Nonnull K key, @Nonnull V value)
158     {
159         Long nextVersion = getNextVersion(key);
160         Versioned<V> versioned = new Versioned<V>(value, nextVersion);
161         Versioned<V> oldValue = localCache.putIfAbsent(key, versioned);
162         if (oldValue == null)
163         {
164             // entry was missing in the local cache. Increment the version to invalidate the entry across the cluster.
165             // if no concurrent change to the entry was made, this should bring the version to nextVersion which
166             // matches the version in the local cache. If a concurrent change was made, the version in the versionMap
167             // will not match nextVersion and the local cache entry will be invalidated on the next get.
168             incrementVersion(key);
169 
170             return null;
171         }
172         return oldValue.getValue();
173     }
174 
175     @Override
176     public void remove(@Nonnull K key)
177     {
178         // increment the version to trigger a cluster-wide invalidation
179         incrementVersion(key);
180         localCache.remove(key);
181     }
182 
183     @Override
184     public boolean remove(@Nonnull K key, @Nonnull V value)
185     {
186         Versioned<V> currentValue = null;
187         try
188         {
189             currentValue = getInternal(key);
190         }
191         catch (CacheException e)
192         {
193             //if the get throws a CacheException we should not care it is only if the remove throws an
194             //exception that we should bubble the failure
195             log.debug("Swallowing exception thrown during call to remove, when looking up cache key: " + key, e);
196         }
197         if (currentValue != null && equal(value, currentValue.getValue()))
198         {
199             if (localCache.remove(key, currentValue))
200             {
201                 // increment the version to trigger a cluster-wide invalidation
202                 incrementVersion(key);
203                 return true;
204             }
205         }
206         return false;
207     }
208 
209     @Override
210     public void removeAll()
211     {
212         // increment all entry versions to trigger a cluster-wide invalidation
213         versionMap.executeOnEntries(IncrementVersionEntryProcessor.getInstance());
214         localCache.removeAll();
215     }
216 
217     @Override
218     public boolean replace(@Nonnull K key, @Nonnull V oldValue, @Nonnull V newValue)
219     {
220         Versioned<V> currentValue = getInternal(key);
221         if (equal(oldValue, currentValue.getValue()))
222         {
223             Long nextVersion = getNextVersion(key);
224             if (localCache.replace(key, currentValue, new Versioned<V>(newValue, nextVersion)))
225             {
226                 // entry was replaced in the local cache. Increment the version to invalidate the entry across the cluster.
227                 // if no concurrent change to the entry was made, this should bring the version to nextVersion which
228                 // matches the version in the local cache. If a concurrent change was made, the version in the versionMap
229                 // will not match nextVersion and the local cache entry will be invalidated on the next get.
230                 incrementVersion(key);
231                 return true;
232             }
233         }
234 
235         return false;
236     }
237 
238     @Override
239     public boolean updateExpireAfterAccess(long expireAfter, @Nonnull TimeUnit timeUnit)
240     {
241         if (!super.updateExpireAfterAccess(expireAfter, timeUnit))
242         {
243             return false;
244         }
245         CacheSettings settings = new CacheSettingsBuilder(getCacheSettings())
246                 .expireAfterAccess(expireAfter * HazelcastMapConfigConfigurator.HYBRID_MULTIPLIER, timeUnit)
247                 .build();
248         cacheManager.updateCacheSettings(getHazelcastMapName(), settings);
249         return true;
250     }
251 
252     @Override
253     public boolean updateExpireAfterWrite(long expireAfter, @Nonnull TimeUnit timeUnit)
254     {
255         if (!super.updateExpireAfterAccess(expireAfter, timeUnit))
256         {
257             return false;
258         }
259         CacheSettings settings = new CacheSettingsBuilder(getCacheSettings())
260                 .expireAfterAccess(expireAfter * HazelcastMapConfigConfigurator.HYBRID_MULTIPLIER, timeUnit)
261                 .build();
262         cacheManager.updateCacheSettings(getHazelcastMapName(), settings);
263         return true;
264     }
265 
266     @Override
267     public boolean updateMaxEntries(int newValue)
268     {
269         if (!super.updateMaxEntries(newValue))
270         {
271             return false;
272         }
273         CacheSettings settings = new CacheSettingsBuilder(getCacheSettings())
274                 .maxEntries(HazelcastMapConfigConfigurator.HYBRID_MULTIPLIER * newValue)
275                 .build();
276         cacheManager.updateCacheSettings(getHazelcastMapName(), settings);
277         return true;
278     }
279 
280     @Override
281     public void addListener(@Nonnull CacheEntryListener<K, V> listener, boolean includeValues)
282     {
283         listenerSupport.add(listener, includeValues);
284     }
285 
286     @Override
287     public void removeListener(@Nonnull CacheEntryListener<K, V> listener)
288     {
289         listenerSupport.remove(listener);
290     }
291 
292     @Override
293     protected ManagedCache getLocalCache()
294     {
295         return (ManagedCache) localCache;
296     }
297 
298     private CacheSettings getCacheSettings()
299     {
300         return cacheManager.getCacheSettings(getHazelcastMapName());
301     }
302 
303     private String getHazelcastMapName()
304     {
305         return versionMap.getName();
306     }
307 
308     @Override
309     public boolean isFlushable()
310     {
311         return getCacheSettings().getFlushable(true);
312     }
313 
314     private Versioned<V> loadAndVersion(final K key, Supplier<? extends V> supplier)
315     {
316         try
317         {
318             // retrieve the current version if it exists and is compatible with the generated value (identical hash),
319             // otherwise generate or increment the tracked version
320             long version = getVersion(key);
321 
322             V value = supplier.get();
323             //noinspection ConstantConditions
324             if (value == null)
325             {
326                 throw new CacheException("The generated value for cache '" + getName() + "' was null for key '" +
327                         key + "'. Null values are not supported.");
328             }
329 
330             log.debug("Generated value '{}' for key '{}' in cache with name '{}'", value, key, localCache.getName());
331             return new Versioned<V>(value, version);
332         }
333         catch (RuntimeException e)
334         {
335             Throwables.propagateIfInstanceOf(e, CacheException.class);
336             throw new CacheException("Error generating a value for key '" + key + "' in cache '" + localCache.getName() + "'", e);
337         }
338     }
339 
340     @Nonnull
341     private Versioned<V> getInternal(K key)
342     {
343         Versioned<V> versioned = localCache.get(key);
344         if (versioned != null)
345         {
346             Long version = versionMap.get(key);
347             if (version != null && version == versioned.getVersion())
348             {
349                 // versions match, cache is up to date
350                 return versioned;
351             }
352 
353             // Value in localCache is outdated, clear it.
354             localCache.remove(key);
355 
356             if (selfLoading)
357             {
358                 // a new value will be recalculated and the version will be properly initialized
359                 //noinspection ConstantConditions
360                 return localCache.get(key);
361             }
362         }
363         return Versioned.empty();
364     }
365 
366     @Nonnull
367     private Versioned<V> getInternal(K key, Supplier<Versioned<V>> valueSupplier)
368     {
369         Versioned<V> versioned = localCache.get(key, valueSupplier);
370         Long version = versionMap.get(key);
371         if (version != null && version == versioned.getVersion())
372         {
373             // versions match, cache is up to date
374             return versioned;
375         }
376 
377         // Value in localCache is outdated, clear it.
378         localCache.remove(key);
379 
380         // a new value will be recalculated and the version will be properly initialized
381         return localCache.get(key, valueSupplier);
382     }
383 
384     private Long getNextVersion(K key)
385     {
386         Long version = versionMap.get(key);
387         return version == null ? 1L : version + 1L;
388     }
389 
390     private Long getVersion(K key)
391     {
392         // try a standard get first to give the near-cache a chance
393         Long version = versionMap.get(key);
394         if (version == null)
395         {
396             version = (Long) versionMap.executeOnKey(key, GetOrInitVersionEntryProcessor.getInstance());
397         }
398         return version;
399     }
400 
401     private Long incrementVersion(K key)
402     {
403         return (Long) versionMap.executeOnKey(key, IncrementVersionEntryProcessor.getInstance());
404     }
405 
406     private static class DelegatingCacheEntryListener<K, V> implements CacheEntryListener<K, Versioned<V>>
407     {
408         private final CacheEntryListenerSupport<K, V> listenerSupport;
409 
410         private DelegatingCacheEntryListener(final CacheEntryListenerSupport<K, V> listenerSupport)
411         {
412             this.listenerSupport = listenerSupport;
413         }
414 
415         @Override
416         public void onAdd(@Nonnull CacheEntryEvent<K, Versioned<V>> event)
417         {
418             listenerSupport.notifyAdd(event.getKey(), get(event.getValue()));
419         }
420 
421         @Override
422         public void onEvict(@Nonnull CacheEntryEvent<K, Versioned<V>> event)
423         {
424             listenerSupport.notifyEvict(event.getKey(), get(event.getOldValue()));
425         }
426 
427         @Override
428         public void onRemove(@Nonnull CacheEntryEvent<K, Versioned<V>> event)
429         {
430             listenerSupport.notifyRemove(event.getKey(), get(event.getOldValue()));
431         }
432 
433         @Override
434         public void onUpdate(@Nonnull CacheEntryEvent<K, Versioned<V>> event)
435         {
436             listenerSupport.notifyUpdate(event.getKey(), get(event.getValue()), get(event.getOldValue()));
437         }
438 
439         private V get(Versioned<V> versioned)
440         {
441             return versioned != null ? versioned.getValue() : null;
442         }
443     }
444 }