View Javadoc

1   package com.atlassian.vcache.internal.test;
2   
3   import com.atlassian.utt.concurrency.Barrier;
4   import com.atlassian.vcache.ChangeRate;
5   import com.atlassian.vcache.DirectExternalCache;
6   import com.atlassian.vcache.ExternalCacheSettings;
7   import com.atlassian.vcache.ExternalCacheSettingsBuilder;
8   import com.atlassian.vcache.PutPolicy;
9   import com.atlassian.vcache.StableReadExternalCache;
10  import com.google.common.base.Throwables;
11  import com.google.common.collect.ImmutableMap;
12  import com.google.common.collect.Maps;
13  import org.hamcrest.Matchers;
14  import org.junit.Assert;
15  import org.junit.Before;
16  import org.junit.Rule;
17  import org.junit.Test;
18  import org.slf4j.Logger;
19  import org.slf4j.LoggerFactory;
20  
21  import java.time.Duration;
22  import java.util.Map;
23  import java.util.Optional;
24  import java.util.concurrent.Callable;
25  import java.util.concurrent.CompletableFuture;
26  import java.util.concurrent.CompletionStage;
27  import java.util.concurrent.ExecutionException;
28  import java.util.concurrent.ExecutorService;
29  import java.util.concurrent.Executors;
30  import java.util.concurrent.Future;
31  import java.util.stream.Collectors;
32  import java.util.stream.IntStream;
33  
34  import static com.atlassian.vcache.VCacheUtils.unsafeJoin;
35  import static com.atlassian.vcache.internal.test.CompletionStageSuccessful.successful;
36  import static com.atlassian.vcache.internal.test.CompletionStageSuccessful.successfulWith;
37  import static org.hamcrest.MatcherAssert.assertThat;
38  import static org.hamcrest.Matchers.containsInAnyOrder;
39  import static org.hamcrest.Matchers.is;
40  import static org.hamcrest.Matchers.not;
41  
42  /**
43   * Base test class for the {@link StableReadExternalCache}.
44   */
45  public abstract class AbstractStableReadExternalCacheIT {
46      private static final Logger log = LoggerFactory.getLogger(AbstractStableReadExternalCacheIT.class);
47      private static final String CACHE_NAME = "kids_hobbies";
48  
49      @Rule
50      public LoggingTestWatcher watcher = new LoggingTestWatcher(log);
51  
52      private StableReadExternalCache<String> cache;
53      private DirectExternalCache<String> directCache;
54      private final ExecutorService executorService = Executors.newCachedThreadPool();
55  
56      protected abstract StableReadExternalCache<String> createCache(String name, ExternalCacheSettings settings);
57  
58      protected abstract DirectExternalCache<String> obtainDirectCache(String name, ExternalCacheSettings settings);
59  
60      @Before
61      public void ensureCache() {
62          final ExternalCacheSettings settings = new ExternalCacheSettingsBuilder()
63                  .entryGrowthRateHint(ChangeRate.LOW_CHANGE)
64                  .entryCountHint(5)
65                  .defaultTtl(Duration.ofMinutes(5))
66                  .dataChangeRateHint(ChangeRate.HIGH_CHANGE)
67                  .build();
68          cache = createCache(CACHE_NAME, settings);
69  
70          directCache = obtainDirectCache(CACHE_NAME, settings);
71  
72          // Start from a clean slate
73          final CompletionStage<Void> rm = directCache.removeAll();
74          assertThat(rm, successful());
75      }
76  
77      @Test
78      public void single_cache_get_set() throws ExecutionException, InterruptedException {
79          final CompletionStage<Optional<String>> eldestGet = cache.get("claira");
80  
81          assertThat(eldestGet, successfulWith(is(Optional.empty())));
82  
83          final CompletionStage<Boolean> eldestAdd = cache.put("claira", "dancing", PutPolicy.ADD_ONLY);
84  
85          assertThat(eldestAdd, successfulWith(is(true)));
86  
87          final CompletionStage<Optional<String>> eldestGet2 = cache.get("claira");
88  
89          assertThat(eldestGet2, successfulWith(is(Optional.of("dancing"))));
90      }
91  
92      @Test
93      public void dual_cache_get_set() throws ExecutionException, InterruptedException {
94          final CompletionStage<Optional<String>> get1 = cache.get("claira");
95  
96          assertThat(get1, successfulWith(is(Optional.empty())));
97  
98          // Change will not be visible to main cache
99          final CompletionStage<Boolean> dput1 = directCache.put("claira", "dancing", PutPolicy.PUT_ALWAYS);
100 
101         assertThat(dput1, successfulWith(is(true)));
102 
103         final CompletionStage<Optional<String>> get2 = cache.get("claira");
104 
105         assertThat(get2, successfulWith(is(Optional.empty())));
106 
107         // Add will fail as value exists from the other cache. However, we will now be able to see the
108         // value added by the other cache.
109         final CompletionStage<Boolean> put1 = cache.put("claira", "singing", PutPolicy.ADD_ONLY);
110 
111         assertThat(put1, successfulWith(is(false)));
112 
113         final CompletionStage<Optional<String>> get3 = cache.get("claira");
114 
115         assertThat(get3, successfulWith(is(Optional.of("dancing"))));
116     }
117 
118     @Test
119     public void dual_cache_get_with_supplier() throws ExecutionException, InterruptedException {
120         final CompletionStage<Optional<String>> get1 = cache.get("claira");
121 
122         assertThat(get1, successfulWith(is(Optional.empty())));
123 
124         final CompletionStage<String> get2 = cache.get("claira", () -> "dancing");
125 
126         assertThat(get2, successfulWith(is("dancing")));
127 
128         final CompletionStage<Optional<String>> get3 = cache.get("claira");
129 
130         assertThat(get3, successfulWith(is(Optional.of("dancing"))));
131 
132         // Change will not be visible to main cache
133         final CompletionStage<Boolean> dput1 = directCache.put("claira", "singing", PutPolicy.PUT_ALWAYS);
134         assertThat(dput1, successfulWith(is(true)));
135 
136         final CompletionStage<Optional<String>> get4 = cache.get("claira");
137 
138         assertThat(get4, successfulWith(is(Optional.of("dancing"))));
139 
140         final CompletionStage<String> get5 = cache.get("claira", () -> "riding");
141 
142         assertThat(get5, successfulWith(is("dancing")));
143     }
144 
145     @Test
146     public void dual_cache_get_with_supplier_take2() throws ExecutionException, InterruptedException {
147         final CompletionStage<Optional<String>> get1 = cache.get("claira");
148 
149         assertThat(get1, successfulWith(is(Optional.empty())));
150 
151         final CompletionStage<Boolean> dput1 = directCache.put("claira", "singing", PutPolicy.PUT_ALWAYS);
152         assertThat(dput1, successfulWith(is(true)));
153 
154         final CompletionStage<String> get2 = cache.get("claira", () -> "riding");
155 
156         assertThat(get2, successfulWith(is("singing")));
157     }
158 
159     @Test
160     public void dual_cache_getBulk() throws ExecutionException, InterruptedException {
161         final CompletionStage<Boolean> dput1 = directCache.put("claira", "singing", PutPolicy.PUT_ALWAYS);
162         assertThat(dput1, successfulWith(is(true)));
163 
164         final CompletionStage<Map<String, Optional<String>>> get1 = cache.getBulk("claira", "josephine", "claira");
165 
166         assertThat(get1, successful());
167         final Map<String, Optional<String>> map1 = unsafeJoin(get1);
168         assertThat(map1.keySet(), Matchers.containsInAnyOrder("claira", "josephine"));
169         assertThat(map1.get("claira"), is(Optional.of("singing")));
170         assertThat(map1.get("josephine"), is(Optional.empty()));
171 
172         final CompletionStage<Boolean> dput2 = directCache.put("josephine", "football", PutPolicy.PUT_ALWAYS);
173         assertThat(dput2, successfulWith(is(true)));
174 
175         final CompletionStage<Boolean> dput3 = directCache.put("jasmin", "skiing", PutPolicy.PUT_ALWAYS);
176         assertThat(dput3, successfulWith(is(true)));
177 
178         final CompletionStage<Map<String, Optional<String>>> get2 = cache.getBulk("claira", "josephine", "jasmin");
179 
180         assertThat(get2, successful());
181         final Map<String, Optional<String>> map2 = unsafeJoin(get2);
182         assertThat(map2.keySet(), Matchers.containsInAnyOrder("claira", "josephine", "jasmin"));
183         assertThat(map2.get("claira"), is(Optional.of("singing")));
184         assertThat(map2.get("josephine"), is(Optional.empty()));
185         assertThat(map2.get("jasmin"), is(Optional.of("skiing")));
186     }
187 
188     @Test
189     public void remove_normal() throws ExecutionException, InterruptedException {
190         final CompletionStage<Boolean> dput1 = directCache.put("claira", "singing", PutPolicy.PUT_ALWAYS);
191         assertThat(dput1, successfulWith(is(true)));
192 
193         final CompletionStage<Boolean> put1 = cache.put("josephine", "football", PutPolicy.PUT_ALWAYS);
194         assertThat(put1, successfulWith(is(true)));
195 
196         final CompletionStage<Map<String, Optional<String>>> get1 = cache.getBulk("claira", "josephine", "jasmin");
197 
198         assertThat(get1, successful());
199         final Map<String, Optional<String>> map1 = unsafeJoin(get1);
200         assertThat(map1.keySet(), Matchers.containsInAnyOrder("claira", "josephine", "jasmin"));
201         assertThat(map1.get("claira"), is(Optional.of("singing")));
202         assertThat(map1.get("josephine"), is(Optional.of("football")));
203         assertThat(map1.get("jasmin"), is(Optional.empty()));
204 
205         final CompletionStage<Void> rm1 = cache.remove("jasmin", "claira", "claira", "josephine");
206 
207         assertThat(rm1, successful());
208 
209         final CompletionStage<Map<String, Optional<String>>> dget1 = directCache.getBulk("claira", "josephine", "jasmin");
210 
211         assertThat(dget1, successful());
212         final Map<String, Optional<String>> dmap1 = unsafeJoin(dget1);
213         assertThat(dmap1.keySet(), Matchers.containsInAnyOrder("claira", "josephine", "jasmin"));
214         assertThat(dmap1.get("claira"), is(Optional.empty()));
215         assertThat(dmap1.get("josephine"), is(Optional.empty()));
216         assertThat(dmap1.get("jasmin"), is(Optional.empty()));
217 
218         final CompletionStage<Map<String, Optional<String>>> get2 = cache.getBulk("claira", "josephine", "jasmin");
219 
220         assertThat(get2, successful());
221         final Map<String, Optional<String>> map2 = unsafeJoin(get2);
222         assertThat(map2.keySet(), Matchers.containsInAnyOrder("claira", "josephine", "jasmin"));
223         assertThat(map2.get("claira"), is(Optional.empty()));
224         assertThat(map2.get("josephine"), is(Optional.empty()));
225         assertThat(map2.get("jasmin"), is(Optional.empty()));
226     }
227 
228     @Test
229     public void simple_removeAll() throws ExecutionException, InterruptedException {
230         final CompletionStage<Boolean> put1 = cache.put("claira", "dancing", PutPolicy.PUT_ALWAYS);
231 
232         assertThat(put1, successfulWith(is(true)));
233 
234         final CompletionStage<Optional<String>> get1 = cache.get("claira");
235 
236         assertThat(get1, successfulWith(is(Optional.of("dancing"))));
237 
238         final CompletionStage<Void> rm1 = cache.removeAll();
239 
240         assertThat(rm1, successful());
241 
242         final CompletionStage<Optional<String>> get2 = cache.get("claira");
243 
244         assertThat(get2, successfulWith(is(Optional.empty())));
245     }
246 
247     @Test
248     public void simple_getBulk_function() throws ExecutionException, InterruptedException {
249         final CompletionStage<Map<String, String>> get1 = cache.getBulk(
250                 keys -> Maps.asMap(keys, k -> k + "-1"), "claira");
251 
252         assertThat(get1, successful());
253         assertThat(unsafeJoin(get1).keySet(), containsInAnyOrder("claira"));
254         assertThat(unsafeJoin(get1).values(), containsInAnyOrder("claira-1"));
255 
256         final CompletionStage<Map<String, String>> get2 = cache.getBulk(
257                 keys -> Maps.asMap(keys, k -> k + "-2"), "claira", "josephine", "claira");
258 
259         assertThat(get2, successful());
260         assertThat(unsafeJoin(get2).keySet(), containsInAnyOrder("claira", "josephine"));
261         assertThat(unsafeJoin(get2).values(), containsInAnyOrder("claira-1", "josephine-2"));
262     }
263 
264     @Test
265     public void dual_getBulk_function() throws ExecutionException, InterruptedException {
266         final CompletionStage<Map<String, String>> get1 = cache.getBulk(
267                 keys -> Maps.asMap(keys, k -> k + "-1"), "claira");
268 
269         assertThat(get1, successful());
270         assertThat(unsafeJoin(get1).keySet(), containsInAnyOrder("claira"));
271         assertThat(unsafeJoin(get1).values(), containsInAnyOrder("claira-1"));
272 
273         final CompletionStage<Boolean> mput1 = directCache.put("claira", "singing", PutPolicy.PUT_ALWAYS);
274         assertThat(mput1, successfulWith(is(true)));
275 
276         final CompletionStage<Map<String, String>> get2 = cache.getBulk(
277                 keys -> Maps.asMap(keys, k -> k + "-2"), "claira", "josephine", "claira");
278 
279         assertThat(get2, successful());
280         assertThat(unsafeJoin(get2).keySet(), containsInAnyOrder("claira", "josephine"));
281         assertThat(unsafeJoin(get2).values(), containsInAnyOrder("claira-1", "josephine-2"));
282 
283         final CompletionStage<Void> rm1 = cache.remove("claira");
284 
285         assertThat(rm1, successful());
286 
287         final CompletionStage<Map<String, String>> get3 = cache.getBulk(
288                 keys -> Maps.asMap(keys, k -> k + "-3"), "claira", "josephine", "jasmin");
289 
290         assertThat(get3, successful());
291         assertThat(unsafeJoin(get3).keySet(), containsInAnyOrder("claira", "josephine", "jasmin"));
292         assertThat(unsafeJoin(get3).values(), containsInAnyOrder("claira-3", "josephine-2", "jasmin-3"));
293     }
294 
295     @SuppressWarnings("ConstantConditions")
296     @Test
297     public void check_null_detection() {
298         assertThat(cache.get("kenny", () -> null), not(successful()));
299         assertThat(cache.put("key", null, PutPolicy.ADD_ONLY), not(successful()));
300         assertThat(cache.getBulk(
301                 strings -> strings.stream().collect(Collectors.toMap(k -> k, k -> null)),
302                 "extra"),
303                 not(successful()));
304     }
305 
306     @Test
307     public void both_getBulk_and_get_fail() throws InterruptedException {
308         //Scenario:
309         // T1: bulkGet A, B, C
310         // T2: get B, supplier
311         // T1: bulkGet completes load with an exception
312         // T2: get B, supplier -> fails with another exception
313 
314         final Barrier blockedInFactory = new Barrier();
315         final Barrier resumeInFactory = new Barrier();
316 
317         final RuntimeException exBulk = new RuntimeException("getBulk() exception");
318         final CompletableFuture<Map<String, String>> t1Result = runAndWaitForStart(() -> cache.getBulk(keys -> {
319             log.info("{}: about to wait", Thread.currentThread().getName());
320             blockedInFactory.signal();
321             resumeInFactory.await();
322             log.info("{}: About to throw exception", Thread.currentThread().getName());
323             throw exBulk;
324         }, "A", "B", "C").toCompletableFuture().join());
325 
326         final RuntimeException exSingle = new RuntimeException("get() exception");
327         blockedInFactory.await();
328         final CompletableFuture<String> t2Result = runAndWaitForStart(() -> cache.get("B", () -> {
329             log.info("{}: About to throw exception", Thread.currentThread().getName());
330             throw exSingle;
331         }).toCompletableFuture().join());
332 
333         log.info("{}: About to signal on barrierBulk", Thread.currentThread().getName());
334         resumeInFactory.signal(); //Wait until T2 cache.get calls join
335 
336         Assert.assertThat(t1Result.handle((r, e) -> Throwables.getRootCause(e)).join(), is(exBulk));
337         Assert.assertThat(t2Result.handle((r, e) -> Throwables.getRootCause(e)).join(), is(exSingle));
338         Assert.assertThat(cache.get("A").toCompletableFuture().join(), is(Optional.empty()));
339         Assert.assertThat(cache.get("B").toCompletableFuture().join(), is(Optional.empty()));
340         Assert.assertThat(cache.get("C").toCompletableFuture().join(), is(Optional.empty()));
341     }
342 
343     @Test
344     public void both_get_and_getBulk_fail() throws InterruptedException {
345         //Scenario:
346         // T1: get B, supplier
347         // T2: bulkGet A, B, C
348         // T1: get B, supplier -> fails with another exception
349         // T2: bulkGet completes load with an exception
350 
351         final Barrier blockedInFactory = new Barrier();
352         final Barrier resumeInFactory = new Barrier();
353 
354         final RuntimeException exSingle = new RuntimeException("get() exception");
355         final CompletableFuture<String> t1Result = runAndWaitForStart(() -> cache.get("B", () -> {
356             log.info("{}: About to throw exception", Thread.currentThread().getName());
357             blockedInFactory.signal();
358             resumeInFactory.await();
359             throw exSingle;
360         }).toCompletableFuture().join());
361 
362         final RuntimeException exBulk = new RuntimeException("getBulk() exception");
363         blockedInFactory.await();
364         final CompletableFuture<Map<String, String>> t2Result = runAndWaitForStart(() -> cache.getBulk(keys -> {
365             log.info("{}: about to wait", Thread.currentThread().getName());
366             log.info("{}: About to throw exception", Thread.currentThread().getName());
367             throw exBulk;
368         }, "A", "B", "C").toCompletableFuture().join());
369 
370         log.info("{}: About to signal on barrierBulk", Thread.currentThread().getName());
371         resumeInFactory.signal(); //Wait until T2 cache.get calls join
372 
373         Assert.assertThat(t1Result.handle((r, e) -> Throwables.getRootCause(e)).join(), is(exSingle));
374         Assert.assertThat(t2Result.handle((r, e) -> Throwables.getRootCause(e)).join(), is(exBulk));
375         Assert.assertThat(cache.get("A").toCompletableFuture().join(), is(Optional.empty()));
376         Assert.assertThat(cache.get("B").toCompletableFuture().join(), is(Optional.empty()));
377         Assert.assertThat(cache.get("C").toCompletableFuture().join(), is(Optional.empty()));
378     }
379 
380     @Test
381     public void get_fails_and_getBulk_succeeds() throws InterruptedException {
382         //Scenario:
383         // T1: get B, supplier, fails
384         // T2: bulkGet A, B, C succeeds
385         // T1: get B, supplier -> fails with another exception
386         // T2: bulkGet completes with a map
387 
388         final Barrier blockedInFactory = new Barrier();
389         final Barrier resumeInFactory = new Barrier();
390 
391         final RuntimeException exSingle = new RuntimeException("get() exception");
392         final CompletableFuture<String> t1Result = runAndWaitForStart(() -> cache.get("B", () -> {
393             log.info("{}: about to wait", Thread.currentThread().getName());
394             blockedInFactory.signal();
395             resumeInFactory.await();
396             log.info("{}: About to throw exception", Thread.currentThread().getName());
397             throw exSingle;
398         }).toCompletableFuture().join());
399 
400         blockedInFactory.await();
401         final CompletableFuture<Map<String, String>> t2Result = runAndWaitForStart(() -> cache.getBulk(keys -> {
402             log.info("{}: About to return numbers", Thread.currentThread().getName());
403             return ImmutableMap.of("A", "1", "B", "2", "C", "3");
404         }, "A", "B", "C").toCompletableFuture().join());
405 
406         log.info("{}: About to signal on barrierBulk", Thread.currentThread().getName());
407         resumeInFactory.signal(); //Wait until T2 cache.get calls join
408 
409         Assert.assertThat(t1Result.handle((r, e) -> {
410             if (e != null) {
411                 return Throwables.getRootCause(e);
412             } else {
413                 log.info("r: {}", r);
414                 // This forces a fail.
415                 return new RuntimeException();
416             }
417         }).join(), is(exSingle));
418 
419         Assert.assertThat(t2Result.join().get("A"), is("1"));
420         Assert.assertThat(t2Result.join().get("B"), is("2"));
421         Assert.assertThat(t2Result.join().get("C"), is("3"));
422 
423         Assert.assertThat(cache.get("A").toCompletableFuture().join(), is(Optional.of("1")));
424         Assert.assertThat(cache.get("B").toCompletableFuture().join(), is(Optional.of("2")));
425         Assert.assertThat(cache.get("C").toCompletableFuture().join(), is(Optional.of("3")));
426     }
427 
428     @Test
429     public void getBulk_fail_and_get_succeed() throws InterruptedException {
430         //Scenario:
431         // T1: bulkGet A, B, C
432         // T2: get B, supplier
433         // T1: bulkGet function throws exception
434         // T2: get B, supplier -> returns value
435 
436         final Barrier blockedInFactory = new Barrier();
437         final Barrier resumeInFactory = new Barrier();
438 
439         final RuntimeException exBulk = new RuntimeException("getBulk() exception");
440         final CompletableFuture<Map<String, String>> t1Result = runAndWaitForStart(() -> cache.getBulk(keys -> {
441             log.info("{}: about to wait", Thread.currentThread().getName());
442             blockedInFactory.signal();
443             resumeInFactory.await();
444             log.info("{}: About to throw exception", Thread.currentThread().getName());
445             throw exBulk;
446         }, "A", "B", "C").toCompletableFuture().join());
447 
448         blockedInFactory.await();
449         final CompletableFuture<String> t2Result = runAndWaitForStart(() -> cache.get("B", () -> "bee").toCompletableFuture().join());
450 
451         log.info("{}: About to signal on barrierBulk", Thread.currentThread().getName());
452         resumeInFactory.signal(); //Wait until T2 cache.get calls join
453 
454 
455         Assert.assertThat(t1Result.handle((r, e) -> Throwables.getRootCause(e)).join(), is(exBulk));
456         Assert.assertThat(t2Result.join(), is("bee"));
457         Assert.assertThat(cache.get("A").toCompletableFuture().join(), is(Optional.empty()));
458         Assert.assertThat(cache.get("B").toCompletableFuture().join(), is(Optional.of("bee")));
459         Assert.assertThat(cache.get("C").toCompletableFuture().join(), is(Optional.empty()));
460     }
461 
462     @Test
463     public void getBulk_pass_and_get_reuse() throws InterruptedException {
464         //Scenario:
465         // T1: bulkGet A, B, C
466         // T2: get B, supplier
467         // T1: bulkGet function returns values
468         // T2: get B, supplier -> returns bulkGet value
469 
470         final Barrier blockedInFactory = new Barrier();
471         final Barrier resumeInFactory = new Barrier();
472 
473         final CompletableFuture<Map<String, String>> t1Result = runAndWaitForStart(() -> cache.getBulk(keys -> {
474             log.info("{}: About to await on barrierBulk", Thread.currentThread().getName());
475             blockedInFactory.signal();
476             resumeInFactory.await();
477             log.info("{}: resuming", Thread.currentThread().getName());
478             return keys.stream().collect(Collectors.toMap(k -> k, k -> k + "-1"));
479         }, "A", "B", "C").toCompletableFuture().join());
480 
481         blockedInFactory.await();
482         final CompletableFuture<String> t2Result = runAndWaitForStart(() -> cache.get("B", () -> "bee").toCompletableFuture().join());
483 
484         log.info("{}: About to signal on barrierBulk", Thread.currentThread().getName());
485         resumeInFactory.signal(); //Wait until T2 cache.get calls join
486         log.info("{}: and now", Thread.currentThread().getName());
487 
488 
489         final Map<String, String> t1ResultData = t1Result.join();
490         Assert.assertThat(t1ResultData.get("A"), is("A-1"));
491         Assert.assertThat(t1ResultData.get("B"), is("B-1"));
492         Assert.assertThat(t1ResultData.get("C"), is("C-1"));
493         Assert.assertThat(t2Result.join(), is("B-1"));
494         Assert.assertThat(cache.get("A").toCompletableFuture().join(), is(Optional.of("A-1")));
495         Assert.assertThat(cache.get("B").toCompletableFuture().join(), is(Optional.of("B-1")));
496         Assert.assertThat(cache.get("C").toCompletableFuture().join(), is(Optional.of("C-1")));
497     }
498 
499     @Test
500     public void potential_deadlock() {
501         final CompletableFuture<String> get = cache.get("lockType", () -> {
502             // People have actually done this!!
503             cache.remove("lockType");
504             return "lockwood";
505         }).toCompletableFuture();
506 
507         assertThat(get.isCompletedExceptionally(), is(true));
508     }
509 
510     @Test
511     public void repeated_concurrency() {
512         IntStream.range(1, 50).forEach((i) -> {
513             try {
514                 getBulk_fail_and_get_succeed();
515                 cache.removeAll();
516                 getBulk_pass_and_get_reuse();
517                 cache.removeAll();
518                 get_fails_and_getBulk_succeeds();
519                 cache.removeAll();
520                 both_get_and_getBulk_fail();
521                 cache.removeAll();
522                 both_getBulk_and_get_fail();
523                 cache.removeAll();
524 
525             } catch (InterruptedException e) {
526                 throw new RuntimeException(e);
527             }
528         });
529     }
530 
531     private <T> CompletableFuture<T> runAndWaitForStart(final Callable<T> r) {
532         final Barrier barrier = new Barrier();
533         log.info("{}: About to submit()", Thread.currentThread().getName());
534         final Future<T> result = executorService.submit(() -> {
535             log.info("{}: About to signal()", Thread.currentThread().getName());
536             barrier.signal();
537             log.info("{}: resume after signal", Thread.currentThread().getName());
538             return r.call();
539         });
540 
541         log.info("{}: About to await()", Thread.currentThread().getName());
542         barrier.await();
543         log.info("{}: resume after await()", Thread.currentThread().getName());
544 
545         return CompletableFuture.supplyAsync(() -> {
546             try {
547                 return result.get();
548             } catch (InterruptedException | ExecutionException e) {
549                 throw new RuntimeException(e);
550             }
551         });
552     }
553 }