View Javadoc

1   package com.atlassian.cache.hazelcast;
2   
3   import java.io.Serializable;
4   import java.util.Map;
5   
6   import javax.annotation.Nonnull;
7   import javax.annotation.Nullable;
8   import javax.annotation.PostConstruct;
9   import javax.annotation.PreDestroy;
10  
11  import com.atlassian.cache.Cache;
12  import com.atlassian.cache.CacheFactory;
13  import com.atlassian.cache.CacheLoader;
14  import com.atlassian.cache.CacheSettings;
15  import com.atlassian.cache.CacheSettingsBuilder;
16  import com.atlassian.cache.CacheSettingsDefaultsProvider;
17  import com.atlassian.cache.CachedReference;
18  import com.atlassian.cache.ManagedCache;
19  import com.atlassian.cache.Supplier;
20  import com.atlassian.cache.impl.AbstractCacheManager;
21  import com.atlassian.cache.impl.ReferenceKey;
22  import com.atlassian.cache.impl.StrongSupplier;
23  import com.atlassian.cache.impl.WeakSupplier;
24  import com.atlassian.hazelcast.serialization.OsgiSafe;
25  
26  import com.hazelcast.config.Config;
27  import com.hazelcast.config.MapConfig;
28  import com.hazelcast.core.EntryAdapter;
29  import com.hazelcast.core.EntryEvent;
30  import com.hazelcast.core.HazelcastInstance;
31  import com.hazelcast.core.IMap;
32  import com.hazelcast.core.MembershipAdapter;
33  import com.hazelcast.core.MembershipEvent;
34  import com.hazelcast.map.impl.MapContainer;
35  import com.hazelcast.map.impl.MapService;
36  import com.hazelcast.map.impl.MapServiceContext;
37  import com.hazelcast.map.impl.proxy.MapProxyImpl;
38  
39  import org.slf4j.Logger;
40  import org.slf4j.LoggerFactory;
41  
42  import static com.google.common.base.Preconditions.checkNotNull;
43  
44  /**
45   * Hazelcast implementation of the {@link com.atlassian.cache.CacheManager} contract
46   */
47  public class HazelcastCacheManager extends AbstractCacheManager
48  {
49      private static final Logger log = LoggerFactory.getLogger(HazelcastCacheManager.class);
50  
51      protected static final String PREFIX = "atlassian-cache.";
52      protected static final String PREFIX_CACHE = PREFIX + "Cache.";
53      protected static final String PREFIX_CACHE_REFERENCE = PREFIX + "CacheReference.";
54      protected static final String SETTINGS_MAP_NAME = PREFIX + "settings";
55  
56      private final HazelcastInstance hazelcast;
57      private final CacheFactory localCacheFactory;
58      private final IMap<String, CacheSettings> mapSettings;
59      private final String mapSettingsListenerId;
60      private final String membershipListenerId;
61  
62      private MapServiceContext mapServiceContext;
63  
64      public HazelcastCacheManager(HazelcastInstance hazelcast, CacheFactory localCacheFactory,
65              CacheSettingsDefaultsProvider cacheSettingsDefaultsProvider)
66      {
67          super(cacheSettingsDefaultsProvider);
68  
69          this.hazelcast = hazelcast;
70          this.localCacheFactory = localCacheFactory;
71          this.mapSettings = hazelcast.getMap(SETTINGS_MAP_NAME);
72  
73          // add a listener to be notified of remote changes to the map config so the local config can be updated too
74          this.mapSettingsListenerId = mapSettings.addEntryListener(new EntryAdapter<String, CacheSettings>()
75          {
76              @Override
77              public void entryAdded(EntryEvent<String, CacheSettings> event)
78              {
79                  configureMap(event.getKey(), event.getValue());
80              }
81  
82              @Override
83              public void entryUpdated(EntryEvent<String, CacheSettings> event)
84              {
85                  reconfigureMap(event.getKey(), event.getValue());
86              }
87          }, true);
88  
89          // add a listener for join events to ensure that the local cache configs are updated
90          this.membershipListenerId = hazelcast.getCluster().addMembershipListener(new MembershipAdapter()
91          {
92              @Override
93              public void memberAdded(MembershipEvent membershipEvent)
94              {
95                  maybeUpdateMapContainers();
96              }
97          });
98      }
99  
100     /**
101      * Initializes the manager. This method must be called after the bean has been created.
102      */
103     @PostConstruct
104     public void init()
105     {
106         maybeUpdateMapContainers();
107     }
108 
109     @Override
110     protected <K, V> ManagedCache createComputingCache(@Nonnull final String name, @Nonnull final CacheSettings settings, final CacheLoader<K, V> loader)
111     {
112         checkSettingsAreCompatible(name, settings);
113 
114         // when a loader is provided, always create a new ManagedCache to ensure the correct loader is being used.
115         // if the cache already existed, the backing values will be reused
116         return cacheCreationLocks.get(name).withLock(new com.atlassian.util.concurrent.Supplier<ManagedCache>()
117         {
118             @Override
119             public ManagedCache get()
120             {
121                 if (!caches.containsKey(name) || loader != null)
122                 {
123                     caches.put(name, new WeakSupplier<ManagedCache>((ManagedCache) doCreateCache(name, loader, settings)));
124                 }
125                 return caches.get(name).get();
126             }
127         });
128     }
129 
130     @Override
131     protected ManagedCache createSimpleCache(@Nonnull final String name, @Nonnull final CacheSettings settings)
132     {
133         checkSettingsAreCompatible(name, settings);
134 
135         ManagedCache existing = getManagedCache(name);
136         if (existing != null)
137         {
138             return existing;
139         }
140 
141         return cacheCreationLocks.get(name).withLock(new com.atlassian.util.concurrent.Supplier<ManagedCache>()
142         {
143             @Override
144             public ManagedCache get()
145             {
146                 if (!caches.containsKey(name))
147                 {
148                     caches.put(name, new StrongSupplier<ManagedCache>((ManagedCache) doCreateCache(name, null, settings)));
149                 }
150                 return caches.get(name).get();
151             }
152         });
153     }
154 
155     /**
156      * De-registers listeners. This method must be called when the bean is no longer required.
157      */
158     @PreDestroy
159     public void destroy()
160     {
161         mapSettings.removeEntryListener(mapSettingsListenerId);
162         hazelcast.getCluster().removeMembershipListener(membershipListenerId);
163     }
164 
165     @Nonnull
166     @Override
167     public <V> CachedReference<V> getCachedReference(@Nonnull final String name, @Nonnull final Supplier<V> supplier,
168             @Nonnull final CacheSettings settings)
169     {
170         checkSettingsAreCompatible(name, settings);
171 
172         return cacheCreationLocks.get(name).withLock(new com.atlassian.util.concurrent.Supplier<CachedReference<V>>()
173         {
174             @Override
175             public CachedReference<V> get()
176             {
177                 caches.put(name, new WeakSupplier<ManagedCache>((ManagedCache) doCreateCachedReference(name, supplier, settings)));
178                 //noinspection unchecked
179                 return (CachedReference<V>) caches.get(name).get();
180             }
181         });
182     }
183 
184     protected void checkSettingsAreCompatible(String name, CacheSettings settings)
185     {
186     }
187 
188     protected MapConfig configureMap(String mapName, CacheSettings settings)
189     {
190         Config config = hazelcast.getConfig();
191         MapConfig mapConfig = config.findMapConfig(mapName);
192         if (!mapConfig.getName().equals(mapName))
193         {
194             mapConfig = convertAndStoreMapConfig(mapName, settings, config, mapConfig);
195 
196             // store the config in the settings map to trigger other nodes to also configure the IMap
197             mapSettings.putIfAbsent(mapName, asSerializable(settings));
198         }
199         else
200         {
201             log.debug("Using existing cache configuration for cache {}", mapName);
202         }
203 
204         updateMapContainerIfAbsent(mapName, mapConfig);
205 
206         return mapConfig.getAsReadOnly();
207     }
208 
209     protected MapConfig reconfigureMap(String mapName, CacheSettings newSettings)
210     {
211         Config config = hazelcast.getConfig();
212 
213         MapConfig baseConfig = config.getMapConfig(mapName);
214 
215         return convertAndStoreMapConfig(mapName, newSettings, config, baseConfig);
216     }
217 
218     /**
219      * Update the cache settings of an existing cache. Called by {@link com.atlassian.cache.ManagedCache} methods
220      *
221      * @param mapName The name of the map to update
222      * @param newSettings CacheSettings object representing the new settings
223      * @return true if the update succeeded, false otherwise
224      */
225     public boolean updateCacheSettings(@Nonnull String mapName, @Nonnull CacheSettings newSettings)
226     {
227         //Do nothing if the map doesn't exist
228         MapConfig mapConfig = hazelcast.getConfig().findMapConfig(mapName);
229         if (!mapConfig.getName().equals(mapName))
230         {
231             return false;
232         }
233 
234         MapConfig newConfig = reconfigureMap(mapName, newSettings);
235 
236         boolean result = updateMapContainer(mapName, newConfig);
237         if (result)
238         {
239             // store the config in the settings map to trigger other nodes to also configure the IMap
240             mapSettings.put(mapName, asSerializable(newSettings));
241         }
242 
243         return result;
244     }
245 
246     private MapConfig convertAndStoreMapConfig(String mapName, CacheSettings newSettings, Config config, MapConfig baseConfig)
247     {
248         MapConfig newConfig = new MapConfig(baseConfig);
249         newConfig.setName(mapName);
250         newConfig.setStatisticsEnabled(true);
251         new HazelcastMapConfigConfigurator().configureMapConfig(newSettings, newConfig);
252         config.addMapConfig(newConfig);
253         return newConfig;
254     }
255 
256     protected <K, V> Cache<K, V> createDistributedCache(String name, CacheLoader<K, V> loader, CacheSettings settings)
257     {
258         final String mapName = PREFIX_CACHE + name;
259         configureMap(mapName, settings);
260         final IMap<K, OsgiSafe<V>> map = hazelcast.getMap(mapName);
261         final CacheVersion cacheVersion = new CacheVersion(hazelcast, mapName);
262 
263         return new HazelcastCache<K, V>(name, map, loader, cacheVersion, this);
264     }
265 
266     protected <V> CachedReference<V> createDistributedCachedReference(String name, Supplier<V> supplier, CacheSettings settings)
267     {
268         // override the settings to ensure the reference is flushable and the max is set to 1000. A low value for
269         // maxEntries would trigger continuous cache invalidations because of the way map eviction works in Hazelcast.
270         final CacheSettings overriddenSettings = checkNotNull(settings, "settings").override(
271                 new CacheSettingsBuilder().flushable().maxEntries(1000).build());
272 
273         final String mapName = PREFIX_CACHE_REFERENCE + name;
274         configureMap(mapName, overriddenSettings);
275         final IMap<String, OsgiSafe<V>> map = hazelcast.getMap(mapName);
276 
277         return new HazelcastCachedReference<V>(name, map, supplier, this);
278     }
279 
280     protected <K, V> Cache<K, V> createHybridCache(String name, CacheLoader<K, V> loader, CacheSettings settings)
281     {
282         final String mapName = PREFIX_CACHE + name;
283         configureMap(mapName, settings);
284         final IMap<K, Long> map = hazelcast.getMap(mapName);
285 
286         return new HazelcastHybridCache<K, V>(name, localCacheFactory, map, loader, this);
287     }
288 
289     protected <V> CachedReference<V> createHybridCachedReference(String name, Supplier<V> supplier, CacheSettings settings)
290     {
291         final String mapName = PREFIX_CACHE_REFERENCE + name;
292         configureMap(mapName, settings);
293         final IMap<ReferenceKey, Long> map = hazelcast.getMap(mapName);
294 
295         return new HazelcastHybridCachedReference<V>(name, localCacheFactory, map, supplier, this);
296     }
297 
298     private CacheSettings asSerializable(CacheSettings settings)
299     {
300         if (settings instanceof Serializable)
301         {
302             return settings;
303         }
304 
305         // DefaultCacheSettings implements Serializable, make a copy
306         return new CacheSettingsBuilder(settings).build();
307     }
308 
309     private <K, V> Cache<K, V> doCreateCache(String name, CacheLoader<K, V> loader, CacheSettings settings)
310     {
311         if (settings.getLocal(false))
312         {
313             return localCacheFactory.getCache(name, loader, settings);
314         }
315 
316         if (settings.getReplicateViaCopy(true))
317         {
318             return createDistributedCache(name, loader, settings);
319         }
320 
321         return createHybridCache(name, loader, settings);
322     }
323 
324     private <V> CachedReference<V> doCreateCachedReference(String name, Supplier<V> supplier, CacheSettings settings)
325     {
326         if (settings.getLocal(false))
327         {
328             return localCacheFactory.getCachedReference(name, supplier, settings);
329         }
330 
331         // remote cached reference
332         if (settings.getReplicateViaCopy(true))
333         {
334             return createDistributedCachedReference(name, supplier, settings);
335         }
336 
337         return createHybridCachedReference(name, supplier, settings);
338     }
339 
340     private MapContainer getMapContainer(@Nonnull String name)
341     {
342         if (mapServiceContext == null)
343         {
344             MapProxyImpl<String, CacheSettings> proxy = hazelcast.getDistributedObject(MapService.SERVICE_NAME, SETTINGS_MAP_NAME);
345             mapServiceContext = proxy.getService().getMapServiceContext();
346         }
347 
348         return mapServiceContext.getMapContainer(name);
349     }
350 
351     private boolean updateMapContainer(String mapName, MapConfig config)
352     {
353         final MapContainer container = getMapContainer(mapName);
354         if (container == null)
355         {
356             return false;
357         }
358 
359         container.setMapConfig(config);
360 
361         return true;
362     }
363 
364     private boolean updateMapContainerIfAbsent(String mapName, MapConfig config)
365     {
366         final MapContainer container = getMapContainer(mapName);
367         if (container == null || container.getMapConfig().getName().equals(config.getName()))
368         {
369             // container does not exist or is already configured
370             return false;
371         }
372 
373         container.setMapConfig(config);
374 
375         return true;
376     }
377 
378     private void maybeUpdateMapContainers()
379     {
380         for (Map.Entry<String, CacheSettings> entry : mapSettings.entrySet())
381         {
382             configureMap(entry.getKey(), entry.getValue());
383         }
384     }
385 
386     /**
387      * Get the current MapConfig for the given IMap
388      *
389      * @param mapName the name of the map
390      * @return The current mapConfig for that map if it exists, null otherwise
391      */
392     @Nullable
393     MapConfig getMapConfig(@Nonnull String mapName)
394     {
395         MapConfig mapConfig = hazelcast.getConfig().findMapConfig(mapName);
396         if (!mapConfig.getName().equals(mapName))
397         {
398             return null;
399         }
400         else
401         {
402             return mapConfig;
403         }
404     }
405 
406     /**
407      * Gets the original CacheSettings object used to configure the given map, before it was transformed into a
408      * Hazelcast MapConfig
409      *
410      * @param mapName the name of the map
411      * @return The original CacheSettings object, or null if the map doesn't exist
412      */
413     @Nullable
414     CacheSettings getCacheSettings(@Nonnull String mapName)
415     {
416         return mapSettings.get(mapName);
417     }
418 }