View Javadoc

1   package com.atlassian.vcache.internal.core.service;
2   
3   import com.atlassian.vcache.ExternalCacheException;
4   import com.atlassian.vcache.PutPolicy;
5   import com.atlassian.vcache.TransactionalExternalCache;
6   import com.atlassian.vcache.internal.MetricLabel;
7   import com.atlassian.vcache.internal.RequestContext;
8   import com.atlassian.vcache.internal.core.TransactionControl;
9   import com.atlassian.vcache.internal.core.metrics.CacheType;
10  import com.atlassian.vcache.internal.core.metrics.MetricsRecorder;
11  
12  import java.util.Collections;
13  import java.util.HashMap;
14  import java.util.Map;
15  import java.util.Optional;
16  import java.util.Set;
17  import java.util.concurrent.ExecutionException;
18  import java.util.function.Function;
19  import java.util.function.Supplier;
20  import java.util.stream.Collectors;
21  
22  import static java.util.Objects.requireNonNull;
23  
24  /**
25   * Provides operations common for {@link com.atlassian.vcache.TransactionalExternalCache} instances.
26   *
27   * @param <V> the value type
28   * @since 1.0.0
29   */
30  public abstract class AbstractTransactionalExternalCache<V>
31          extends AbstractNonDirectExternalCache<V>
32          implements TransactionalExternalCache<V>, TransactionControl {
33  
34      protected final Supplier<RequestContext> contextSupplier;
35  
36      protected AbstractTransactionalExternalCache(String name, Supplier<RequestContext> contextSupplier, MetricsRecorder metricsRecorder) {
37          super(name, metricsRecorder);
38          this.contextSupplier = requireNonNull(contextSupplier);
39      }
40  
41      @Override
42      public final void put(String internalKey, V value, PutPolicy policy) {
43          final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
44          cacheContext.recordPut(internalKey, value, policy);
45      }
46  
47      @Override
48      public final void remove(Iterable<String> internalKeys) {
49          final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
50          cacheContext.recordRemove(internalKeys);
51      }
52  
53      @Override
54      public final void removeAll() {
55          final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
56          cacheContext.recordRemoveAll();
57      }
58  
59      @Override
60      public final boolean transactionDiscard() {
61          final RequestContext requestContext = contextSupplier.get();
62          final Optional<AbstractExternalCacheRequestContext<V>> cacheRequestContext = requestContext.get(this);
63  
64          if (!cacheRequestContext.isPresent()) {
65              // there are no pending operations
66              return false;
67          }
68  
69          final boolean hasPendingOperations = cacheRequestContext.get().hasPendingOperations();
70          cacheRequestContext.get().forgetAll();
71          return hasPendingOperations;
72      }
73  
74      protected final Map<String, Optional<V>> checkValuesRecorded(Iterable<String> internalKeys) {
75          final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
76  
77          final Map<String, Optional<V>> result = new HashMap<>();
78  
79          internalKeys.forEach(k -> {
80              final Optional<Optional<V>> valueRecorded = cacheContext.getValueRecorded(k);
81              if (valueRecorded.isPresent()) {
82                  result.put(k, valueRecorded.get());
83              } else if (cacheContext.hasRemoveAll()) {
84                  result.put(k, Optional.empty());
85              }
86          });
87  
88          return result;
89      }
90  
91      @Override
92      protected final V handleCreation(String internalKey, Supplier<V> supplier) throws ExecutionException, InterruptedException {
93          final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
94          final V suppliedValue = requireNonNull(supplier.get());
95          cacheContext.recordPutPolicy(internalKey, suppliedValue, PutPolicy.ADD_ONLY);
96          return suppliedValue;
97      }
98  
99      @Override
100     protected final Map<String, V> handleCreation(Function<Set<String>, Map<String, V>> factory, Set<String> externalKeys)
101             throws ExecutionException, InterruptedException {
102         // Get the missing values from the external cache.
103         final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
104 
105         // Need to handle if removeAll has been performed, and hence not check remotely. Otherwise,
106         // need to check remotely.
107         final Map<String, Optional<V>> candidateValues;
108         if (cacheContext.hasRemoveAll()) {
109             candidateValues = new HashMap<>();
110         } else {
111             metricsRecorder.record(name, CacheType.EXTERNAL, MetricLabel.NUMBER_OF_REMOTE_GET, 1);
112             candidateValues = directGetBulk(externalKeys);
113         }
114 
115         final Set<String> missingExternalKeys = cacheContext.hasRemoveAll() ?
116                 externalKeys :
117                 candidateValues.entrySet().stream()
118                         .filter(e -> !e.getValue().isPresent())
119                         .map(Map.Entry::getKey)
120                         .collect(Collectors.toSet());
121 
122         // Add the retrieved values to the grand result
123         final Map<String, V> grandResult = candidateValues.entrySet().stream()
124                 .filter(e -> e.getValue().isPresent())
125                 .collect(Collectors.toMap(
126                         e -> cacheContext.internalEntryKeyFor(e.getKey()),
127                         e -> e.getValue().get()));
128 
129         if (!missingExternalKeys.isEmpty()) {
130             getLogger().trace("Cache {}: getBulk(Function): calling factory to create {} values",
131                     name, missingExternalKeys.size());
132             // Okay, need to get the missing values and mapping from externalKeys to internalKeys
133             final Set<String> missingInternalKeys = Collections.unmodifiableSet(
134                     missingExternalKeys.stream().map(cacheContext::internalEntryKeyFor).collect(Collectors.toSet()));
135             final Map<String, V> missingValues = factory.apply(missingInternalKeys);
136             if (missingInternalKeys.size() != missingValues.size()) {
137                 getLogger().warn("Cache {}: getBulk(Function): mismatch on generated values, expected ",
138                         name, missingInternalKeys.size() + " but got " + missingValues.size());
139                 throw new ExternalCacheException(ExternalCacheException.Reason.FUNCTION_INCORRECT_RESULT);
140             }
141 
142             // Okay, got the missing values, now need to record adding them
143             missingValues.entrySet().forEach(e -> put(e.getKey(), e.getValue(), PutPolicy.ADD_ONLY));
144 
145             grandResult.putAll(missingValues);
146         }
147 
148         return grandResult;
149     }
150 }