1 package com.atlassian.cache.hazelcast;
2
3 import java.util.Collection;
4 import java.util.concurrent.TimeUnit;
5
6 import javax.annotation.Nonnull;
7
8 import com.atlassian.cache.Cache;
9 import com.atlassian.cache.CacheEntryEvent;
10 import com.atlassian.cache.CacheEntryListener;
11 import com.atlassian.cache.CacheException;
12 import com.atlassian.cache.CacheFactory;
13 import com.atlassian.cache.CacheLoader;
14 import com.atlassian.cache.CacheSettings;
15 import com.atlassian.cache.CacheSettingsBuilder;
16 import com.atlassian.cache.ManagedCache;
17 import com.atlassian.cache.Supplier;
18 import com.atlassian.cache.impl.CacheEntryListenerSupport;
19 import com.atlassian.cache.impl.CacheLoaderSupplier;
20 import com.atlassian.cache.impl.ValueCacheEntryListenerSupport;
21
22 import com.google.common.base.Throwables;
23 import com.hazelcast.core.IMap;
24
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27
28 import static com.google.common.base.Objects.equal;
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45 public class HazelcastHybridCache<K, V> extends ManagedHybridCacheSupport implements Cache<K, V>
46 {
47
48 private static final Logger log = LoggerFactory.getLogger(HazelcastHybridCache.class);
49
50 private final Cache<K, Versioned<V>> localCache;
51 private final boolean selfLoading;
52 private final IMap<K, Long> versionMap;
53
54 private final CacheEntryListenerSupport<K, V> listenerSupport = new ValueCacheEntryListenerSupport<K, V>()
55 {
56 @Override
57 protected void initValue(final CacheEntryListenerSupport<K, V> actualListenerSupport)
58 {
59 localCache.addListener(new DelegatingCacheEntryListener<K, V>(actualListenerSupport), true);
60 }
61
62 @Override
63 protected void initValueless(final CacheEntryListenerSupport<K, V> actualListenerSupport)
64 {
65 localCache.addListener(new DelegatingCacheEntryListener<K, V>(actualListenerSupport), false);
66 }
67 };
68
69 public HazelcastHybridCache(String name, CacheFactory localCacheFactory, IMap<K, Long> versionMap,
70 final CacheLoader<K, V> cacheLoader, HazelcastCacheManager cacheManager)
71 {
72 super(name, cacheManager);
73 this.selfLoading = cacheLoader != null;
74
75 CacheLoader<K, Versioned<V>> versionedCacheLoader = selfLoading ? new CacheLoader<K, Versioned<V>>()
76 {
77 @Nonnull
78 @Override
79 public Versioned<V> load(@Nonnull K key)
80 {
81 return loadAndVersion(key, new CacheLoaderSupplier<K, V>(key, cacheLoader));
82 }
83 } : null;
84
85 this.versionMap = versionMap;
86 this.localCache = localCacheFactory.getCache(name, versionedCacheLoader, getCacheSettings());
87 }
88
89 @Override
90 public void clear()
91 {
92 removeAll();
93 }
94
95 @Override
96 public boolean containsKey(@Nonnull final K k)
97 {
98 return localCache.containsKey(k);
99 }
100
101 @Override
102 public V get(@Nonnull K key)
103 {
104 return getInternal(key).getValue();
105 }
106
107 @Nonnull
108 @Override
109 public V get(@Nonnull final K key, @Nonnull final Supplier<? extends V> valueSupplier)
110 {
111 return getInternal(key, new Supplier<Versioned<V>>()
112 {
113 @Override
114 public Versioned<V> get()
115 {
116 return loadAndVersion(key, valueSupplier);
117 }
118 }).getValue();
119 }
120
121 @Nonnull
122 @Override
123 public Collection<K> getKeys()
124 {
125 return localCache.getKeys();
126 }
127
128 @Nonnull
129 @Override
130 public String getName()
131 {
132 return localCache.getName();
133 }
134
135 @Override
136 public boolean isReplicateAsynchronously()
137 {
138 return false;
139 }
140
141 @Override
142 public void put(@Nonnull K key, @Nonnull V value)
143 {
144 Long version = incrementVersion(key);
145 localCache.put(key, new Versioned<V>(value, version));
146 }
147
148
149
150
151
152
153
154
155
156 @Override
157 public V putIfAbsent(@Nonnull K key, @Nonnull V value)
158 {
159 Long nextVersion = getNextVersion(key);
160 Versioned<V> versioned = new Versioned<V>(value, nextVersion);
161 Versioned<V> oldValue = localCache.putIfAbsent(key, versioned);
162 if (oldValue == null)
163 {
164
165
166
167
168 incrementVersion(key);
169
170 return null;
171 }
172 return oldValue.getValue();
173 }
174
175 @Override
176 public void remove(@Nonnull K key)
177 {
178
179 incrementVersion(key);
180 localCache.remove(key);
181 }
182
183 @Override
184 public boolean remove(@Nonnull K key, @Nonnull V value)
185 {
186 Versioned<V> currentValue = null;
187 try
188 {
189 currentValue = getInternal(key);
190 }
191 catch (CacheException e)
192 {
193
194
195 log.debug("Swallowing exception thrown during call to remove, when looking up cache key: " + key, e);
196 }
197 if (currentValue != null && equal(value, currentValue.getValue()))
198 {
199 if (localCache.remove(key, currentValue))
200 {
201
202 incrementVersion(key);
203 return true;
204 }
205 }
206 return false;
207 }
208
209 @Override
210 public void removeAll()
211 {
212
213 versionMap.executeOnEntries(IncrementVersionEntryProcessor.getInstance());
214 localCache.removeAll();
215 }
216
217 @Override
218 public boolean replace(@Nonnull K key, @Nonnull V oldValue, @Nonnull V newValue)
219 {
220 Versioned<V> currentValue = getInternal(key);
221 if (equal(oldValue, currentValue.getValue()))
222 {
223 Long nextVersion = getNextVersion(key);
224 if (localCache.replace(key, currentValue, new Versioned<V>(newValue, nextVersion)))
225 {
226
227
228
229
230 incrementVersion(key);
231 return true;
232 }
233 }
234
235 return false;
236 }
237
238 @Override
239 public boolean updateExpireAfterAccess(long expireAfter, @Nonnull TimeUnit timeUnit)
240 {
241 if (!super.updateExpireAfterAccess(expireAfter, timeUnit))
242 {
243 return false;
244 }
245 CacheSettings settings = new CacheSettingsBuilder(getCacheSettings())
246 .expireAfterAccess(expireAfter * HazelcastMapConfigConfigurator.HYBRID_MULTIPLIER, timeUnit)
247 .build();
248 cacheManager.updateCacheSettings(getHazelcastMapName(), settings);
249 return true;
250 }
251
252 @Override
253 public boolean updateExpireAfterWrite(long expireAfter, @Nonnull TimeUnit timeUnit)
254 {
255 if (!super.updateExpireAfterAccess(expireAfter, timeUnit))
256 {
257 return false;
258 }
259 CacheSettings settings = new CacheSettingsBuilder(getCacheSettings())
260 .expireAfterAccess(expireAfter * HazelcastMapConfigConfigurator.HYBRID_MULTIPLIER, timeUnit)
261 .build();
262 cacheManager.updateCacheSettings(getHazelcastMapName(), settings);
263 return true;
264 }
265
266 @Override
267 public boolean updateMaxEntries(int newValue)
268 {
269 if (!super.updateMaxEntries(newValue))
270 {
271 return false;
272 }
273 CacheSettings settings = new CacheSettingsBuilder(getCacheSettings())
274 .maxEntries(HazelcastMapConfigConfigurator.HYBRID_MULTIPLIER * newValue)
275 .build();
276 cacheManager.updateCacheSettings(getHazelcastMapName(), settings);
277 return true;
278 }
279
280 @Override
281 public void addListener(@Nonnull CacheEntryListener<K, V> listener, boolean includeValues)
282 {
283 listenerSupport.add(listener, includeValues);
284 }
285
286 @Override
287 public void removeListener(@Nonnull CacheEntryListener<K, V> listener)
288 {
289 listenerSupport.remove(listener);
290 }
291
292 @Override
293 protected ManagedCache getLocalCache()
294 {
295 return (ManagedCache) localCache;
296 }
297
298 private CacheSettings getCacheSettings()
299 {
300 return cacheManager.getCacheSettings(getHazelcastMapName());
301 }
302
303 private String getHazelcastMapName()
304 {
305 return versionMap.getName();
306 }
307
308 @Override
309 public boolean isFlushable()
310 {
311 return getCacheSettings().getFlushable(true);
312 }
313
314 private Versioned<V> loadAndVersion(final K key, Supplier<? extends V> supplier)
315 {
316 try
317 {
318
319
320 long version = getVersion(key);
321
322 V value = supplier.get();
323
324 if (value == null)
325 {
326 throw new CacheException("The generated value for cache '" + getName() + "' was null for key '" +
327 key + "'. Null values are not supported.");
328 }
329
330 log.debug("Generated value '{}' for key '{}' in cache with name '{}'", value, key, localCache.getName());
331 return new Versioned<V>(value, version);
332 }
333 catch (RuntimeException e)
334 {
335 Throwables.propagateIfInstanceOf(e, CacheException.class);
336 throw new CacheException("Error generating a value for key '" + key + "' in cache '" + localCache.getName() + "'", e);
337 }
338 }
339
340 @Nonnull
341 private Versioned<V> getInternal(K key)
342 {
343 Versioned<V> versioned = localCache.get(key);
344 if (versioned != null)
345 {
346 Long version = versionMap.get(key);
347 if (version != null && version == versioned.getVersion())
348 {
349
350 return versioned;
351 }
352
353
354 localCache.remove(key);
355
356 if (selfLoading)
357 {
358
359
360 return localCache.get(key);
361 }
362 }
363 return Versioned.empty();
364 }
365
366 @Nonnull
367 private Versioned<V> getInternal(K key, Supplier<Versioned<V>> valueSupplier)
368 {
369 Versioned<V> versioned = localCache.get(key, valueSupplier);
370 Long version = versionMap.get(key);
371 if (version != null && version == versioned.getVersion())
372 {
373
374 return versioned;
375 }
376
377
378 localCache.remove(key);
379
380
381 return localCache.get(key, valueSupplier);
382 }
383
384 private Long getNextVersion(K key)
385 {
386 Long version = versionMap.get(key);
387 return version == null ? 1L : version + 1L;
388 }
389
390 private Long getVersion(K key)
391 {
392
393 Long version = versionMap.get(key);
394 if (version == null)
395 {
396 version = (Long) versionMap.executeOnKey(key, GetOrInitVersionEntryProcessor.getInstance());
397 }
398 return version;
399 }
400
401 private Long incrementVersion(K key)
402 {
403 return (Long) versionMap.executeOnKey(key, IncrementVersionEntryProcessor.getInstance());
404 }
405
406 private static class DelegatingCacheEntryListener<K, V> implements CacheEntryListener<K, Versioned<V>>
407 {
408 private final CacheEntryListenerSupport<K, V> listenerSupport;
409
410 private DelegatingCacheEntryListener(final CacheEntryListenerSupport<K, V> listenerSupport)
411 {
412 this.listenerSupport = listenerSupport;
413 }
414
415 @Override
416 public void onAdd(@Nonnull CacheEntryEvent<K, Versioned<V>> event)
417 {
418 listenerSupport.notifyAdd(event.getKey(), get(event.getValue()));
419 }
420
421 @Override
422 public void onEvict(@Nonnull CacheEntryEvent<K, Versioned<V>> event)
423 {
424 listenerSupport.notifyEvict(event.getKey(), get(event.getOldValue()));
425 }
426
427 @Override
428 public void onRemove(@Nonnull CacheEntryEvent<K, Versioned<V>> event)
429 {
430 listenerSupport.notifyRemove(event.getKey(), get(event.getOldValue()));
431 }
432
433 @Override
434 public void onUpdate(@Nonnull CacheEntryEvent<K, Versioned<V>> event)
435 {
436 listenerSupport.notifyUpdate(event.getKey(), get(event.getValue()), get(event.getOldValue()));
437 }
438
439 private V get(Versioned<V> versioned)
440 {
441 return versioned != null ? versioned.getValue() : null;
442 }
443 }
444 }