View Javadoc

1   package com.atlassian.vcache.internal.memcached;
2   
3   import java.util.HashMap;
4   import java.util.Map;
5   import java.util.Optional;
6   import java.util.Set;
7   import java.util.concurrent.CompletableFuture;
8   import java.util.concurrent.ExecutionException;
9   import java.util.function.Function;
10  import java.util.function.Supplier;
11  import java.util.stream.Collectors;
12  import java.util.stream.StreamSupport;
13  import javax.annotation.Nonnull;
14  
15  import com.atlassian.vcache.ExternalCache;
16  import com.atlassian.vcache.ExternalCacheSettings;
17  import com.atlassian.vcache.Marshaller;
18  import com.atlassian.vcache.MarshallerException;
19  import com.atlassian.vcache.internal.RequestContext;
20  import com.atlassian.vcache.internal.core.ExternalCacheKeyGenerator;
21  import com.atlassian.vcache.internal.core.VCacheUtils;
22  
23  import net.spy.memcached.MemcachedClientIF;
24  import net.spy.memcached.OperationTimeoutException;
25  import org.slf4j.Logger;
26  
27  import static com.atlassian.vcache.internal.NameValidator.requireValidCacheName;
28  import static com.atlassian.vcache.internal.memcached.MemcachedUtils.isEmpty;
29  import static com.atlassian.vcache.internal.memcached.MemcachedUtils.perform;
30  import static com.atlassian.vcache.internal.memcached.MemcachedUtils.unmarshall;
31  import static java.util.Objects.requireNonNull;
32  
33  abstract class AbstractStableReadExternalCache<V> implements ExternalCache<V>
34  {
35      protected final Supplier<MemcachedClientIF> clientSupplier;
36      protected final Supplier<RequestContext> contextSupplier;
37      protected final ExternalCacheKeyGenerator keyGenerator;
38      protected final String name;
39      protected final Marshaller<V> valueMarshaller;
40      protected final ExternalCacheSettings settings;
41      protected final int defaultTtl;
42  
43      AbstractStableReadExternalCache(
44              Supplier<MemcachedClientIF> clientSupplier,
45              Supplier<RequestContext> contextSupplier,
46              ExternalCacheKeyGenerator keyGenerator,
47              String name,
48              Marshaller<V> valueMarshaller,
49              ExternalCacheSettings settings)
50      {
51          this.clientSupplier = requireNonNull(clientSupplier);
52          this.contextSupplier = requireNonNull(contextSupplier);
53          this.keyGenerator = requireNonNull(keyGenerator);
54          this.name = requireValidCacheName(name);
55          this.valueMarshaller = requireNonNull(valueMarshaller);
56          this.settings = requireNonNull(settings);
57          this.defaultTtl = VCacheUtils.roundUpToSeconds(settings.getDefaultTtl().get());
58      }
59  
60      /**
61       * Returns the cache context for the current request.
62       * @return the cache context for the current request.
63       * @throws OperationTimeoutException
64       */
65      @Nonnull
66      abstract StableReadRequestContext<V> ensureCacheContext() throws OperationTimeoutException;
67  
68      /**
69       * Returns the logging instance for the implementation class.
70       * @return the logging instance for the implementation class.
71       */
72      @Nonnull
73      abstract Logger getLogger();
74  
75      /**
76       * Handles the creation of an entry, if required.
77       *
78       * @param internalKey the internal key for the entry.
79       * @param supplier called to create the value, if required
80       * @return the value associated with the key.
81       */
82      @Nonnull
83      abstract V handleCreation(String internalKey, Supplier<V> supplier)
84              throws MarshallerException, ExecutionException, InterruptedException;
85  
86      /**
87       * Handles the creation of a number of entries
88       * @param factory called to create the required values.
89       * @param externalKeys the external keys for the entries.
90       *
91       * @return the values associated with the supplied keys (mapped on internal key)
92       */
93      @Nonnull
94      abstract Map<String, V> handleCreation(
95          Function<Set<String>, Map<String, V>> factory, Set<String> externalKeys)
96              throws ExecutionException, InterruptedException;
97  
98      /**
99       * Checks if a value is recorded for a specified internal key.
100      */
101     @Nonnull
102     abstract Optional<Optional<V>> checkValueRecorded(String internalKey);
103 
104     /**
105      * Checks for values recorded for specified internal keys.
106      */
107     @Nonnull
108     abstract Map<String, Optional<V>> checkValuesRecorded(Iterable<String> internalKeys);
109 
110     @Nonnull
111     @Override
112     public final String getName()
113     {
114         return name;
115     }
116 
117     @Nonnull
118     @Override
119     public final CompletableFuture<Optional<V>> get(String internalKey)
120     {
121         return perform(() -> {
122             final StableReadRequestContext<V> cacheContext = ensureCacheContext();
123 
124             final Optional<Optional<V>> prior = checkValueRecorded(internalKey);
125 
126             return prior.orElseGet(() -> {
127                 final String externalKey = cacheContext.externalEntryKeyFor(internalKey);
128 
129                 final Optional<V> result = unmarshall(clientSupplier.get().get(externalKey), valueMarshaller);
130                 cacheContext.recordValue(internalKey, result);
131                 return result;
132             });
133         });
134     }
135 
136     @Nonnull
137     @Override
138     public final CompletableFuture<V> get(String internalKey, Supplier<V> supplier)
139     {
140         return perform(() -> {
141             final StableReadRequestContext<V> cacheContext = ensureCacheContext();
142 
143             final Optional<Optional<V>> prior = checkValueRecorded(internalKey);
144             if (prior.isPresent() && prior.get().isPresent())
145             {
146                 return prior.get().get();
147             }
148 
149             if (prior.isPresent())
150             {
151                 if (prior.get().isPresent())
152                 {
153                     return prior.get().get();
154                 }
155                 else
156                 {
157                     getLogger().trace("Cache {}, creating candidate for key {}", name, internalKey);
158                     return handleCreation(internalKey, supplier);
159                 }
160             }
161 
162             // Either way, there was no prior value recorded, or it was blank. So need to create the value
163             // and write it back.
164             final String externalKey = cacheContext.externalEntryKeyFor(internalKey);
165             final Optional<V> result = unmarshall(clientSupplier.get().get(externalKey), valueMarshaller);
166             if (result.isPresent())
167             {
168                 // A valid value exists in the external cache
169                 cacheContext.recordValue(internalKey, result);
170                 return result.get();
171             }
172 
173             getLogger().trace("Cache {}, creating candidate for key {}", name, internalKey);
174             return handleCreation(internalKey, supplier);
175         });
176     }
177 
178     @Nonnull
179     @Override
180     public final CompletableFuture<Map<String, Optional<V>>> getBulk(Iterable<String> internalKeys)
181     {
182         return perform(() -> {
183             if (isEmpty(internalKeys))
184             {
185                 return new HashMap<>();
186             }
187 
188             // Get the recorded values first
189             final StableReadRequestContext<V> cacheContext = ensureCacheContext();
190             final Map<String, Optional<V>> grandResult = checkValuesRecorded(internalKeys);
191 
192             // Calculate the externalKeys for the entries that are missing
193             final Set<String> missingExternalKeys = StreamSupport.stream(internalKeys.spliterator(), false)
194                     .filter(k -> !grandResult.containsKey(k))
195                     .map(cacheContext::externalEntryKeyFor)
196                     .collect(Collectors.toSet());
197 
198             if (missingExternalKeys.isEmpty())
199             {
200                 getLogger().trace("Cache {}: getBulk(): have all the requested entries cached", name);
201                 return grandResult;
202             }
203             getLogger().trace("Cache {}: getBulk(): not cached {} requested entries", name, missingExternalKeys.size());
204 
205             // Get the missing values.
206             // Returns map of keys that contain values, so need to handle the missing ones
207             final Map<String, Object> haveValues = clientSupplier.get().getBulk(missingExternalKeys);
208 
209             return missingExternalKeys.stream().collect(
210                     () -> grandResult,
211                     (m, k) -> {
212                         final Optional<V> result = unmarshall(haveValues.get(k), valueMarshaller);
213                         cacheContext.recordValue(cacheContext.internalEntryKeyFor(k), result);
214                         m.put(cacheContext.internalEntryKeyFor(k), result);
215                     },
216                     Map::putAll
217             );
218         });
219     }
220 
221     @Nonnull
222     @Override
223     public final CompletableFuture<Map<String, V>> getBulk(
224             Function<Set<String>, Map<String, V>> factory, Iterable<String> internalKeys)
225     {
226         return perform(() -> {
227             if (isEmpty(internalKeys))
228             {
229                 return new HashMap<>();
230             }
231 
232             // Get the recorded values first and collect the ones that have values.
233             final StableReadRequestContext<V> cacheContext = ensureCacheContext();
234             final Map<String, V> grandResult = checkValuesRecorded(internalKeys).entrySet().stream()
235                     .filter(e -> e.getValue().isPresent())
236                     .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().get()));
237 
238             // Calculate the candidate externalKeys for the entries that are missing
239             final Set<String> candidateMissingExternalKeys = StreamSupport.stream(internalKeys.spliterator(), false)
240                     .filter(k -> !grandResult.containsKey(k))
241                     .map(cacheContext::externalEntryKeyFor)
242                     .collect(Collectors.toSet());
243 
244             // Bail out if we have all the entries requested
245             if (candidateMissingExternalKeys.isEmpty())
246             {
247                 getLogger().trace("Cache {}: getBulk(Function): had all the requested entries cached", name);
248                 return grandResult;
249             }
250             getLogger().trace("Cache {}: getBulk(Function): checking external cache for {} keys",
251                     name, candidateMissingExternalKeys.size());
252 
253             final Map<String, V> missingValues = handleCreation(factory, candidateMissingExternalKeys);
254             cacheContext.recordValues(missingValues);
255             grandResult.putAll(missingValues);
256 
257             return grandResult;
258         });
259     }
260 }