View Javadoc

1   package com.atlassian.vcache.internal.guava;
2   
3   import com.atlassian.marshalling.api.MarshallingPair;
4   import com.atlassian.vcache.CasIdentifier;
5   import com.atlassian.vcache.DirectExternalCache;
6   import com.atlassian.vcache.ExternalCacheException;
7   import com.atlassian.vcache.IdentifiedValue;
8   import com.atlassian.vcache.PutPolicy;
9   import com.atlassian.vcache.internal.RequestContext;
10  import com.atlassian.vcache.internal.core.ExternalCacheKeyGenerator;
11  import com.atlassian.vcache.internal.core.cas.IdentifiedData;
12  import com.atlassian.vcache.internal.core.service.AbstractExternalCache;
13  import com.atlassian.vcache.internal.core.service.AbstractExternalCacheRequestContext;
14  import com.atlassian.vcache.internal.core.service.FactoryUtils;
15  import com.atlassian.vcache.internal.core.service.UnversionedExternalCacheRequestContext;
16  import com.google.common.cache.Cache;
17  import org.slf4j.Logger;
18  import org.slf4j.LoggerFactory;
19  
20  import java.time.Duration;
21  import java.util.HashMap;
22  import java.util.Map;
23  import java.util.Objects;
24  import java.util.Optional;
25  import java.util.Set;
26  import java.util.concurrent.CompletionStage;
27  import java.util.function.Function;
28  import java.util.function.Supplier;
29  import java.util.stream.Collectors;
30  import java.util.stream.StreamSupport;
31  
32  import static com.atlassian.vcache.VCacheUtils.unsafeJoin;
33  import static com.atlassian.vcache.internal.core.VCacheCoreUtils.isEmpty;
34  import static com.atlassian.vcache.internal.core.cas.IdentifiedUtils.marshall;
35  import static com.atlassian.vcache.internal.core.cas.IdentifiedUtils.safeCast;
36  import static com.atlassian.vcache.internal.core.cas.IdentifiedUtils.unmarshall;
37  import static com.atlassian.vcache.internal.core.cas.IdentifiedUtils.unmarshallIdentified;
38  import static java.util.Objects.requireNonNull;
39  
40  /**
41   * Guava based implementation of {@link DirectExternalCache}.
42   *
43   * @param <V> the value type.
44   * @since 1.0.0
45   */
46  public class GuavaDirectExternalCache<V>
47          extends AbstractExternalCache<V>
48          implements DirectExternalCache<V> {
49      private static final Logger log = LoggerFactory.getLogger(GuavaDirectExternalCache.class);
50  
51      private final Cache<String, IdentifiedData> delegate;
52      private final Supplier<RequestContext> contextSupplier;
53      private final ExternalCacheKeyGenerator keyGenerator;
54      private final Optional<MarshallingPair<V>> valueMarshalling;
55  
56      public GuavaDirectExternalCache(
57              String name,
58              Cache<String, IdentifiedData> delegate,
59              Supplier<RequestContext> contextSupplier,
60              ExternalCacheKeyGenerator keyGenerator,
61              Optional<MarshallingPair<V>> valueMarshalling,
62              Duration lockTimeout) {
63          super(name, lockTimeout, (n, ex) -> {});
64          this.delegate = requireNonNull(delegate);
65          this.contextSupplier = requireNonNull(contextSupplier);
66          this.keyGenerator = requireNonNull(keyGenerator);
67          this.valueMarshalling = requireNonNull(valueMarshalling);
68      }
69  
70      @Override
71      public CompletionStage<Optional<V>> get(String internalKey) {
72          return perform(() -> {
73              final String externalKey = buildExternalKey(internalKey);
74              final IdentifiedData identifiedData = delegate.getIfPresent(externalKey);
75              return unmarshall(identifiedData, valueMarshalling);
76          });
77      }
78  
79      @Override
80      public CompletionStage<V> get(String internalKey, Supplier<V> supplier) {
81          return perform(() -> {
82              final String externalKey = buildExternalKey(internalKey);
83              final IdentifiedData identifiedData =
84                      delegate.get(externalKey, () -> marshall(supplier.get(), valueMarshalling));
85              return unmarshall(identifiedData, valueMarshalling).get();
86          });
87      }
88  
89      @Override
90      public CompletionStage<Optional<IdentifiedValue<V>>> getIdentified(String internalKey) {
91          return perform(() -> {
92              final String externalKey = buildExternalKey(internalKey);
93              return unmarshallIdentified(delegate.getIfPresent(externalKey), valueMarshalling);
94          });
95      }
96  
97      @Override
98      public CompletionStage<IdentifiedValue<V>> getIdentified(String internalKey, Supplier<V> supplier) {
99          return perform(() -> {
100             final String externalKey = buildExternalKey(internalKey);
101             return unmarshallIdentified(
102                     delegate.get(externalKey, () -> marshall(supplier.get(), valueMarshalling)),
103                     valueMarshalling).get();
104         });
105     }
106 
107     @Override
108     public CompletionStage<Map<String, Optional<V>>> getBulk(Iterable<String> internalKeys) {
109         return perform(() -> {
110             if (isEmpty(internalKeys)) {
111                 return new HashMap<>();
112             }
113 
114             // De-duplicate the keys, calculate the externalKeys and retrieve
115             final AbstractExternalCacheRequestContext cacheContext = ensureCacheContext();
116             return StreamSupport.stream(internalKeys.spliterator(), false)
117                     .distinct()
118                     .collect(Collectors.toMap(
119                             Objects::requireNonNull,
120                             k -> unmarshall(delegate.getIfPresent(cacheContext.externalEntryKeyFor(k)), valueMarshalling)));
121         });
122     }
123 
124     @Override
125     public CompletionStage<Map<String, V>> getBulk(
126             Function<Set<String>, Map<String, V>> factory, Iterable<String> internalKeys) {
127         return perform(() -> {
128             if (isEmpty(internalKeys)) {
129                 return new HashMap<>();
130             }
131 
132             // De-duplicate the keys and then obtain the existing values
133             final AbstractExternalCacheRequestContext cacheContext = ensureCacheContext();
134             final Map<String, Optional<V>> existingValues = unsafeJoin(getBulk(internalKeys));
135 
136             // Add known values to the grand result
137             final Map<String, V> grandResult = existingValues.entrySet().stream()
138                     .filter(e -> e.getValue().isPresent())
139                     .collect(Collectors.toMap(
140                             Map.Entry::getKey,
141                             e -> e.getValue().get()));
142 
143             // Bail out if we have all the values
144             if (grandResult.size() == existingValues.size()) {
145                 return grandResult;
146             }
147 
148             // Sadly we now need to call the factory to create the missing values and then merge into the grand result.
149             final Set<String> missingInternalKeys = existingValues.entrySet().stream()
150                     .filter(e -> !e.getValue().isPresent())
151                     .map(Map.Entry::getKey)
152                     .collect(Collectors.toSet());
153 
154             final Map<String, V> missingValues = factory.apply(missingInternalKeys);
155             FactoryUtils.verifyFactoryResult(missingValues, missingInternalKeys);
156 
157 
158             missingValues.entrySet().forEach(e -> {
159                 final Optional<V> existing = unmarshall(
160                         delegate.asMap().putIfAbsent(
161                                 cacheContext.externalEntryKeyFor(e.getKey()),
162                                 marshall(e.getValue(), valueMarshalling)),
163                         valueMarshalling);
164                 grandResult.put(e.getKey(), existing.orElse(e.getValue()));
165             });
166 
167             return grandResult;
168         });
169     }
170 
171     @Override
172     public CompletionStage<Map<String, Optional<IdentifiedValue<V>>>> getBulkIdentified(Iterable<String> internalKeys) {
173         return perform(() -> {
174             if (isEmpty(internalKeys)) {
175                 return new HashMap<>();
176             }
177 
178             // De-duplicate the keys, calculate the externalKeys and retrieve
179             final AbstractExternalCacheRequestContext cacheContext = ensureCacheContext();
180             return StreamSupport.stream(internalKeys.spliterator(), false)
181                     .distinct()
182                     .collect(Collectors.toMap(
183                             Objects::requireNonNull,
184                             k -> unmarshallIdentified(delegate.getIfPresent(cacheContext.externalEntryKeyFor(k)), valueMarshalling)));
185         });
186     }
187 
188 
189     @Override
190     public CompletionStage<Boolean> put(String internalKey, V value, PutPolicy policy) {
191         return perform(() -> {
192             final AbstractExternalCacheRequestContext cacheContext = ensureCacheContext();
193             final String externalKey = cacheContext.externalEntryKeyFor(internalKey);
194             final IdentifiedData identifiedData = marshall(value, valueMarshalling);
195 
196             return GuavaUtils.directPut(externalKey, identifiedData, policy, delegate);
197         });
198     }
199 
200     @Override
201     public CompletionStage<Boolean> removeIf(String internalKey, CasIdentifier casId) {
202         return perform(() -> {
203             final AbstractExternalCacheRequestContext cacheContext = ensureCacheContext();
204             final String externalKey = cacheContext.externalEntryKeyFor(internalKey);
205             final IdentifiedData existingData = safeCast(casId);
206             return delegate.asMap().remove(externalKey, existingData);
207         });
208     }
209 
210     @Override
211     public CompletionStage<Boolean> replaceIf(String internalKey, CasIdentifier casId, V newValue) {
212         return perform(() -> {
213             final AbstractExternalCacheRequestContext cacheContext = ensureCacheContext();
214             final String externalKey = cacheContext.externalEntryKeyFor(internalKey);
215             final IdentifiedData existingData = safeCast(casId);
216             final IdentifiedData newData = marshall(newValue, valueMarshalling);
217             return delegate.asMap().replace(externalKey, existingData, newData);
218         });
219     }
220 
221     @Override
222     public CompletionStage<Void> remove(Iterable<String> internalKeys) {
223         return perform(() -> {
224             final AbstractExternalCacheRequestContext cacheContext = ensureCacheContext();
225 
226             StreamSupport.stream(internalKeys.spliterator(), false)
227                     .distinct()
228                     .map(cacheContext::externalEntryKeyFor)
229                     .forEach(k -> delegate.asMap().remove(k));
230             return null;
231         });
232     }
233 
234     @Override
235     public CompletionStage<Void> removeAll() {
236         return perform(() -> {
237             delegate.asMap().clear();
238             return null;
239         });
240     }
241 
242     protected AbstractExternalCacheRequestContext<V> ensureCacheContext() {
243         final RequestContext requestContext = contextSupplier.get();
244 
245         return requestContext.computeIfAbsent(this, () -> {
246             log.trace("Cache {}: Setting up a new context", getName());
247             return new UnversionedExternalCacheRequestContext<>(
248                     keyGenerator, getName(), requestContext::partitionIdentifier, lockTimeout);
249         });
250     }
251 
252     @Override
253     protected ExternalCacheException mapException(Exception ex) {
254         return GuavaUtils.mapException(ex);
255     }
256 
257     @Override
258     protected Logger getLogger() {
259         return log;
260     }
261 
262     private String buildExternalKey(String internalKey) {
263         final AbstractExternalCacheRequestContext cacheContext = ensureCacheContext();
264         return cacheContext.externalEntryKeyFor(internalKey);
265     }
266 }