View Javadoc
1   package com.atlassian.cache.hazelcast;
2   
3   import javax.annotation.Nonnull;
4   
5   import com.atlassian.cache.CacheException;
6   import com.atlassian.cache.CacheFactory;
7   import com.atlassian.cache.CacheSettings;
8   import com.atlassian.cache.CachedReference;
9   import com.atlassian.cache.CachedReferenceEvent;
10  import com.atlassian.cache.CachedReferenceListener;
11  import com.atlassian.cache.ManagedCache;
12  import com.atlassian.cache.Supplier;
13  import com.atlassian.cache.impl.CachedReferenceListenerSupport;
14  import com.atlassian.cache.impl.ReferenceKey;
15  import com.atlassian.cache.impl.ValueCachedReferenceListenerSupport;
16  
17  import com.hazelcast.core.EntryAdapter;
18  import com.hazelcast.core.EntryEvent;
19  import com.hazelcast.core.IMap;
20  
21  /**
22   * Implementation of {@link ManagedCache} and {@link com.atlassian.cache.CachedReference} that can be used when the
23   * cached value does not implement {@code Serializable} but reference invalidation must work cluster-wide.
24   *
25   * @since 2.4.0
26   */
27  public class HazelcastHybridCachedReference<V> extends ManagedHybridCacheSupport implements CachedReference<V>
28  {
29      private final CachedReference<Versioned<V>> localReference;
30      private final IMap<ReferenceKey, Long> versionMap;
31  
32      private final CachedReferenceListenerSupport<V> listenerSupport = new ValueCachedReferenceListenerSupport<V>()
33      {
34          @Override
35          protected void init(CachedReferenceListenerSupport<V> actualListenerSupport)
36          {
37              versionMap.addEntryListener(new HazelcastHybridReferenceEntryListener(), false);
38          }
39  
40          @Override
41          protected void initValue(final CachedReferenceListenerSupport<V> actualListenerSupport)
42          {
43              localReference.addListener(new DelegatingCachedReferenceListener<V>(actualListenerSupport), true);
44          }
45  
46          @Override
47          protected void initValueless(final CachedReferenceListenerSupport<V> actualListenerSupport)
48          {
49              localReference.addListener(new DelegatingCachedReferenceListener<V>(actualListenerSupport), false);
50          }
51      };
52  
53      public HazelcastHybridCachedReference(String name, CacheFactory localFactory, final IMap<ReferenceKey, Long> versionMap,
54              final Supplier<V> supplier, HazelcastCacheManager cacheManager)
55      {
56          super(name, cacheManager);
57  
58          Supplier<Versioned<V>> localSupplier = new Supplier<Versioned<V>>()
59          {
60              @Override
61              public Versioned<V> get()
62              {
63                  long version = getVersion();
64                  V value = supplier.get();
65                  if (value == null)
66                  {
67                      throw new CacheException("The Supplier for cached reference '" + getName() + "'returned null. Null values are not supported.");
68                  }
69                  return new Versioned<V>(value, version);
70              }
71          };
72          this.versionMap = versionMap;
73          this.localReference = localFactory.getCachedReference(name, localSupplier, getCacheSettings());
74      }
75  
76      @Nonnull
77      @Override
78      public V get()
79      {
80          Versioned<V> value = localReference.get();
81          Long version = versionMap.get(ReferenceKey.KEY);
82          if (version == null || value.getVersion() != version)
83          {
84              // version mismatch
85              localReference.reset();
86              value = localReference.get(); // will trigger a new call to supplier to re-generate the value
87          }
88          return value.getValue();
89      }
90  
91      @Override
92      public boolean isFlushable()
93      {
94          return getCacheSettings().getFlushable(true);
95      }
96  
97      @Override
98      public boolean isReplicateAsynchronously()
99      {
100         return false;
101     }
102 
103     @Override
104     public void reset()
105     {
106         versionMap.executeOnKey(ReferenceKey.KEY, IncrementVersionEntryProcessor.getInstance());
107         localReference.reset();
108     }
109 
110     @Override
111     protected ManagedCache getLocalCache()
112     {
113         return (ManagedCache) localReference;
114     }
115 
116     private CacheSettings getCacheSettings()
117     {
118         return cacheManager.getCacheSettings(getHazelcastMapName());
119     }
120 
121     private String getHazelcastMapName()
122     {
123         return versionMap.getName();
124     }
125 
126     @Override
127     public void clear()
128     {
129         if (isFlushable())
130         {
131             reset();
132         }
133     }
134 
135     @Override
136     public boolean updateMaxEntries(int newValue)
137     {
138         return false;
139     }
140 
141     @Override
142     public void addListener(@Nonnull CachedReferenceListener<V> listener, boolean includeValues)
143     {
144         listenerSupport.add(listener, includeValues);
145     }
146 
147     @Override
148     public void removeListener(@Nonnull CachedReferenceListener<V> listener)
149     {
150         listenerSupport.remove(listener);
151     }
152 
153     private long getVersion()
154     {
155         // try a standard get to give the near-cache a chance
156         Long version = versionMap.get(ReferenceKey.KEY);
157         if (version == null)
158         {
159             version = (Long) versionMap.executeOnKey(ReferenceKey.KEY, GetOrInitVersionEntryProcessor.getInstance());
160         }
161         return version;
162     }
163 
164     private class HazelcastHybridReferenceEntryListener extends EntryAdapter<ReferenceKey, Long>
165     {
166         @Override
167         public void entryRemoved(EntryEvent<ReferenceKey, Long> event)
168         {
169             // This should not happen because on all operations we just bump up the shared version.
170             localReference.reset();
171         }
172 
173         @Override
174         public void entryUpdated(EntryEvent<ReferenceKey, Long> event)
175         {
176             // The only mechanism we are employing here is version bump. But we don't support set/put
177             // so a version bump always means reset.
178             localReference.reset();
179         }
180 
181         @Override
182         public void entryEvicted(EntryEvent<ReferenceKey, Long> event)
183         {
184             // Eviction of the shared value should bring eviction on the local value. However we don't
185             // have access to this. Probably the best we can do is to trigger reset - probably will result in
186             // two events - evict and reset.
187             localReference.reset();
188         }
189     }
190 
191     private static class DelegatingCachedReferenceListener<V> implements CachedReferenceListener<Versioned<V>>
192     {
193         private final CachedReferenceListenerSupport<V> listenerSupport;
194 
195         private DelegatingCachedReferenceListener(final CachedReferenceListenerSupport<V> listenerSupport)
196         {
197             this.listenerSupport = listenerSupport;
198         }
199 
200         @Override
201         public void onEvict(@Nonnull CachedReferenceEvent<Versioned<V>> event)
202         {
203             listenerSupport.notifyEvict(get(event.getValue()));
204         }
205 
206         @Override
207         public void onSet(@Nonnull CachedReferenceEvent<Versioned<V>> event)
208         {
209             listenerSupport.notifySet(get(event.getValue()));
210         }
211 
212         @Override
213         public void onReset(@Nonnull CachedReferenceEvent<Versioned<V>> event)
214         {
215             listenerSupport.notifyReset(get(event.getValue()));
216         }
217 
218         private V get(Versioned<V> versioned)
219         {
220             return versioned != null ? versioned.getValue() : null;
221         }
222     }
223 }