View Javadoc

1   package com.atlassian.vcache.internal.guava;
2   
3   import com.atlassian.marshalling.api.MarshallingPair;
4   import com.atlassian.vcache.CasIdentifier;
5   import com.atlassian.vcache.DirectExternalCache;
6   import com.atlassian.vcache.ExternalCacheException;
7   import com.atlassian.vcache.IdentifiedValue;
8   import com.atlassian.vcache.PutPolicy;
9   import com.atlassian.vcache.internal.RequestContext;
10  import com.atlassian.vcache.internal.core.ExternalCacheKeyGenerator;
11  import com.atlassian.vcache.internal.core.cas.IdentifiedData;
12  import com.atlassian.vcache.internal.core.service.AbstractExternalCache;
13  import com.atlassian.vcache.internal.core.service.AbstractExternalCacheRequestContext;
14  import com.atlassian.vcache.internal.core.service.UnversionedExternalCacheRequestContext;
15  import com.google.common.cache.Cache;
16  import org.slf4j.Logger;
17  import org.slf4j.LoggerFactory;
18  
19  import java.util.HashMap;
20  import java.util.Map;
21  import java.util.Objects;
22  import java.util.Optional;
23  import java.util.Set;
24  import java.util.concurrent.CompletionStage;
25  import java.util.function.Function;
26  import java.util.function.Supplier;
27  import java.util.stream.Collectors;
28  import java.util.stream.StreamSupport;
29  
30  import static com.atlassian.vcache.VCacheUtils.unsafeJoin;
31  import static com.atlassian.vcache.internal.core.VCacheCoreUtils.isEmpty;
32  import static com.atlassian.vcache.internal.core.cas.IdentifiedUtils.marshall;
33  import static com.atlassian.vcache.internal.core.cas.IdentifiedUtils.safeCast;
34  import static com.atlassian.vcache.internal.core.cas.IdentifiedUtils.unmarshall;
35  import static com.atlassian.vcache.internal.core.cas.IdentifiedUtils.unmarshallIdentified;
36  import static java.util.Objects.requireNonNull;
37  
38  /**
39   * Guava based implementation of {@link DirectExternalCache}.
40   *
41   * @param <V> the value type.
42   * @since 1.0.0
43   */
44  public class GuavaDirectExternalCache<V>
45          extends AbstractExternalCache<V>
46          implements DirectExternalCache<V> {
47      private static final Logger log = LoggerFactory.getLogger(GuavaDirectExternalCache.class);
48  
49      private final Cache<String, IdentifiedData> delegate;
50      private final Supplier<RequestContext> contextSupplier;
51      private final ExternalCacheKeyGenerator keyGenerator;
52      private final Optional<MarshallingPair<V>> valueMarshalling;
53  
54      public GuavaDirectExternalCache(
55              String name,
56              Cache<String, IdentifiedData> delegate,
57              Supplier<RequestContext> contextSupplier,
58              ExternalCacheKeyGenerator keyGenerator,
59              Optional<MarshallingPair<V>> valueMarshalling) {
60          super(name);
61          this.delegate = requireNonNull(delegate);
62          this.contextSupplier = requireNonNull(contextSupplier);
63          this.keyGenerator = requireNonNull(keyGenerator);
64          this.valueMarshalling = requireNonNull(valueMarshalling);
65      }
66  
67      @Override
68      public CompletionStage<Optional<V>> get(String internalKey) {
69          return perform(() -> {
70              final String externalKey = buildExternalKey(internalKey);
71              final IdentifiedData identifiedData = delegate.getIfPresent(externalKey);
72              return unmarshall(identifiedData, valueMarshalling);
73          });
74      }
75  
76      @Override
77      public CompletionStage<V> get(String internalKey, Supplier<V> supplier) {
78          return perform(() -> {
79              final String externalKey = buildExternalKey(internalKey);
80              final IdentifiedData identifiedData =
81                      delegate.get(externalKey, () -> marshall(supplier.get(), valueMarshalling));
82              return unmarshall(identifiedData, valueMarshalling).get();
83          });
84      }
85  
86      @Override
87      public CompletionStage<Optional<IdentifiedValue<V>>> getIdentified(String internalKey) {
88          return perform(() -> {
89              final String externalKey = buildExternalKey(internalKey);
90              return unmarshallIdentified(delegate.getIfPresent(externalKey), valueMarshalling);
91          });
92      }
93  
94      @Override
95      public CompletionStage<IdentifiedValue<V>> getIdentified(String internalKey, Supplier<V> supplier) {
96          return perform(() -> {
97              final String externalKey = buildExternalKey(internalKey);
98              return unmarshallIdentified(
99                      delegate.get(externalKey, () -> marshall(supplier.get(), valueMarshalling)),
100                     valueMarshalling).get();
101         });
102     }
103 
104     @Override
105     public CompletionStage<Map<String, Optional<V>>> getBulk(Iterable<String> internalKeys) {
106         return perform(() -> {
107             if (isEmpty(internalKeys)) {
108                 return new HashMap<>();
109             }
110 
111             // De-duplicate the keys, calculate the externalKeys and retrieve
112             final AbstractExternalCacheRequestContext cacheContext = ensureCacheContext();
113             return StreamSupport.stream(internalKeys.spliterator(), false)
114                     .distinct()
115                     .collect(Collectors.toMap(
116                             Objects::requireNonNull,
117                             k -> unmarshall(delegate.getIfPresent(cacheContext.externalEntryKeyFor(k)), valueMarshalling)));
118         });
119     }
120 
121     @Override
122     public CompletionStage<Map<String, V>> getBulk(
123             Function<Set<String>, Map<String, V>> factory, Iterable<String> internalKeys) {
124         return perform(() -> {
125             if (isEmpty(internalKeys)) {
126                 return new HashMap<>();
127             }
128 
129             // De-duplicate the keys and then obtain the existing values
130             final AbstractExternalCacheRequestContext cacheContext = ensureCacheContext();
131             final Map<String, Optional<V>> existingValues = unsafeJoin(getBulk(internalKeys));
132 
133             // Add known values to the grand result
134             final Map<String, V> grandResult = existingValues.entrySet().stream()
135                     .filter(e -> e.getValue().isPresent())
136                     .collect(Collectors.toMap(
137                             Map.Entry::getKey,
138                             e -> e.getValue().get()));
139 
140             // Bail out if we have all the values
141             if (grandResult.size() == existingValues.size()) {
142                 return grandResult;
143             }
144 
145             // Sadly we now need to call the factory to create the missing values and then merge into the grand result.
146             final Set<String> missingInternalKeys = existingValues.entrySet().stream()
147                     .filter(e -> !e.getValue().isPresent())
148                     .map(Map.Entry::getKey)
149                     .collect(Collectors.toSet());
150 
151             final Map<String, V> missingValues = factory.apply(missingInternalKeys);
152 
153             missingValues.entrySet().forEach(e -> {
154                 final Optional<V> existing = unmarshall(
155                         delegate.asMap().putIfAbsent(
156                                 cacheContext.externalEntryKeyFor(e.getKey()),
157                                 marshall(e.getValue(), valueMarshalling)),
158                         valueMarshalling);
159                 grandResult.put(e.getKey(), existing.orElse(e.getValue()));
160             });
161 
162             return grandResult;
163         });
164     }
165 
166     @Override
167     public CompletionStage<Map<String, Optional<IdentifiedValue<V>>>> getBulkIdentified(Iterable<String> internalKeys) {
168         return perform(() -> {
169             if (isEmpty(internalKeys)) {
170                 return new HashMap<>();
171             }
172 
173             // De-duplicate the keys, calculate the externalKeys and retrieve
174             final AbstractExternalCacheRequestContext cacheContext = ensureCacheContext();
175             return StreamSupport.stream(internalKeys.spliterator(), false)
176                     .distinct()
177                     .collect(Collectors.toMap(
178                             Objects::requireNonNull,
179                             k -> unmarshallIdentified(delegate.getIfPresent(cacheContext.externalEntryKeyFor(k)), valueMarshalling)));
180         });
181     }
182 
183 
184     @Override
185     public CompletionStage<Boolean> put(String internalKey, V value, PutPolicy policy) {
186         return perform(() -> {
187             final AbstractExternalCacheRequestContext cacheContext = ensureCacheContext();
188             final String externalKey = cacheContext.externalEntryKeyFor(internalKey);
189             final IdentifiedData identifiedData = marshall(value, valueMarshalling);
190 
191             return GuavaUtils.directPut(externalKey, identifiedData, policy, delegate);
192         });
193     }
194 
195     @Override
196     public CompletionStage<Boolean> removeIf(String internalKey, CasIdentifier casId) {
197         return perform(() -> {
198             final AbstractExternalCacheRequestContext cacheContext = ensureCacheContext();
199             final String externalKey = cacheContext.externalEntryKeyFor(internalKey);
200             final IdentifiedData existingData = safeCast(casId);
201             return delegate.asMap().remove(externalKey, existingData);
202         });
203     }
204 
205     @Override
206     public CompletionStage<Boolean> replaceIf(String internalKey, CasIdentifier casId, V newValue) {
207         return perform(() -> {
208             final AbstractExternalCacheRequestContext cacheContext = ensureCacheContext();
209             final String externalKey = cacheContext.externalEntryKeyFor(internalKey);
210             final IdentifiedData existingData = safeCast(casId);
211             final IdentifiedData newData = marshall(newValue, valueMarshalling);
212             return delegate.asMap().replace(externalKey, existingData, newData);
213         });
214     }
215 
216     @Override
217     public CompletionStage<Void> remove(Iterable<String> internalKeys) {
218         return perform(() -> {
219             final AbstractExternalCacheRequestContext cacheContext = ensureCacheContext();
220 
221             StreamSupport.stream(internalKeys.spliterator(), false)
222                     .distinct()
223                     .map(cacheContext::externalEntryKeyFor)
224                     .forEach(k -> delegate.asMap().remove(k));
225             return null;
226         });
227     }
228 
229     @Override
230     public CompletionStage<Void> removeAll() {
231         return perform(() -> {
232             delegate.asMap().clear();
233             return null;
234         });
235     }
236 
237     protected AbstractExternalCacheRequestContext<V> ensureCacheContext() {
238         final RequestContext requestContext = contextSupplier.get();
239 
240         return requestContext.computeIfAbsent(this, () -> {
241             log.trace("Cache {}: Setting up a new context", getName());
242             return new UnversionedExternalCacheRequestContext<>(
243                     keyGenerator, getName(), requestContext::partitionIdentifier);
244         });
245     }
246 
247     @Override
248     protected ExternalCacheException mapException(Exception ex) {
249         return GuavaUtils.mapException(ex);
250     }
251 
252     @Override
253     protected Logger getLogger() {
254         return log;
255     }
256 
257     private String buildExternalKey(String internalKey) {
258         final AbstractExternalCacheRequestContext cacheContext = ensureCacheContext();
259         return cacheContext.externalEntryKeyFor(internalKey);
260     }
261 }