View Javadoc
1   package com.atlassian.cache.hazelcast;
2   
3   import javax.annotation.Nonnull;
4   
5   import com.atlassian.cache.CachedReference;
6   import com.atlassian.cache.CachedReferenceEvent;
7   import com.atlassian.cache.CachedReferenceListener;
8   import com.atlassian.cache.CacheFactory;
9   import com.atlassian.cache.CacheSettings;
10  import com.atlassian.cache.ManagedCache;
11  import com.atlassian.cache.Supplier;
12  import com.atlassian.cache.impl.CachedReferenceListenerSupport;
13  import com.atlassian.cache.impl.ReferenceKey;
14  import com.atlassian.cache.impl.ValueCachedReferenceListenerSupport;
15  
16  import com.hazelcast.core.Cluster;
17  import com.hazelcast.core.ITopic;
18  import com.hazelcast.core.MembershipAdapter;
19  import com.hazelcast.core.MembershipEvent;
20  import com.hazelcast.core.Message;
21  import com.hazelcast.core.MessageListener;
22  
23  import java.lang.ref.WeakReference;
24  
25  /**
26   * Implementation of {@link ManagedCache} and {@link com.atlassian.cache.CachedReference} that can be used when the
27   * cached value does not implement {@code Serializable} but reference invalidation must work cluster-wide. It works by
28   * caching the value locally and broadcasting invalidations cluster-wide using a Hazelcast {@link ITopic}.
29   *
30   * @since 2.12.0
31   */
32  public class HazelcastAsyncHybridCachedReference<V> extends ManagedHybridCacheSupport implements CachedReference<V>
33  {
34      private final AsyncInvalidationListener listener;
35      private final CachedReferenceListenerSupport<V> listenerSupport = new ValueCachedReferenceListenerSupport<V>()
36      {
37          @Override
38          protected void initValue(final CachedReferenceListenerSupport<V> actualListenerSupport)
39          {
40              localReference.addListener(new DelegatingCachedReferenceListener<V>(actualListenerSupport), true);
41          }
42  
43          @Override
44          protected void initValueless(final CachedReferenceListenerSupport<V> actualListenerSupport)
45          {
46              localReference.addListener(new DelegatingCachedReferenceListener<V>(actualListenerSupport), false);
47          }
48      };
49      private final CachedReference<V> localReference;
50  
51      public HazelcastAsyncHybridCachedReference(String name, CacheFactory localFactory, final ITopic<ReferenceKey> topic,
52              final Supplier<V> supplier, HazelcastCacheManager cacheManager, CacheSettings settings)
53      {
54          super(name, cacheManager);
55          this.localReference = localFactory.getCachedReference(name, supplier, settings);
56          listener = new AsyncInvalidationListener(cacheManager.getHazelcastInstance().getCluster(), localReference,
57                  topic);
58      }
59  
60      @Nonnull
61      @Override
62      public V get()
63      {
64          return localReference.get();
65      }
66  
67      @Override
68      public boolean isFlushable()
69      {
70          return true;
71      }
72  
73      @Override
74      public boolean isReplicateAsynchronously()
75      {
76          return true;
77      }
78  
79      @Override
80      public void reset()
81      {
82          localReference.reset();
83          invalidateRemotely();
84      }
85  
86      @Override
87      protected ManagedCache getLocalCache()
88      {
89          return (ManagedCache) localReference;
90      }
91  
92      @Override
93      public void clear()
94      {
95          reset();
96      }
97  
98      @Override
99      public boolean updateMaxEntries(int newValue)
100     {
101         return false;
102     }
103 
104     @Override
105     public void addListener(@Nonnull CachedReferenceListener<V> listener, boolean includeValues)
106     {
107         listenerSupport.add(listener, includeValues);
108     }
109 
110     @Override
111     public void removeListener(@Nonnull CachedReferenceListener<V> listener)
112     {
113         listenerSupport.remove(listener);
114     }
115 
116     private void invalidateRemotely()
117     {
118         listener.publish(null);
119     }
120 
121     private static class DelegatingCachedReferenceListener<V> implements CachedReferenceListener<V>
122     {
123         private final CachedReferenceListenerSupport<V> listenerSupport;
124 
125         private DelegatingCachedReferenceListener(final CachedReferenceListenerSupport<V> listenerSupport)
126         {
127             this.listenerSupport = listenerSupport;
128         }
129 
130         @Override
131         public void onEvict(@Nonnull CachedReferenceEvent<V> event)
132         {
133             listenerSupport.notifyEvict(event.getValue());
134         }
135 
136         @Override
137         public void onSet(@Nonnull CachedReferenceEvent<V> event)
138         {
139             listenerSupport.notifySet(event.getValue());
140         }
141 
142         @Override
143         public void onReset(@Nonnull CachedReferenceEvent<V> event)
144         {
145             listenerSupport.notifyReset(event.getValue());
146         }
147     }
148 
149     private static class AsyncInvalidationListener extends MembershipAdapter implements MessageListener<ReferenceKey> {
150 
151         private final Cluster cluster;
152         private final WeakReference<CachedReference<?>> localReferenceRef;
153         private final String membershipListenerId;
154         private final ITopic<ReferenceKey> topic;
155         private final String topicListenerId;
156 
157         AsyncInvalidationListener(Cluster cluster, CachedReference<?> localReference, ITopic<ReferenceKey> topic)
158         {
159             this.cluster = cluster;
160             this.localReferenceRef = new WeakReference<>(localReference);
161             this.topic = topic;
162             this.topicListenerId = topic.addMessageListener(this);
163             this.membershipListenerId = cluster.addMembershipListener(this);
164         }
165 
166         @Override
167         public void memberAdded(MembershipEvent membershipEvent)
168         {
169             CachedReference<?> localReference = localReferenceRef.get();
170             if (localReference == null)
171             {
172                 // cache has been GC'd,
173                 destroy();
174                 return;
175             }
176             // Members that have left the cluster and re-joined may be an inconsistent state due to the
177             // potential for "split brain".  So err on the side of safety and flush the local cache.
178             localReference.reset();
179         }
180 
181         @Override
182         public void onMessage(Message<ReferenceKey> message) {
183             CachedReference<?> localReference = localReferenceRef.get();
184             if (localReference == null)
185             {
186                 // cache has been gc-d
187                 destroy();
188                 return;
189             }
190             if (!message.getPublishingMember().localMember())
191             {
192                 localReference.reset();
193             }
194         }
195 
196         void destroy()
197         {
198             cluster.removeMembershipListener(membershipListenerId);
199             topic.removeMessageListener(topicListenerId);
200         }
201 
202         void publish(ReferenceKey message)
203         {
204             topic.publish(message);
205         }
206     }
207 }