1 package com.atlassian.vcache.internal.test;
2
3 import com.atlassian.utt.concurrency.Barrier;
4 import com.atlassian.vcache.ChangeRate;
5 import com.atlassian.vcache.DirectExternalCache;
6 import com.atlassian.vcache.ExternalCacheSettings;
7 import com.atlassian.vcache.ExternalCacheSettingsBuilder;
8 import com.atlassian.vcache.PutPolicy;
9 import com.atlassian.vcache.StableReadExternalCache;
10 import com.atlassian.vcache.internal.LongMetric;
11 import com.atlassian.vcache.internal.MetricLabel;
12 import com.atlassian.vcache.internal.RequestMetrics;
13 import com.google.common.base.Throwables;
14 import com.google.common.collect.ImmutableMap;
15 import com.google.common.collect.Maps;
16 import org.hamcrest.Matchers;
17 import org.junit.Before;
18 import org.junit.Rule;
19 import org.junit.Test;
20 import org.slf4j.Logger;
21 import org.slf4j.LoggerFactory;
22
23 import java.time.Duration;
24 import java.util.EnumMap;
25 import java.util.Map;
26 import java.util.Optional;
27 import java.util.concurrent.CompletableFuture;
28 import java.util.concurrent.CompletionStage;
29 import java.util.concurrent.ExecutionException;
30 import java.util.stream.Collectors;
31 import java.util.stream.IntStream;
32
33 import static com.atlassian.vcache.VCacheUtils.unsafeJoin;
34 import static com.atlassian.vcache.internal.test.CacheMetricsMatcher.hasMetric;
35 import static com.atlassian.vcache.internal.test.CacheMetricsMatcher.hasSize;
36 import static com.atlassian.vcache.internal.test.CompletionStageSuccessful.successful;
37 import static com.atlassian.vcache.internal.test.CompletionStageSuccessful.successfulWith;
38 import static com.atlassian.vcache.internal.test.TestUtils.runAndWaitForStart;
39 import static org.hamcrest.Matchers.containsInAnyOrder;
40 import static org.hamcrest.Matchers.greaterThan;
41 import static org.hamcrest.Matchers.is;
42 import static org.hamcrest.Matchers.not;
43 import static org.junit.Assert.assertThat;
44
45
46
47
48 public abstract class AbstractStableReadExternalCacheIT {
49 private static final Logger log = LoggerFactory.getLogger(AbstractStableReadExternalCacheIT.class);
50 private static final String CACHE_NAME = "kids_hobbies";
51
52 @Rule
53 public LoggingTestWatcher watcher = new LoggingTestWatcher(log);
54
55 private StableReadExternalCache<String> cache;
56 private DirectExternalCache<String> directCache;
57
58 protected abstract DirectExternalCache<String> obtainDirectCache(String name, ExternalCacheSettings settings);
59
60 protected abstract StableReadExternalCache<String> createCache(String name, ExternalCacheSettings settings, Duration lockTimeout);
61
62 protected StableReadExternalCache<String> createCache(String name, ExternalCacheSettings settings) {
63 return createCache(name, settings, Duration.ofSeconds(2));
64 }
65
66 protected abstract RequestMetrics requestMetrics();
67
68 @Before
69 public void ensureCache() {
70 final ExternalCacheSettings settings = new ExternalCacheSettingsBuilder()
71 .entryGrowthRateHint(ChangeRate.LOW_CHANGE)
72 .entryCountHint(5)
73 .defaultTtl(Duration.ofMinutes(5))
74 .dataChangeRateHint(ChangeRate.HIGH_CHANGE)
75 .build();
76 cache = createCache(CACHE_NAME, settings);
77
78 directCache = obtainDirectCache(CACHE_NAME, settings);
79
80
81 final CompletionStage<Void> rm = directCache.removeAll();
82 assertThat(rm, successful());
83 }
84
85 @Test
86 public void single_cache_get_set() throws ExecutionException, InterruptedException {
87 final CompletionStage<Optional<String>> eldestGet = cache.get("claira");
88
89 assertThat(eldestGet, successfulWith(is(Optional.empty())));
90
91 final CompletionStage<Boolean> eldestAdd = cache.put("claira", "dancing", PutPolicy.ADD_ONLY);
92
93 assertThat(eldestAdd, successfulWith(is(true)));
94
95 final CompletionStage<Optional<String>> eldestGet2 = cache.get("claira");
96
97 assertThat(eldestGet2, successfulWith(is(Optional.of("dancing"))));
98
99 final EnumMap<MetricLabel, ? extends LongMetric> cacheMetrics =
100 requestMetrics().allExternalCacheLongMetrics().get(CACHE_NAME);
101 assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_MISSES, is(1L), is(1L)));
102 assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_HITS, is(1L), is(1L)));
103 assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_REMOTE_GET, is(1L), is(1L)));
104 assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_GET_CALL, is(2L), greaterThan(0L)));
105 assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_PUT_CALL, is(1L), greaterThan(0L)));
106 assertThat(cacheMetrics, hasSize(is(5)));
107 }
108
109 @Test
110 public void dual_cache_get_set() throws ExecutionException, InterruptedException {
111 final CompletionStage<Optional<String>> get1 = cache.get("claira");
112
113 assertThat(get1, successfulWith(is(Optional.empty())));
114
115
116 final CompletionStage<Boolean> dput1 = directCache.put("claira", "dancing", PutPolicy.PUT_ALWAYS);
117
118 assertThat(dput1, successfulWith(is(true)));
119
120 final CompletionStage<Optional<String>> get2 = cache.get("claira");
121
122 assertThat(get2, successfulWith(is(Optional.empty())));
123
124
125
126 final CompletionStage<Boolean> put1 = cache.put("claira", "singing", PutPolicy.ADD_ONLY);
127
128 assertThat(put1, successfulWith(is(false)));
129
130 final CompletionStage<Optional<String>> get3 = cache.get("claira");
131
132 assertThat(get3, successfulWith(is(Optional.of("dancing"))));
133
134 final EnumMap<MetricLabel, ? extends LongMetric> cacheMetrics =
135 requestMetrics().allExternalCacheLongMetrics().get(CACHE_NAME);
136 assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_GET_CALL, is(3L), greaterThan(0L)));
137 assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_PUT_CALL, is(1L), greaterThan(0L)));
138 assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_HITS, is(1L), is(1L)));
139 assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_MISSES, is(2L), is(2L)));
140 assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_REMOTE_GET, is(2L), is(2L)));
141 assertThat(cacheMetrics, hasSize(is(5)));
142 }
143
144 @Test
145 public void dual_cache_get_with_supplier() throws ExecutionException, InterruptedException {
146 final CompletionStage<Optional<String>> get1 = cache.get("claira");
147
148 assertThat(get1, successfulWith(is(Optional.empty())));
149
150 final CompletionStage<String> get2 = cache.get("claira", () -> "dancing");
151
152 assertThat(get2, successfulWith(is("dancing")));
153
154 final CompletionStage<Optional<String>> get3 = cache.get("claira");
155
156 assertThat(get3, successfulWith(is(Optional.of("dancing"))));
157
158
159 final CompletionStage<Boolean> dput1 = directCache.put("claira", "singing", PutPolicy.PUT_ALWAYS);
160 assertThat(dput1, successfulWith(is(true)));
161
162 final CompletionStage<Optional<String>> get4 = cache.get("claira");
163
164 assertThat(get4, successfulWith(is(Optional.of("dancing"))));
165
166 final CompletionStage<String> get5 = cache.get("claira", () -> "riding");
167
168 assertThat(get5, successfulWith(is("dancing")));
169
170 final EnumMap<MetricLabel, ? extends LongMetric> cacheMetrics =
171 requestMetrics().allExternalCacheLongMetrics().get(CACHE_NAME);
172 assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_GET_CALL, is(5L), greaterThan(0L)));
173 assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_SUPPLIER_CALL, is(1L), greaterThan(0L)));
174 assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_HITS, is(3L), is(3L)));
175 assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_MISSES, is(2L), is(2L)));
176 assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_REMOTE_GET, is(2L), is(2L)));
177 assertThat(cacheMetrics, hasSize(is(5)));
178 }
179
180 @Test
181 public void dual_cache_get_with_supplier_take2() throws ExecutionException, InterruptedException {
182 final CompletionStage<Optional<String>> get1 = cache.get("claira");
183
184 assertThat(get1, successfulWith(is(Optional.empty())));
185
186 final CompletionStage<Boolean> dput1 = directCache.put("claira", "singing", PutPolicy.PUT_ALWAYS);
187 assertThat(dput1, successfulWith(is(true)));
188
189 final CompletionStage<String> get2 = cache.get("claira", () -> "riding");
190
191 assertThat(get2, successfulWith(is("singing")));
192
193 final EnumMap<MetricLabel, ? extends LongMetric> cacheMetrics =
194 requestMetrics().allExternalCacheLongMetrics().get(CACHE_NAME);
195 assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_GET_CALL, is(2L), greaterThan(0L)));
196 assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_SUPPLIER_CALL, is(1L), greaterThan(0L)));
197 assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_MISSES, is(2L), is(2L)));
198 assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_REMOTE_GET, is(3L), is(3L)));
199 assertThat(cacheMetrics, hasSize(is(4)));
200 }
201
202 @Test
203 public void dual_cache_getBulk() throws ExecutionException, InterruptedException {
204 final CompletionStage<Boolean> dput1 = directCache.put("claira", "singing", PutPolicy.PUT_ALWAYS);
205 assertThat(dput1, successfulWith(is(true)));
206
207 final CompletionStage<Map<String, Optional<String>>> get1 = cache.getBulk("claira", "josephine", "claira");
208
209 assertThat(get1, successful());
210 final Map<String, Optional<String>> map1 = unsafeJoin(get1);
211 assertThat(map1.keySet(), Matchers.containsInAnyOrder("claira", "josephine"));
212 assertThat(map1.get("claira"), is(Optional.of("singing")));
213 assertThat(map1.get("josephine"), is(Optional.empty()));
214
215 final CompletionStage<Boolean> dput2 = directCache.put("josephine", "football", PutPolicy.PUT_ALWAYS);
216 assertThat(dput2, successfulWith(is(true)));
217
218 final CompletionStage<Boolean> dput3 = directCache.put("jasmin", "skiing", PutPolicy.PUT_ALWAYS);
219 assertThat(dput3, successfulWith(is(true)));
220
221 final CompletionStage<Map<String, Optional<String>>> get2 = cache.getBulk("claira", "josephine", "jasmin");
222
223 assertThat(get2, successful());
224 final Map<String, Optional<String>> map2 = unsafeJoin(get2);
225 assertThat(map2.keySet(), Matchers.containsInAnyOrder("claira", "josephine", "jasmin"));
226 assertThat(map2.get("claira"), is(Optional.of("singing")));
227 assertThat(map2.get("josephine"), is(Optional.empty()));
228 assertThat(map2.get("jasmin"), is(Optional.of("skiing")));
229
230 final EnumMap<MetricLabel, ? extends LongMetric> cacheMetrics =
231 requestMetrics().allExternalCacheLongMetrics().get(CACHE_NAME);
232 assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_GET_CALL, is(2L), greaterThan(0L)));
233 assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_HITS, is(2L), is(3L)));
234 assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_MISSES, is(2L), is(2L)));
235 assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_REMOTE_GET, is(2L), is(2L)));
236 assertThat(cacheMetrics, hasSize(is(4)));
237 }
238
239 @Test
240 public void remove_normal() throws ExecutionException, InterruptedException {
241 final CompletionStage<Boolean> dput1 = directCache.put("claira", "singing", PutPolicy.PUT_ALWAYS);
242 assertThat(dput1, successfulWith(is(true)));
243
244 final CompletionStage<Boolean> put1 = cache.put("josephine", "football", PutPolicy.PUT_ALWAYS);
245 assertThat(put1, successfulWith(is(true)));
246
247 final CompletionStage<Map<String, Optional<String>>> get1 = cache.getBulk("claira", "josephine", "jasmin");
248
249 assertThat(get1, successful());
250 final Map<String, Optional<String>> map1 = unsafeJoin(get1);
251 assertThat(map1.keySet(), Matchers.containsInAnyOrder("claira", "josephine", "jasmin"));
252 assertThat(map1.get("claira"), is(Optional.of("singing")));
253 assertThat(map1.get("josephine"), is(Optional.of("football")));
254 assertThat(map1.get("jasmin"), is(Optional.empty()));
255
256 final CompletionStage<Void> rm1 = cache.remove("jasmin", "claira", "claira", "josephine");
257
258 assertThat(rm1, successful());
259
260 final CompletionStage<Map<String, Optional<String>>> dget1 = directCache.getBulk("claira", "josephine", "jasmin");
261
262 assertThat(dget1, successful());
263 final Map<String, Optional<String>> dmap1 = unsafeJoin(dget1);
264 assertThat(dmap1.keySet(), Matchers.containsInAnyOrder("claira", "josephine", "jasmin"));
265 assertThat(dmap1.get("claira"), is(Optional.empty()));
266 assertThat(dmap1.get("josephine"), is(Optional.empty()));
267 assertThat(dmap1.get("jasmin"), is(Optional.empty()));
268
269 final CompletionStage<Map<String, Optional<String>>> get2 = cache.getBulk("claira", "josephine", "jasmin");
270
271 assertThat(get2, successful());
272 final Map<String, Optional<String>> map2 = unsafeJoin(get2);
273 assertThat(map2.keySet(), Matchers.containsInAnyOrder("claira", "josephine", "jasmin"));
274 assertThat(map2.get("claira"), is(Optional.empty()));
275 assertThat(map2.get("josephine"), is(Optional.empty()));
276 assertThat(map2.get("jasmin"), is(Optional.empty()));
277
278 final EnumMap<MetricLabel, ? extends LongMetric> cacheMetrics =
279 requestMetrics().allExternalCacheLongMetrics().get(CACHE_NAME);
280 assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_GET_CALL, is(2L), greaterThan(0L)));
281 assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_PUT_CALL, is(1L), greaterThan(0L)));
282 assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_REMOVE_CALL, is(1L), greaterThan(0L)));
283 assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_HITS, is(1L), is(2L)));
284 assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_MISSES, is(2L), is(4L)));
285 assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_REMOTE_GET, is(1L), is(1L)));
286 assertThat(cacheMetrics, hasSize(is(6)));
287 }
288
289 @Test
290 public void simple_removeAll() throws ExecutionException, InterruptedException {
291 final CompletionStage<Boolean> put1 = cache.put("claira", "dancing", PutPolicy.PUT_ALWAYS);
292
293 assertThat(put1, successfulWith(is(true)));
294
295 final CompletionStage<Optional<String>> get1 = cache.get("claira");
296
297 assertThat(get1, successfulWith(is(Optional.of("dancing"))));
298
299 final CompletionStage<Void> rm1 = cache.removeAll();
300
301 assertThat(rm1, successful());
302
303 final CompletionStage<Optional<String>> get2 = cache.get("claira");
304
305 assertThat(get2, successfulWith(is(Optional.empty())));
306
307 final EnumMap<MetricLabel, ? extends LongMetric> cacheMetrics =
308 requestMetrics().allExternalCacheLongMetrics().get(CACHE_NAME);
309 assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_GET_CALL, is(2L), greaterThan(0L)));
310 assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_PUT_CALL, is(1L), greaterThan(0L)));
311 assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_REMOVE_ALL_CALL, is(1L), greaterThan(0L)));
312 assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_HITS, is(1L), is(1L)));
313 assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_MISSES, is(1L), is(1L)));
314 assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_REMOTE_GET, is(1L), is(1L)));
315 assertThat(cacheMetrics, hasSize(is(6)));
316 }
317
318 @Test
319 public void simple_getBulk_function() throws ExecutionException, InterruptedException {
320 final CompletionStage<Map<String, String>> get1 = cache.getBulk(
321 keys -> Maps.asMap(keys, k -> k + "-1"), "claira");
322
323 assertThat(get1, successful());
324 assertThat(unsafeJoin(get1).keySet(), containsInAnyOrder("claira"));
325 assertThat(unsafeJoin(get1).values(), containsInAnyOrder("claira-1"));
326
327 final CompletionStage<Map<String, String>> get2 = cache.getBulk(
328 keys -> Maps.asMap(keys, k -> k + "-2"), "claira", "josephine", "claira");
329
330 assertThat(get2, successful());
331 assertThat(unsafeJoin(get2).keySet(), containsInAnyOrder("claira", "josephine"));
332 assertThat(unsafeJoin(get2).values(), containsInAnyOrder("claira-1", "josephine-2"));
333
334 final EnumMap<MetricLabel, ? extends LongMetric> cacheMetrics =
335 requestMetrics().allExternalCacheLongMetrics().get(CACHE_NAME);
336 assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_FACTORY_CALL, is(2L), greaterThan(0L)));
337 assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_GET_CALL, is(2L), greaterThan(0L)));
338 assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_FACTORY_KEYS, is(2L), is(2L)));
339 assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_HITS, is(1L), is(1L)));
340 assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_MISSES, is(2L), is(2L)));
341 assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_REMOTE_GET, is(4L), is(4L)));
342 assertThat(cacheMetrics, hasSize(is(6)));
343 }
344
345 @Test
346 public void dual_getBulk_function() throws ExecutionException, InterruptedException {
347 final CompletionStage<Map<String, String>> get1 = cache.getBulk(
348 keys -> Maps.asMap(keys, k -> k + "-1"), "claira");
349
350 assertThat(get1, successful());
351 assertThat(unsafeJoin(get1).keySet(), containsInAnyOrder("claira"));
352 assertThat(unsafeJoin(get1).values(), containsInAnyOrder("claira-1"));
353
354 final CompletionStage<Boolean> mput1 = directCache.put("claira", "singing", PutPolicy.PUT_ALWAYS);
355 assertThat(mput1, successfulWith(is(true)));
356
357 final CompletionStage<Map<String, String>> get2 = cache.getBulk(
358 keys -> Maps.asMap(keys, k -> k + "-2"), "claira", "josephine", "claira");
359
360 assertThat(get2, successful());
361 assertThat(unsafeJoin(get2).keySet(), containsInAnyOrder("claira", "josephine"));
362 assertThat(unsafeJoin(get2).values(), containsInAnyOrder("claira-1", "josephine-2"));
363
364 final CompletionStage<Void> rm1 = cache.remove("claira");
365
366 assertThat(rm1, successful());
367
368 final CompletionStage<Map<String, String>> get3 = cache.getBulk(
369 keys -> Maps.asMap(keys, k -> k + "-3"), "claira", "josephine", "jasmin");
370
371 assertThat(get3, successful());
372 assertThat(unsafeJoin(get3).keySet(), containsInAnyOrder("claira", "josephine", "jasmin"));
373 assertThat(unsafeJoin(get3).values(), containsInAnyOrder("claira-3", "josephine-2", "jasmin-3"));
374
375 final EnumMap<MetricLabel, ? extends LongMetric> cacheMetrics =
376 requestMetrics().allExternalCacheLongMetrics().get(CACHE_NAME);
377 assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_FACTORY_CALL, is(3L), greaterThan(0L)));
378 assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_GET_CALL, is(3L), greaterThan(0L)));
379 assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_REMOVE_CALL, is(1L), greaterThan(0L)));
380 assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_FACTORY_KEYS, is(3L), is(4L)));
381 assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_HITS, is(2L), is(2L)));
382 assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_MISSES, is(3L), is(4L)));
383 assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_REMOTE_GET, is(7L), is(7L)));
384 assertThat(cacheMetrics, hasSize(is(7)));
385 }
386
387 @SuppressWarnings("ConstantConditions")
388 @Test
389 public void check_null_detection() {
390 assertThat(cache.get("kenny", () -> null), not(successful()));
391 assertThat(cache.put("key", null, PutPolicy.ADD_ONLY), not(successful()));
392 assertThat(cache.getBulk(
393 strings -> strings.stream().collect(Collectors.toMap(k -> k, k -> null)),
394 "extra"),
395 not(successful()));
396
397 final EnumMap<MetricLabel, ? extends LongMetric> cacheMetrics =
398 requestMetrics().allExternalCacheLongMetrics().get(CACHE_NAME);
399 assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_FACTORY_CALL, is(1L), greaterThan(0L)));
400 assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_GET_CALL, is(2L), greaterThan(0L)));
401 assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_PUT_CALL, is(1L), greaterThan(0L)));
402 assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_SUPPLIER_CALL, is(1L), greaterThan(0L)));
403 assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_FACTORY_KEYS, is(1L), is(1L)));
404 assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_FAILED_GET, is(2L), is(2L)));
405 assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_FAILED_PUT, is(1L), is(1L)));
406 assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_REMOTE_GET, is(2L), is(2L)));
407 assertThat(cacheMetrics, hasSize(is(8)));
408 }
409
410 @Test
411 public void both_getBulk_and_get_fail() throws InterruptedException {
412
413
414
415
416
417
418 final Barrier blockedInFactory = new Barrier();
419 final Barrier resumeInFactory = new Barrier();
420
421 final RuntimeException exBulk = new RuntimeException("getBulk() exception");
422 final CompletableFuture<Map<String, String>> t1Result =
423 runAndWaitForStart(() -> cache.getBulk(keys -> {
424 log.info("{}: about to wait", Thread.currentThread().getName());
425 blockedInFactory.signal();
426 resumeInFactory.await();
427 log.info("{}: About to throw exception", Thread.currentThread().getName());
428 throw exBulk;
429 }, "A", "B", "C").toCompletableFuture().join());
430
431 final RuntimeException exSingle = new RuntimeException("get() exception");
432 blockedInFactory.await();
433 final CompletableFuture<String> t2Result =
434 runAndWaitForStart(() -> cache.get("B", () -> {
435 log.info("{}: About to throw exception", Thread.currentThread().getName());
436 throw exSingle;
437 }).toCompletableFuture().join());
438
439 log.info("{}: About to signal on barrierBulk", Thread.currentThread().getName());
440 resumeInFactory.signal();
441
442 assertThat(t1Result.handle((r, e) -> Throwables.getRootCause(e)).join(), is(exBulk));
443 assertThat(t2Result.handle((r, e) -> Throwables.getRootCause(e)).join(), is(exSingle));
444 assertThat(cache.get("A").toCompletableFuture().join(), is(Optional.empty()));
445 assertThat(cache.get("B").toCompletableFuture().join(), is(Optional.empty()));
446 assertThat(cache.get("C").toCompletableFuture().join(), is(Optional.empty()));
447 }
448
449 @Test
450 public void both_get_and_getBulk_fail() throws InterruptedException {
451
452
453
454
455
456
457 final Barrier blockedInFactory = new Barrier();
458 final Barrier resumeInFactory = new Barrier();
459
460 final RuntimeException exSingle = new RuntimeException("get() exception");
461 final CompletableFuture<String> t1Result =
462 runAndWaitForStart(() -> cache.get("B", () -> {
463 log.info("{}: About to throw exception", Thread.currentThread().getName());
464 blockedInFactory.signal();
465 resumeInFactory.await();
466 throw exSingle;
467 }).toCompletableFuture().join());
468
469 final RuntimeException exBulk = new RuntimeException("getBulk() exception");
470 blockedInFactory.await();
471 final CompletableFuture<Map<String, String>> t2Result =
472 runAndWaitForStart(() -> cache.getBulk(keys -> {
473 log.info("{}: about to wait", Thread.currentThread().getName());
474 log.info("{}: About to throw exception", Thread.currentThread().getName());
475 throw exBulk;
476 }, "A", "B", "C").toCompletableFuture().join());
477
478 log.info("{}: About to signal on barrierBulk", Thread.currentThread().getName());
479 resumeInFactory.signal();
480
481 assertThat(t1Result.handle((r, e) -> Throwables.getRootCause(e)).join(), is(exSingle));
482 assertThat(t2Result.handle((r, e) -> Throwables.getRootCause(e)).join(), is(exBulk));
483 assertThat(cache.get("A").toCompletableFuture().join(), is(Optional.empty()));
484 assertThat(cache.get("B").toCompletableFuture().join(), is(Optional.empty()));
485 assertThat(cache.get("C").toCompletableFuture().join(), is(Optional.empty()));
486 }
487
488 @Test
489 public void get_fails_and_getBulk_succeeds() throws InterruptedException {
490
491
492
493
494
495
496 final Barrier blockedInFactory = new Barrier();
497 final Barrier resumeInFactory = new Barrier();
498
499 final RuntimeException exSingle = new RuntimeException("get() exception");
500 final CompletableFuture<String> t1Result =
501 runAndWaitForStart(() -> cache.get("B", () -> {
502 log.info("{}: about to wait", Thread.currentThread().getName());
503 blockedInFactory.signal();
504 resumeInFactory.await();
505 log.info("{}: About to throw exception", Thread.currentThread().getName());
506 throw exSingle;
507 }).toCompletableFuture().join());
508
509 blockedInFactory.await();
510 final CompletableFuture<Map<String, String>> t2Result =
511 runAndWaitForStart(() -> cache.getBulk(keys -> {
512 log.info("{}: About to return numbers", Thread.currentThread().getName());
513 return ImmutableMap.of("A", "1", "B", "2", "C", "3");
514 }, "A", "B", "C").toCompletableFuture().join());
515
516 log.info("{}: About to signal on barrierBulk", Thread.currentThread().getName());
517 resumeInFactory.signal();
518
519 assertThat(t1Result.handle((r, e) -> {
520 if (e != null) {
521 return Throwables.getRootCause(e);
522 } else {
523 log.info("r: {}", r);
524
525 return new RuntimeException();
526 }
527 }).join(), is(exSingle));
528
529 assertThat(t2Result.join().get("A"), is("1"));
530 assertThat(t2Result.join().get("B"), is("2"));
531 assertThat(t2Result.join().get("C"), is("3"));
532
533 assertThat(cache.get("A").toCompletableFuture().join(), is(Optional.of("1")));
534 assertThat(cache.get("B").toCompletableFuture().join(), is(Optional.of("2")));
535 assertThat(cache.get("C").toCompletableFuture().join(), is(Optional.of("3")));
536 }
537
538 @Test
539 public void getBulk_fail_and_get_succeed() throws InterruptedException {
540
541
542
543
544
545
546 final Barrier blockedInFactory = new Barrier();
547 final Barrier resumeInFactory = new Barrier();
548
549 final RuntimeException exBulk = new RuntimeException("getBulk() exception");
550 final CompletableFuture<Map<String, String>> t1Result =
551 runAndWaitForStart(() -> cache.getBulk(keys -> {
552 log.info("{}: about to wait", Thread.currentThread().getName());
553 blockedInFactory.signal();
554 resumeInFactory.await();
555 log.info("{}: About to throw exception", Thread.currentThread().getName());
556 throw exBulk;
557 }, "A", "B", "C").toCompletableFuture().join());
558
559 blockedInFactory.await();
560 final CompletableFuture<String> t2Result =
561 runAndWaitForStart(() -> cache.get("B", () -> "bee").toCompletableFuture().join());
562
563 log.info("{}: About to signal on barrierBulk", Thread.currentThread().getName());
564 resumeInFactory.signal();
565
566
567 assertThat(t1Result.handle((r, e) -> Throwables.getRootCause(e)).join(), is(exBulk));
568 assertThat(t2Result.join(), is("bee"));
569 assertThat(cache.get("A").toCompletableFuture().join(), is(Optional.empty()));
570 assertThat(cache.get("B").toCompletableFuture().join(), is(Optional.of("bee")));
571 assertThat(cache.get("C").toCompletableFuture().join(), is(Optional.empty()));
572 }
573
574 @Test
575 public void getBulk_pass_and_get_reuse() throws InterruptedException {
576
577
578
579
580
581
582 final Barrier blockedInFactory = new Barrier();
583 final Barrier resumeInFactory = new Barrier();
584
585 final CompletableFuture<Map<String, String>> t1Result =
586 runAndWaitForStart(() -> cache.getBulk(keys -> {
587 log.info("{}: About to await on barrierBulk", Thread.currentThread().getName());
588 blockedInFactory.signal();
589 resumeInFactory.await();
590 log.info("{}: resuming", Thread.currentThread().getName());
591 return keys.stream().collect(Collectors.toMap(k -> k, k -> k + "-1"));
592 }, "A", "B", "C").toCompletableFuture().join());
593
594 blockedInFactory.await();
595 final CompletableFuture<String> t2Result =
596 runAndWaitForStart(() -> cache.get("B", () -> "B-2").toCompletableFuture().join());
597
598 log.info("{}: About to signal on barrierBulk", Thread.currentThread().getName());
599 assertThat(t2Result.join(), is("B-2"));
600 resumeInFactory.signal();
601 log.info("{}: and now", Thread.currentThread().getName());
602
603
604 final Map<String, String> t1ResultData = t1Result.join();
605 assertThat(t1ResultData.get("A"), is("A-1"));
606 assertThat(t1ResultData.get("B"), is("B-2"));
607 assertThat(t1ResultData.get("C"), is("C-1"));
608 assertThat(cache.get("A").toCompletableFuture().join(), is(Optional.of("A-1")));
609 assertThat(cache.get("B").toCompletableFuture().join(), is(Optional.of("B-2")));
610 assertThat(cache.get("C").toCompletableFuture().join(), is(Optional.of("C-1")));
611 }
612
613 @Test
614 public void potential_deadlock() {
615 final CompletableFuture<String> get = cache.get("lockType", () -> {
616
617 cache.remove("lockType");
618 return "lockwood";
619 }).toCompletableFuture();
620
621 assertThat(get, successfulWith(is("lockwood")));
622 }
623
624 @Test
625 public void repeated_concurrency() {
626 IntStream.range(1, 50).forEach((i) -> {
627 try {
628 getBulk_fail_and_get_succeed();
629 cache.removeAll();
630 getBulk_pass_and_get_reuse();
631 cache.removeAll();
632 get_fails_and_getBulk_succeeds();
633 cache.removeAll();
634 both_get_and_getBulk_fail();
635 cache.removeAll();
636 both_getBulk_and_get_fail();
637 cache.removeAll();
638
639 } catch (InterruptedException e) {
640 throw new RuntimeException(e);
641 }
642 });
643 }
644 }