1 package com.atlassian.cache.hazelcast;
2
3 import java.lang.ref.WeakReference;
4 import java.util.Collection;
5
6 import javax.annotation.Nonnull;
7
8 import com.atlassian.cache.Cache;
9 import com.atlassian.cache.CacheEntryEvent;
10 import com.atlassian.cache.CacheEntryListener;
11 import com.atlassian.cache.CacheFactory;
12 import com.atlassian.cache.CacheLoader;
13 import com.atlassian.cache.CacheSettings;
14 import com.atlassian.cache.ManagedCache;
15 import com.atlassian.cache.Supplier;
16 import com.atlassian.cache.impl.CacheEntryListenerSupport;
17 import com.atlassian.cache.impl.ValueCacheEntryListenerSupport;
18
19 import com.hazelcast.core.Cluster;
20 import com.hazelcast.core.ITopic;
21 import com.hazelcast.core.MembershipAdapter;
22 import com.hazelcast.core.MembershipEvent;
23 import com.hazelcast.core.Message;
24 import com.hazelcast.core.MessageListener;
25
26
27
28
29
30
31
32
33
34
35 public class HazelcastAsyncHybridCache<K, V> extends ManagedHybridCacheSupport implements Cache<K, V>
36 {
37 private final AsyncInvalidationListener<K> listener;
38 private final Cache<K, V> localCache;
39 private final CacheEntryListenerSupport<K, V> listenerSupport = new ValueCacheEntryListenerSupport<K, V>()
40 {
41 @Override
42 protected void initValue(final CacheEntryListenerSupport<K, V> actualListenerSupport)
43 {
44 localCache.addListener(new DelegatingCacheEntryListener<>(actualListenerSupport), true);
45 }
46
47 @Override
48 protected void initValueless(final CacheEntryListenerSupport<K, V> actualListenerSupport)
49 {
50 localCache.addListener(new DelegatingCacheEntryListener<>(actualListenerSupport), false);
51 }
52 };
53
54 public HazelcastAsyncHybridCache(String name, CacheFactory localCacheFactory, ITopic<K> invalidationTopic,
55 final CacheLoader<K, V> cacheLoader, HazelcastCacheManager cacheManager, CacheSettings settings)
56 {
57 super(name, cacheManager);
58 localCache = localCacheFactory.getCache(name, cacheLoader, settings);
59 listener = new AsyncInvalidationListener<K>(cacheManager.getHazelcastInstance().getCluster(), localCache,
60 invalidationTopic);
61 }
62
63 @Override
64 public void clear()
65 {
66 removeAll();
67 }
68
69 @Override
70 public boolean containsKey(@Nonnull final K key)
71 {
72 return localCache.containsKey(key);
73 }
74
75 @Override
76 public V get(@Nonnull K key)
77 {
78 return localCache.get(key);
79 }
80
81 @Nonnull
82 @Override
83 public V get(@Nonnull final K key, @Nonnull final Supplier<? extends V> valueSupplier)
84 {
85 return localCache.get(key, valueSupplier);
86 }
87
88 @Nonnull
89 @Override
90 public Collection<K> getKeys()
91 {
92 return localCache.getKeys();
93 }
94
95 @Nonnull
96 @Override
97 public String getName()
98 {
99 return localCache.getName();
100 }
101
102 @Override
103 public boolean isFlushable()
104 {
105 return true;
106 }
107
108 @Override
109 public boolean isReplicateAsynchronously()
110 {
111 return true;
112 }
113
114 @Override
115 public void put(@Nonnull K key, @Nonnull V value)
116 {
117 invalidateRemotely(key);
118 localCache.put(key, value);
119 }
120
121 @Override
122 public V putIfAbsent(@Nonnull K key, @Nonnull V value)
123 {
124 V oldValue = localCache.putIfAbsent(key, value);
125 if (oldValue == null)
126 {
127 invalidateRemotely(key);
128 }
129 return oldValue;
130 }
131
132 @Override
133 public void remove(@Nonnull K key)
134 {
135 invalidateRemotely(key);
136 localCache.remove(key);
137 }
138
139 @Override
140 public boolean remove(@Nonnull K key, @Nonnull V value)
141 {
142 if (localCache.remove(key, value))
143 {
144 invalidateRemotely(key);
145 return true;
146 }
147 return false;
148 }
149
150 @Override
151 public void removeAll()
152 {
153 invalidateRemotely();
154 localCache.removeAll();
155 }
156
157 @Override
158 public boolean replace(@Nonnull K key, @Nonnull V oldValue, @Nonnull V newValue)
159 {
160 if (localCache.replace(key, oldValue, newValue))
161 {
162
163 invalidateRemotely(key);
164 return true;
165 }
166
167 return false;
168 }
169
170 @Override
171 public void addListener(@Nonnull CacheEntryListener<K, V> listener, boolean includeValues)
172 {
173 listenerSupport.add(listener, includeValues);
174 }
175
176 @Override
177 public void removeListener(@Nonnull CacheEntryListener<K, V> listener)
178 {
179 listenerSupport.remove(listener);
180 }
181
182 @Override
183 protected ManagedCache getLocalCache()
184 {
185 return (ManagedCache) localCache;
186 }
187
188 private void invalidateRemotely()
189 {
190 listener.publish(null);
191 }
192
193 private void invalidateRemotely(@Nonnull K key)
194 {
195 listener.publish(key);
196 }
197
198 private static class DelegatingCacheEntryListener<K, V> implements CacheEntryListener<K, V>
199 {
200 private final CacheEntryListenerSupport<K, V> listenerSupport;
201
202 private DelegatingCacheEntryListener(final CacheEntryListenerSupport<K, V> listenerSupport)
203 {
204 this.listenerSupport = listenerSupport;
205 }
206
207 @Override
208 public void onAdd(@Nonnull CacheEntryEvent<K, V> event)
209 {
210 listenerSupport.notifyAdd(event.getKey(), event.getValue());
211 }
212
213 @Override
214 public void onEvict(@Nonnull CacheEntryEvent<K, V> event)
215 {
216 listenerSupport.notifyEvict(event.getKey(), event.getOldValue());
217 }
218
219 @Override
220 public void onRemove(@Nonnull CacheEntryEvent<K, V> event)
221 {
222 listenerSupport.notifyRemove(event.getKey(), event.getOldValue());
223 }
224
225 @Override
226 public void onUpdate(@Nonnull CacheEntryEvent<K, V> event)
227 {
228 listenerSupport.notifyUpdate(event.getKey(), event.getValue(), event.getOldValue());
229 }
230 }
231
232 private static class AsyncInvalidationListener<K> extends MembershipAdapter implements MessageListener<K> {
233
234 private final Cluster cluster;
235 private final WeakReference<Cache<K, ?>> localCacheRef;
236 private final String membershipListenerId;
237 private final ITopic<K> topic;
238 private final String topicListenerId;
239
240 AsyncInvalidationListener(Cluster cluster, Cache<K, ?> localCache, ITopic<K> topic)
241 {
242 this.cluster = cluster;
243 this.localCacheRef = new WeakReference<>(localCache);
244 this.topic = topic;
245 this.topicListenerId = topic.addMessageListener(this);
246 this.membershipListenerId = cluster.addMembershipListener(this);
247 }
248
249 @Override
250 public void memberAdded(MembershipEvent membershipEvent)
251 {
252 Cache<K, ?> localCache = localCacheRef.get();
253 if (localCache == null)
254 {
255
256 destroy();
257 return;
258 }
259
260
261 localCache.removeAll();
262 }
263
264 @Override
265 public void onMessage(Message<K> message) {
266 Cache<K, ?> localCache = localCacheRef.get();
267 if (localCache == null)
268 {
269
270 destroy();
271 return;
272 }
273 if (!message.getPublishingMember().localMember())
274 {
275 K key = message.getMessageObject();
276 if (key == null)
277 {
278 localCache.removeAll();
279 }
280 else
281 {
282 localCache.remove(key);
283 }
284 }
285 }
286
287 void destroy()
288 {
289 cluster.removeMembershipListener(membershipListenerId);
290 topic.removeMessageListener(topicListenerId);
291 }
292
293 void publish(K message)
294 {
295 topic.publish(message);
296 }
297 }
298 }