View Javadoc
1   package com.atlassian.cache.hazelcast;
2   
3   import java.io.Serializable;
4   import java.util.Map;
5   
6   import javax.annotation.Nonnull;
7   import javax.annotation.Nullable;
8   import javax.annotation.PostConstruct;
9   import javax.annotation.PreDestroy;
10  
11  import com.atlassian.cache.Cache;
12  import com.atlassian.cache.CacheFactory;
13  import com.atlassian.cache.CacheLoader;
14  import com.atlassian.cache.CacheSettings;
15  import com.atlassian.cache.CacheSettingsBuilder;
16  import com.atlassian.cache.CacheSettingsDefaultsProvider;
17  import com.atlassian.cache.CachedReference;
18  import com.atlassian.cache.ManagedCache;
19  import com.atlassian.cache.Supplier;
20  import com.atlassian.cache.impl.AbstractCacheManager;
21  import com.atlassian.cache.impl.ReferenceKey;
22  import com.atlassian.cache.impl.StrongSupplier;
23  import com.atlassian.cache.impl.WeakSupplier;
24  import com.atlassian.hazelcast.serialization.OsgiSafe;
25  
26  import com.hazelcast.config.Config;
27  import com.hazelcast.config.ConfigurationException;
28  import com.hazelcast.config.MapConfig;
29  import com.hazelcast.core.HazelcastInstance;
30  import com.hazelcast.core.IMap;
31  import com.hazelcast.core.ITopic;
32  import com.hazelcast.core.MembershipAdapter;
33  import com.hazelcast.core.MembershipEvent;
34  import com.hazelcast.map.impl.MapContainer;
35  import com.hazelcast.map.impl.MapService;
36  import com.hazelcast.map.impl.MapServiceContext;
37  import com.hazelcast.map.impl.proxy.MapProxyImpl;
38  import com.hazelcast.map.listener.EntryAddedListener;
39  import com.hazelcast.map.listener.EntryUpdatedListener;
40  
41  import org.slf4j.Logger;
42  import org.slf4j.LoggerFactory;
43  
44  import static com.google.common.base.Preconditions.checkNotNull;
45  
46  /**
47   * Hazelcast implementation of the {@link com.atlassian.cache.CacheManager} contract
48   */
49  public class HazelcastCacheManager extends AbstractCacheManager
50  {
51      private static final Logger log = LoggerFactory.getLogger(HazelcastCacheManager.class);
52  
53      protected static final String PREFIX = "atlassian-cache.";
54      protected static final String PREFIX_CACHE = PREFIX + "Cache.";
55      protected static final String PREFIX_CACHE_REFERENCE = PREFIX + "CacheReference.";
56      protected static final String SETTINGS_MAP_NAME = PREFIX + "settings";
57  
58      private final HazelcastInstance hazelcast;
59      private final CacheFactory localCacheFactory;
60      private final IMap<String, CacheSettings> mapSettings;
61      private final String mapSettingsUpdatedListenerId;
62      private final String mapSettingsAddedListenerId;
63      private final String membershipListenerId;
64  
65      private MapServiceContext mapServiceContext;
66  
67      public HazelcastCacheManager(HazelcastInstance hazelcast, CacheFactory localCacheFactory,
68              CacheSettingsDefaultsProvider cacheSettingsDefaultsProvider)
69      {
70          super(cacheSettingsDefaultsProvider);
71  
72          this.hazelcast = hazelcast;
73          this.localCacheFactory = localCacheFactory;
74          this.mapSettings = hazelcast.getMap(SETTINGS_MAP_NAME);
75  
76          // add listeners to be notified of remote changes to the map config so the local config can be updated too
77          this.mapSettingsUpdatedListenerId = mapSettings.addEntryListener(
78                  (EntryUpdatedListener<String, CacheSettings>) event -> reconfigureMap(event.getKey(), event.getValue()),
79                  true);
80          this.mapSettingsAddedListenerId = mapSettings.addEntryListener(
81                  (EntryAddedListener<String, CacheSettings>) event -> configureMap(event.getKey(), event.getValue()),
82                  true);
83  
84          // add a listener for join events to ensure that the local cache configs are updated
85          this.membershipListenerId = hazelcast.getCluster().addMembershipListener(new MembershipAdapter()
86          {
87              @Override
88              public void memberAdded(MembershipEvent membershipEvent)
89              {
90                  maybeUpdateMapContainers();
91              }
92          });
93      }
94  
95      public HazelcastInstance getHazelcastInstance()
96      {
97          return hazelcast;
98      }
99  
100     /**
101      * Initializes the manager. This method must be called after the bean has been created.
102      */
103     @PostConstruct
104     public void init()
105     {
106         maybeUpdateMapContainers();
107     }
108 
109     @Override
110     protected <K, V> ManagedCache createComputingCache(@Nonnull final String name, @Nonnull final CacheSettings settings, final CacheLoader<K, V> loader)
111     {
112         checkSettingsAreCompatible(name, settings);
113 
114         // when a loader is provided, always create a new ManagedCache to ensure the correct loader is being used.
115         // if the cache already existed, the backing values will be reused
116         return cacheCreationLocks.apply(name).withLock(new java.util.function.Supplier<ManagedCache>()
117         {
118             @Override
119             public ManagedCache get()
120             {
121                 if (!caches.containsKey(name) || loader != null)
122                 {
123                     caches.put(name, new WeakSupplier<ManagedCache>((ManagedCache) doCreateCache(name, loader, settings)));
124                 }
125                 return caches.get(name).get();
126             }
127         });
128     }
129 
130     @Override
131     protected ManagedCache createSimpleCache(@Nonnull final String name, @Nonnull final CacheSettings settings)
132     {
133         checkSettingsAreCompatible(name, settings);
134 
135         ManagedCache existing = getManagedCache(name);
136         if (existing != null)
137         {
138             return existing;
139         }
140 
141         return cacheCreationLocks.apply(name).withLock(new java.util.function.Supplier<ManagedCache>()
142         {
143             @Override
144             public ManagedCache get()
145             {
146                 if (!caches.containsKey(name))
147                 {
148                     caches.put(name, new StrongSupplier<ManagedCache>((ManagedCache) doCreateCache(name, null, settings)));
149                 }
150                 return caches.get(name).get();
151             }
152         });
153     }
154 
155     /**
156      * De-registers listeners. This method must be called when the bean is no longer required.
157      */
158     @PreDestroy
159     public void destroy()
160     {
161         mapSettings.removeEntryListener(mapSettingsAddedListenerId);
162         mapSettings.removeEntryListener(mapSettingsUpdatedListenerId);
163         hazelcast.getCluster().removeMembershipListener(membershipListenerId);
164     }
165 
166     @Nonnull
167     @Override
168     public <V> CachedReference<V> getCachedReference(@Nonnull final String name, @Nonnull final Supplier<V> supplier,
169             @Nonnull final CacheSettings settings)
170     {
171         checkSettingsAreCompatible(name, settings);
172 
173         return cacheCreationLocks.apply(name).withLock(new java.util.function.Supplier<CachedReference<V>>()
174         {
175             @Override
176             public CachedReference<V> get()
177             {
178                 caches.put(name, new WeakSupplier<ManagedCache>((ManagedCache) doCreateCachedReference(name, supplier, settings)));
179                 //noinspection unchecked
180                 return (CachedReference<V>) caches.get(name).get();
181             }
182         });
183     }
184 
185     protected void checkSettingsAreCompatible(String name, CacheSettings settings)
186     {
187     }
188 
189     protected MapConfig configureMap(String mapName, CacheSettings settings)
190     {
191         Config config = hazelcast.getConfig();
192         MapConfig mapConfig = config.findMapConfig(mapName);
193         if (!mapConfig.getName().equals(mapName))
194         {
195             mapConfig = convertAndStoreMapConfig(mapName, settings, config, mapConfig);
196 
197             // store the config in the settings map to trigger other nodes to also configure the IMap
198             mapSettings.putIfAbsent(mapName, asSerializable(settings));
199         }
200         else
201         {
202             log.debug("Using existing cache configuration for cache {}", mapName);
203             mapSettings.computeIfAbsent(mapName, (key)-> asSerializable(settings));
204         }
205         updateMapContainer(mapName, mapConfig);
206 
207         return mapConfig.getAsReadOnly();
208     }
209 
210     protected MapConfig reconfigureMap(String mapName, CacheSettings newSettings)
211     {
212         Config config = hazelcast.getConfig();
213 
214         MapConfig baseConfig = config.getMapConfig(mapName);
215 
216         return convertAndStoreMapConfig(mapName, newSettings, config, baseConfig);
217     }
218 
219     /**
220      * Update the cache settings of an existing cache. Called by {@link com.atlassian.cache.ManagedCache} methods
221      *
222      * @param mapName The name of the map to update
223      * @param newSettings CacheSettings object representing the new settings
224      * @return true if the update succeeded, false otherwise
225      */
226     public boolean updateCacheSettings(@Nonnull String mapName, @Nonnull CacheSettings newSettings)
227     {
228         try {
229             //Do nothing if the map doesn't exist
230             MapConfig mapConfig = hazelcast.getConfig().findMapConfig(mapName);
231             if (!mapConfig.getName().equals(mapName)) {
232                 return false;
233             }
234 
235             MapConfig newConfig = reconfigureMap(mapName, newSettings);
236 
237             boolean result = updateMapContainer(mapName, newConfig);
238             if (result) {
239                 // store the config in the settings map to trigger other nodes to also configure the IMap
240                 mapSettings.put(mapName, asSerializable(newSettings));
241             }
242 
243             return result;
244         }catch (ConfigurationException exception) {
245             if(log.isDebugEnabled()) {
246                 log.debug("Updating configuration of {} failed: ", mapName, exception);
247             } else {
248                 log.warn("Updating cache settings on {} is not possible", mapName);
249             }
250             return false;
251         }
252     }
253 
254     private MapConfig convertAndStoreMapConfig(String mapName, CacheSettings newSettings, Config config, MapConfig baseConfig)
255     {
256         MapConfig newConfig = new MapConfig(baseConfig);
257         newConfig.setName(mapName);
258         newConfig.setStatisticsEnabled(true);
259         new HazelcastMapConfigConfigurator().configureMapConfig(newSettings, newConfig, hazelcast.getPartitionService().getPartitions().size());
260         MapConfig oldConfig = config.findMapConfig(mapName);
261         if(oldConfig ==null || !oldConfig.getName().equals(mapName)) {
262             config.addMapConfig(newConfig);
263         }
264         return newConfig;
265     }
266 
267     protected <K, V> Cache<K, V> createAsyncHybridCache(String name, CacheLoader<K, V> loader, CacheSettings settings)
268     {
269         final String topicName = PREFIX_CACHE + name;
270         final ITopic<K> topic = hazelcast.getTopic(topicName);
271 
272         return new HazelcastAsyncHybridCache<K, V>(name, localCacheFactory, topic, loader, this, settings);
273     }
274 
275     protected <V> CachedReference<V> createAsyncHybridCachedReference(String name, Supplier<V> supplier, CacheSettings settings)
276     {
277         final String topicName = PREFIX_CACHE_REFERENCE + name;
278         final ITopic<ReferenceKey> topic = hazelcast.getTopic(topicName);
279 
280         return new HazelcastAsyncHybridCachedReference<V>(name, localCacheFactory, topic, supplier, this, settings);
281     }
282 
283     protected <K, V> Cache<K, V> createDistributedCache(String name, CacheLoader<K, V> loader, CacheSettings settings)
284     {
285         final String mapName = PREFIX_CACHE + name;
286         configureMap(mapName, settings);
287         final IMap<K, OsgiSafe<V>> map = hazelcast.getMap(mapName);
288         final CacheVersion cacheVersion = new CacheVersion(hazelcast, mapName);
289 
290         return new HazelcastCache<K, V>(name, map, loader, cacheVersion, this);
291     }
292 
293     protected <V> CachedReference<V> createDistributedCachedReference(String name, Supplier<V> supplier, CacheSettings settings)
294     {
295         // override the settings to ensure the reference is flushable and the max is set to 1000. A low value for
296         // maxEntries would trigger continuous cache invalidations because of the way map eviction works in Hazelcast.
297         final CacheSettings overriddenSettings = checkNotNull(settings, "settings").override(
298                 new CacheSettingsBuilder().flushable().maxEntries(1000).build());
299 
300         final String mapName = PREFIX_CACHE_REFERENCE + name;
301         configureMap(mapName, overriddenSettings);
302         final IMap<String, OsgiSafe<V>> map = hazelcast.getMap(mapName);
303 
304         return new HazelcastCachedReference<V>(name, map, supplier, this);
305     }
306 
307     protected <K, V> Cache<K, V> createHybridCache(String name, CacheLoader<K, V> loader, CacheSettings settings)
308     {
309         final String mapName = PREFIX_CACHE + name;
310         configureMap(mapName, settings);
311         final IMap<K, Long> map = hazelcast.getMap(mapName);
312 
313         return new HazelcastHybridCache<K, V>(name, localCacheFactory, map, loader, this);
314     }
315 
316     protected <V> CachedReference<V> createHybridCachedReference(String name, Supplier<V> supplier, CacheSettings settings)
317     {
318         final String mapName = PREFIX_CACHE_REFERENCE + name;
319         configureMap(mapName, settings);
320         final IMap<ReferenceKey, Long> map = hazelcast.getMap(mapName);
321 
322         return new HazelcastHybridCachedReference<V>(name, localCacheFactory, map, supplier, this);
323     }
324 
325     private CacheSettings asSerializable(CacheSettings settings)
326     {
327         if (settings instanceof Serializable)
328         {
329             return settings;
330         }
331 
332         // DefaultCacheSettings implements Serializable, make a copy
333         return new CacheSettingsBuilder(settings).build();
334     }
335 
336     private <K, V> Cache<K, V> doCreateCache(String name, CacheLoader<K, V> loader, CacheSettings settings)
337     {
338         if (settings.getLocal(false))
339         {
340             return localCacheFactory.getCache(name, loader, settings);
341         }
342 
343         if (settings.getReplicateViaCopy(true))
344         {
345             return createDistributedCache(name, loader, settings);
346         }
347 
348         if (settings.getReplicateAsynchronously(true))
349         {
350             return createAsyncHybridCache(name, loader, settings);
351         }
352         else
353         {
354             return createHybridCache(name, loader, settings);
355         }
356     }
357 
358     private <V> CachedReference<V> doCreateCachedReference(String name, Supplier<V> supplier, CacheSettings settings)
359     {
360         if (settings.getLocal(false))
361         {
362             return localCacheFactory.getCachedReference(name, supplier, settings);
363         }
364 
365         // remote cached reference
366         if (settings.getReplicateViaCopy(true))
367         {
368             return createDistributedCachedReference(name, supplier, settings);
369         }
370 
371         if (settings.getReplicateAsynchronously(true))
372         {
373             return createAsyncHybridCachedReference(name, supplier, settings);
374         }
375         else
376         {
377             return createHybridCachedReference(name, supplier, settings);
378         }
379     }
380 
381     private MapContainer getMapContainer(@Nonnull String name)
382     {
383         if (mapServiceContext == null)
384         {
385             MapProxyImpl<String, CacheSettings> proxy = hazelcast.getDistributedObject(MapService.SERVICE_NAME, SETTINGS_MAP_NAME);
386             mapServiceContext = proxy.getService().getMapServiceContext();
387         }
388 
389         return mapServiceContext.getMapContainer(name);
390     }
391 
392     private boolean updateMapContainer(String mapName, MapConfig config)
393     {
394         final MapContainer container = getMapContainer(mapName);
395         if (container == null)
396         {
397             return false;
398         }
399 
400         container.setMapConfig(config);
401         container.initEvictor();
402 
403         return true;
404     }
405 
406     private void maybeUpdateMapContainers()
407     {
408         for (Map.Entry<String, CacheSettings> entry : mapSettings.entrySet())
409         {
410             configureMap(entry.getKey(), entry.getValue());
411         }
412     }
413 
414     /**
415      * Get the current MapConfig for the given IMap
416      *
417      * @param mapName the name of the map
418      * @return The current mapConfig for that map if it exists, null otherwise
419      */
420     @Nullable
421     MapConfig getMapConfig(@Nonnull String mapName)
422     {
423         MapContainer mapContainer = getMapContainer(mapName);
424         MapConfig mapConfig = mapContainer.getMapConfig();
425         if (!mapConfig.getName().equals(mapName))
426         {
427             return null;
428         }
429         else
430         {
431             return mapConfig;
432         }
433     }
434 
435     /**
436      * Gets the original CacheSettings object used to configure the given map, before it was transformed into a
437      * Hazelcast MapConfig
438      *
439      * @param mapName the name of the map
440      * @return The original CacheSettings object, or null if the map doesn't exist
441      */
442     @Nullable
443     CacheSettings getCacheSettings(@Nonnull String mapName)
444     {
445         return mapSettings.get(mapName);
446     }
447 }