1 package com.atlassian.cache.hazelcast;
2
3 import java.util.Collection;
4 import java.util.concurrent.TimeUnit;
5
6 import javax.annotation.Nonnull;
7
8 import com.atlassian.cache.Cache;
9 import com.atlassian.cache.CacheEntryEvent;
10 import com.atlassian.cache.CacheEntryListener;
11 import com.atlassian.cache.CacheException;
12 import com.atlassian.cache.CacheLoader;
13 import com.atlassian.cache.CacheSettings;
14 import com.atlassian.cache.ManagedCache;
15 import com.atlassian.cache.Supplier;
16 import com.atlassian.cache.impl.CacheEntryListenerSupport;
17 import com.atlassian.cache.impl.CacheLoaderSupplier;
18 import com.atlassian.cache.impl.DefaultCacheEntryEvent;
19 import com.atlassian.cache.impl.ValueCacheEntryListenerSupport;
20 import com.atlassian.hazelcast.serialization.OsgiSafe;
21
22 import com.google.common.base.Objects;
23 import com.google.common.base.Throwables;
24 import com.hazelcast.config.MapConfig;
25 import com.hazelcast.core.EntryEvent;
26 import com.hazelcast.core.EntryListener;
27 import com.hazelcast.core.IMap;
28
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 import static com.atlassian.cache.hazelcast.OsgiSafeUtils.unwrap;
33 import static com.atlassian.cache.hazelcast.OsgiSafeUtils.wrap;
34 import static com.google.common.base.Preconditions.checkNotNull;
35
36
37
38
39
40
41 public class HazelcastCache<K, V> extends ManagedCacheSupport implements Cache<K, V>
42 {
43 private static final Logger log = LoggerFactory.getLogger(HazelcastCache.class);
44
45 private final CacheLoader<K, V> cacheLoader;
46 private final IMap<K, OsgiSafe<V>> map;
47 private final CacheEntryListenerSupport<K, OsgiSafe<V>> listenerSupport = new ValueCacheEntryListenerSupport<K, OsgiSafe<V>>()
48 {
49 @Override
50 protected void initValue(final CacheEntryListenerSupport<K, OsgiSafe<V>> actualListenerSupport)
51 {
52 map.addEntryListener(new HazelcastCacheEntryListener<K, OsgiSafe<V>>(actualListenerSupport), true);
53 }
54
55 @Override
56 protected void initValueless(final CacheEntryListenerSupport<K, OsgiSafe<V>> actualListenerSupport)
57 {
58 map.addEntryListener(new HazelcastCacheEntryListener<K, OsgiSafe<V>>(actualListenerSupport), false);
59 }
60 };
61
62 public HazelcastCache(String name, IMap<K, OsgiSafe<V>> map, MapConfig config, CacheLoader<K, V> cacheLoader,
63 CacheSettings settings)
64 {
65 super(name, config, settings);
66
67 this.map = map;
68 this.cacheLoader = cacheLoader;
69 }
70
71 @Override
72 public void clear()
73 {
74 if (isFlushable())
75 {
76 map.clear();
77 }
78 else
79 {
80 log.debug("Not clearing cache {} because it's configured as not flushable.", getName());
81 }
82 }
83
84 @Override
85 public boolean containsKey(@Nonnull K k)
86 {
87 return map.containsKey(k);
88 }
89
90 @SuppressWarnings ("unchecked")
91 @Override
92 public V get(@Nonnull final K key)
93 {
94 return getOrLoad(key, cacheLoader == null ? null : new CacheLoaderSupplier<K, V>(key, cacheLoader));
95 }
96
97 @Nonnull
98 @Override
99 public V get(@Nonnull final K key, @Nonnull final Supplier<? extends V> valueSupplier)
100 {
101 return getOrLoad(key, valueSupplier);
102 }
103
104 @Nonnull
105 @Override
106 public Collection<K> getKeys()
107 {
108 return map.keySet();
109 }
110
111 @Override
112 public void put(@Nonnull K key, @Nonnull V value)
113 {
114 map.put(checkNotNull(key, "key"), wrap(checkNotNull(value, "value")));
115 }
116
117 @Override
118 public V putIfAbsent(@Nonnull K key, @Nonnull V value)
119 {
120 return unwrap(map.putIfAbsent(checkNotNull(key, "key"), wrap(checkNotNull(value, "value"))));
121 }
122
123 @Override
124 public void remove(@Nonnull K key)
125 {
126 map.remove(checkNotNull(key, "key"));
127 }
128
129 @Override
130 public boolean remove(@Nonnull K key, @Nonnull V value)
131 {
132 return map.remove(checkNotNull(key, "key"), wrap(checkNotNull(value, "value")));
133 }
134
135 @Override
136 public void removeAll()
137 {
138 map.clear();
139 }
140
141 @Override
142 public boolean replace(@Nonnull K key, @Nonnull V oldValue, @Nonnull V newValue)
143 {
144 return map.replace(checkNotNull(key, "key"),
145 wrap(checkNotNull(oldValue, "oldValue")), wrap(checkNotNull(newValue, "newValue")));
146 }
147
148 @Override
149 public void addListener(@Nonnull CacheEntryListener<K, V> listener, boolean includeValues)
150 {
151 listenerSupport.add(new OsgiSafeCacheEntryListener<K, V>(listener), includeValues);
152 }
153
154 @Override
155 public void removeListener(@Nonnull CacheEntryListener<K, V> listener)
156 {
157 listenerSupport.remove(new OsgiSafeCacheEntryListener<K, V>(listener));
158 }
159
160 private static class OsgiSafeCacheEntryEvent<K, V> extends DefaultCacheEntryEvent<K, V>
161 {
162 public OsgiSafeCacheEntryEvent(CacheEntryEvent<K, OsgiSafe<V>> event)
163 {
164 super(event.getKey(), unwrap(event.getValue()), unwrap(event.getOldValue()));
165 }
166 }
167
168 private static class OsgiSafeCacheEntryListener<K, V> implements CacheEntryListener<K, OsgiSafe<V>>
169 {
170 private final CacheEntryListener<K, V> delegate;
171
172 private OsgiSafeCacheEntryListener(CacheEntryListener<K, V> listener)
173 {
174 this.delegate = checkNotNull(listener, "listener");
175 }
176
177 @Override
178 public void onAdd(@Nonnull CacheEntryEvent<K, OsgiSafe<V>> event)
179 {
180 delegate.onAdd(new OsgiSafeCacheEntryEvent<K, V>(event));
181 }
182
183 @Override
184 public void onEvict(@Nonnull CacheEntryEvent<K, OsgiSafe<V>> event)
185 {
186 delegate.onEvict(new OsgiSafeCacheEntryEvent<K, V>(event));
187 }
188
189 @Override
190 public void onRemove(@Nonnull CacheEntryEvent<K, OsgiSafe<V>> event)
191 {
192 delegate.onRemove(new OsgiSafeCacheEntryEvent<K, V>(event));
193 }
194
195 @Override
196 public void onUpdate(@Nonnull CacheEntryEvent<K, OsgiSafe<V>> event)
197 {
198 delegate.onUpdate(new OsgiSafeCacheEntryEvent<K, V>(event));
199 }
200
201 @Override
202 public boolean equals(Object o)
203 {
204 if (this == o)
205 {
206 return true;
207 }
208 if (o == null || getClass() != o.getClass())
209 {
210 return false;
211 }
212
213 OsgiSafeCacheEntryListener that = (OsgiSafeCacheEntryListener) o;
214 return delegate.equals(that.delegate);
215 }
216
217 @Override
218 public int hashCode()
219 {
220 return delegate.hashCode();
221 }
222 }
223
224
225 private V getOrLoad(final K key, final Supplier<? extends V> valueSupplier)
226 {
227 try
228 {
229 OsgiSafe<V> value = map.get(checkNotNull(key, "key"));
230 if (value != null)
231 {
232 return value.getValue();
233 }
234 else if (valueSupplier == null)
235 {
236 return null;
237 }
238
239
240
241
242 map.lock(key, 5, TimeUnit.MINUTES);
243 try
244 {
245 V newValue = valueSupplier.get();
246
247 if (newValue == null)
248 {
249 throw new CacheException("The provided cacheLoader returned null. Null values are not supported.");
250 }
251 value = wrap(newValue);
252 OsgiSafe<V> current = map.putIfAbsent(key, value);
253
254 return unwrap(Objects.firstNonNull(current, value));
255 }
256 finally
257 {
258 map.unlock(key);
259 }
260 }
261 catch (RuntimeException e)
262 {
263 Throwables.propagateIfInstanceOf(e, CacheException.class);
264 throw new CacheException("Problem retrieving a value from cache " + getName(), e);
265 }
266 }
267
268 private static class HazelcastCacheEntryListener<K, V> implements EntryListener<K, V>
269 {
270 private final CacheEntryListenerSupport<K, V> listenerSupport;
271
272 private HazelcastCacheEntryListener(final CacheEntryListenerSupport<K, V> listenerSupport)
273 {
274 this.listenerSupport = checkNotNull(listenerSupport, "listenerSupport");
275 }
276
277 @Override
278 public void entryAdded(EntryEvent<K, V> event)
279 {
280 listenerSupport.notifyAdd(event.getKey(), event.getValue());
281 }
282
283 @Override
284 public void entryRemoved(EntryEvent<K, V> event)
285 {
286 listenerSupport.notifyRemove(event.getKey(), event.getOldValue());
287 }
288
289 @Override
290 public void entryUpdated(EntryEvent<K, V> event)
291 {
292 listenerSupport.notifyUpdate(event.getKey(), event.getValue(), event.getOldValue());
293 }
294
295 @Override
296 public void entryEvicted(EntryEvent<K, V> event)
297 {
298 listenerSupport.notifyEvict(event.getKey(), event.getOldValue());
299 }
300 }
301 }