1 package com.atlassian.vcache.internal.memcached;
2
3 import java.util.HashMap;
4 import java.util.Map;
5 import java.util.Optional;
6 import java.util.Set;
7 import java.util.concurrent.CompletableFuture;
8 import java.util.concurrent.ExecutionException;
9 import java.util.function.Function;
10 import java.util.function.Supplier;
11 import java.util.stream.Collectors;
12 import java.util.stream.StreamSupport;
13 import javax.annotation.Nonnull;
14
15 import com.atlassian.vcache.ExternalCache;
16 import com.atlassian.vcache.ExternalCacheSettings;
17 import com.atlassian.vcache.Marshaller;
18 import com.atlassian.vcache.MarshallerException;
19 import com.atlassian.vcache.internal.RequestContext;
20 import com.atlassian.vcache.internal.core.ExternalCacheKeyGenerator;
21 import com.atlassian.vcache.internal.core.VCacheUtils;
22
23 import net.spy.memcached.MemcachedClientIF;
24 import net.spy.memcached.OperationTimeoutException;
25 import org.slf4j.Logger;
26
27 import static com.atlassian.vcache.internal.NameValidator.requireValidCacheName;
28 import static com.atlassian.vcache.internal.memcached.MemcachedUtils.isEmpty;
29 import static com.atlassian.vcache.internal.memcached.MemcachedUtils.perform;
30 import static com.atlassian.vcache.internal.memcached.MemcachedUtils.unmarshall;
31 import static java.util.Objects.requireNonNull;
32
33 abstract class AbstractStableReadExternalCache<V> implements ExternalCache<V>
34 {
35 protected final Supplier<MemcachedClientIF> clientSupplier;
36 protected final Supplier<RequestContext> contextSupplier;
37 protected final ExternalCacheKeyGenerator keyGenerator;
38 protected final String name;
39 protected final Marshaller<V> valueMarshaller;
40 protected final ExternalCacheSettings settings;
41 protected final int defaultTtl;
42
43 AbstractStableReadExternalCache(
44 Supplier<MemcachedClientIF> clientSupplier,
45 Supplier<RequestContext> contextSupplier,
46 ExternalCacheKeyGenerator keyGenerator,
47 String name,
48 Marshaller<V> valueMarshaller,
49 ExternalCacheSettings settings)
50 {
51 this.clientSupplier = requireNonNull(clientSupplier);
52 this.contextSupplier = requireNonNull(contextSupplier);
53 this.keyGenerator = requireNonNull(keyGenerator);
54 this.name = requireValidCacheName(name);
55 this.valueMarshaller = requireNonNull(valueMarshaller);
56 this.settings = requireNonNull(settings);
57 this.defaultTtl = VCacheUtils.roundUpToSeconds(settings.getDefaultTtl().get());
58 }
59
60
61
62
63
64
65 @Nonnull
66 abstract StableReadRequestContext<V> ensureCacheContext() throws OperationTimeoutException;
67
68
69
70
71
72 @Nonnull
73 abstract Logger getLogger();
74
75
76
77
78
79
80
81
82 @Nonnull
83 abstract V handleCreation(String internalKey, Supplier<V> supplier)
84 throws MarshallerException, ExecutionException, InterruptedException;
85
86
87
88
89
90
91
92
93 @Nonnull
94 abstract Map<String, V> handleCreation(
95 Function<Set<String>, Map<String, V>> factory, Set<String> externalKeys)
96 throws ExecutionException, InterruptedException;
97
98
99
100
101 @Nonnull
102 abstract Optional<Optional<V>> checkValueRecorded(String internalKey);
103
104
105
106
107 @Nonnull
108 abstract Map<String, Optional<V>> checkValuesRecorded(Iterable<String> internalKeys);
109
110 @Nonnull
111 @Override
112 public final String getName()
113 {
114 return name;
115 }
116
117 @Nonnull
118 @Override
119 public final CompletableFuture<Optional<V>> get(String internalKey)
120 {
121 return perform(() -> {
122 final StableReadRequestContext<V> cacheContext = ensureCacheContext();
123
124 final Optional<Optional<V>> prior = checkValueRecorded(internalKey);
125
126 return prior.orElseGet(() -> {
127 final String externalKey = cacheContext.externalEntryKeyFor(internalKey);
128
129 final Optional<V> result = unmarshall(clientSupplier.get().get(externalKey), valueMarshaller);
130 cacheContext.recordValue(internalKey, result);
131 return result;
132 });
133 });
134 }
135
136 @Nonnull
137 @Override
138 public final CompletableFuture<V> get(String internalKey, Supplier<V> supplier)
139 {
140 return perform(() -> {
141 final StableReadRequestContext<V> cacheContext = ensureCacheContext();
142
143 final Optional<Optional<V>> prior = checkValueRecorded(internalKey);
144 if (prior.isPresent() && prior.get().isPresent())
145 {
146 return prior.get().get();
147 }
148
149 if (prior.isPresent())
150 {
151 if (prior.get().isPresent())
152 {
153 return prior.get().get();
154 }
155 else
156 {
157 getLogger().trace("Cache {}, creating candidate for key {}", name, internalKey);
158 return handleCreation(internalKey, supplier);
159 }
160 }
161
162
163
164 final String externalKey = cacheContext.externalEntryKeyFor(internalKey);
165 final Optional<V> result = unmarshall(clientSupplier.get().get(externalKey), valueMarshaller);
166 if (result.isPresent())
167 {
168
169 cacheContext.recordValue(internalKey, result);
170 return result.get();
171 }
172
173 getLogger().trace("Cache {}, creating candidate for key {}", name, internalKey);
174 return handleCreation(internalKey, supplier);
175 });
176 }
177
178 @Nonnull
179 @Override
180 public final CompletableFuture<Map<String, Optional<V>>> getBulk(Iterable<String> internalKeys)
181 {
182 return perform(() -> {
183 if (isEmpty(internalKeys))
184 {
185 return new HashMap<>();
186 }
187
188
189 final StableReadRequestContext<V> cacheContext = ensureCacheContext();
190 final Map<String, Optional<V>> grandResult = checkValuesRecorded(internalKeys);
191
192
193 final Set<String> missingExternalKeys = StreamSupport.stream(internalKeys.spliterator(), false)
194 .filter(k -> !grandResult.containsKey(k))
195 .map(cacheContext::externalEntryKeyFor)
196 .collect(Collectors.toSet());
197
198 if (missingExternalKeys.isEmpty())
199 {
200 getLogger().trace("Cache {}: getBulk(): have all the requested entries cached", name);
201 return grandResult;
202 }
203 getLogger().trace("Cache {}: getBulk(): not cached {} requested entries", name, missingExternalKeys.size());
204
205
206
207 final Map<String, Object> haveValues = clientSupplier.get().getBulk(missingExternalKeys);
208
209 return missingExternalKeys.stream().collect(
210 () -> grandResult,
211 (m, k) -> {
212 final Optional<V> result = unmarshall(haveValues.get(k), valueMarshaller);
213 cacheContext.recordValue(cacheContext.internalEntryKeyFor(k), result);
214 m.put(cacheContext.internalEntryKeyFor(k), result);
215 },
216 Map::putAll
217 );
218 });
219 }
220
221 @Nonnull
222 @Override
223 public final CompletableFuture<Map<String, V>> getBulk(
224 Function<Set<String>, Map<String, V>> factory, Iterable<String> internalKeys)
225 {
226 return perform(() -> {
227 if (isEmpty(internalKeys))
228 {
229 return new HashMap<>();
230 }
231
232
233 final StableReadRequestContext<V> cacheContext = ensureCacheContext();
234 final Map<String, V> grandResult = checkValuesRecorded(internalKeys).entrySet().stream()
235 .filter(e -> e.getValue().isPresent())
236 .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().get()));
237
238
239 final Set<String> candidateMissingExternalKeys = StreamSupport.stream(internalKeys.spliterator(), false)
240 .filter(k -> !grandResult.containsKey(k))
241 .map(cacheContext::externalEntryKeyFor)
242 .collect(Collectors.toSet());
243
244
245 if (candidateMissingExternalKeys.isEmpty())
246 {
247 getLogger().trace("Cache {}: getBulk(Function): had all the requested entries cached", name);
248 return grandResult;
249 }
250 getLogger().trace("Cache {}: getBulk(Function): checking external cache for {} keys",
251 name, candidateMissingExternalKeys.size());
252
253 final Map<String, V> missingValues = handleCreation(factory, candidateMissingExternalKeys);
254 cacheContext.recordValues(missingValues);
255 grandResult.putAll(missingValues);
256
257 return grandResult;
258 });
259 }
260 }