View Javadoc

1   package com.atlassian.cache.hazelcast;
2   
3   import java.util.Collection;
4   import java.util.concurrent.TimeUnit;
5   
6   import javax.annotation.Nonnull;
7   
8   import com.atlassian.cache.Cache;
9   import com.atlassian.cache.CacheEntryEvent;
10  import com.atlassian.cache.CacheEntryListener;
11  import com.atlassian.cache.CacheException;
12  import com.atlassian.cache.CacheLoader;
13  import com.atlassian.cache.CacheSettings;
14  import com.atlassian.cache.ManagedCache;
15  import com.atlassian.cache.Supplier;
16  import com.atlassian.cache.impl.CacheEntryListenerSupport;
17  import com.atlassian.cache.impl.CacheLoaderSupplier;
18  import com.atlassian.cache.impl.DefaultCacheEntryEvent;
19  import com.atlassian.cache.impl.ValueCacheEntryListenerSupport;
20  import com.atlassian.hazelcast.serialization.OsgiSafe;
21  
22  import com.google.common.base.Objects;
23  import com.google.common.base.Throwables;
24  import com.hazelcast.config.MapConfig;
25  import com.hazelcast.core.EntryEvent;
26  import com.hazelcast.core.EntryListener;
27  import com.hazelcast.core.IMap;
28  
29  import org.slf4j.Logger;
30  import org.slf4j.LoggerFactory;
31  
32  import static com.atlassian.cache.hazelcast.OsgiSafeUtils.unwrap;
33  import static com.atlassian.cache.hazelcast.OsgiSafeUtils.wrap;
34  import static com.google.common.base.Preconditions.checkNotNull;
35  
36  /**
37   * Hazelcast implementation of the {@link Cache} and {@link ManagedCache} interfaces
38   *
39   * @since 2.4.0
40   */
41  public class HazelcastCache<K, V> extends ManagedCacheSupport implements Cache<K, V>
42  {
43      private static final Logger log = LoggerFactory.getLogger(HazelcastCache.class);
44  
45      private final CacheLoader<K, V> cacheLoader;
46      private final IMap<K, OsgiSafe<V>> map;
47      private final CacheEntryListenerSupport<K, OsgiSafe<V>> listenerSupport = new ValueCacheEntryListenerSupport<K, OsgiSafe<V>>()
48      {
49          @Override
50          protected void initValue(final CacheEntryListenerSupport<K, OsgiSafe<V>> actualListenerSupport)
51          {
52              map.addEntryListener(new HazelcastCacheEntryListener<K, OsgiSafe<V>>(actualListenerSupport), true);
53          }
54  
55          @Override
56          protected void initValueless(final CacheEntryListenerSupport<K, OsgiSafe<V>> actualListenerSupport)
57          {
58              map.addEntryListener(new HazelcastCacheEntryListener<K, OsgiSafe<V>>(actualListenerSupport), false);
59          }
60      };
61  
62      public HazelcastCache(String name, IMap<K, OsgiSafe<V>> map, MapConfig config, CacheLoader<K, V> cacheLoader,
63              CacheSettings settings)
64      {
65          super(name, config, settings);
66  
67          this.map = map;
68          this.cacheLoader = cacheLoader;
69      }
70  
71      @Override
72      public void clear()
73      {
74          if (isFlushable())
75          {
76              map.clear();
77          }
78          else
79          {
80              log.debug("Not clearing cache {} because it's configured as not flushable.", getName());
81          }
82      }
83  
84      @Override
85      public boolean containsKey(@Nonnull K k)
86      {
87          return map.containsKey(k);
88      }
89  
90      @SuppressWarnings ("unchecked")
91      @Override
92      public V get(@Nonnull final K key)
93      {
94          return getOrLoad(key, cacheLoader == null ? null : new CacheLoaderSupplier<K, V>(key, cacheLoader));
95      }
96  
97      @Nonnull
98      @Override
99      public V get(@Nonnull final K key, @Nonnull final Supplier<? extends V> valueSupplier)
100     {
101         return getOrLoad(key, valueSupplier);
102     }
103 
104     @Nonnull
105     @Override
106     public Collection<K> getKeys()
107     {
108         return map.keySet();
109     }
110 
111     @Override
112     public void put(@Nonnull K key, @Nonnull V value)
113     {
114         map.put(checkNotNull(key, "key"), wrap(checkNotNull(value, "value")));
115     }
116 
117     @Override
118     public V putIfAbsent(@Nonnull K key, @Nonnull V value)
119     {
120         return unwrap(map.putIfAbsent(checkNotNull(key, "key"), wrap(checkNotNull(value, "value"))));
121     }
122 
123     @Override
124     public void remove(@Nonnull K key)
125     {
126         map.remove(checkNotNull(key, "key"));
127     }
128 
129     @Override
130     public boolean remove(@Nonnull K key, @Nonnull V value)
131     {
132         return map.remove(checkNotNull(key, "key"), wrap(checkNotNull(value, "value")));
133     }
134 
135     @Override
136     public void removeAll()
137     {
138         map.clear();
139     }
140 
141     @Override
142     public boolean replace(@Nonnull K key, @Nonnull V oldValue, @Nonnull V newValue)
143     {
144         return map.replace(checkNotNull(key, "key"),
145                 wrap(checkNotNull(oldValue, "oldValue")), wrap(checkNotNull(newValue, "newValue")));
146     }
147 
148     @Override
149     public void addListener(@Nonnull CacheEntryListener<K, V> listener, boolean includeValues)
150     {
151         listenerSupport.add(new OsgiSafeCacheEntryListener<K, V>(listener), includeValues);
152     }
153 
154     @Override
155     public void removeListener(@Nonnull CacheEntryListener<K, V> listener)
156     {
157         listenerSupport.remove(new OsgiSafeCacheEntryListener<K, V>(listener));
158     }
159 
160     private static class OsgiSafeCacheEntryEvent<K, V> extends DefaultCacheEntryEvent<K, V>
161     {
162         public OsgiSafeCacheEntryEvent(CacheEntryEvent<K, OsgiSafe<V>> event)
163         {
164             super(event.getKey(), unwrap(event.getValue()), unwrap(event.getOldValue()));
165         }
166     }
167 
168     private static class OsgiSafeCacheEntryListener<K, V> implements CacheEntryListener<K, OsgiSafe<V>>
169     {
170         private final CacheEntryListener<K, V> delegate;
171 
172         private OsgiSafeCacheEntryListener(CacheEntryListener<K, V> listener)
173         {
174             this.delegate = checkNotNull(listener, "listener");
175         }
176 
177         @Override
178         public void onAdd(@Nonnull CacheEntryEvent<K, OsgiSafe<V>> event)
179         {
180             delegate.onAdd(new OsgiSafeCacheEntryEvent<K, V>(event));
181         }
182 
183         @Override
184         public void onEvict(@Nonnull CacheEntryEvent<K, OsgiSafe<V>> event)
185         {
186             delegate.onEvict(new OsgiSafeCacheEntryEvent<K, V>(event));
187         }
188 
189         @Override
190         public void onRemove(@Nonnull CacheEntryEvent<K, OsgiSafe<V>> event)
191         {
192             delegate.onRemove(new OsgiSafeCacheEntryEvent<K, V>(event));
193         }
194 
195         @Override
196         public void onUpdate(@Nonnull CacheEntryEvent<K, OsgiSafe<V>> event)
197         {
198             delegate.onUpdate(new OsgiSafeCacheEntryEvent<K, V>(event));
199         }
200 
201         @Override
202         public boolean equals(Object o)
203         {
204             if (this == o)
205             {
206                 return true;
207             }
208             if (o == null || getClass() != o.getClass())
209             {
210                 return false;
211             }
212 
213             OsgiSafeCacheEntryListener that = (OsgiSafeCacheEntryListener) o;
214             return delegate.equals(that.delegate);
215         }
216 
217         @Override
218         public int hashCode()
219         {
220             return delegate.hashCode();
221         }
222     }
223 
224 
225     private V getOrLoad(final K key, final Supplier<? extends V> valueSupplier)
226     {
227         try
228         {
229             OsgiSafe<V> value = map.get(checkNotNull(key, "key"));
230             if (value != null)
231             {
232                 return value.getValue();
233             }
234             else if (valueSupplier == null)
235             {
236                 return null;
237             }
238 
239             // note: we cannot use an EntryProcessor here because it would need to be passed the cacheLoader, which is
240             // not Serializable. Take out a lock on the entry to ensure that we don't store a stale value in the cache
241             // because the cache is invalidated between cache value calculation and adding it to the map.
242             map.lock(key, 5, TimeUnit.MINUTES);
243             try
244             {
245                 V newValue = valueSupplier.get();
246                 //noinspection ConstantConditions
247                 if (newValue == null)
248                 {
249                     throw new CacheException("The provided cacheLoader returned null. Null values are not supported.");
250                 }
251                 value = wrap(newValue);
252                 OsgiSafe<V> current = map.putIfAbsent(key, value);
253 
254                 return unwrap(Objects.firstNonNull(current, value));
255             }
256             finally
257             {
258                 map.unlock(key);
259             }
260         }
261         catch (RuntimeException e)
262         {
263             Throwables.propagateIfInstanceOf(e, CacheException.class);
264             throw new CacheException("Problem retrieving a value from cache " + getName(), e);
265         }
266     }
267 
268     private static class HazelcastCacheEntryListener<K, V> implements EntryListener<K, V>
269     {
270         private final CacheEntryListenerSupport<K, V> listenerSupport;
271 
272         private HazelcastCacheEntryListener(final CacheEntryListenerSupport<K, V> listenerSupport)
273         {
274             this.listenerSupport = checkNotNull(listenerSupport, "listenerSupport");
275         }
276 
277         @Override
278         public void entryAdded(EntryEvent<K, V> event)
279         {
280             listenerSupport.notifyAdd(event.getKey(), event.getValue());
281         }
282 
283         @Override
284         public void entryRemoved(EntryEvent<K, V> event)
285         {
286             listenerSupport.notifyRemove(event.getKey(), event.getOldValue());
287         }
288 
289         @Override
290         public void entryUpdated(EntryEvent<K, V> event)
291         {
292             listenerSupport.notifyUpdate(event.getKey(), event.getValue(), event.getOldValue());
293         }
294 
295         @Override
296         public void entryEvicted(EntryEvent<K, V> event)
297         {
298             listenerSupport.notifyEvict(event.getKey(), event.getOldValue());
299         }
300     }
301 }