View Javadoc

1   package com.atlassian.cache.hazelcast;
2   
3   import javax.annotation.Nonnull;
4   
5   import com.atlassian.cache.CacheException;
6   import com.atlassian.cache.CacheFactory;
7   import com.atlassian.cache.CacheSettings;
8   import com.atlassian.cache.CachedReference;
9   import com.atlassian.cache.CachedReferenceEvent;
10  import com.atlassian.cache.CachedReferenceListener;
11  import com.atlassian.cache.ManagedCache;
12  import com.atlassian.cache.Supplier;
13  import com.atlassian.cache.impl.CachedReferenceListenerSupport;
14  import com.atlassian.cache.impl.ReferenceKey;
15  import com.atlassian.cache.impl.ValueCachedReferenceListenerSupport;
16  
17  import com.hazelcast.core.EntryEvent;
18  import com.hazelcast.core.EntryListener;
19  import com.hazelcast.core.IMap;
20  
21  /**
22   * Implementation of {@link ManagedCache} and {@link com.atlassian.cache.CachedReference} that can be used when the
23   * cached value does not implement {@code Serializable} but reference invalidation must work cluster-wide.
24   *
25   * @since 2.4.0
26   */
27  public class HazelcastHybridCachedReference<V> extends ManagedHybridCacheSupport implements CachedReference<V>
28  {
29      private final CachedReference<Versioned<V>> localReference;
30      private final IMap<ReferenceKey, Long> versionMap;
31  
32      private final CachedReferenceListenerSupport<V> listenerSupport = new ValueCachedReferenceListenerSupport<V>()
33      {
34          @Override
35          protected void init(CachedReferenceListenerSupport<V> actualListenerSupport)
36          {
37              versionMap.addEntryListener(new HazelcastHybridReferenceEntryListener(), false);
38          }
39  
40          @Override
41          protected void initValue(final CachedReferenceListenerSupport<V> actualListenerSupport)
42          {
43              localReference.addListener(new DelegatingCachedReferenceListener<V>(actualListenerSupport), true);
44          }
45  
46          @Override
47          protected void initValueless(final CachedReferenceListenerSupport<V> actualListenerSupport)
48          {
49              localReference.addListener(new DelegatingCachedReferenceListener<V>(actualListenerSupport), false);
50          }
51      };
52  
53      public HazelcastHybridCachedReference(String name, CacheFactory localFactory, final IMap<ReferenceKey, Long> versionMap,
54              final Supplier<V> supplier, CacheSettings settings)
55      {
56          super(name, settings);
57  
58          Supplier<Versioned<V>> localSupplier = new Supplier<Versioned<V>>()
59          {
60              @Override
61              public Versioned<V> get()
62              {
63                  V value = supplier.get();
64                  if (value == null)
65                  {
66                      throw new CacheException("The Supplier for cached reference '" + getName() + "'returned null. Null values are not supported.");
67                  }
68  
69                  Long version = (Long) versionMap.executeOnKey(ReferenceKey.KEY, GetOrInitVersionEntryProcessor.getInstance());
70                  return new Versioned<V>(value, version);
71              }
72          };
73          this.localReference = localFactory.getCachedReference(name, localSupplier, settings);
74          this.versionMap = versionMap;
75      }
76  
77      @Nonnull
78      @Override
79      public V get()
80      {
81          Versioned<V> value = localReference.get();
82          Long version = (Long) versionMap.executeOnKey(ReferenceKey.KEY, GetOrInitVersionEntryProcessor.getInstance());
83          if (value.getVersion() != version)
84          {
85              // version mismatch
86              localReference.reset();
87              value = localReference.get(); // will trigger a new call to supplier to re-generate the value
88          }
89          return value.getValue();
90      }
91  
92      @Override
93      public void reset()
94      {
95          versionMap.executeOnKey(ReferenceKey.KEY, IncrementVersionEntryProcessor.getUpdatingInstance());
96          localReference.reset();
97      }
98  
99      @Override
100     protected ManagedCache getManagedCache()
101     {
102         return (ManagedCache) localReference;
103     }
104 
105     @Override
106     public void clear()
107     {
108         if (isFlushable())
109         {
110             reset();
111         }
112     }
113 
114     @Override
115     public boolean updateMaxEntries(int newValue)
116     {
117         return false;
118     }
119 
120     @Override
121     public void addListener(@Nonnull CachedReferenceListener<V> listener, boolean includeValues)
122     {
123         listenerSupport.add(listener, includeValues);
124     }
125 
126     @Override
127     public void removeListener(@Nonnull CachedReferenceListener<V> listener)
128     {
129         listenerSupport.remove(listener);
130     }
131 
132     private class HazelcastHybridReferenceEntryListener implements EntryListener<ReferenceKey, Long>
133     {
134         @Override
135         public void entryAdded(EntryEvent<ReferenceKey, Long> event)
136         {
137             // Do nothing - somewhere in the cluster this value was retrieved, but nothing happened locally
138         }
139 
140         @Override
141         public void entryRemoved(EntryEvent<ReferenceKey, Long> event)
142         {
143             // This should not happen because on all operations we just bump up the shared version.
144             localReference.reset();
145         }
146 
147         @Override
148         public void entryUpdated(EntryEvent<ReferenceKey, Long> event)
149         {
150             // The only mechanism we are employing here is version bump. But we don't support set/put
151             // so a version bump always means reset.
152             localReference.reset();
153         }
154 
155         @Override
156         public void entryEvicted(EntryEvent<ReferenceKey, Long> event)
157         {
158             // Eviction of the shared value should bring eviction on the local value. However we don't
159             // have access to this. Probably the best we can do is to trigger reset - probably will result in
160             // two events - evict and reset.
161             localReference.reset();
162         }
163     }
164 
165     private static class DelegatingCachedReferenceListener<V> implements CachedReferenceListener<Versioned<V>>
166     {
167         private final CachedReferenceListenerSupport<V> listenerSupport;
168 
169         private DelegatingCachedReferenceListener(final CachedReferenceListenerSupport<V> listenerSupport)
170         {
171             this.listenerSupport = listenerSupport;
172         }
173 
174         @Override
175         public void onEvict(@Nonnull CachedReferenceEvent<Versioned<V>> event)
176         {
177             listenerSupport.notifyEvict(get(event.getValue()));
178         }
179 
180         @Override
181         public void onSet(@Nonnull CachedReferenceEvent<Versioned<V>> event)
182         {
183             listenerSupport.notifySet(get(event.getValue()));
184         }
185 
186         @Override
187         public void onReset(@Nonnull CachedReferenceEvent<Versioned<V>> event)
188         {
189             listenerSupport.notifyReset(get(event.getValue()));
190         }
191 
192         private V get(Versioned<V> versioned)
193         {
194             return versioned != null ? versioned.getValue() : null;
195         }
196     }
197 }