View Javadoc

1   package com.atlassian.cache.hazelcast;
2   
3   import java.util.Collection;
4   import java.util.concurrent.TimeUnit;
5   
6   import javax.annotation.Nonnull;
7   
8   import com.atlassian.cache.Cache;
9   import com.atlassian.cache.CacheEntryEvent;
10  import com.atlassian.cache.CacheEntryListener;
11  import com.atlassian.cache.CacheException;
12  import com.atlassian.cache.CacheFactory;
13  import com.atlassian.cache.CacheLoader;
14  import com.atlassian.cache.CacheSettings;
15  import com.atlassian.cache.ManagedCache;
16  import com.atlassian.cache.Supplier;
17  import com.atlassian.cache.impl.CacheEntryListenerSupport;
18  import com.atlassian.cache.impl.CacheLoaderSupplier;
19  import com.atlassian.cache.impl.ValueCacheEntryListenerSupport;
20  
21  import com.google.common.base.Throwables;
22  import com.hazelcast.config.MapConfig;
23  import com.hazelcast.core.IMap;
24  
25  import org.slf4j.Logger;
26  import org.slf4j.LoggerFactory;
27  
28  import static com.google.common.base.Objects.equal;
29  
30  /**
31   * Implementation of {@link ManagedCache} and {@link Cache} that can be used when the cached values do not implement
32   * {@code Serializable} but cache invalidation must work cluster-wide.
33   * <p/>
34   * This implementation tracks versions of entries in Hazelcast {@code IMap}, but stores the actual values in a local
35   * cache. Caches entries are invalidated when the cache is modified through one of the {@code put}, {@code clear},
36   * {@code remove} or {@code replace} methods. Cache entries are never invalidated when {@link #get(Object)} is called,
37   * not even when a value is lazily created and added to the cache.
38   * <p/>
39   * These semantics mean that hybrid caches should only be used as computing caches (by providing a {@link
40   * com.atlassian.cache.CacheLoader}) and that values should not be manually {@link #put(Object, Object)} in the cache.
41   * Doing so will lead to frequent cache evictions.
42   *
43   * @since 2.4.0
44   */
45  public class HazelcastHybridCache<K, V> extends ManagedHybridCacheSupport implements Cache<K, V>
46  {
47  
48      private static final Logger log = LoggerFactory.getLogger(HazelcastHybridCache.class);
49  
50      private final Cache<K, Versioned<V>> localCache;
51      private final MapConfig config;
52      private final boolean selfLoading;
53      private final IMap<K, Long> versionMap;
54  
55      private final CacheEntryListenerSupport<K, V> listenerSupport = new ValueCacheEntryListenerSupport<K, V>()
56      {
57          @Override
58          protected void initValue(final CacheEntryListenerSupport<K, V> actualListenerSupport)
59          {
60              localCache.addListener(new DelegatingCacheEntryListener<K, V>(actualListenerSupport), true);
61          }
62  
63          @Override
64          protected void initValueless(final CacheEntryListenerSupport<K, V> actualListenerSupport)
65          {
66              localCache.addListener(new DelegatingCacheEntryListener<K, V>(actualListenerSupport), false);
67          }
68      };
69  
70      public HazelcastHybridCache(String name, CacheFactory localCacheFactory, IMap<K, Long> versionMap, MapConfig config,
71              final CacheLoader<K, V> cacheLoader, CacheSettings settings)
72      {
73          super(name, settings);
74  
75          this.config = config;
76          this.selfLoading = cacheLoader != null;
77  
78          CacheLoader<K, Versioned<V>> versionedCacheLoader = selfLoading ? new CacheLoader<K, Versioned<V>>()
79          {
80              @Nonnull
81              @Override
82              public Versioned<V> load(@Nonnull K key)
83              {
84                  return loadAndVersion(key, new CacheLoaderSupplier<K, V>(key, cacheLoader));
85              }
86          } : null;
87  
88          this.localCache = localCacheFactory.getCache(name, versionedCacheLoader, settings);
89          this.versionMap = versionMap;
90      }
91  
92      @Override
93      public void clear()
94      {
95          if (flushable)
96          {
97              removeAll();
98          }
99      }
100 
101     @Override
102     public boolean containsKey(@Nonnull final K k)
103     {
104         return localCache.containsKey(k);
105     }
106 
107     @Override
108     public V get(@Nonnull K key)
109     {
110         return getInternal(key).getValue();
111     }
112 
113     @Nonnull
114     @Override
115     public V get(@Nonnull final K key, @Nonnull final Supplier<? extends V> valueSupplier)
116     {
117         return getInternal(key, new Supplier<Versioned<V>>()
118         {
119             @Override
120             public Versioned<V> get()
121             {
122                 return loadAndVersion(key, valueSupplier);
123             }
124         }).getValue();
125     }
126 
127     @Nonnull
128     @Override
129     public Collection<K> getKeys()
130     {
131         return localCache.getKeys();
132     }
133 
134     @Nonnull
135     @Override
136     public String getName()
137     {
138         return localCache.getName();
139     }
140 
141     @Override
142     public void put(@Nonnull K key, @Nonnull V value)
143     {
144         Long version = incrementVersion(key);
145         localCache.put(key, new Versioned<V>(value, version));
146     }
147 
148     /**
149      * {@inheritDoc}
150      * <p/>
151      * This implementation has a weak spot: When there is an existing value, this method must return it. However, if
152      * that value is stored in a remote node, it cannot be returned. <tt>null</tt> is returned instead, which signals
153      * success. In order to not break any contracts, this implementation will invalidate the value on all nodes in this
154      * situation.
155      */
156     @Override
157     public V putIfAbsent(@Nonnull K key, @Nonnull V value)
158     {
159         Long nextVersion = getNextVersion(key);
160         Versioned<V> versioned = new Versioned<V>(value, nextVersion);
161         Versioned<V> oldValue = localCache.putIfAbsent(key, versioned);
162         if (oldValue == null)
163         {
164             // entry was missing in the local cache. Increment the version to invalidate the entry across the cluster.
165             // if no concurrent change to the entry was made, this should bring the version to nextVersion which
166             // matches the version in the local cache. If a concurrent change was made, the version in the versionMap
167             // will not match nextVersion and the local cache entry will be invalidated on the next get.
168             incrementVersion(key);
169 
170             return null;
171         }
172         return oldValue.getValue();
173     }
174 
175     @Override
176     public void remove(@Nonnull K key)
177     {
178         // increment the version to trigger a cluster-wide invalidation
179         versionMap.executeOnKey(key, IncrementVersionEntryProcessor.getUpdatingInstance());
180         localCache.remove(key);
181     }
182 
183     @Override
184     public boolean remove(@Nonnull K key, @Nonnull V value)
185     {
186         Versioned<V> currentValue = getInternal(key);
187         if (equal(value, currentValue.getValue()))
188         {
189             if (localCache.remove(key, currentValue))
190             {
191                 // increment the version to trigger a cluster-wide invalidation
192                 versionMap.executeOnKey(key, IncrementVersionEntryProcessor.getUpdatingInstance());
193                 return true;
194             }
195         }
196         return false;
197     }
198 
199     @Override
200     public void removeAll()
201     {
202         if (flushable)
203         {
204             // increment all entry versions to trigger a cluster-wide invalidation
205             versionMap.executeOnEntries(IncrementVersionEntryProcessor.getUpdatingInstance());
206             localCache.removeAll();
207         }
208     }
209 
210     @Override
211     public boolean replace(@Nonnull K key, @Nonnull V oldValue, @Nonnull V newValue)
212     {
213         Versioned<V> currentValue = getInternal(key);
214         if (equal(oldValue, currentValue.getValue()))
215         {
216             Long nextVersion = getNextVersion(key);
217             if (localCache.replace(key, currentValue, new Versioned<V>(newValue, nextVersion)))
218             {
219                 // entry was replaced in the local cache. Increment the version to invalidate the entry across the cluster.
220                 // if no concurrent change to the entry was made, this should bring the version to nextVersion which
221                 // matches the version in the local cache. If a concurrent change was made, the version in the versionMap
222                 // will not match nextVersion and the local cache entry will be invalidated on the next get.
223                 incrementVersion(key);
224                 return true;
225             }
226         }
227 
228         return false;
229     }
230 
231     @Override
232     public boolean updateExpireAfterAccess(long expireAfter, @Nonnull TimeUnit timeUnit)
233     {
234         if (!super.updateExpireAfterAccess(expireAfter, timeUnit))
235         {
236             return false;
237         }
238         config.setMaxIdleSeconds(HazelcastMapConfigConfigurator.HYBRID_MULTIPLIER * (int) timeUnit.toSeconds(expireAfter));
239         return true;
240     }
241 
242     @Override
243     public boolean updateExpireAfterWrite(long expireAfter, @Nonnull TimeUnit timeUnit)
244     {
245         if (!super.updateExpireAfterAccess(expireAfter, timeUnit))
246         {
247             return false;
248         }
249         config.setMaxIdleSeconds(HazelcastMapConfigConfigurator.HYBRID_MULTIPLIER * (int) timeUnit.toSeconds(expireAfter));
250         return true;
251     }
252 
253     @Override
254     public boolean updateMaxEntries(int newValue)
255     {
256         if (!super.updateMaxEntries(newValue))
257         {
258             return false;
259         }
260         config.getMaxSizeConfig().setSize(HazelcastMapConfigConfigurator.HYBRID_MULTIPLIER * newValue);
261         return true;
262     }
263 
264     @Override
265     public void addListener(@Nonnull CacheEntryListener<K, V> listener, boolean includeValues)
266     {
267         listenerSupport.add(listener, includeValues);
268     }
269 
270     @Override
271     public void removeListener(@Nonnull CacheEntryListener<K, V> listener)
272     {
273         listenerSupport.remove(listener);
274     }
275 
276     @Override
277     protected ManagedCache getManagedCache()
278     {
279         return (ManagedCache) localCache;
280     }
281 
282     private Versioned<V> loadAndVersion(final K key, Supplier<? extends V> supplier)
283     {
284         try
285         {
286             V value = supplier.get();
287             //noinspection ConstantConditions
288             if (value == null)
289             {
290                 throw new CacheException("The generated value for cache '" + getName() + "' was null for key '" +
291                         key + "'. Null values are not supported.");
292             }
293             // retrieve the current version if it exists and is compatible with the generated value (identical hash),
294             // otherwise generate or increment the tracked version
295             long version = HazelcastHybridCache.this.getOrRetrieveVersion(key);
296 
297             log.debug("Generated value '{}' for key '{}' in cache with name '{}'", value, key, localCache.getName());
298             return new Versioned<V>(value, version);
299         }
300         catch (RuntimeException e)
301         {
302             Throwables.propagateIfInstanceOf(e, CacheException.class);
303             throw new CacheException("Error generating a value for key '" + key + "' in cache '" + localCache.getName() + "'", e);
304         }
305     }
306 
307     @Nonnull
308     private Versioned<V> getInternal(K key)
309     {
310         Versioned<V> versioned = localCache.get(key);
311         if (versioned != null)
312         {
313             Long version = versionMap.get(key);
314             if (version != null && version == versioned.getVersion())
315             {
316                 // versions match, cache is up to date
317                 return versioned;
318             }
319 
320             // Value in localCache is outdated, clear it.
321             localCache.remove(key);
322 
323             if (selfLoading)
324             {
325                 // a new value will be recalculated and the version will be properly initialized
326                 //noinspection ConstantConditions
327                 return localCache.get(key);
328             }
329         }
330         return Versioned.empty();
331     }
332 
333     @Nonnull
334     private Versioned<V> getInternal(K key, Supplier<Versioned<V>> valueSupplier)
335     {
336         Versioned<V> versioned = localCache.get(key, valueSupplier);
337         Long version = versionMap.get(key);
338         if (version != null && version == versioned.getVersion())
339         {
340             // versions match, cache is up to date
341             return versioned;
342         }
343 
344         // Value in localCache is outdated, clear it.
345         localCache.remove(key);
346 
347         // a new value will be recalculated and the version will be properly initialized
348         return localCache.get(key, valueSupplier);
349     }
350 
351     private Long getNextVersion(K key)
352     {
353         return (Long) versionMap.executeOnKey(key, IncrementVersionEntryProcessor.getInstance());
354     }
355 
356     private Long getOrRetrieveVersion(K key)
357     {
358         return (Long) versionMap.executeOnKey(key, GetOrInitVersionEntryProcessor.getInstance());
359     }
360 
361     private Long incrementVersion(K key)
362     {
363         return (Long) versionMap.executeOnKey(key, IncrementVersionEntryProcessor.getUpdatingInstance());
364     }
365 
366     private static class DelegatingCacheEntryListener<K, V> implements CacheEntryListener<K, Versioned<V>>
367     {
368         private final CacheEntryListenerSupport<K, V> listenerSupport;
369 
370         private DelegatingCacheEntryListener(final CacheEntryListenerSupport<K, V> listenerSupport)
371         {
372             this.listenerSupport = listenerSupport;
373         }
374 
375         @Override
376         public void onAdd(@Nonnull CacheEntryEvent<K, Versioned<V>> event)
377         {
378             listenerSupport.notifyAdd(event.getKey(), get(event.getValue()));
379         }
380 
381         @Override
382         public void onEvict(@Nonnull CacheEntryEvent<K, Versioned<V>> event)
383         {
384             listenerSupport.notifyEvict(event.getKey(), get(event.getOldValue()));
385         }
386 
387         @Override
388         public void onRemove(@Nonnull CacheEntryEvent<K, Versioned<V>> event)
389         {
390             listenerSupport.notifyRemove(event.getKey(), get(event.getOldValue()));
391         }
392 
393         @Override
394         public void onUpdate(@Nonnull CacheEntryEvent<K, Versioned<V>> event)
395         {
396             listenerSupport.notifyUpdate(event.getKey(), get(event.getValue()), get(event.getOldValue()));
397         }
398 
399         private V get(Versioned<V> versioned)
400         {
401             return versioned != null ? versioned.getValue() : null;
402         }
403     }
404 }