1 package com.atlassian.vcache.internal.test;
2
3 import com.atlassian.utt.concurrency.Barrier;
4 import com.atlassian.vcache.ChangeRate;
5 import com.atlassian.vcache.DirectExternalCache;
6 import com.atlassian.vcache.ExternalCacheSettings;
7 import com.atlassian.vcache.ExternalCacheSettingsBuilder;
8 import com.atlassian.vcache.PutPolicy;
9 import com.atlassian.vcache.StableReadExternalCache;
10 import com.google.common.base.Throwables;
11 import com.google.common.collect.ImmutableMap;
12 import com.google.common.collect.Maps;
13 import org.hamcrest.Matchers;
14 import org.junit.Assert;
15 import org.junit.Before;
16 import org.junit.Rule;
17 import org.junit.Test;
18 import org.slf4j.Logger;
19 import org.slf4j.LoggerFactory;
20
21 import java.time.Duration;
22 import java.util.Map;
23 import java.util.Optional;
24 import java.util.concurrent.Callable;
25 import java.util.concurrent.CompletableFuture;
26 import java.util.concurrent.CompletionStage;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.ExecutorService;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.Future;
31 import java.util.stream.Collectors;
32 import java.util.stream.IntStream;
33
34 import static com.atlassian.vcache.VCacheUtils.unsafeJoin;
35 import static com.atlassian.vcache.internal.test.CompletionStageSuccessful.successful;
36 import static com.atlassian.vcache.internal.test.CompletionStageSuccessful.successfulWith;
37 import static org.hamcrest.MatcherAssert.assertThat;
38 import static org.hamcrest.Matchers.containsInAnyOrder;
39 import static org.hamcrest.Matchers.is;
40 import static org.hamcrest.Matchers.not;
41
42
43
44
45 public abstract class AbstractStableReadExternalCacheIT {
46 private static final Logger log = LoggerFactory.getLogger(AbstractStableReadExternalCacheIT.class);
47 private static final String CACHE_NAME = "kids_hobbies";
48
49 @Rule
50 public LoggingTestWatcher watcher = new LoggingTestWatcher(log);
51
52 private StableReadExternalCache<String> cache;
53 private DirectExternalCache<String> directCache;
54 private final ExecutorService executorService = Executors.newCachedThreadPool();
55
56 protected abstract StableReadExternalCache<String> createCache(String name, ExternalCacheSettings settings);
57
58 protected abstract DirectExternalCache<String> obtainDirectCache(String name, ExternalCacheSettings settings);
59
60 @Before
61 public void ensureCache() {
62 final ExternalCacheSettings settings = new ExternalCacheSettingsBuilder()
63 .entryGrowthRateHint(ChangeRate.LOW_CHANGE)
64 .entryCountHint(5)
65 .defaultTtl(Duration.ofMinutes(5))
66 .dataChangeRateHint(ChangeRate.HIGH_CHANGE)
67 .build();
68 cache = createCache(CACHE_NAME, settings);
69
70 directCache = obtainDirectCache(CACHE_NAME, settings);
71
72
73 final CompletionStage<Void> rm = directCache.removeAll();
74 assertThat(rm, successful());
75 }
76
77 @Test
78 public void single_cache_get_set() throws ExecutionException, InterruptedException {
79 final CompletionStage<Optional<String>> eldestGet = cache.get("claira");
80
81 assertThat(eldestGet, successfulWith(is(Optional.empty())));
82
83 final CompletionStage<Boolean> eldestAdd = cache.put("claira", "dancing", PutPolicy.ADD_ONLY);
84
85 assertThat(eldestAdd, successfulWith(is(true)));
86
87 final CompletionStage<Optional<String>> eldestGet2 = cache.get("claira");
88
89 assertThat(eldestGet2, successfulWith(is(Optional.of("dancing"))));
90 }
91
92 @Test
93 public void dual_cache_get_set() throws ExecutionException, InterruptedException {
94 final CompletionStage<Optional<String>> get1 = cache.get("claira");
95
96 assertThat(get1, successfulWith(is(Optional.empty())));
97
98
99 final CompletionStage<Boolean> dput1 = directCache.put("claira", "dancing", PutPolicy.PUT_ALWAYS);
100
101 assertThat(dput1, successfulWith(is(true)));
102
103 final CompletionStage<Optional<String>> get2 = cache.get("claira");
104
105 assertThat(get2, successfulWith(is(Optional.empty())));
106
107
108
109 final CompletionStage<Boolean> put1 = cache.put("claira", "singing", PutPolicy.ADD_ONLY);
110
111 assertThat(put1, successfulWith(is(false)));
112
113 final CompletionStage<Optional<String>> get3 = cache.get("claira");
114
115 assertThat(get3, successfulWith(is(Optional.of("dancing"))));
116 }
117
118 @Test
119 public void dual_cache_get_with_supplier() throws ExecutionException, InterruptedException {
120 final CompletionStage<Optional<String>> get1 = cache.get("claira");
121
122 assertThat(get1, successfulWith(is(Optional.empty())));
123
124 final CompletionStage<String> get2 = cache.get("claira", () -> "dancing");
125
126 assertThat(get2, successfulWith(is("dancing")));
127
128 final CompletionStage<Optional<String>> get3 = cache.get("claira");
129
130 assertThat(get3, successfulWith(is(Optional.of("dancing"))));
131
132
133 final CompletionStage<Boolean> dput1 = directCache.put("claira", "singing", PutPolicy.PUT_ALWAYS);
134 assertThat(dput1, successfulWith(is(true)));
135
136 final CompletionStage<Optional<String>> get4 = cache.get("claira");
137
138 assertThat(get4, successfulWith(is(Optional.of("dancing"))));
139
140 final CompletionStage<String> get5 = cache.get("claira", () -> "riding");
141
142 assertThat(get5, successfulWith(is("dancing")));
143 }
144
145 @Test
146 public void dual_cache_get_with_supplier_take2() throws ExecutionException, InterruptedException {
147 final CompletionStage<Optional<String>> get1 = cache.get("claira");
148
149 assertThat(get1, successfulWith(is(Optional.empty())));
150
151 final CompletionStage<Boolean> dput1 = directCache.put("claira", "singing", PutPolicy.PUT_ALWAYS);
152 assertThat(dput1, successfulWith(is(true)));
153
154 final CompletionStage<String> get2 = cache.get("claira", () -> "riding");
155
156 assertThat(get2, successfulWith(is("singing")));
157 }
158
159 @Test
160 public void dual_cache_getBulk() throws ExecutionException, InterruptedException {
161 final CompletionStage<Boolean> dput1 = directCache.put("claira", "singing", PutPolicy.PUT_ALWAYS);
162 assertThat(dput1, successfulWith(is(true)));
163
164 final CompletionStage<Map<String, Optional<String>>> get1 = cache.getBulk("claira", "josephine", "claira");
165
166 assertThat(get1, successful());
167 final Map<String, Optional<String>> map1 = unsafeJoin(get1);
168 assertThat(map1.keySet(), Matchers.containsInAnyOrder("claira", "josephine"));
169 assertThat(map1.get("claira"), is(Optional.of("singing")));
170 assertThat(map1.get("josephine"), is(Optional.empty()));
171
172 final CompletionStage<Boolean> dput2 = directCache.put("josephine", "football", PutPolicy.PUT_ALWAYS);
173 assertThat(dput2, successfulWith(is(true)));
174
175 final CompletionStage<Boolean> dput3 = directCache.put("jasmin", "skiing", PutPolicy.PUT_ALWAYS);
176 assertThat(dput3, successfulWith(is(true)));
177
178 final CompletionStage<Map<String, Optional<String>>> get2 = cache.getBulk("claira", "josephine", "jasmin");
179
180 assertThat(get2, successful());
181 final Map<String, Optional<String>> map2 = unsafeJoin(get2);
182 assertThat(map2.keySet(), Matchers.containsInAnyOrder("claira", "josephine", "jasmin"));
183 assertThat(map2.get("claira"), is(Optional.of("singing")));
184 assertThat(map2.get("josephine"), is(Optional.empty()));
185 assertThat(map2.get("jasmin"), is(Optional.of("skiing")));
186 }
187
188 @Test
189 public void remove_normal() throws ExecutionException, InterruptedException {
190 final CompletionStage<Boolean> dput1 = directCache.put("claira", "singing", PutPolicy.PUT_ALWAYS);
191 assertThat(dput1, successfulWith(is(true)));
192
193 final CompletionStage<Boolean> put1 = cache.put("josephine", "football", PutPolicy.PUT_ALWAYS);
194 assertThat(put1, successfulWith(is(true)));
195
196 final CompletionStage<Map<String, Optional<String>>> get1 = cache.getBulk("claira", "josephine", "jasmin");
197
198 assertThat(get1, successful());
199 final Map<String, Optional<String>> map1 = unsafeJoin(get1);
200 assertThat(map1.keySet(), Matchers.containsInAnyOrder("claira", "josephine", "jasmin"));
201 assertThat(map1.get("claira"), is(Optional.of("singing")));
202 assertThat(map1.get("josephine"), is(Optional.of("football")));
203 assertThat(map1.get("jasmin"), is(Optional.empty()));
204
205 final CompletionStage<Void> rm1 = cache.remove("jasmin", "claira", "claira", "josephine");
206
207 assertThat(rm1, successful());
208
209 final CompletionStage<Map<String, Optional<String>>> dget1 = directCache.getBulk("claira", "josephine", "jasmin");
210
211 assertThat(dget1, successful());
212 final Map<String, Optional<String>> dmap1 = unsafeJoin(dget1);
213 assertThat(dmap1.keySet(), Matchers.containsInAnyOrder("claira", "josephine", "jasmin"));
214 assertThat(dmap1.get("claira"), is(Optional.empty()));
215 assertThat(dmap1.get("josephine"), is(Optional.empty()));
216 assertThat(dmap1.get("jasmin"), is(Optional.empty()));
217
218 final CompletionStage<Map<String, Optional<String>>> get2 = cache.getBulk("claira", "josephine", "jasmin");
219
220 assertThat(get2, successful());
221 final Map<String, Optional<String>> map2 = unsafeJoin(get2);
222 assertThat(map2.keySet(), Matchers.containsInAnyOrder("claira", "josephine", "jasmin"));
223 assertThat(map2.get("claira"), is(Optional.empty()));
224 assertThat(map2.get("josephine"), is(Optional.empty()));
225 assertThat(map2.get("jasmin"), is(Optional.empty()));
226 }
227
228 @Test
229 public void simple_removeAll() throws ExecutionException, InterruptedException {
230 final CompletionStage<Boolean> put1 = cache.put("claira", "dancing", PutPolicy.PUT_ALWAYS);
231
232 assertThat(put1, successfulWith(is(true)));
233
234 final CompletionStage<Optional<String>> get1 = cache.get("claira");
235
236 assertThat(get1, successfulWith(is(Optional.of("dancing"))));
237
238 final CompletionStage<Void> rm1 = cache.removeAll();
239
240 assertThat(rm1, successful());
241
242 final CompletionStage<Optional<String>> get2 = cache.get("claira");
243
244 assertThat(get2, successfulWith(is(Optional.empty())));
245 }
246
247 @Test
248 public void simple_getBulk_function() throws ExecutionException, InterruptedException {
249 final CompletionStage<Map<String, String>> get1 = cache.getBulk(
250 keys -> Maps.asMap(keys, k -> k + "-1"), "claira");
251
252 assertThat(get1, successful());
253 assertThat(unsafeJoin(get1).keySet(), containsInAnyOrder("claira"));
254 assertThat(unsafeJoin(get1).values(), containsInAnyOrder("claira-1"));
255
256 final CompletionStage<Map<String, String>> get2 = cache.getBulk(
257 keys -> Maps.asMap(keys, k -> k + "-2"), "claira", "josephine", "claira");
258
259 assertThat(get2, successful());
260 assertThat(unsafeJoin(get2).keySet(), containsInAnyOrder("claira", "josephine"));
261 assertThat(unsafeJoin(get2).values(), containsInAnyOrder("claira-1", "josephine-2"));
262 }
263
264 @Test
265 public void dual_getBulk_function() throws ExecutionException, InterruptedException {
266 final CompletionStage<Map<String, String>> get1 = cache.getBulk(
267 keys -> Maps.asMap(keys, k -> k + "-1"), "claira");
268
269 assertThat(get1, successful());
270 assertThat(unsafeJoin(get1).keySet(), containsInAnyOrder("claira"));
271 assertThat(unsafeJoin(get1).values(), containsInAnyOrder("claira-1"));
272
273 final CompletionStage<Boolean> mput1 = directCache.put("claira", "singing", PutPolicy.PUT_ALWAYS);
274 assertThat(mput1, successfulWith(is(true)));
275
276 final CompletionStage<Map<String, String>> get2 = cache.getBulk(
277 keys -> Maps.asMap(keys, k -> k + "-2"), "claira", "josephine", "claira");
278
279 assertThat(get2, successful());
280 assertThat(unsafeJoin(get2).keySet(), containsInAnyOrder("claira", "josephine"));
281 assertThat(unsafeJoin(get2).values(), containsInAnyOrder("claira-1", "josephine-2"));
282
283 final CompletionStage<Void> rm1 = cache.remove("claira");
284
285 assertThat(rm1, successful());
286
287 final CompletionStage<Map<String, String>> get3 = cache.getBulk(
288 keys -> Maps.asMap(keys, k -> k + "-3"), "claira", "josephine", "jasmin");
289
290 assertThat(get3, successful());
291 assertThat(unsafeJoin(get3).keySet(), containsInAnyOrder("claira", "josephine", "jasmin"));
292 assertThat(unsafeJoin(get3).values(), containsInAnyOrder("claira-3", "josephine-2", "jasmin-3"));
293 }
294
295 @SuppressWarnings("ConstantConditions")
296 @Test
297 public void check_null_detection() {
298 assertThat(cache.get("kenny", () -> null), not(successful()));
299 assertThat(cache.put("key", null, PutPolicy.ADD_ONLY), not(successful()));
300 assertThat(cache.getBulk(
301 strings -> strings.stream().collect(Collectors.toMap(k -> k, k -> null)),
302 "extra"),
303 not(successful()));
304 }
305
306 @Test
307 public void both_getBulk_and_get_fail() throws InterruptedException {
308
309
310
311
312
313
314 final Barrier blockedInFactory = new Barrier();
315 final Barrier resumeInFactory = new Barrier();
316
317 final RuntimeException exBulk = new RuntimeException("getBulk() exception");
318 final CompletableFuture<Map<String, String>> t1Result = runAndWaitForStart(() -> cache.getBulk(keys -> {
319 log.info("{}: about to wait", Thread.currentThread().getName());
320 blockedInFactory.signal();
321 resumeInFactory.await();
322 log.info("{}: About to throw exception", Thread.currentThread().getName());
323 throw exBulk;
324 }, "A", "B", "C").toCompletableFuture().join());
325
326 final RuntimeException exSingle = new RuntimeException("get() exception");
327 blockedInFactory.await();
328 final CompletableFuture<String> t2Result = runAndWaitForStart(() -> cache.get("B", () -> {
329 log.info("{}: About to throw exception", Thread.currentThread().getName());
330 throw exSingle;
331 }).toCompletableFuture().join());
332
333 log.info("{}: About to signal on barrierBulk", Thread.currentThread().getName());
334 resumeInFactory.signal();
335
336 Assert.assertThat(t1Result.handle((r, e) -> Throwables.getRootCause(e)).join(), is(exBulk));
337 Assert.assertThat(t2Result.handle((r, e) -> Throwables.getRootCause(e)).join(), is(exSingle));
338 Assert.assertThat(cache.get("A").toCompletableFuture().join(), is(Optional.empty()));
339 Assert.assertThat(cache.get("B").toCompletableFuture().join(), is(Optional.empty()));
340 Assert.assertThat(cache.get("C").toCompletableFuture().join(), is(Optional.empty()));
341 }
342
343 @Test
344 public void both_get_and_getBulk_fail() throws InterruptedException {
345
346
347
348
349
350
351 final Barrier blockedInFactory = new Barrier();
352 final Barrier resumeInFactory = new Barrier();
353
354 final RuntimeException exSingle = new RuntimeException("get() exception");
355 final CompletableFuture<String> t1Result = runAndWaitForStart(() -> cache.get("B", () -> {
356 log.info("{}: About to throw exception", Thread.currentThread().getName());
357 blockedInFactory.signal();
358 resumeInFactory.await();
359 throw exSingle;
360 }).toCompletableFuture().join());
361
362 final RuntimeException exBulk = new RuntimeException("getBulk() exception");
363 blockedInFactory.await();
364 final CompletableFuture<Map<String, String>> t2Result = runAndWaitForStart(() -> cache.getBulk(keys -> {
365 log.info("{}: about to wait", Thread.currentThread().getName());
366 log.info("{}: About to throw exception", Thread.currentThread().getName());
367 throw exBulk;
368 }, "A", "B", "C").toCompletableFuture().join());
369
370 log.info("{}: About to signal on barrierBulk", Thread.currentThread().getName());
371 resumeInFactory.signal();
372
373 Assert.assertThat(t1Result.handle((r, e) -> Throwables.getRootCause(e)).join(), is(exSingle));
374 Assert.assertThat(t2Result.handle((r, e) -> Throwables.getRootCause(e)).join(), is(exBulk));
375 Assert.assertThat(cache.get("A").toCompletableFuture().join(), is(Optional.empty()));
376 Assert.assertThat(cache.get("B").toCompletableFuture().join(), is(Optional.empty()));
377 Assert.assertThat(cache.get("C").toCompletableFuture().join(), is(Optional.empty()));
378 }
379
380 @Test
381 public void get_fails_and_getBulk_succeeds() throws InterruptedException {
382
383
384
385
386
387
388 final Barrier blockedInFactory = new Barrier();
389 final Barrier resumeInFactory = new Barrier();
390
391 final RuntimeException exSingle = new RuntimeException("get() exception");
392 final CompletableFuture<String> t1Result = runAndWaitForStart(() -> cache.get("B", () -> {
393 log.info("{}: about to wait", Thread.currentThread().getName());
394 blockedInFactory.signal();
395 resumeInFactory.await();
396 log.info("{}: About to throw exception", Thread.currentThread().getName());
397 throw exSingle;
398 }).toCompletableFuture().join());
399
400 blockedInFactory.await();
401 final CompletableFuture<Map<String, String>> t2Result = runAndWaitForStart(() -> cache.getBulk(keys -> {
402 log.info("{}: About to return numbers", Thread.currentThread().getName());
403 return ImmutableMap.of("A", "1", "B", "2", "C", "3");
404 }, "A", "B", "C").toCompletableFuture().join());
405
406 log.info("{}: About to signal on barrierBulk", Thread.currentThread().getName());
407 resumeInFactory.signal();
408
409 Assert.assertThat(t1Result.handle((r, e) -> {
410 if (e != null) {
411 return Throwables.getRootCause(e);
412 } else {
413 log.info("r: {}", r);
414
415 return new RuntimeException();
416 }
417 }).join(), is(exSingle));
418
419 Assert.assertThat(t2Result.join().get("A"), is("1"));
420 Assert.assertThat(t2Result.join().get("B"), is("2"));
421 Assert.assertThat(t2Result.join().get("C"), is("3"));
422
423 Assert.assertThat(cache.get("A").toCompletableFuture().join(), is(Optional.of("1")));
424 Assert.assertThat(cache.get("B").toCompletableFuture().join(), is(Optional.of("2")));
425 Assert.assertThat(cache.get("C").toCompletableFuture().join(), is(Optional.of("3")));
426 }
427
428 @Test
429 public void getBulk_fail_and_get_succeed() throws InterruptedException {
430
431
432
433
434
435
436 final Barrier blockedInFactory = new Barrier();
437 final Barrier resumeInFactory = new Barrier();
438
439 final RuntimeException exBulk = new RuntimeException("getBulk() exception");
440 final CompletableFuture<Map<String, String>> t1Result = runAndWaitForStart(() -> cache.getBulk(keys -> {
441 log.info("{}: about to wait", Thread.currentThread().getName());
442 blockedInFactory.signal();
443 resumeInFactory.await();
444 log.info("{}: About to throw exception", Thread.currentThread().getName());
445 throw exBulk;
446 }, "A", "B", "C").toCompletableFuture().join());
447
448 blockedInFactory.await();
449 final CompletableFuture<String> t2Result = runAndWaitForStart(() -> cache.get("B", () -> "bee").toCompletableFuture().join());
450
451 log.info("{}: About to signal on barrierBulk", Thread.currentThread().getName());
452 resumeInFactory.signal();
453
454
455 Assert.assertThat(t1Result.handle((r, e) -> Throwables.getRootCause(e)).join(), is(exBulk));
456 Assert.assertThat(t2Result.join(), is("bee"));
457 Assert.assertThat(cache.get("A").toCompletableFuture().join(), is(Optional.empty()));
458 Assert.assertThat(cache.get("B").toCompletableFuture().join(), is(Optional.of("bee")));
459 Assert.assertThat(cache.get("C").toCompletableFuture().join(), is(Optional.empty()));
460 }
461
462 @Test
463 public void getBulk_pass_and_get_reuse() throws InterruptedException {
464
465
466
467
468
469
470 final Barrier blockedInFactory = new Barrier();
471 final Barrier resumeInFactory = new Barrier();
472
473 final CompletableFuture<Map<String, String>> t1Result = runAndWaitForStart(() -> cache.getBulk(keys -> {
474 log.info("{}: About to await on barrierBulk", Thread.currentThread().getName());
475 blockedInFactory.signal();
476 resumeInFactory.await();
477 log.info("{}: resuming", Thread.currentThread().getName());
478 return keys.stream().collect(Collectors.toMap(k -> k, k -> k + "-1"));
479 }, "A", "B", "C").toCompletableFuture().join());
480
481 blockedInFactory.await();
482 final CompletableFuture<String> t2Result = runAndWaitForStart(() -> cache.get("B", () -> "bee").toCompletableFuture().join());
483
484 log.info("{}: About to signal on barrierBulk", Thread.currentThread().getName());
485 resumeInFactory.signal();
486 log.info("{}: and now", Thread.currentThread().getName());
487
488
489 final Map<String, String> t1ResultData = t1Result.join();
490 Assert.assertThat(t1ResultData.get("A"), is("A-1"));
491 Assert.assertThat(t1ResultData.get("B"), is("B-1"));
492 Assert.assertThat(t1ResultData.get("C"), is("C-1"));
493 Assert.assertThat(t2Result.join(), is("B-1"));
494 Assert.assertThat(cache.get("A").toCompletableFuture().join(), is(Optional.of("A-1")));
495 Assert.assertThat(cache.get("B").toCompletableFuture().join(), is(Optional.of("B-1")));
496 Assert.assertThat(cache.get("C").toCompletableFuture().join(), is(Optional.of("C-1")));
497 }
498
499 @Test
500 public void potential_deadlock() {
501 final CompletableFuture<String> get = cache.get("lockType", () -> {
502
503 cache.remove("lockType");
504 return "lockwood";
505 }).toCompletableFuture();
506
507 assertThat(get.isCompletedExceptionally(), is(true));
508 }
509
510 @Test
511 public void repeated_concurrency() {
512 IntStream.range(1, 50).forEach((i) -> {
513 try {
514 getBulk_fail_and_get_succeed();
515 cache.removeAll();
516 getBulk_pass_and_get_reuse();
517 cache.removeAll();
518 get_fails_and_getBulk_succeeds();
519 cache.removeAll();
520 both_get_and_getBulk_fail();
521 cache.removeAll();
522 both_getBulk_and_get_fail();
523 cache.removeAll();
524
525 } catch (InterruptedException e) {
526 throw new RuntimeException(e);
527 }
528 });
529 }
530
531 private <T> CompletableFuture<T> runAndWaitForStart(final Callable<T> r) {
532 final Barrier barrier = new Barrier();
533 log.info("{}: About to submit()", Thread.currentThread().getName());
534 final Future<T> result = executorService.submit(() -> {
535 log.info("{}: About to signal()", Thread.currentThread().getName());
536 barrier.signal();
537 log.info("{}: resume after signal", Thread.currentThread().getName());
538 return r.call();
539 });
540
541 log.info("{}: About to await()", Thread.currentThread().getName());
542 barrier.await();
543 log.info("{}: resume after await()", Thread.currentThread().getName());
544
545 return CompletableFuture.supplyAsync(() -> {
546 try {
547 return result.get();
548 } catch (InterruptedException | ExecutionException e) {
549 throw new RuntimeException(e);
550 }
551 });
552 }
553 }