1 package com.atlassian.cache.hazelcast;
2
3 import java.util.Collection;
4 import java.util.concurrent.TimeUnit;
5
6 import javax.annotation.Nonnull;
7
8 import com.atlassian.cache.Cache;
9 import com.atlassian.cache.CacheEntryEvent;
10 import com.atlassian.cache.CacheEntryListener;
11 import com.atlassian.cache.CacheException;
12 import com.atlassian.cache.CacheFactory;
13 import com.atlassian.cache.CacheLoader;
14 import com.atlassian.cache.CacheSettings;
15 import com.atlassian.cache.ManagedCache;
16 import com.atlassian.cache.Supplier;
17 import com.atlassian.cache.impl.CacheEntryListenerSupport;
18 import com.atlassian.cache.impl.CacheLoaderSupplier;
19 import com.atlassian.cache.impl.ValueCacheEntryListenerSupport;
20
21 import com.google.common.base.Throwables;
22 import com.hazelcast.config.MapConfig;
23 import com.hazelcast.core.IMap;
24
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27
28 import static com.google.common.base.Objects.equal;
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45 public class HazelcastHybridCache<K, V> extends ManagedHybridCacheSupport implements Cache<K, V>
46 {
47
48 private static final Logger log = LoggerFactory.getLogger(HazelcastHybridCache.class);
49
50 private final Cache<K, Versioned<V>> localCache;
51 private final MapConfig config;
52 private final boolean selfLoading;
53 private final IMap<K, Long> versionMap;
54
55 private final CacheEntryListenerSupport<K, V> listenerSupport = new ValueCacheEntryListenerSupport<K, V>()
56 {
57 @Override
58 protected void initValue(final CacheEntryListenerSupport<K, V> actualListenerSupport)
59 {
60 localCache.addListener(new DelegatingCacheEntryListener<K, V>(actualListenerSupport), true);
61 }
62
63 @Override
64 protected void initValueless(final CacheEntryListenerSupport<K, V> actualListenerSupport)
65 {
66 localCache.addListener(new DelegatingCacheEntryListener<K, V>(actualListenerSupport), false);
67 }
68 };
69
70 public HazelcastHybridCache(String name, CacheFactory localCacheFactory, IMap<K, Long> versionMap, MapConfig config,
71 final CacheLoader<K, V> cacheLoader, CacheSettings settings)
72 {
73 super(name, settings);
74
75 this.config = config;
76 this.selfLoading = cacheLoader != null;
77
78 CacheLoader<K, Versioned<V>> versionedCacheLoader = selfLoading ? new CacheLoader<K, Versioned<V>>()
79 {
80 @Nonnull
81 @Override
82 public Versioned<V> load(@Nonnull K key)
83 {
84 return loadAndVersion(key, new CacheLoaderSupplier<K, V>(key, cacheLoader));
85 }
86 } : null;
87
88 this.localCache = localCacheFactory.getCache(name, versionedCacheLoader, settings);
89 this.versionMap = versionMap;
90 }
91
92 @Override
93 public void clear()
94 {
95 if (flushable)
96 {
97 removeAll();
98 }
99 }
100
101 @Override
102 public boolean containsKey(@Nonnull final K k)
103 {
104 return localCache.containsKey(k);
105 }
106
107 @Override
108 public V get(@Nonnull K key)
109 {
110 return getInternal(key).getValue();
111 }
112
113 @Nonnull
114 @Override
115 public V get(@Nonnull final K key, @Nonnull final Supplier<? extends V> valueSupplier)
116 {
117 return getInternal(key, new Supplier<Versioned<V>>()
118 {
119 @Override
120 public Versioned<V> get()
121 {
122 return loadAndVersion(key, valueSupplier);
123 }
124 }).getValue();
125 }
126
127 @Nonnull
128 @Override
129 public Collection<K> getKeys()
130 {
131 return localCache.getKeys();
132 }
133
134 @Nonnull
135 @Override
136 public String getName()
137 {
138 return localCache.getName();
139 }
140
141 @Override
142 public void put(@Nonnull K key, @Nonnull V value)
143 {
144 Long version = incrementVersion(key);
145 localCache.put(key, new Versioned<V>(value, version));
146 }
147
148
149
150
151
152
153
154
155
156 @Override
157 public V putIfAbsent(@Nonnull K key, @Nonnull V value)
158 {
159 Long nextVersion = getNextVersion(key);
160 Versioned<V> versioned = new Versioned<V>(value, nextVersion);
161 Versioned<V> oldValue = localCache.putIfAbsent(key, versioned);
162 if (oldValue == null)
163 {
164
165
166
167
168 incrementVersion(key);
169
170 return null;
171 }
172 return oldValue.getValue();
173 }
174
175 @Override
176 public void remove(@Nonnull K key)
177 {
178
179 versionMap.executeOnKey(key, IncrementVersionEntryProcessor.getUpdatingInstance());
180 localCache.remove(key);
181 }
182
183 @Override
184 public boolean remove(@Nonnull K key, @Nonnull V value)
185 {
186 Versioned<V> currentValue = getInternal(key);
187 if (equal(value, currentValue.getValue()))
188 {
189 if (localCache.remove(key, currentValue))
190 {
191
192 versionMap.executeOnKey(key, IncrementVersionEntryProcessor.getUpdatingInstance());
193 return true;
194 }
195 }
196 return false;
197 }
198
199 @Override
200 public void removeAll()
201 {
202 if (flushable)
203 {
204
205 versionMap.executeOnEntries(IncrementVersionEntryProcessor.getUpdatingInstance());
206 localCache.removeAll();
207 }
208 }
209
210 @Override
211 public boolean replace(@Nonnull K key, @Nonnull V oldValue, @Nonnull V newValue)
212 {
213 Versioned<V> currentValue = getInternal(key);
214 if (equal(oldValue, currentValue.getValue()))
215 {
216 Long nextVersion = getNextVersion(key);
217 if (localCache.replace(key, currentValue, new Versioned<V>(newValue, nextVersion)))
218 {
219
220
221
222
223 incrementVersion(key);
224 return true;
225 }
226 }
227
228 return false;
229 }
230
231 @Override
232 public boolean updateExpireAfterAccess(long expireAfter, @Nonnull TimeUnit timeUnit)
233 {
234 if (!super.updateExpireAfterAccess(expireAfter, timeUnit))
235 {
236 return false;
237 }
238 config.setMaxIdleSeconds(HazelcastMapConfigConfigurator.HYBRID_MULTIPLIER * (int) timeUnit.toSeconds(expireAfter));
239 return true;
240 }
241
242 @Override
243 public boolean updateExpireAfterWrite(long expireAfter, @Nonnull TimeUnit timeUnit)
244 {
245 if (!super.updateExpireAfterAccess(expireAfter, timeUnit))
246 {
247 return false;
248 }
249 config.setMaxIdleSeconds(HazelcastMapConfigConfigurator.HYBRID_MULTIPLIER * (int) timeUnit.toSeconds(expireAfter));
250 return true;
251 }
252
253 @Override
254 public boolean updateMaxEntries(int newValue)
255 {
256 if (!super.updateMaxEntries(newValue))
257 {
258 return false;
259 }
260 config.getMaxSizeConfig().setSize(HazelcastMapConfigConfigurator.HYBRID_MULTIPLIER * newValue);
261 return true;
262 }
263
264 @Override
265 public void addListener(@Nonnull CacheEntryListener<K, V> listener, boolean includeValues)
266 {
267 listenerSupport.add(listener, includeValues);
268 }
269
270 @Override
271 public void removeListener(@Nonnull CacheEntryListener<K, V> listener)
272 {
273 listenerSupport.remove(listener);
274 }
275
276 @Override
277 protected ManagedCache getManagedCache()
278 {
279 return (ManagedCache) localCache;
280 }
281
282 private Versioned<V> loadAndVersion(final K key, Supplier<? extends V> supplier)
283 {
284 try
285 {
286 V value = supplier.get();
287
288 if (value == null)
289 {
290 throw new CacheException("The generated value for cache '" + getName() + "' was null for key '" +
291 key + "'. Null values are not supported.");
292 }
293
294
295 long version = HazelcastHybridCache.this.getOrRetrieveVersion(key);
296
297 log.debug("Generated value '{}' for key '{}' in cache with name '{}'", value, key, localCache.getName());
298 return new Versioned<V>(value, version);
299 }
300 catch (RuntimeException e)
301 {
302 Throwables.propagateIfInstanceOf(e, CacheException.class);
303 throw new CacheException("Error generating a value for key '" + key + "' in cache '" + localCache.getName() + "'", e);
304 }
305 }
306
307 @Nonnull
308 private Versioned<V> getInternal(K key)
309 {
310 Versioned<V> versioned = localCache.get(key);
311 if (versioned != null)
312 {
313 Long version = versionMap.get(key);
314 if (version != null && version == versioned.getVersion())
315 {
316
317 return versioned;
318 }
319
320
321 localCache.remove(key);
322
323 if (selfLoading)
324 {
325
326
327 return localCache.get(key);
328 }
329 }
330 return Versioned.empty();
331 }
332
333 @Nonnull
334 private Versioned<V> getInternal(K key, Supplier<Versioned<V>> valueSupplier)
335 {
336 Versioned<V> versioned = localCache.get(key, valueSupplier);
337 Long version = versionMap.get(key);
338 if (version != null && version == versioned.getVersion())
339 {
340
341 return versioned;
342 }
343
344
345 localCache.remove(key);
346
347
348 return localCache.get(key, valueSupplier);
349 }
350
351 private Long getNextVersion(K key)
352 {
353 return (Long) versionMap.executeOnKey(key, IncrementVersionEntryProcessor.getInstance());
354 }
355
356 private Long getOrRetrieveVersion(K key)
357 {
358 return (Long) versionMap.executeOnKey(key, GetOrInitVersionEntryProcessor.getInstance());
359 }
360
361 private Long incrementVersion(K key)
362 {
363 return (Long) versionMap.executeOnKey(key, IncrementVersionEntryProcessor.getUpdatingInstance());
364 }
365
366 private static class DelegatingCacheEntryListener<K, V> implements CacheEntryListener<K, Versioned<V>>
367 {
368 private final CacheEntryListenerSupport<K, V> listenerSupport;
369
370 private DelegatingCacheEntryListener(final CacheEntryListenerSupport<K, V> listenerSupport)
371 {
372 this.listenerSupport = listenerSupport;
373 }
374
375 @Override
376 public void onAdd(@Nonnull CacheEntryEvent<K, Versioned<V>> event)
377 {
378 listenerSupport.notifyAdd(event.getKey(), get(event.getValue()));
379 }
380
381 @Override
382 public void onEvict(@Nonnull CacheEntryEvent<K, Versioned<V>> event)
383 {
384 listenerSupport.notifyEvict(event.getKey(), get(event.getOldValue()));
385 }
386
387 @Override
388 public void onRemove(@Nonnull CacheEntryEvent<K, Versioned<V>> event)
389 {
390 listenerSupport.notifyRemove(event.getKey(), get(event.getOldValue()));
391 }
392
393 @Override
394 public void onUpdate(@Nonnull CacheEntryEvent<K, Versioned<V>> event)
395 {
396 listenerSupport.notifyUpdate(event.getKey(), get(event.getValue()), get(event.getOldValue()));
397 }
398
399 private V get(Versioned<V> versioned)
400 {
401 return versioned != null ? versioned.getValue() : null;
402 }
403 }
404 }