View Javadoc
1   package com.atlassian.cache.hazelcast;
2   
3   import java.lang.ref.WeakReference;
4   import java.util.Collection;
5   
6   import javax.annotation.Nonnull;
7   
8   import com.atlassian.cache.Cache;
9   import com.atlassian.cache.CacheEntryEvent;
10  import com.atlassian.cache.CacheEntryListener;
11  import com.atlassian.cache.CacheFactory;
12  import com.atlassian.cache.CacheLoader;
13  import com.atlassian.cache.CacheSettings;
14  import com.atlassian.cache.ManagedCache;
15  import com.atlassian.cache.Supplier;
16  import com.atlassian.cache.impl.CacheEntryListenerSupport;
17  import com.atlassian.cache.impl.ValueCacheEntryListenerSupport;
18  
19  import com.hazelcast.core.Cluster;
20  import com.hazelcast.core.ITopic;
21  import com.hazelcast.core.MembershipAdapter;
22  import com.hazelcast.core.MembershipEvent;
23  import com.hazelcast.core.Message;
24  import com.hazelcast.core.MessageListener;
25  
26  /**
27   * Implementation of {@link ManagedCache} and {@link Cache} that does not require the values to be {@code Serializable}.
28   * It works by caching values locally and broadcasting invalidations cluster-wide using a Hazelcast {@link ITopic}.
29   * <p>
30   * Cache entries are invalidated when the cache is modified through one of the methods {@link #clear}, {@link #remove},
31   * {@link #replace}, or {@link #put} (when it overwrites an existing value).
32   *
33   * @since 2.12.0
34   */
35  public class HazelcastAsyncHybridCache<K, V> extends ManagedHybridCacheSupport implements Cache<K, V>
36  {
37      private final AsyncInvalidationListener<K> listener;
38      private final Cache<K, V> localCache;
39      private final CacheEntryListenerSupport<K, V> listenerSupport = new ValueCacheEntryListenerSupport<K, V>()
40      {
41          @Override
42          protected void initValue(final CacheEntryListenerSupport<K, V> actualListenerSupport)
43          {
44              localCache.addListener(new DelegatingCacheEntryListener<>(actualListenerSupport), true);
45          }
46  
47          @Override
48          protected void initValueless(final CacheEntryListenerSupport<K, V> actualListenerSupport)
49          {
50              localCache.addListener(new DelegatingCacheEntryListener<>(actualListenerSupport), false);
51          }
52      };
53  
54      public HazelcastAsyncHybridCache(String name, CacheFactory localCacheFactory, ITopic<K> invalidationTopic,
55              final CacheLoader<K, V> cacheLoader, HazelcastCacheManager cacheManager, CacheSettings settings)
56      {
57          super(name, cacheManager);
58          localCache = localCacheFactory.getCache(name, cacheLoader, settings);
59          listener = new AsyncInvalidationListener<K>(cacheManager.getHazelcastInstance().getCluster(), localCache,
60                  invalidationTopic);
61      }
62  
63      @Override
64      public void clear()
65      {
66          removeAll();
67      }
68  
69      @Override
70      public boolean containsKey(@Nonnull final K key)
71      {
72          return localCache.containsKey(key);
73      }
74  
75      @Override
76      public V get(@Nonnull K key)
77      {
78          return localCache.get(key);
79      }
80  
81      @Nonnull
82      @Override
83      public V get(@Nonnull final K key, @Nonnull final Supplier<? extends V> valueSupplier)
84      {
85          return localCache.get(key, valueSupplier);
86      }
87  
88      @Nonnull
89      @Override
90      public Collection<K> getKeys()
91      {
92          return localCache.getKeys();
93      }
94  
95      @Nonnull
96      @Override
97      public String getName()
98      {
99          return localCache.getName();
100     }
101 
102     @Override
103     public boolean isFlushable()
104     {
105         return true;
106     }
107 
108     @Override
109     public boolean isReplicateAsynchronously()
110     {
111         return true;
112     }
113 
114     @Override
115     public void put(@Nonnull K key, @Nonnull V value)
116     {
117         invalidateRemotely(key);
118         localCache.put(key, value);
119     }
120 
121     @Override
122     public V putIfAbsent(@Nonnull K key, @Nonnull V value)
123     {
124         V oldValue = localCache.putIfAbsent(key, value);
125         if (oldValue == null)
126         {
127             invalidateRemotely(key);
128         }
129         return oldValue;
130     }
131 
132     @Override
133     public void remove(@Nonnull K key)
134     {
135         invalidateRemotely(key);
136         localCache.remove(key);
137     }
138 
139     @Override
140     public boolean remove(@Nonnull K key, @Nonnull V value)
141     {
142         if (localCache.remove(key, value))
143         {
144             invalidateRemotely(key);
145             return true;
146         }
147         return false;
148     }
149 
150     @Override
151     public void removeAll()
152     {
153         invalidateRemotely();
154         localCache.removeAll();
155     }
156 
157     @Override
158     public boolean replace(@Nonnull K key, @Nonnull V oldValue, @Nonnull V newValue)
159     {
160         if (localCache.replace(key, oldValue, newValue))
161         {
162             // entry was replaced in the local cache. Invalidate the entry across the cluster.
163             invalidateRemotely(key);
164             return true;
165         }
166 
167         return false;
168     }
169 
170     @Override
171     public void addListener(@Nonnull CacheEntryListener<K, V> listener, boolean includeValues)
172     {
173         listenerSupport.add(listener, includeValues);
174     }
175 
176     @Override
177     public void removeListener(@Nonnull CacheEntryListener<K, V> listener)
178     {
179         listenerSupport.remove(listener);
180     }
181 
182     @Override
183     protected ManagedCache getLocalCache()
184     {
185         return (ManagedCache) localCache;
186     }
187 
188     private void invalidateRemotely()
189     {
190         listener.publish(null);
191     }
192 
193     private void invalidateRemotely(@Nonnull K key)
194     {
195         listener.publish(key);
196     }
197 
198     private static class DelegatingCacheEntryListener<K, V> implements CacheEntryListener<K, V>
199     {
200         private final CacheEntryListenerSupport<K, V> listenerSupport;
201 
202         private DelegatingCacheEntryListener(final CacheEntryListenerSupport<K, V> listenerSupport)
203         {
204             this.listenerSupport = listenerSupport;
205         }
206 
207         @Override
208         public void onAdd(@Nonnull CacheEntryEvent<K, V> event)
209         {
210             listenerSupport.notifyAdd(event.getKey(), event.getValue());
211         }
212 
213         @Override
214         public void onEvict(@Nonnull CacheEntryEvent<K, V> event)
215         {
216             listenerSupport.notifyEvict(event.getKey(), event.getOldValue());
217         }
218 
219         @Override
220         public void onRemove(@Nonnull CacheEntryEvent<K, V> event)
221         {
222             listenerSupport.notifyRemove(event.getKey(), event.getOldValue());
223         }
224 
225         @Override
226         public void onUpdate(@Nonnull CacheEntryEvent<K, V> event)
227         {
228             listenerSupport.notifyUpdate(event.getKey(), event.getValue(), event.getOldValue());
229         }
230     }
231 
232     private static class AsyncInvalidationListener<K> extends MembershipAdapter implements MessageListener<K> {
233 
234         private final Cluster cluster;
235         private final WeakReference<Cache<K, ?>> localCacheRef;
236         private final String membershipListenerId;
237         private final ITopic<K> topic;
238         private final String topicListenerId;
239 
240         AsyncInvalidationListener(Cluster cluster, Cache<K, ?> localCache, ITopic<K> topic)
241         {
242             this.cluster = cluster;
243             this.localCacheRef = new WeakReference<>(localCache);
244             this.topic = topic;
245             this.topicListenerId = topic.addMessageListener(this);
246             this.membershipListenerId = cluster.addMembershipListener(this);
247         }
248 
249         @Override
250         public void memberAdded(MembershipEvent membershipEvent)
251         {
252             Cache<K, ?> localCache = localCacheRef.get();
253             if (localCache == null)
254             {
255                 // cache has been GC'd,
256                 destroy();
257                 return;
258             }
259             // Members that have left the cluster and re-joined may be an inconsistent state due to the
260             // potential for "split brain".  So err on the side of safety and flush the local cache.
261             localCache.removeAll();
262         }
263 
264         @Override
265         public void onMessage(Message<K> message) {
266             Cache<K, ?> localCache = localCacheRef.get();
267             if (localCache == null)
268             {
269                 // cache has been gc-d
270                 destroy();
271                 return;
272             }
273             if (!message.getPublishingMember().localMember())
274             {
275                 K key = message.getMessageObject();
276                 if (key == null)
277                 {
278                     localCache.removeAll();
279                 }
280                 else
281                 {
282                     localCache.remove(key);
283                 }
284             }
285         }
286 
287         void destroy()
288         {
289             cluster.removeMembershipListener(membershipListenerId);
290             topic.removeMessageListener(topicListenerId);
291         }
292 
293         void publish(K message)
294         {
295             topic.publish(message);
296         }
297     }
298 }