1 package com.atlassian.cache.hazelcast;
2
3 import java.io.Serializable;
4 import java.util.Map;
5
6 import javax.annotation.Nonnull;
7 import javax.annotation.Nullable;
8 import javax.annotation.PostConstruct;
9 import javax.annotation.PreDestroy;
10
11 import com.atlassian.cache.Cache;
12 import com.atlassian.cache.CacheFactory;
13 import com.atlassian.cache.CacheLoader;
14 import com.atlassian.cache.CacheSettings;
15 import com.atlassian.cache.CacheSettingsBuilder;
16 import com.atlassian.cache.CacheSettingsDefaultsProvider;
17 import com.atlassian.cache.CachedReference;
18 import com.atlassian.cache.ManagedCache;
19 import com.atlassian.cache.Supplier;
20 import com.atlassian.cache.impl.AbstractCacheManager;
21 import com.atlassian.cache.impl.ReferenceKey;
22 import com.atlassian.cache.impl.StrongSupplier;
23 import com.atlassian.cache.impl.WeakSupplier;
24 import com.atlassian.hazelcast.serialization.OsgiSafe;
25
26 import com.hazelcast.config.Config;
27 import com.hazelcast.config.ConfigurationException;
28 import com.hazelcast.config.MapConfig;
29 import com.hazelcast.core.HazelcastInstance;
30 import com.hazelcast.core.IMap;
31 import com.hazelcast.core.ITopic;
32 import com.hazelcast.core.MembershipAdapter;
33 import com.hazelcast.core.MembershipEvent;
34 import com.hazelcast.map.impl.MapContainer;
35 import com.hazelcast.map.impl.MapService;
36 import com.hazelcast.map.impl.MapServiceContext;
37 import com.hazelcast.map.impl.proxy.MapProxyImpl;
38 import com.hazelcast.map.listener.EntryAddedListener;
39 import com.hazelcast.map.listener.EntryUpdatedListener;
40
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43
44 import static com.google.common.base.Preconditions.checkNotNull;
45
46
47
48
49 public class HazelcastCacheManager extends AbstractCacheManager
50 {
51 private static final Logger log = LoggerFactory.getLogger(HazelcastCacheManager.class);
52
53 protected static final String PREFIX = "atlassian-cache.";
54 protected static final String PREFIX_CACHE = PREFIX + "Cache.";
55 protected static final String PREFIX_CACHE_REFERENCE = PREFIX + "CacheReference.";
56 protected static final String SETTINGS_MAP_NAME = PREFIX + "settings";
57
58 private final HazelcastInstance hazelcast;
59 private final CacheFactory localCacheFactory;
60 private final IMap<String, CacheSettings> mapSettings;
61 private final String mapSettingsUpdatedListenerId;
62 private final String mapSettingsAddedListenerId;
63 private final String membershipListenerId;
64
65 private MapServiceContext mapServiceContext;
66
67 public HazelcastCacheManager(HazelcastInstance hazelcast, CacheFactory localCacheFactory,
68 CacheSettingsDefaultsProvider cacheSettingsDefaultsProvider)
69 {
70 super(cacheSettingsDefaultsProvider);
71
72 this.hazelcast = hazelcast;
73 this.localCacheFactory = localCacheFactory;
74 this.mapSettings = hazelcast.getMap(SETTINGS_MAP_NAME);
75
76
77 this.mapSettingsUpdatedListenerId = mapSettings.addEntryListener(
78 (EntryUpdatedListener<String, CacheSettings>) event -> reconfigureMap(event.getKey(), event.getValue()),
79 true);
80 this.mapSettingsAddedListenerId = mapSettings.addEntryListener(
81 (EntryAddedListener<String, CacheSettings>) event -> configureMap(event.getKey(), event.getValue()),
82 true);
83
84
85 this.membershipListenerId = hazelcast.getCluster().addMembershipListener(new MembershipAdapter()
86 {
87 @Override
88 public void memberAdded(MembershipEvent membershipEvent)
89 {
90 maybeUpdateMapContainers();
91 }
92 });
93 }
94
95 public HazelcastInstance getHazelcastInstance()
96 {
97 return hazelcast;
98 }
99
100
101
102
103 @PostConstruct
104 public void init()
105 {
106 maybeUpdateMapContainers();
107 }
108
109 @Override
110 protected <K, V> ManagedCache createComputingCache(@Nonnull final String name, @Nonnull final CacheSettings settings, final CacheLoader<K, V> loader)
111 {
112 checkSettingsAreCompatible(name, settings);
113
114
115
116 return cacheCreationLocks.apply(name).withLock(new java.util.function.Supplier<ManagedCache>()
117 {
118 @Override
119 public ManagedCache get()
120 {
121 if (!caches.containsKey(name) || loader != null)
122 {
123 caches.put(name, new WeakSupplier<ManagedCache>((ManagedCache) doCreateCache(name, loader, settings)));
124 }
125 return caches.get(name).get();
126 }
127 });
128 }
129
130 @Override
131 protected ManagedCache createSimpleCache(@Nonnull final String name, @Nonnull final CacheSettings settings)
132 {
133 checkSettingsAreCompatible(name, settings);
134
135 ManagedCache existing = getManagedCache(name);
136 if (existing != null)
137 {
138 return existing;
139 }
140
141 return cacheCreationLocks.apply(name).withLock(new java.util.function.Supplier<ManagedCache>()
142 {
143 @Override
144 public ManagedCache get()
145 {
146 if (!caches.containsKey(name))
147 {
148 caches.put(name, new StrongSupplier<ManagedCache>((ManagedCache) doCreateCache(name, null, settings)));
149 }
150 return caches.get(name).get();
151 }
152 });
153 }
154
155
156
157
158 @PreDestroy
159 public void destroy()
160 {
161 mapSettings.removeEntryListener(mapSettingsAddedListenerId);
162 mapSettings.removeEntryListener(mapSettingsUpdatedListenerId);
163 hazelcast.getCluster().removeMembershipListener(membershipListenerId);
164 }
165
166 @Nonnull
167 @Override
168 public <V> CachedReference<V> getCachedReference(@Nonnull final String name, @Nonnull final Supplier<V> supplier,
169 @Nonnull final CacheSettings settings)
170 {
171 checkSettingsAreCompatible(name, settings);
172
173 return cacheCreationLocks.apply(name).withLock(new java.util.function.Supplier<CachedReference<V>>()
174 {
175 @Override
176 public CachedReference<V> get()
177 {
178 caches.put(name, new WeakSupplier<ManagedCache>((ManagedCache) doCreateCachedReference(name, supplier, settings)));
179
180 return (CachedReference<V>) caches.get(name).get();
181 }
182 });
183 }
184
185 protected void checkSettingsAreCompatible(String name, CacheSettings settings)
186 {
187 }
188
189 protected MapConfig configureMap(String mapName, CacheSettings settings)
190 {
191 Config config = hazelcast.getConfig();
192 MapConfig mapConfig = config.findMapConfig(mapName);
193 if (!mapConfig.getName().equals(mapName))
194 {
195 mapConfig = convertAndStoreMapConfig(mapName, settings, config, mapConfig);
196
197
198 mapSettings.putIfAbsent(mapName, asSerializable(settings));
199 }
200 else
201 {
202 log.debug("Using existing cache configuration for cache {}", mapName);
203 mapSettings.computeIfAbsent(mapName, (key)-> asSerializable(settings));
204 }
205 updateMapContainer(mapName, mapConfig);
206
207 return mapConfig.getAsReadOnly();
208 }
209
210 protected MapConfig reconfigureMap(String mapName, CacheSettings newSettings)
211 {
212 Config config = hazelcast.getConfig();
213
214 MapConfig baseConfig = config.getMapConfig(mapName);
215
216 return convertAndStoreMapConfig(mapName, newSettings, config, baseConfig);
217 }
218
219
220
221
222
223
224
225
226 public boolean updateCacheSettings(@Nonnull String mapName, @Nonnull CacheSettings newSettings)
227 {
228 try {
229
230 MapConfig mapConfig = hazelcast.getConfig().findMapConfig(mapName);
231 if (!mapConfig.getName().equals(mapName)) {
232 return false;
233 }
234
235 MapConfig newConfig = reconfigureMap(mapName, newSettings);
236
237 boolean result = updateMapContainer(mapName, newConfig);
238 if (result) {
239
240 mapSettings.put(mapName, asSerializable(newSettings));
241 }
242
243 return result;
244 }catch (ConfigurationException exception) {
245 if(log.isDebugEnabled()) {
246 log.debug("Updating configuration of {} failed: ", mapName, exception);
247 } else {
248 log.warn("Updating cache settings on {} is not possible", mapName);
249 }
250 return false;
251 }
252 }
253
254 private MapConfig convertAndStoreMapConfig(String mapName, CacheSettings newSettings, Config config, MapConfig baseConfig)
255 {
256 MapConfig newConfig = new MapConfig(baseConfig);
257 newConfig.setName(mapName);
258 newConfig.setStatisticsEnabled(true);
259 new HazelcastMapConfigConfigurator().configureMapConfig(newSettings, newConfig, hazelcast.getPartitionService().getPartitions().size());
260 MapConfig oldConfig = config.findMapConfig(mapName);
261 if(oldConfig ==null || !oldConfig.getName().equals(mapName)) {
262 config.addMapConfig(newConfig);
263 }
264 return newConfig;
265 }
266
267 protected <K, V> Cache<K, V> createAsyncHybridCache(String name, CacheLoader<K, V> loader, CacheSettings settings)
268 {
269 final String topicName = PREFIX_CACHE + name;
270 final ITopic<K> topic = hazelcast.getTopic(topicName);
271
272 return new HazelcastAsyncHybridCache<K, V>(name, localCacheFactory, topic, loader, this, settings);
273 }
274
275 protected <V> CachedReference<V> createAsyncHybridCachedReference(String name, Supplier<V> supplier, CacheSettings settings)
276 {
277 final String topicName = PREFIX_CACHE_REFERENCE + name;
278 final ITopic<ReferenceKey> topic = hazelcast.getTopic(topicName);
279
280 return new HazelcastAsyncHybridCachedReference<V>(name, localCacheFactory, topic, supplier, this, settings);
281 }
282
283 protected <K, V> Cache<K, V> createDistributedCache(String name, CacheLoader<K, V> loader, CacheSettings settings)
284 {
285 final String mapName = PREFIX_CACHE + name;
286 configureMap(mapName, settings);
287 final IMap<K, OsgiSafe<V>> map = hazelcast.getMap(mapName);
288 final CacheVersion cacheVersion = new CacheVersion(hazelcast, mapName);
289
290 return new HazelcastCache<K, V>(name, map, loader, cacheVersion, this);
291 }
292
293 protected <V> CachedReference<V> createDistributedCachedReference(String name, Supplier<V> supplier, CacheSettings settings)
294 {
295
296
297 final CacheSettings overriddenSettings = checkNotNull(settings, "settings").override(
298 new CacheSettingsBuilder().flushable().maxEntries(1000).build());
299
300 final String mapName = PREFIX_CACHE_REFERENCE + name;
301 configureMap(mapName, overriddenSettings);
302 final IMap<String, OsgiSafe<V>> map = hazelcast.getMap(mapName);
303
304 return new HazelcastCachedReference<V>(name, map, supplier, this);
305 }
306
307 protected <K, V> Cache<K, V> createHybridCache(String name, CacheLoader<K, V> loader, CacheSettings settings)
308 {
309 final String mapName = PREFIX_CACHE + name;
310 configureMap(mapName, settings);
311 final IMap<K, Long> map = hazelcast.getMap(mapName);
312
313 return new HazelcastHybridCache<K, V>(name, localCacheFactory, map, loader, this);
314 }
315
316 protected <V> CachedReference<V> createHybridCachedReference(String name, Supplier<V> supplier, CacheSettings settings)
317 {
318 final String mapName = PREFIX_CACHE_REFERENCE + name;
319 configureMap(mapName, settings);
320 final IMap<ReferenceKey, Long> map = hazelcast.getMap(mapName);
321
322 return new HazelcastHybridCachedReference<V>(name, localCacheFactory, map, supplier, this);
323 }
324
325 private CacheSettings asSerializable(CacheSettings settings)
326 {
327 if (settings instanceof Serializable)
328 {
329 return settings;
330 }
331
332
333 return new CacheSettingsBuilder(settings).build();
334 }
335
336 private <K, V> Cache<K, V> doCreateCache(String name, CacheLoader<K, V> loader, CacheSettings settings)
337 {
338 if (settings.getLocal(false))
339 {
340 return localCacheFactory.getCache(name, loader, settings);
341 }
342
343 if (settings.getReplicateViaCopy(true))
344 {
345 return createDistributedCache(name, loader, settings);
346 }
347
348 if (settings.getReplicateAsynchronously(true))
349 {
350 return createAsyncHybridCache(name, loader, settings);
351 }
352 else
353 {
354 return createHybridCache(name, loader, settings);
355 }
356 }
357
358 private <V> CachedReference<V> doCreateCachedReference(String name, Supplier<V> supplier, CacheSettings settings)
359 {
360 if (settings.getLocal(false))
361 {
362 return localCacheFactory.getCachedReference(name, supplier, settings);
363 }
364
365
366 if (settings.getReplicateViaCopy(true))
367 {
368 return createDistributedCachedReference(name, supplier, settings);
369 }
370
371 if (settings.getReplicateAsynchronously(true))
372 {
373 return createAsyncHybridCachedReference(name, supplier, settings);
374 }
375 else
376 {
377 return createHybridCachedReference(name, supplier, settings);
378 }
379 }
380
381 private MapContainer getMapContainer(@Nonnull String name)
382 {
383 if (mapServiceContext == null)
384 {
385 MapProxyImpl<String, CacheSettings> proxy = hazelcast.getDistributedObject(MapService.SERVICE_NAME, SETTINGS_MAP_NAME);
386 mapServiceContext = proxy.getService().getMapServiceContext();
387 }
388
389 return mapServiceContext.getMapContainer(name);
390 }
391
392 private boolean updateMapContainer(String mapName, MapConfig config)
393 {
394 final MapContainer container = getMapContainer(mapName);
395 if (container == null)
396 {
397 return false;
398 }
399
400 container.setMapConfig(config);
401 container.initEvictor();
402
403 return true;
404 }
405
406 private void maybeUpdateMapContainers()
407 {
408 for (Map.Entry<String, CacheSettings> entry : mapSettings.entrySet())
409 {
410 configureMap(entry.getKey(), entry.getValue());
411 }
412 }
413
414
415
416
417
418
419
420 @Nullable
421 MapConfig getMapConfig(@Nonnull String mapName)
422 {
423 MapContainer mapContainer = getMapContainer(mapName);
424 MapConfig mapConfig = mapContainer.getMapConfig();
425 if (!mapConfig.getName().equals(mapName))
426 {
427 return null;
428 }
429 else
430 {
431 return mapConfig;
432 }
433 }
434
435
436
437
438
439
440
441
442 @Nullable
443 CacheSettings getCacheSettings(@Nonnull String mapName)
444 {
445 return mapSettings.get(mapName);
446 }
447 }