1 package com.atlassian.vcache.internal.core.service;
2
3 import com.atlassian.vcache.ExternalCacheException;
4 import com.atlassian.vcache.PutPolicy;
5 import com.atlassian.vcache.TransactionalExternalCache;
6 import com.atlassian.vcache.internal.MetricLabel;
7 import com.atlassian.vcache.internal.RequestContext;
8 import com.atlassian.vcache.internal.core.TransactionControl;
9 import com.atlassian.vcache.internal.core.metrics.CacheType;
10 import com.atlassian.vcache.internal.core.metrics.MetricsRecorder;
11
12 import java.util.Collections;
13 import java.util.HashMap;
14 import java.util.Map;
15 import java.util.Optional;
16 import java.util.Set;
17 import java.util.concurrent.ExecutionException;
18 import java.util.function.Function;
19 import java.util.function.Supplier;
20 import java.util.stream.Collectors;
21
22 import static java.util.Objects.requireNonNull;
23
24
25
26
27
28
29
30 public abstract class AbstractTransactionalExternalCache<V>
31 extends AbstractNonDirectExternalCache<V>
32 implements TransactionalExternalCache<V>, TransactionControl {
33
34 protected final Supplier<RequestContext> contextSupplier;
35
36 protected AbstractTransactionalExternalCache(String name, Supplier<RequestContext> contextSupplier, MetricsRecorder metricsRecorder) {
37 super(name, metricsRecorder);
38 this.contextSupplier = requireNonNull(contextSupplier);
39 }
40
41 @Override
42 public final void put(String internalKey, V value, PutPolicy policy) {
43 final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
44 cacheContext.recordPut(internalKey, value, policy);
45 }
46
47 @Override
48 public final void remove(Iterable<String> internalKeys) {
49 final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
50 cacheContext.recordRemove(internalKeys);
51 }
52
53 @Override
54 public final void removeAll() {
55 final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
56 cacheContext.recordRemoveAll();
57 }
58
59 @Override
60 public final boolean transactionDiscard() {
61 final RequestContext requestContext = contextSupplier.get();
62 final Optional<AbstractExternalCacheRequestContext<V>> cacheRequestContext = requestContext.get(this);
63
64 if (!cacheRequestContext.isPresent()) {
65
66 return false;
67 }
68
69 final boolean hasPendingOperations = cacheRequestContext.get().hasPendingOperations();
70 cacheRequestContext.get().forgetAll();
71 return hasPendingOperations;
72 }
73
74 protected final Map<String, Optional<V>> checkValuesRecorded(Iterable<String> internalKeys) {
75 final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
76
77 final Map<String, Optional<V>> result = new HashMap<>();
78
79 internalKeys.forEach(k -> {
80 final Optional<Optional<V>> valueRecorded = cacheContext.getValueRecorded(k);
81 if (valueRecorded.isPresent()) {
82 result.put(k, valueRecorded.get());
83 } else if (cacheContext.hasRemoveAll()) {
84 result.put(k, Optional.empty());
85 }
86 });
87
88 return result;
89 }
90
91 @Override
92 protected final V handleCreation(String internalKey, Supplier<V> supplier) throws ExecutionException, InterruptedException {
93 final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
94 final V suppliedValue = requireNonNull(supplier.get());
95 cacheContext.recordPutPolicy(internalKey, suppliedValue, PutPolicy.ADD_ONLY);
96 return suppliedValue;
97 }
98
99 @Override
100 protected final Map<String, V> handleCreation(Function<Set<String>, Map<String, V>> factory, Set<String> externalKeys)
101 throws ExecutionException, InterruptedException {
102
103 final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
104
105
106
107 final Map<String, Optional<V>> candidateValues;
108 if (cacheContext.hasRemoveAll()) {
109 candidateValues = new HashMap<>();
110 } else {
111 metricsRecorder.record(name, CacheType.EXTERNAL, MetricLabel.NUMBER_OF_REMOTE_GET, 1);
112 candidateValues = directGetBulk(externalKeys);
113 }
114
115 final Set<String> missingExternalKeys = cacheContext.hasRemoveAll() ?
116 externalKeys :
117 candidateValues.entrySet().stream()
118 .filter(e -> !e.getValue().isPresent())
119 .map(Map.Entry::getKey)
120 .collect(Collectors.toSet());
121
122
123 final Map<String, V> grandResult = candidateValues.entrySet().stream()
124 .filter(e -> e.getValue().isPresent())
125 .collect(Collectors.toMap(
126 e -> cacheContext.internalEntryKeyFor(e.getKey()),
127 e -> e.getValue().get()));
128
129 if (!missingExternalKeys.isEmpty()) {
130 getLogger().trace("Cache {}: getBulk(Function): calling factory to create {} values",
131 name, missingExternalKeys.size());
132
133 final Set<String> missingInternalKeys = Collections.unmodifiableSet(
134 missingExternalKeys.stream().map(cacheContext::internalEntryKeyFor).collect(Collectors.toSet()));
135 final Map<String, V> missingValues = factory.apply(missingInternalKeys);
136 if (missingInternalKeys.size() != missingValues.size()) {
137 getLogger().warn("Cache {}: getBulk(Function): mismatch on generated values, expected ",
138 name, missingInternalKeys.size() + " but got " + missingValues.size());
139 throw new ExternalCacheException(ExternalCacheException.Reason.FUNCTION_INCORRECT_RESULT);
140 }
141
142
143 missingValues.entrySet().forEach(e -> put(e.getKey(), e.getValue(), PutPolicy.ADD_ONLY));
144
145 grandResult.putAll(missingValues);
146 }
147
148 return grandResult;
149 }
150 }