1 package com.atlassian.cache.hazelcast;
2
3 import java.io.Serializable;
4 import java.util.Map;
5
6 import javax.annotation.Nonnull;
7 import javax.annotation.Nullable;
8 import javax.annotation.PostConstruct;
9 import javax.annotation.PreDestroy;
10
11 import com.atlassian.cache.Cache;
12 import com.atlassian.cache.CacheFactory;
13 import com.atlassian.cache.CacheLoader;
14 import com.atlassian.cache.CacheSettings;
15 import com.atlassian.cache.CacheSettingsBuilder;
16 import com.atlassian.cache.CacheSettingsDefaultsProvider;
17 import com.atlassian.cache.CachedReference;
18 import com.atlassian.cache.ManagedCache;
19 import com.atlassian.cache.Supplier;
20 import com.atlassian.cache.impl.AbstractCacheManager;
21 import com.atlassian.cache.impl.ReferenceKey;
22 import com.atlassian.cache.impl.StrongSupplier;
23 import com.atlassian.cache.impl.WeakSupplier;
24 import com.atlassian.hazelcast.serialization.OsgiSafe;
25
26 import com.hazelcast.config.Config;
27 import com.hazelcast.config.MapConfig;
28 import com.hazelcast.core.EntryAdapter;
29 import com.hazelcast.core.EntryEvent;
30 import com.hazelcast.core.HazelcastInstance;
31 import com.hazelcast.core.IMap;
32 import com.hazelcast.core.MembershipAdapter;
33 import com.hazelcast.core.MembershipEvent;
34 import com.hazelcast.map.impl.MapContainer;
35 import com.hazelcast.map.impl.MapService;
36 import com.hazelcast.map.impl.MapServiceContext;
37 import com.hazelcast.map.impl.proxy.MapProxyImpl;
38
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42 import static com.google.common.base.Preconditions.checkNotNull;
43
44
45
46
47 public class HazelcastCacheManager extends AbstractCacheManager
48 {
49 private static final Logger log = LoggerFactory.getLogger(HazelcastCacheManager.class);
50
51 protected static final String PREFIX = "atlassian-cache.";
52 protected static final String PREFIX_CACHE = PREFIX + "Cache.";
53 protected static final String PREFIX_CACHE_REFERENCE = PREFIX + "CacheReference.";
54 protected static final String SETTINGS_MAP_NAME = PREFIX + "settings";
55
56 private final HazelcastInstance hazelcast;
57 private final CacheFactory localCacheFactory;
58 private final IMap<String, CacheSettings> mapSettings;
59 private final String mapSettingsListenerId;
60 private final String membershipListenerId;
61
62 private MapServiceContext mapServiceContext;
63
64 public HazelcastCacheManager(HazelcastInstance hazelcast, CacheFactory localCacheFactory,
65 CacheSettingsDefaultsProvider cacheSettingsDefaultsProvider)
66 {
67 super(cacheSettingsDefaultsProvider);
68
69 this.hazelcast = hazelcast;
70 this.localCacheFactory = localCacheFactory;
71 this.mapSettings = hazelcast.getMap(SETTINGS_MAP_NAME);
72
73
74 this.mapSettingsListenerId = mapSettings.addEntryListener(new EntryAdapter<String, CacheSettings>()
75 {
76 @Override
77 public void entryAdded(EntryEvent<String, CacheSettings> event)
78 {
79 configureMap(event.getKey(), event.getValue());
80 }
81
82 @Override
83 public void entryUpdated(EntryEvent<String, CacheSettings> event)
84 {
85 reconfigureMap(event.getKey(), event.getValue());
86 }
87 }, true);
88
89
90 this.membershipListenerId = hazelcast.getCluster().addMembershipListener(new MembershipAdapter()
91 {
92 @Override
93 public void memberAdded(MembershipEvent membershipEvent)
94 {
95 maybeUpdateMapContainers();
96 }
97 });
98 }
99
100
101
102
103 @PostConstruct
104 public void init()
105 {
106 maybeUpdateMapContainers();
107 }
108
109 @Override
110 protected <K, V> ManagedCache createComputingCache(@Nonnull final String name, @Nonnull final CacheSettings settings, final CacheLoader<K, V> loader)
111 {
112 checkSettingsAreCompatible(name, settings);
113
114
115
116 return cacheCreationLocks.get(name).withLock(new com.atlassian.util.concurrent.Supplier<ManagedCache>()
117 {
118 @Override
119 public ManagedCache get()
120 {
121 if (!caches.containsKey(name) || loader != null)
122 {
123 caches.put(name, new WeakSupplier<ManagedCache>((ManagedCache) doCreateCache(name, loader, settings)));
124 }
125 return caches.get(name).get();
126 }
127 });
128 }
129
130 @Override
131 protected ManagedCache createSimpleCache(@Nonnull final String name, @Nonnull final CacheSettings settings)
132 {
133 checkSettingsAreCompatible(name, settings);
134
135 ManagedCache existing = getManagedCache(name);
136 if (existing != null)
137 {
138 return existing;
139 }
140
141 return cacheCreationLocks.get(name).withLock(new com.atlassian.util.concurrent.Supplier<ManagedCache>()
142 {
143 @Override
144 public ManagedCache get()
145 {
146 if (!caches.containsKey(name))
147 {
148 caches.put(name, new StrongSupplier<ManagedCache>((ManagedCache) doCreateCache(name, null, settings)));
149 }
150 return caches.get(name).get();
151 }
152 });
153 }
154
155
156
157
158 @PreDestroy
159 public void destroy()
160 {
161 mapSettings.removeEntryListener(mapSettingsListenerId);
162 hazelcast.getCluster().removeMembershipListener(membershipListenerId);
163 }
164
165 @Nonnull
166 @Override
167 public <V> CachedReference<V> getCachedReference(@Nonnull final String name, @Nonnull final Supplier<V> supplier,
168 @Nonnull final CacheSettings settings)
169 {
170 checkSettingsAreCompatible(name, settings);
171
172 return cacheCreationLocks.get(name).withLock(new com.atlassian.util.concurrent.Supplier<CachedReference<V>>()
173 {
174 @Override
175 public CachedReference<V> get()
176 {
177 caches.put(name, new WeakSupplier<ManagedCache>((ManagedCache) doCreateCachedReference(name, supplier, settings)));
178
179 return (CachedReference<V>) caches.get(name).get();
180 }
181 });
182 }
183
184 protected void checkSettingsAreCompatible(String name, CacheSettings settings)
185 {
186 }
187
188 protected MapConfig configureMap(String mapName, CacheSettings settings)
189 {
190 Config config = hazelcast.getConfig();
191 MapConfig mapConfig = config.findMapConfig(mapName);
192 if (!mapConfig.getName().equals(mapName))
193 {
194 mapConfig = convertAndStoreMapConfig(mapName, settings, config, mapConfig);
195
196
197 mapSettings.putIfAbsent(mapName, asSerializable(settings));
198 }
199 else
200 {
201 log.debug("Using existing cache configuration for cache {}", mapName);
202 }
203
204 updateMapContainerIfAbsent(mapName, mapConfig);
205
206 return mapConfig.getAsReadOnly();
207 }
208
209 protected MapConfig reconfigureMap(String mapName, CacheSettings newSettings)
210 {
211 Config config = hazelcast.getConfig();
212
213 MapConfig baseConfig = config.getMapConfig(mapName);
214
215 return convertAndStoreMapConfig(mapName, newSettings, config, baseConfig);
216 }
217
218
219
220
221
222
223
224
225 public boolean updateCacheSettings(@Nonnull String mapName, @Nonnull CacheSettings newSettings)
226 {
227
228 MapConfig mapConfig = hazelcast.getConfig().findMapConfig(mapName);
229 if (!mapConfig.getName().equals(mapName))
230 {
231 return false;
232 }
233
234 MapConfig newConfig = reconfigureMap(mapName, newSettings);
235
236 boolean result = updateMapContainer(mapName, newConfig);
237 if (result)
238 {
239
240 mapSettings.put(mapName, asSerializable(newSettings));
241 }
242
243 return result;
244 }
245
246 private MapConfig convertAndStoreMapConfig(String mapName, CacheSettings newSettings, Config config, MapConfig baseConfig)
247 {
248 MapConfig newConfig = new MapConfig(baseConfig);
249 newConfig.setName(mapName);
250 newConfig.setStatisticsEnabled(true);
251 new HazelcastMapConfigConfigurator().configureMapConfig(newSettings, newConfig);
252 config.addMapConfig(newConfig);
253 return newConfig;
254 }
255
256 protected <K, V> Cache<K, V> createDistributedCache(String name, CacheLoader<K, V> loader, CacheSettings settings)
257 {
258 final String mapName = PREFIX_CACHE + name;
259 configureMap(mapName, settings);
260 final IMap<K, OsgiSafe<V>> map = hazelcast.getMap(mapName);
261 final CacheVersion cacheVersion = new CacheVersion(hazelcast, mapName);
262
263 return new HazelcastCache<K, V>(name, map, loader, cacheVersion, this);
264 }
265
266 protected <V> CachedReference<V> createDistributedCachedReference(String name, Supplier<V> supplier, CacheSettings settings)
267 {
268
269
270 final CacheSettings overriddenSettings = checkNotNull(settings, "settings").override(
271 new CacheSettingsBuilder().flushable().maxEntries(1000).build());
272
273 final String mapName = PREFIX_CACHE_REFERENCE + name;
274 configureMap(mapName, overriddenSettings);
275 final IMap<String, OsgiSafe<V>> map = hazelcast.getMap(mapName);
276
277 return new HazelcastCachedReference<V>(name, map, supplier, this);
278 }
279
280 protected <K, V> Cache<K, V> createHybridCache(String name, CacheLoader<K, V> loader, CacheSettings settings)
281 {
282 final String mapName = PREFIX_CACHE + name;
283 configureMap(mapName, settings);
284 final IMap<K, Long> map = hazelcast.getMap(mapName);
285
286 return new HazelcastHybridCache<K, V>(name, localCacheFactory, map, loader, this);
287 }
288
289 protected <V> CachedReference<V> createHybridCachedReference(String name, Supplier<V> supplier, CacheSettings settings)
290 {
291 final String mapName = PREFIX_CACHE_REFERENCE + name;
292 configureMap(mapName, settings);
293 final IMap<ReferenceKey, Long> map = hazelcast.getMap(mapName);
294
295 return new HazelcastHybridCachedReference<V>(name, localCacheFactory, map, supplier, this);
296 }
297
298 private CacheSettings asSerializable(CacheSettings settings)
299 {
300 if (settings instanceof Serializable)
301 {
302 return settings;
303 }
304
305
306 return new CacheSettingsBuilder(settings).build();
307 }
308
309 private <K, V> Cache<K, V> doCreateCache(String name, CacheLoader<K, V> loader, CacheSettings settings)
310 {
311 if (settings.getLocal(false))
312 {
313 return localCacheFactory.getCache(name, loader, settings);
314 }
315
316 if (settings.getReplicateViaCopy(true))
317 {
318 return createDistributedCache(name, loader, settings);
319 }
320
321 return createHybridCache(name, loader, settings);
322 }
323
324 private <V> CachedReference<V> doCreateCachedReference(String name, Supplier<V> supplier, CacheSettings settings)
325 {
326 if (settings.getLocal(false))
327 {
328 return localCacheFactory.getCachedReference(name, supplier, settings);
329 }
330
331
332 if (settings.getReplicateViaCopy(true))
333 {
334 return createDistributedCachedReference(name, supplier, settings);
335 }
336
337 return createHybridCachedReference(name, supplier, settings);
338 }
339
340 private MapContainer getMapContainer(@Nonnull String name)
341 {
342 if (mapServiceContext == null)
343 {
344 MapProxyImpl<String, CacheSettings> proxy = hazelcast.getDistributedObject(MapService.SERVICE_NAME, SETTINGS_MAP_NAME);
345 mapServiceContext = proxy.getService().getMapServiceContext();
346 }
347
348 return mapServiceContext.getMapContainer(name);
349 }
350
351 private boolean updateMapContainer(String mapName, MapConfig config)
352 {
353 final MapContainer container = getMapContainer(mapName);
354 if (container == null)
355 {
356 return false;
357 }
358
359 container.setMapConfig(config);
360
361 return true;
362 }
363
364 private boolean updateMapContainerIfAbsent(String mapName, MapConfig config)
365 {
366 final MapContainer container = getMapContainer(mapName);
367 if (container == null || container.getMapConfig().getName().equals(config.getName()))
368 {
369
370 return false;
371 }
372
373 container.setMapConfig(config);
374
375 return true;
376 }
377
378 private void maybeUpdateMapContainers()
379 {
380 for (Map.Entry<String, CacheSettings> entry : mapSettings.entrySet())
381 {
382 configureMap(entry.getKey(), entry.getValue());
383 }
384 }
385
386
387
388
389
390
391
392 @Nullable
393 MapConfig getMapConfig(@Nonnull String mapName)
394 {
395 MapConfig mapConfig = hazelcast.getConfig().findMapConfig(mapName);
396 if (!mapConfig.getName().equals(mapName))
397 {
398 return null;
399 }
400 else
401 {
402 return mapConfig;
403 }
404 }
405
406
407
408
409
410
411
412
413 @Nullable
414 CacheSettings getCacheSettings(@Nonnull String mapName)
415 {
416 return mapSettings.get(mapName);
417 }
418 }