1 package com.atlassian.vcache.internal.core.service;
2
3 import java.util.HashMap;
4 import java.util.Map;
5 import java.util.Optional;
6 import java.util.Set;
7 import java.util.concurrent.CompletableFuture;
8 import java.util.concurrent.ExecutionException;
9 import java.util.function.Function;
10 import java.util.function.Supplier;
11 import java.util.stream.Collectors;
12 import java.util.stream.StreamSupport;
13 import javax.annotation.Nonnull;
14
15 import com.atlassian.vcache.MarshallerException;
16
17 import static com.atlassian.vcache.internal.core.VCacheUtils.isEmpty;
18
19
20
21
22
23
24
25 public abstract class AbstractNonDirectExternalCache<V>
26 extends AbstractExternalCache<V>
27 {
28
29 protected AbstractNonDirectExternalCache(String name)
30 {
31 super(name);
32 }
33
34
35
36
37
38
39
40
41 @Nonnull
42 protected abstract V handleCreation(String internalKey, Supplier<V> supplier)
43 throws MarshallerException, ExecutionException, InterruptedException;
44
45
46
47
48
49
50
51
52 @Nonnull
53 protected abstract Map<String, V> handleCreation(
54 Function<Set<String>, Map<String, V>> factory, Set<String> externalKeys)
55 throws ExecutionException, InterruptedException;
56
57
58
59
60 @Nonnull
61 protected abstract Optional<Optional<V>> checkValueRecorded(String internalKey);
62
63
64
65
66 @Nonnull
67 protected abstract Map<String, Optional<V>> checkValuesRecorded(Iterable<String> internalKeys);
68
69
70
71
72 @Nonnull
73 protected abstract Optional<V> directGet(String externalKey);
74
75
76
77
78 @Nonnull
79 protected abstract Map<String, Optional<V>> directGetBulk(Set<String> externalKeys);
80
81 @Nonnull
82 @Override
83 public final CompletableFuture<Optional<V>> get(String internalKey)
84 {
85 return perform(() -> {
86 final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
87
88 final Optional<Optional<V>> prior = checkValueRecorded(internalKey);
89
90 return prior.orElseGet(() -> {
91 final String externalKey = cacheContext.externalEntryKeyFor(internalKey);
92
93 final Optional<V> result = directGet(externalKey);
94 cacheContext.recordValue(internalKey, result);
95 return result;
96 });
97 });
98 }
99
100 @Nonnull
101 @Override
102 public final CompletableFuture<V> get(String internalKey, Supplier<V> supplier)
103 {
104 return perform(() -> {
105 final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
106
107 final Optional<Optional<V>> prior = checkValueRecorded(internalKey);
108 if (prior.isPresent())
109 {
110 if (prior.get().isPresent())
111 {
112 return prior.get().get();
113 }
114 else
115 {
116 getLogger().trace("Cache {}, creating candidate for key {}", name, internalKey);
117 return handleCreation(internalKey, supplier);
118 }
119 }
120
121
122
123 final String externalKey = cacheContext.externalEntryKeyFor(internalKey);
124 final Optional<V> result = directGet(externalKey);
125 if (result.isPresent())
126 {
127
128 cacheContext.recordValue(internalKey, result);
129 return result.get();
130 }
131
132 getLogger().trace("Cache {}, creating candidate for key {}", name, internalKey);
133 return handleCreation(internalKey, supplier);
134 });
135 }
136
137 @Nonnull
138 @Override
139 public final CompletableFuture<Map<String, Optional<V>>> getBulk(Iterable<String> internalKeys)
140 {
141 return perform(() -> {
142 if (isEmpty(internalKeys))
143 {
144 return new HashMap<>();
145 }
146
147
148 final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
149 final Map<String, Optional<V>> grandResult = checkValuesRecorded(internalKeys);
150
151
152 final Set<String> missingExternalKeys = StreamSupport.stream(internalKeys.spliterator(), false)
153 .filter(k -> !grandResult.containsKey(k))
154 .map(cacheContext::externalEntryKeyFor)
155 .collect(Collectors.toSet());
156
157 if (missingExternalKeys.isEmpty())
158 {
159 getLogger().trace("Cache {}: getBulk(): have all the requested entries cached", name);
160 return grandResult;
161 }
162 getLogger().trace("Cache {}: getBulk(): not cached {} requested entries", name, missingExternalKeys.size());
163
164
165 final Map<String, Optional<V>> candidateValues = directGetBulk(missingExternalKeys);
166
167 return candidateValues.entrySet().stream().collect(
168 () -> grandResult,
169 (m, e) -> {
170 final Optional<V> result = e.getValue();
171 cacheContext.recordValue(cacheContext.internalEntryKeyFor(e.getKey()), result);
172 m.put(cacheContext.internalEntryKeyFor(e.getKey()), result);
173 },
174 Map::putAll
175 );
176 });
177 }
178
179 @Nonnull
180 @Override
181 public final CompletableFuture<Map<String, V>> getBulk(
182 Function<Set<String>, Map<String, V>> factory, Iterable<String> internalKeys)
183 {
184 return perform(() -> {
185 if (isEmpty(internalKeys))
186 {
187 return new HashMap<>();
188 }
189
190
191 final AbstractExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
192 final Map<String, V> grandResult = checkValuesRecorded(internalKeys).entrySet().stream()
193 .filter(e -> e.getValue().isPresent())
194 .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().get()));
195
196
197 final Set<String> candidateMissingExternalKeys = StreamSupport.stream(internalKeys.spliterator(), false)
198 .filter(k -> !grandResult.containsKey(k))
199 .map(cacheContext::externalEntryKeyFor)
200 .collect(Collectors.toSet());
201
202
203 if (candidateMissingExternalKeys.isEmpty())
204 {
205 getLogger().trace("Cache {}: getBulk(Function): had all the requested entries cached", name);
206 return grandResult;
207 }
208 getLogger().trace("Cache {}: getBulk(Function): checking external cache for {} keys",
209 name, candidateMissingExternalKeys.size());
210
211 final Map<String, V> missingValues = handleCreation(factory, candidateMissingExternalKeys);
212 cacheContext.recordValues(missingValues);
213 grandResult.putAll(missingValues);
214
215 return grandResult;
216 });
217 }
218 }