View Javadoc
1   package com.atlassian.cache.vcache;
2   
3   import java.util.Collection;
4   import java.util.Objects;
5   import java.util.Optional;
6   import javax.annotation.Nonnull;
7   import javax.annotation.Nullable;
8   
9   import com.atlassian.cache.Cache;
10  import com.atlassian.cache.CacheEntryListener;
11  import com.atlassian.cache.CacheException;
12  import com.atlassian.cache.CacheLoader;
13  import com.atlassian.cache.CacheSettings;
14  import com.atlassian.cache.Supplier;
15  import com.atlassian.vcache.JvmCache;
16  import com.atlassian.vcache.PutPolicy;
17  import com.atlassian.vcache.StableReadExternalCache;
18  
19  import com.google.common.base.Throwables;
20  import org.slf4j.Logger;
21  import org.slf4j.LoggerFactory;
22  
23  import static com.atlassian.cache.vcache.Utils.asString;
24  import static com.atlassian.vcache.VCacheUtils.join;
25  import static java.util.Objects.requireNonNull;
26  
27  class HybridCache<K, V>
28          extends ManagedCacheSupport
29          implements Cache<K, V>
30  {
31      private static final Logger log = LoggerFactory.getLogger(HybridCache.class);
32  
33      private final JvmCache<K, Versioned<V>> localVersioned;
34      private final StableReadExternalCache<String> globalVersions;
35      private final CacheLoader<K, V> cacheLoader;
36  
37      HybridCache(String name,
38                  JvmCache<K, Versioned<V>> localVersioned,
39                  StableReadExternalCache<String> globalVersions,
40                  @Nullable CacheLoader<K, V> cacheLoader,
41                  CacheSettings settings)
42      {
43          super(name, settings);
44          this.localVersioned = requireNonNull(localVersioned);
45          this.globalVersions = requireNonNull(globalVersions);
46          this.cacheLoader = cacheLoader;
47      }
48  
49      @Override
50      public void clear()
51      {
52          globalVersions.removeAll();
53          localVersioned.removeAll();
54      }
55  
56      @Override
57      public boolean isLocal()
58      {
59          return false;
60      }
61  
62      @Override
63      public boolean containsKey(@Nonnull K key)
64      {
65          return localVersioned.get(key).isPresent();
66      }
67  
68      @Nonnull
69      @Override
70      public Collection<K> getKeys()
71      {
72          return localVersioned.getKeys();
73      }
74  
75      @Nullable
76      @Override
77      public V get(@Nonnull K key)
78      {
79          if (cacheLoader != null)
80          {
81              return get(key, () -> cacheLoader.load(key));
82          }
83  
84          final Optional<Versioned<V>> versionedOptional = localVersioned.get(key);
85  
86          if (versionedOptional.isPresent())
87          {
88              final Optional<String> globalVersion = join(globalVersions.get(asString(key)));
89  
90              if (globalVersion.isPresent()
91                      && globalVersion.get().equals(versionedOptional.get().getVersion()))
92              {
93                  // versions match, cache is up to date
94                  return versionedOptional.get().getValue();
95              }
96  
97              // Value in localVersioned is outdated, clear it.
98              //noinspection unchecked
99              localVersioned.remove(key);
100         }
101 
102         return null; // No match
103     }
104 
105     @Nonnull
106     @Override
107     public V get(@Nonnull K key, @Nonnull Supplier<? extends V> valueSupplier)
108     {
109         final Optional<Versioned<V>> versionedOptional = localVersioned.get(key);
110 
111         if (versionedOptional.isPresent())
112         {
113             final Optional<String> globalVersion = join(globalVersions.get(asString(key)));
114 
115             if (globalVersion.isPresent()
116                     && globalVersion.get().equals(versionedOptional.get().getVersion()))
117             {
118                 // versions match, cache is up to date
119                 return versionedOptional.get().getValue();
120             }
121 
122             // Value in localVersioned is outdated, clear it.
123             //noinspection unchecked
124             localVersioned.remove(key);
125         }
126 
127         return localVersioned.get(key, () -> loadAndVersion(key, valueSupplier)).getValue();
128     }
129 
130     @Override
131     public void put(@Nonnull K key, @Nonnull V value)
132     {
133         final String newVersion = newVersion(key);
134         localVersioned.put(key, new Versioned<>(newVersion, value));
135     }
136 
137     /**
138      * {@inheritDoc}
139      * <p>
140      * This implementation has a weak spot: When there is an existing value, this method must return it. However, if
141      * that value is stored in a remote node, it cannot be returned. <tt>null</tt> is returned instead, which signals
142      * success. In order to not break any contracts, this implementation will invalidate the value on all nodes in this
143      * situation.
144      */
145     @Nullable
146     @Override
147     public V putIfAbsent(@Nonnull K key, @Nonnull V value)
148     {
149         final String newVersion = Utils.uniqueId();
150         final Versioned<V> versioned = new Versioned<>(newVersion, value);
151         final Optional<Versioned<V>> oldValue = localVersioned.putIfAbsent(key, versioned);
152 
153         if (oldValue.isPresent())
154         {
155             // There is already an existing value, so just return it.
156             return oldValue.get().getValue();
157         }
158 
159         // Entry was missing, so need to do a PUT_ALWAYS on the version number, to invalidate the value on all
160         // other nodes (see Javadoc comment above)
161         if (!join(globalVersions.put(asString(key), newVersion, PutPolicy.PUT_ALWAYS)))
162         {
163             log.info("Unable to add a version for key '{}' in cache with name '{}'", key, localVersioned.getName());
164         }
165 
166         return null;
167     }
168 
169     @Override
170     public void remove(@Nonnull K key)
171     {
172         // add a new version to trigger a cluster-wide invalidation
173         newVersion(key);
174         //noinspection unchecked
175         localVersioned.remove(key);
176     }
177 
178     @Override
179     public boolean remove(@Nonnull K key, @Nonnull V value)
180     {
181         final Optional<Versioned<V>> localValue = localVersioned.get(key);
182 
183         if (!localValue.isPresent() || !Objects.equals(value, localValue.get().getValue()))
184         {
185             return false;
186         }
187 
188         remove(key);
189         return true;
190     }
191 
192     @Override
193     public void removeAll()
194     {
195         if (isFlushable())
196         {
197             globalVersions.removeAll();
198             localVersioned.removeAll();
199         }
200     }
201 
202     @Override
203     public boolean replace(@Nonnull K key, @Nonnull V oldValue, @Nonnull V newValue)
204     {
205         final Optional<Versioned<V>> localValue = localVersioned.get(key);
206 
207         if (!localValue.isPresent() || !Objects.equals(oldValue, localValue.get().getValue()))
208         {
209             return false;
210         }
211 
212         final String newVersion = newVersion(key);
213         if (localVersioned.replaceIf(key, localValue.get(), new Versioned<>(newVersion, newValue)))
214         {
215             join(globalVersions.put(asString(key), newVersion, PutPolicy.PUT_ALWAYS));
216             return true;
217         }
218 
219         return false;
220     }
221 
222     @Override
223     public void addListener(@Nonnull CacheEntryListener<K, V> listener, boolean includeValues)
224     {
225         throw new UnsupportedOperationException("Unsupported when using the VCache implementation");
226     }
227 
228     @Override
229     public void removeListener(@Nonnull CacheEntryListener<K, V> listener)
230     {
231         throw new UnsupportedOperationException("Unsupported when using the VCache implementation");
232     }
233 
234     @Nonnull
235     private String getVersion(K key)
236     {
237         // get the existing value, or create a new one
238         return join(globalVersions.get(asString(key), Utils::uniqueId));
239     }
240 
241     @Nonnull
242     private Versioned<V> loadAndVersion(final K key, Supplier<? extends V> supplier)
243     {
244         try
245         {
246             final String version = getVersion(key);
247 
248             final V value = supplier.get();
249             if (value == null)
250             {
251                 throw new CacheException("The generated value for cache '" + getName() + "' was null for key '" +
252                         key + "'. Null values are not supported.");
253             }
254 
255             log.debug("Generated value '{}' for key '{}' in cache with name '{}'", value, key, localVersioned.getName());
256             return new Versioned<>(version, value);
257         }
258         catch (RuntimeException e)
259         {
260             Throwables.propagateIfInstanceOf(e, CacheException.class);
261             throw new CacheException(
262                     "Error generating a value for key '" + key + "' in cache '" + localVersioned.getName() + "'", e);
263         }
264     }
265 
266     @Nonnull
267     private String newVersion(K key)
268     {
269         final String result = Utils.uniqueId();
270         join(globalVersions.put(asString(key), result, PutPolicy.PUT_ALWAYS));
271         return result;
272     }
273 }