View Javadoc

1   package com.atlassian.vcache.internal.core.service;
2   
3   import java.util.HashMap;
4   import java.util.Map;
5   import java.util.Optional;
6   import java.util.Set;
7   import java.util.concurrent.CompletableFuture;
8   import java.util.concurrent.ExecutionException;
9   import java.util.function.Function;
10  import java.util.function.Supplier;
11  import java.util.stream.Collectors;
12  import java.util.stream.StreamSupport;
13  import javax.annotation.Nonnull;
14  
15  import com.atlassian.vcache.MarshallerException;
16  
17  import static com.atlassian.vcache.internal.core.VCacheUtils.isEmpty;
18  
19  /**
20   * Provides operations common to both {@link com.atlassian.vcache.StableReadExternalCache} and
21   * {@link com.atlassian.vcache.TransactionalExternalCache} instances.
22   *
23   * @param <V> the value type
24   */
25  public abstract class AbstractNonDirectExternalCache<V>
26          extends AbstractExternalCache<V>
27  {
28  
29      protected AbstractNonDirectExternalCache(String name)
30      {
31          super(name);
32      }
33  
34      /**
35       * Handles the creation of an entry, if required.
36       *
37       * @param internalKey the internal key for the entry.
38       * @param supplier called to create the value, if required
39       * @return the value associated with the key.
40       */
41      @Nonnull
42      protected abstract V handleCreation(String internalKey, Supplier<V> supplier)
43              throws MarshallerException, ExecutionException, InterruptedException;
44  
45      /**
46       * Handles the creation of a number of entries
47       * @param factory called to create the required values.
48       * @param externalKeys the external keys for the entries.
49       *
50       * @return the values associated with the supplied keys (mapped on internal key)
51       */
52      @Nonnull
53      protected abstract Map<String, V> handleCreation(
54              Function<Set<String>, Map<String, V>> factory, Set<String> externalKeys)
55              throws ExecutionException, InterruptedException;
56  
57      /**
58       * Checks if a value is recorded for a specified internal key.
59       */
60      @Nonnull
61      protected abstract Optional<Optional<V>> checkValueRecorded(String internalKey);
62  
63      /**
64       * Checks for values recorded for specified internal keys.
65       */
66      @Nonnull
67      protected abstract Map<String, Optional<V>> checkValuesRecorded(Iterable<String> internalKeys);
68  
69      /**
70       * Performs a direct get operation against the external cache using the supplied external key.
71       */
72      @Nonnull
73      protected abstract Optional<V> directGet(String externalKey);
74  
75      /**
76       * Performs a direct bulk get operation against the external cache using the supplied external keys.
77       */
78      @Nonnull
79      protected abstract Map<String, Optional<V>> directGetBulk(Set<String> externalKeys);
80  
81      @Nonnull
82      @Override
83      public final CompletableFuture<Optional<V>> get(String internalKey)
84      {
85          return perform(() -> {
86              final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
87  
88              final Optional<Optional<V>> prior = checkValueRecorded(internalKey);
89  
90              return prior.orElseGet(() -> {
91                  final String externalKey = cacheContext.externalEntryKeyFor(internalKey);
92  
93                  final Optional<V> result = directGet(externalKey);
94                  cacheContext.recordValue(internalKey, result);
95                  return result;
96              });
97          });
98      }
99  
100     @Nonnull
101     @Override
102     public final CompletableFuture<V> get(String internalKey, Supplier<V> supplier)
103     {
104         return perform(() -> {
105             final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
106 
107             final Optional<Optional<V>> prior = checkValueRecorded(internalKey);
108             if (prior.isPresent())
109             {
110                 if (prior.get().isPresent())
111                 {
112                     return prior.get().get();
113                 }
114                 else
115                 {
116                     getLogger().trace("Cache {}, creating candidate for key {}", name, internalKey);
117                     return handleCreation(internalKey, supplier);
118                 }
119             }
120 
121             // Either way, there was no prior value recorded, or it was blank. So need to create the value
122             // and write it back.
123             final String externalKey = cacheContext.externalEntryKeyFor(internalKey);
124             final Optional<V> result = directGet(externalKey);
125             if (result.isPresent())
126             {
127                 // A valid value exists in the external cache
128                 cacheContext.recordValue(internalKey, result);
129                 return result.get();
130             }
131 
132             getLogger().trace("Cache {}, creating candidate for key {}", name, internalKey);
133             return handleCreation(internalKey, supplier);
134         });
135     }
136 
137     @Nonnull
138     @Override
139     public final CompletableFuture<Map<String, Optional<V>>> getBulk(Iterable<String> internalKeys)
140     {
141         return perform(() -> {
142             if (isEmpty(internalKeys))
143             {
144                 return new HashMap<>();
145             }
146 
147             // Get the recorded values first
148             final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
149             final Map<String, Optional<V>> grandResult = checkValuesRecorded(internalKeys);
150 
151             // Calculate the externalKeys for the entries that are missing
152             final Set<String> missingExternalKeys = StreamSupport.stream(internalKeys.spliterator(), false)
153                     .filter(k -> !grandResult.containsKey(k))
154                     .map(cacheContext::externalEntryKeyFor)
155                     .collect(Collectors.toSet());
156 
157             if (missingExternalKeys.isEmpty())
158             {
159                 getLogger().trace("Cache {}: getBulk(): have all the requested entries cached", name);
160                 return grandResult;
161             }
162             getLogger().trace("Cache {}: getBulk(): not cached {} requested entries", name, missingExternalKeys.size());
163 
164             // Get the missing values.
165             final Map<String, Optional<V>> candidateValues = directGetBulk(missingExternalKeys);
166 
167             return candidateValues.entrySet().stream().collect(
168                     () -> grandResult,
169                     (m, e) -> {
170                         final Optional<V> result = e.getValue();
171                         cacheContext.recordValue(cacheContext.internalEntryKeyFor(e.getKey()), result);
172                         m.put(cacheContext.internalEntryKeyFor(e.getKey()), result);
173                     },
174                     Map::putAll
175             );
176         });
177     }
178 
179     @Nonnull
180     @Override
181     public final CompletableFuture<Map<String, V>> getBulk(
182             Function<Set<String>, Map<String, V>> factory, Iterable<String> internalKeys)
183     {
184         return perform(() -> {
185             if (isEmpty(internalKeys))
186             {
187                 return new HashMap<>();
188             }
189 
190             // Get the recorded values first and collect the ones that have values.
191             final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
192             final Map<String, V> grandResult = checkValuesRecorded(internalKeys).entrySet().stream()
193                     .filter(e -> e.getValue().isPresent())
194                     .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().get()));
195 
196             // Calculate the candidate externalKeys for the entries that are missing
197             final Set<String> candidateMissingExternalKeys = StreamSupport.stream(internalKeys.spliterator(), false)
198                     .filter(k -> !grandResult.containsKey(k))
199                     .map(cacheContext::externalEntryKeyFor)
200                     .collect(Collectors.toSet());
201 
202             // Bail out if we have all the entries requested
203             if (candidateMissingExternalKeys.isEmpty())
204             {
205                 getLogger().trace("Cache {}: getBulk(Function): had all the requested entries cached", name);
206                 return grandResult;
207             }
208             getLogger().trace("Cache {}: getBulk(Function): checking external cache for {} keys",
209                     name, candidateMissingExternalKeys.size());
210 
211             final Map<String, V> missingValues = handleCreation(factory, candidateMissingExternalKeys);
212             cacheContext.recordValues(missingValues);
213             grandResult.putAll(missingValues);
214 
215             return grandResult;
216         });
217     }
218 }