View Javadoc

1   package com.atlassian.vcache.internal.legacy;
2   
3   import com.atlassian.cache.Cache;
4   import com.atlassian.marshalling.api.MarshallingPair;
5   import com.atlassian.vcache.CasIdentifier;
6   import com.atlassian.vcache.DirectExternalCache;
7   import com.atlassian.vcache.ExternalCacheException;
8   import com.atlassian.vcache.IdentifiedValue;
9   import com.atlassian.vcache.PutPolicy;
10  import com.atlassian.vcache.internal.RequestContext;
11  import com.atlassian.vcache.internal.core.ExternalCacheKeyGenerator;
12  import com.atlassian.vcache.internal.core.cas.IdentifiedData;
13  import com.atlassian.vcache.internal.core.service.AbstractExternalCache;
14  import com.atlassian.vcache.internal.core.service.AbstractExternalCacheRequestContext;
15  import com.atlassian.vcache.internal.core.service.UnversionedExternalCacheRequestContext;
16  import org.slf4j.Logger;
17  import org.slf4j.LoggerFactory;
18  
19  import java.util.HashMap;
20  import java.util.Map;
21  import java.util.Objects;
22  import java.util.Optional;
23  import java.util.Set;
24  import java.util.concurrent.CompletionStage;
25  import java.util.function.Function;
26  import java.util.function.Supplier;
27  import java.util.stream.Collectors;
28  import java.util.stream.StreamSupport;
29  
30  import static com.atlassian.vcache.VCacheUtils.unsafeJoin;
31  import static com.atlassian.vcache.internal.core.VCacheCoreUtils.isEmpty;
32  import static com.atlassian.vcache.internal.core.cas.IdentifiedUtils.marshall;
33  import static com.atlassian.vcache.internal.core.cas.IdentifiedUtils.safeCast;
34  import static com.atlassian.vcache.internal.core.cas.IdentifiedUtils.unmarshall;
35  import static com.atlassian.vcache.internal.core.cas.IdentifiedUtils.unmarshallIdentified;
36  import static java.util.Objects.requireNonNull;
37  
38  /**
39   * Atlassian Cache backed implementation.
40   *
41   * @param <V> the value type
42   * @since 1.0.0
43   */
44  class LegacyDirectExternalCache<V>
45          extends AbstractExternalCache<V>
46          implements DirectExternalCache<V> {
47      private static final Logger log = LoggerFactory.getLogger(LegacyDirectExternalCache.class);
48  
49      private final Cache<String, IdentifiedData> delegate;
50      private final Supplier<RequestContext> contextSupplier;
51      private final ExternalCacheKeyGenerator keyGenerator;
52      private final Optional<MarshallingPair<V>> valueMarshalling;
53      private final LegacyServiceSettings serviceSettings;
54  
55      LegacyDirectExternalCache(
56              Cache<String, IdentifiedData> delegate,
57              Supplier<RequestContext> contextSupplier,
58              ExternalCacheKeyGenerator keyGenerator,
59              Optional<MarshallingPair<V>> valueMarshalling,
60              LegacyServiceSettings serviceSettings) {
61          super(delegate.getName());
62          this.delegate = requireNonNull(delegate);
63          this.contextSupplier = requireNonNull(contextSupplier);
64          this.keyGenerator = requireNonNull(keyGenerator);
65          this.valueMarshalling = requireNonNull(valueMarshalling);
66          this.serviceSettings = requireNonNull(serviceSettings);
67      }
68  
69      @Override
70      public CompletionStage<Optional<V>> get(String internalKey) {
71          return perform(() -> {
72              final String externalKey = buildExternalKey(internalKey);
73              final IdentifiedData identifiedData = delegate.get(externalKey);
74              return unmarshall(identifiedData, valueMarshalling);
75          });
76      }
77  
78      @Override
79      public CompletionStage<V> get(String internalKey, Supplier<V> supplier) {
80          return perform(() -> {
81              final String externalKey = buildExternalKey(internalKey);
82              final IdentifiedData identifiedData =
83                      delegate.get(externalKey, () -> marshall(supplier.get(), valueMarshalling));
84              return unmarshall(identifiedData, valueMarshalling).get();
85          });
86      }
87  
88      @Override
89      public CompletionStage<Optional<IdentifiedValue<V>>> getIdentified(String internalKey) {
90          return perform(() -> {
91              verifyCasOpsSupported();
92              final String externalKey = buildExternalKey(internalKey);
93              return unmarshallIdentified(delegate.get(externalKey), valueMarshalling);
94          });
95      }
96  
97      @Override
98      public CompletionStage<IdentifiedValue<V>> getIdentified(String internalKey, Supplier<V> supplier) {
99          return perform(() -> {
100             verifyCasOpsSupported();
101             final String externalKey = buildExternalKey(internalKey);
102             return unmarshallIdentified(
103                     delegate.get(externalKey, () -> marshall(supplier.get(), valueMarshalling)),
104                     valueMarshalling).get();
105         });
106     }
107 
108     @Override
109     public CompletionStage<Map<String, Optional<V>>> getBulk(Iterable<String> internalKeys) {
110         return perform(() -> {
111             if (isEmpty(internalKeys)) {
112                 return new HashMap<>();
113             }
114 
115             // De-duplicate the keys, calculate the externalKeys and retrieve
116             final AbstractExternalCacheRequestContext cacheContext = ensureCacheContext();
117             return StreamSupport.stream(internalKeys.spliterator(), false)
118                     .distinct()
119                     .collect(Collectors.toMap(
120                             Objects::requireNonNull,
121                             k -> unmarshall(delegate.get(cacheContext.externalEntryKeyFor(k)), valueMarshalling)));
122         });
123     }
124 
125     @Override
126     public CompletionStage<Map<String, V>> getBulk(
127             Function<Set<String>, Map<String, V>> factory, Iterable<String> internalKeys) {
128         return perform(() -> {
129             if (isEmpty(internalKeys)) {
130                 return new HashMap<>();
131             }
132 
133             // De-duplicate the keys and then obtain the existing values
134             final AbstractExternalCacheRequestContext cacheContext = ensureCacheContext();
135             final Map<String, Optional<V>> existingValues = unsafeJoin(getBulk(internalKeys));
136 
137             // Add known values to the grand result
138             final Map<String, V> grandResult = existingValues.entrySet().stream()
139                     .filter(e -> e.getValue().isPresent())
140                     .collect(Collectors.toMap(
141                             Map.Entry::getKey,
142                             e -> e.getValue().get()));
143 
144             // Bail out if we have all the values
145             if (grandResult.size() == existingValues.size()) {
146                 return grandResult;
147             }
148 
149             // Sadly we now need to call the factory to create the missing values and then merge into the grand result.
150             final Set<String> missingInternalKeys = existingValues.entrySet().stream()
151                     .filter(e -> !e.getValue().isPresent())
152                     .map(Map.Entry::getKey)
153                     .collect(Collectors.toSet());
154 
155             final Map<String, V> missingValues = factory.apply(missingInternalKeys);
156 
157             missingValues.entrySet().forEach(e -> {
158                 if (serviceSettings.isAvoidCasOps()) {
159                     delegate.put(cacheContext.externalEntryKeyFor(e.getKey()), marshall(e.getValue(), valueMarshalling));
160                     grandResult.put(e.getKey(), e.getValue());
161                 } else {
162                     final Optional<V> existing = unmarshall(
163                             delegate.putIfAbsent(
164                                     cacheContext.externalEntryKeyFor(e.getKey()),
165                                     marshall(e.getValue(), valueMarshalling)),
166                             valueMarshalling);
167                     grandResult.put(e.getKey(), existing.orElse(e.getValue()));
168                 }
169             });
170 
171             return grandResult;
172         });
173     }
174 
175     @Override
176     public CompletionStage<Map<String, Optional<IdentifiedValue<V>>>> getBulkIdentified(Iterable<String> internalKeys) {
177         return perform(() -> {
178             verifyCasOpsSupported();
179 
180             if (isEmpty(internalKeys)) {
181                 return new HashMap<>();
182             }
183 
184             // De-duplicate the keys, calculate the externalKeys and retrieve
185             final AbstractExternalCacheRequestContext cacheContext = ensureCacheContext();
186             return StreamSupport.stream(internalKeys.spliterator(), false)
187                     .distinct()
188                     .collect(Collectors.toMap(
189                             Objects::requireNonNull,
190                             k -> unmarshallIdentified(delegate.get(cacheContext.externalEntryKeyFor(k)), valueMarshalling)));
191         });
192     }
193 
194 
195     @Override
196     public CompletionStage<Boolean> put(String internalKey, V value, PutPolicy policy) {
197         return perform(() -> {
198             final AbstractExternalCacheRequestContext cacheContext = ensureCacheContext();
199             final String externalKey = cacheContext.externalEntryKeyFor(internalKey);
200             final IdentifiedData identifiedData = marshall(value, valueMarshalling);
201 
202             return LegacyUtils.directPut(externalKey, identifiedData, policy, delegate, serviceSettings.isAvoidCasOps());
203         });
204     }
205 
206     @Override
207     public CompletionStage<Boolean> removeIf(String internalKey, CasIdentifier casId) {
208         return perform(() -> {
209             verifyCasOpsSupported();
210             final AbstractExternalCacheRequestContext cacheContext = ensureCacheContext();
211             final String externalKey = cacheContext.externalEntryKeyFor(internalKey);
212             final IdentifiedData existingData = safeCast(casId);
213             return delegate.remove(externalKey, existingData);
214         });
215     }
216 
217     @Override
218     public CompletionStage<Boolean> replaceIf(String internalKey, CasIdentifier casId, V newValue) {
219         return perform(() -> {
220             verifyCasOpsSupported();
221             final AbstractExternalCacheRequestContext cacheContext = ensureCacheContext();
222             final String externalKey = cacheContext.externalEntryKeyFor(internalKey);
223             final IdentifiedData existingData = safeCast(casId);
224             final IdentifiedData newData = marshall(newValue, valueMarshalling);
225             return delegate.replace(externalKey, existingData, newData);
226         });
227     }
228 
229     @Override
230     public CompletionStage<Void> remove(Iterable<String> internalKeys) {
231         return perform(() -> {
232             final AbstractExternalCacheRequestContext cacheContext = ensureCacheContext();
233 
234             StreamSupport.stream(internalKeys.spliterator(), false)
235                     .distinct()
236                     .map(cacheContext::externalEntryKeyFor)
237                     .forEach(delegate::remove);
238             return null;
239         });
240     }
241 
242     @Override
243     public CompletionStage<Void> removeAll() {
244         return perform(() -> {
245             delegate.removeAll();
246             return null;
247         });
248     }
249 
250     protected AbstractExternalCacheRequestContext<V> ensureCacheContext() {
251         final RequestContext requestContext = contextSupplier.get();
252 
253         return requestContext.computeIfAbsent(this, () -> {
254             log.trace("Cache {}: Setting up a new context", delegate.getName());
255             return new UnversionedExternalCacheRequestContext<>(
256                     keyGenerator, delegate.getName(), requestContext::partitionIdentifier);
257         });
258     }
259 
260     @Override
261     protected ExternalCacheException mapException(Exception ex) {
262         return LegacyUtils.mapException(ex);
263     }
264 
265     @Override
266     protected Logger getLogger() {
267         return log;
268     }
269 
270     private String buildExternalKey(String internalKey) {
271         final AbstractExternalCacheRequestContext cacheContext = ensureCacheContext();
272         return cacheContext.externalEntryKeyFor(internalKey);
273     }
274 
275     private void verifyCasOpsSupported() {
276         if (serviceSettings.isAvoidCasOps()) {
277             throw new UnsupportedOperationException("CAS operations not supported in this configuration");
278         }
279     }
280 }