1 package com.atlassian.cache.hazelcast;
2
3 import javax.annotation.Nonnull;
4
5 import com.atlassian.cache.CachedReference;
6 import com.atlassian.cache.CachedReferenceEvent;
7 import com.atlassian.cache.CachedReferenceListener;
8 import com.atlassian.cache.CacheFactory;
9 import com.atlassian.cache.CacheSettings;
10 import com.atlassian.cache.ManagedCache;
11 import com.atlassian.cache.Supplier;
12 import com.atlassian.cache.impl.CachedReferenceListenerSupport;
13 import com.atlassian.cache.impl.ReferenceKey;
14 import com.atlassian.cache.impl.ValueCachedReferenceListenerSupport;
15
16 import com.hazelcast.core.Cluster;
17 import com.hazelcast.core.ITopic;
18 import com.hazelcast.core.MembershipAdapter;
19 import com.hazelcast.core.MembershipEvent;
20 import com.hazelcast.core.Message;
21 import com.hazelcast.core.MessageListener;
22
23 import java.lang.ref.WeakReference;
24
25
26
27
28
29
30
31
32 public class HazelcastAsyncHybridCachedReference<V> extends ManagedHybridCacheSupport implements CachedReference<V>
33 {
34 private final AsyncInvalidationListener listener;
35 private final CachedReferenceListenerSupport<V> listenerSupport = new ValueCachedReferenceListenerSupport<V>()
36 {
37 @Override
38 protected void initValue(final CachedReferenceListenerSupport<V> actualListenerSupport)
39 {
40 localReference.addListener(new DelegatingCachedReferenceListener<V>(actualListenerSupport), true);
41 }
42
43 @Override
44 protected void initValueless(final CachedReferenceListenerSupport<V> actualListenerSupport)
45 {
46 localReference.addListener(new DelegatingCachedReferenceListener<V>(actualListenerSupport), false);
47 }
48 };
49 private final CachedReference<V> localReference;
50
51 public HazelcastAsyncHybridCachedReference(String name, CacheFactory localFactory, final ITopic<ReferenceKey> topic,
52 final Supplier<V> supplier, HazelcastCacheManager cacheManager, CacheSettings settings)
53 {
54 super(name, cacheManager);
55 this.localReference = localFactory.getCachedReference(name, supplier, settings);
56 listener = new AsyncInvalidationListener(cacheManager.getHazelcastInstance().getCluster(), localReference,
57 topic);
58 }
59
60 @Nonnull
61 @Override
62 public V get()
63 {
64 return localReference.get();
65 }
66
67 @Override
68 public boolean isFlushable()
69 {
70 return true;
71 }
72
73 @Override
74 public boolean isReplicateAsynchronously()
75 {
76 return true;
77 }
78
79 @Override
80 public void reset()
81 {
82 localReference.reset();
83 invalidateRemotely();
84 }
85
86 @Override
87 protected ManagedCache getLocalCache()
88 {
89 return (ManagedCache) localReference;
90 }
91
92 @Override
93 public void clear()
94 {
95 reset();
96 }
97
98 @Override
99 public boolean updateMaxEntries(int newValue)
100 {
101 return false;
102 }
103
104 @Override
105 public void addListener(@Nonnull CachedReferenceListener<V> listener, boolean includeValues)
106 {
107 listenerSupport.add(listener, includeValues);
108 }
109
110 @Override
111 public void removeListener(@Nonnull CachedReferenceListener<V> listener)
112 {
113 listenerSupport.remove(listener);
114 }
115
116 private void invalidateRemotely()
117 {
118 listener.publish(null);
119 }
120
121 private static class DelegatingCachedReferenceListener<V> implements CachedReferenceListener<V>
122 {
123 private final CachedReferenceListenerSupport<V> listenerSupport;
124
125 private DelegatingCachedReferenceListener(final CachedReferenceListenerSupport<V> listenerSupport)
126 {
127 this.listenerSupport = listenerSupport;
128 }
129
130 @Override
131 public void onEvict(@Nonnull CachedReferenceEvent<V> event)
132 {
133 listenerSupport.notifyEvict(event.getValue());
134 }
135
136 @Override
137 public void onSet(@Nonnull CachedReferenceEvent<V> event)
138 {
139 listenerSupport.notifySet(event.getValue());
140 }
141
142 @Override
143 public void onReset(@Nonnull CachedReferenceEvent<V> event)
144 {
145 listenerSupport.notifyReset(event.getValue());
146 }
147 }
148
149 private static class AsyncInvalidationListener extends MembershipAdapter implements MessageListener<ReferenceKey> {
150
151 private final Cluster cluster;
152 private final WeakReference<CachedReference<?>> localReferenceRef;
153 private final String membershipListenerId;
154 private final ITopic<ReferenceKey> topic;
155 private final String topicListenerId;
156
157 AsyncInvalidationListener(Cluster cluster, CachedReference<?> localReference, ITopic<ReferenceKey> topic)
158 {
159 this.cluster = cluster;
160 this.localReferenceRef = new WeakReference<>(localReference);
161 this.topic = topic;
162 this.topicListenerId = topic.addMessageListener(this);
163 this.membershipListenerId = cluster.addMembershipListener(this);
164 }
165
166 @Override
167 public void memberAdded(MembershipEvent membershipEvent)
168 {
169 CachedReference<?> localReference = localReferenceRef.get();
170 if (localReference == null)
171 {
172
173 destroy();
174 return;
175 }
176
177
178 localReference.reset();
179 }
180
181 @Override
182 public void onMessage(Message<ReferenceKey> message) {
183 CachedReference<?> localReference = localReferenceRef.get();
184 if (localReference == null)
185 {
186
187 destroy();
188 return;
189 }
190 if (!message.getPublishingMember().localMember())
191 {
192 localReference.reset();
193 }
194 }
195
196 void destroy()
197 {
198 cluster.removeMembershipListener(membershipListenerId);
199 topic.removeMessageListener(topicListenerId);
200 }
201
202 void publish(ReferenceKey message)
203 {
204 topic.publish(message);
205 }
206 }
207 }