View Javadoc

1   package com.atlassian.vcache.internal.legacy;
2   
3   import com.atlassian.cache.Cache;
4   import com.atlassian.marshalling.api.MarshallingPair;
5   import com.atlassian.vcache.CasIdentifier;
6   import com.atlassian.vcache.DirectExternalCache;
7   import com.atlassian.vcache.ExternalCacheException;
8   import com.atlassian.vcache.IdentifiedValue;
9   import com.atlassian.vcache.PutPolicy;
10  import com.atlassian.vcache.internal.RequestContext;
11  import com.atlassian.vcache.internal.core.ExternalCacheKeyGenerator;
12  import com.atlassian.vcache.internal.core.cas.IdentifiedData;
13  import com.atlassian.vcache.internal.core.service.AbstractExternalCache;
14  import com.atlassian.vcache.internal.core.service.AbstractExternalCacheRequestContext;
15  import com.atlassian.vcache.internal.core.service.FactoryUtils;
16  import com.atlassian.vcache.internal.core.service.UnversionedExternalCacheRequestContext;
17  import org.slf4j.Logger;
18  import org.slf4j.LoggerFactory;
19  
20  import java.util.HashMap;
21  import java.util.Map;
22  import java.util.Objects;
23  import java.util.Optional;
24  import java.util.Set;
25  import java.util.concurrent.CompletionStage;
26  import java.util.function.Function;
27  import java.util.function.Supplier;
28  import java.util.stream.Collectors;
29  import java.util.stream.StreamSupport;
30  
31  import static com.atlassian.vcache.VCacheUtils.unsafeJoin;
32  import static com.atlassian.vcache.internal.core.VCacheCoreUtils.isEmpty;
33  import static com.atlassian.vcache.internal.core.cas.IdentifiedUtils.marshall;
34  import static com.atlassian.vcache.internal.core.cas.IdentifiedUtils.safeCast;
35  import static com.atlassian.vcache.internal.core.cas.IdentifiedUtils.unmarshall;
36  import static com.atlassian.vcache.internal.core.cas.IdentifiedUtils.unmarshallIdentified;
37  import static java.util.Objects.requireNonNull;
38  
39  /**
40   * Atlassian Cache backed implementation.
41   *
42   * @param <V> the value type
43   * @since 1.0.0
44   */
45  class LegacyDirectExternalCache<V>
46          extends AbstractExternalCache<V>
47          implements DirectExternalCache<V> {
48      private static final Logger log = LoggerFactory.getLogger(LegacyDirectExternalCache.class);
49  
50      private final Cache<String, IdentifiedData> delegate;
51      private final Supplier<RequestContext> contextSupplier;
52      private final ExternalCacheKeyGenerator keyGenerator;
53      private final Optional<MarshallingPair<V>> valueMarshalling;
54      private final LegacyServiceSettings serviceSettings;
55  
56      LegacyDirectExternalCache(
57              Cache<String, IdentifiedData> delegate,
58              Supplier<RequestContext> contextSupplier,
59              ExternalCacheKeyGenerator keyGenerator,
60              Optional<MarshallingPair<V>> valueMarshalling,
61              LegacyServiceSettings serviceSettings) {
62          super(delegate.getName(), serviceSettings.getLockTimeout(), (n, ex) -> {});
63          this.delegate = requireNonNull(delegate);
64          this.contextSupplier = requireNonNull(contextSupplier);
65          this.keyGenerator = requireNonNull(keyGenerator);
66          this.valueMarshalling = requireNonNull(valueMarshalling);
67          this.serviceSettings = requireNonNull(serviceSettings);
68      }
69  
70      @Override
71      public CompletionStage<Optional<V>> get(String internalKey) {
72          return perform(() -> {
73              final String externalKey = buildExternalKey(internalKey);
74              final IdentifiedData identifiedData = delegate.get(externalKey);
75              return unmarshall(identifiedData, valueMarshalling);
76          });
77      }
78  
79      @Override
80      public CompletionStage<V> get(String internalKey, Supplier<V> supplier) {
81          return perform(() -> {
82              final String externalKey = buildExternalKey(internalKey);
83              final IdentifiedData identifiedData =
84                      delegate.get(externalKey, () -> marshall(supplier.get(), valueMarshalling));
85              return unmarshall(identifiedData, valueMarshalling).get();
86          });
87      }
88  
89      @Override
90      public CompletionStage<Optional<IdentifiedValue<V>>> getIdentified(String internalKey) {
91          return perform(() -> {
92              verifyCasOpsSupported();
93              final String externalKey = buildExternalKey(internalKey);
94              return unmarshallIdentified(delegate.get(externalKey), valueMarshalling);
95          });
96      }
97  
98      @Override
99      public CompletionStage<IdentifiedValue<V>> getIdentified(String internalKey, Supplier<V> supplier) {
100         return perform(() -> {
101             verifyCasOpsSupported();
102             final String externalKey = buildExternalKey(internalKey);
103             return unmarshallIdentified(
104                     delegate.get(externalKey, () -> marshall(supplier.get(), valueMarshalling)),
105                     valueMarshalling).get();
106         });
107     }
108 
109     @Override
110     public CompletionStage<Map<String, Optional<V>>> getBulk(Iterable<String> internalKeys) {
111         return perform(() -> {
112             if (isEmpty(internalKeys)) {
113                 return new HashMap<>();
114             }
115 
116             // De-duplicate the keys, calculate the externalKeys and retrieve
117             final AbstractExternalCacheRequestContext cacheContext = ensureCacheContext();
118             return StreamSupport.stream(internalKeys.spliterator(), false)
119                     .distinct()
120                     .collect(Collectors.toMap(
121                             Objects::requireNonNull,
122                             k -> unmarshall(delegate.get(cacheContext.externalEntryKeyFor(k)), valueMarshalling)));
123         });
124     }
125 
126     @Override
127     public CompletionStage<Map<String, V>> getBulk(
128             Function<Set<String>, Map<String, V>> factory, Iterable<String> internalKeys) {
129         return perform(() -> {
130             if (isEmpty(internalKeys)) {
131                 return new HashMap<>();
132             }
133 
134             // De-duplicate the keys and then obtain the existing values
135             final AbstractExternalCacheRequestContext cacheContext = ensureCacheContext();
136             final Map<String, Optional<V>> existingValues = unsafeJoin(getBulk(internalKeys));
137 
138             // Add known values to the grand result
139             final Map<String, V> grandResult = existingValues.entrySet().stream()
140                     .filter(e -> e.getValue().isPresent())
141                     .collect(Collectors.toMap(
142                             Map.Entry::getKey,
143                             e -> e.getValue().get()));
144 
145             // Bail out if we have all the values
146             if (grandResult.size() == existingValues.size()) {
147                 return grandResult;
148             }
149 
150             // Sadly we now need to call the factory to create the missing values and then merge into the grand result.
151             final Set<String> missingInternalKeys = existingValues.entrySet().stream()
152                     .filter(e -> !e.getValue().isPresent())
153                     .map(Map.Entry::getKey)
154                     .collect(Collectors.toSet());
155 
156             final Map<String, V> missingValues = factory.apply(missingInternalKeys);
157             FactoryUtils.verifyFactoryResult(missingValues, missingInternalKeys);
158 
159             missingValues.entrySet().forEach(e -> {
160                 if (serviceSettings.isAvoidCasOps()) {
161                     delegate.put(cacheContext.externalEntryKeyFor(e.getKey()), marshall(e.getValue(), valueMarshalling));
162                     grandResult.put(e.getKey(), e.getValue());
163                 } else {
164                     final Optional<V> existing = unmarshall(
165                             delegate.putIfAbsent(
166                                     cacheContext.externalEntryKeyFor(e.getKey()),
167                                     marshall(e.getValue(), valueMarshalling)),
168                             valueMarshalling);
169                     grandResult.put(e.getKey(), existing.orElse(e.getValue()));
170                 }
171             });
172 
173             return grandResult;
174         });
175     }
176 
177     @Override
178     public CompletionStage<Map<String, Optional<IdentifiedValue<V>>>> getBulkIdentified(Iterable<String> internalKeys) {
179         return perform(() -> {
180             verifyCasOpsSupported();
181 
182             if (isEmpty(internalKeys)) {
183                 return new HashMap<>();
184             }
185 
186             // De-duplicate the keys, calculate the externalKeys and retrieve
187             final AbstractExternalCacheRequestContext cacheContext = ensureCacheContext();
188             return StreamSupport.stream(internalKeys.spliterator(), false)
189                     .distinct()
190                     .collect(Collectors.toMap(
191                             Objects::requireNonNull,
192                             k -> unmarshallIdentified(delegate.get(cacheContext.externalEntryKeyFor(k)), valueMarshalling)));
193         });
194     }
195 
196 
197     @Override
198     public CompletionStage<Boolean> put(String internalKey, V value, PutPolicy policy) {
199         return perform(() -> {
200             final AbstractExternalCacheRequestContext cacheContext = ensureCacheContext();
201             final String externalKey = cacheContext.externalEntryKeyFor(internalKey);
202             final IdentifiedData identifiedData = marshall(value, valueMarshalling);
203 
204             return LegacyUtils.directPut(externalKey, identifiedData, policy, delegate, serviceSettings.isAvoidCasOps());
205         });
206     }
207 
208     @Override
209     public CompletionStage<Boolean> removeIf(String internalKey, CasIdentifier casId) {
210         return perform(() -> {
211             verifyCasOpsSupported();
212             final AbstractExternalCacheRequestContext cacheContext = ensureCacheContext();
213             final String externalKey = cacheContext.externalEntryKeyFor(internalKey);
214             final IdentifiedData existingData = safeCast(casId);
215             return delegate.remove(externalKey, existingData);
216         });
217     }
218 
219     @Override
220     public CompletionStage<Boolean> replaceIf(String internalKey, CasIdentifier casId, V newValue) {
221         return perform(() -> {
222             verifyCasOpsSupported();
223             final AbstractExternalCacheRequestContext cacheContext = ensureCacheContext();
224             final String externalKey = cacheContext.externalEntryKeyFor(internalKey);
225             final IdentifiedData existingData = safeCast(casId);
226             final IdentifiedData newData = marshall(newValue, valueMarshalling);
227             return delegate.replace(externalKey, existingData, newData);
228         });
229     }
230 
231     @Override
232     public CompletionStage<Void> remove(Iterable<String> internalKeys) {
233         return perform(() -> {
234             final AbstractExternalCacheRequestContext cacheContext = ensureCacheContext();
235 
236             StreamSupport.stream(internalKeys.spliterator(), false)
237                     .distinct()
238                     .map(cacheContext::externalEntryKeyFor)
239                     .forEach(delegate::remove);
240             return null;
241         });
242     }
243 
244     @Override
245     public CompletionStage<Void> removeAll() {
246         return perform(() -> {
247             delegate.removeAll();
248             return null;
249         });
250     }
251 
252     protected AbstractExternalCacheRequestContext<V> ensureCacheContext() {
253         final RequestContext requestContext = contextSupplier.get();
254 
255         return requestContext.computeIfAbsent(this, () -> {
256             log.trace("Cache {}: Setting up a new context", delegate.getName());
257             return new UnversionedExternalCacheRequestContext<>(
258                     keyGenerator,
259                     delegate.getName(),
260                     requestContext::partitionIdentifier,
261                     serviceSettings.getLockTimeout());
262         });
263     }
264 
265     @Override
266     protected ExternalCacheException mapException(Exception ex) {
267         return LegacyUtils.mapException(ex);
268     }
269 
270     @Override
271     protected Logger getLogger() {
272         return log;
273     }
274 
275     private String buildExternalKey(String internalKey) {
276         final AbstractExternalCacheRequestContext cacheContext = ensureCacheContext();
277         return cacheContext.externalEntryKeyFor(internalKey);
278     }
279 
280     private void verifyCasOpsSupported() {
281         if (serviceSettings.isAvoidCasOps()) {
282             throw new UnsupportedOperationException("CAS operations not supported in this configuration");
283         }
284     }
285 }