View Javadoc

1   package com.atlassian.vcache.internal.test;
2   
3   import com.atlassian.utt.concurrency.Barrier;
4   import com.atlassian.vcache.ChangeRate;
5   import com.atlassian.vcache.DirectExternalCache;
6   import com.atlassian.vcache.ExternalCacheSettings;
7   import com.atlassian.vcache.ExternalCacheSettingsBuilder;
8   import com.atlassian.vcache.PutPolicy;
9   import com.atlassian.vcache.StableReadExternalCache;
10  import com.atlassian.vcache.internal.LongMetric;
11  import com.atlassian.vcache.internal.MetricLabel;
12  import com.atlassian.vcache.internal.RequestMetrics;
13  import com.google.common.base.Throwables;
14  import com.google.common.collect.ImmutableMap;
15  import com.google.common.collect.Maps;
16  import org.hamcrest.Matchers;
17  import org.junit.Before;
18  import org.junit.Rule;
19  import org.junit.Test;
20  import org.slf4j.Logger;
21  import org.slf4j.LoggerFactory;
22  
23  import java.time.Duration;
24  import java.util.EnumMap;
25  import java.util.Map;
26  import java.util.Optional;
27  import java.util.concurrent.CompletableFuture;
28  import java.util.concurrent.CompletionStage;
29  import java.util.concurrent.ExecutionException;
30  import java.util.stream.Collectors;
31  import java.util.stream.IntStream;
32  
33  import static com.atlassian.vcache.VCacheUtils.unsafeJoin;
34  import static com.atlassian.vcache.internal.test.CacheMetricsMatcher.hasMetric;
35  import static com.atlassian.vcache.internal.test.CacheMetricsMatcher.hasSize;
36  import static com.atlassian.vcache.internal.test.CompletionStageSuccessful.successful;
37  import static com.atlassian.vcache.internal.test.CompletionStageSuccessful.successfulWith;
38  import static com.atlassian.vcache.internal.test.TestUtils.runAndWaitForStart;
39  import static org.hamcrest.Matchers.containsInAnyOrder;
40  import static org.hamcrest.Matchers.greaterThan;
41  import static org.hamcrest.Matchers.is;
42  import static org.hamcrest.Matchers.not;
43  import static org.junit.Assert.assertThat;
44  
45  /**
46   * Base test class for the {@link StableReadExternalCache}.
47   */
48  public abstract class AbstractStableReadExternalCacheIT {
49      private static final Logger log = LoggerFactory.getLogger(AbstractStableReadExternalCacheIT.class);
50      private static final String CACHE_NAME = "kids_hobbies";
51  
52      @Rule
53      public LoggingTestWatcher watcher = new LoggingTestWatcher(log);
54  
55      private StableReadExternalCache<String> cache;
56      private DirectExternalCache<String> directCache;
57  
58      protected abstract DirectExternalCache<String> obtainDirectCache(String name, ExternalCacheSettings settings);
59  
60      protected abstract StableReadExternalCache<String> createCache(String name, ExternalCacheSettings settings, Duration lockTimeout);
61  
62      protected StableReadExternalCache<String> createCache(String name, ExternalCacheSettings settings) {
63          return createCache(name, settings, Duration.ofSeconds(2));
64      }
65  
66      protected abstract RequestMetrics requestMetrics();
67  
68      @Before
69      public void ensureCache() {
70          final ExternalCacheSettings settings = new ExternalCacheSettingsBuilder()
71                  .entryGrowthRateHint(ChangeRate.LOW_CHANGE)
72                  .entryCountHint(5)
73                  .defaultTtl(Duration.ofMinutes(5))
74                  .dataChangeRateHint(ChangeRate.HIGH_CHANGE)
75                  .build();
76          cache = createCache(CACHE_NAME, settings);
77  
78          directCache = obtainDirectCache(CACHE_NAME, settings);
79  
80          // Start from a clean slate
81          final CompletionStage<Void> rm = directCache.removeAll();
82          assertThat(rm, successful());
83      }
84  
85      @Test
86      public void single_cache_get_set() throws ExecutionException, InterruptedException {
87          final CompletionStage<Optional<String>> eldestGet = cache.get("claira");
88  
89          assertThat(eldestGet, successfulWith(is(Optional.empty())));
90  
91          final CompletionStage<Boolean> eldestAdd = cache.put("claira", "dancing", PutPolicy.ADD_ONLY);
92  
93          assertThat(eldestAdd, successfulWith(is(true)));
94  
95          final CompletionStage<Optional<String>> eldestGet2 = cache.get("claira");
96  
97          assertThat(eldestGet2, successfulWith(is(Optional.of("dancing"))));
98  
99          final EnumMap<MetricLabel, ? extends LongMetric> cacheMetrics =
100                 requestMetrics().allExternalCacheLongMetrics().get(CACHE_NAME);
101         assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_MISSES, is(1L), is(1L)));
102         assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_HITS, is(1L), is(1L)));
103         assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_REMOTE_GET, is(1L), is(1L)));
104         assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_GET_CALL, is(2L), greaterThan(0L)));
105         assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_PUT_CALL, is(1L), greaterThan(0L)));
106         assertThat(cacheMetrics, hasSize(is(5)));
107     }
108 
109     @Test
110     public void dual_cache_get_set() throws ExecutionException, InterruptedException {
111         final CompletionStage<Optional<String>> get1 = cache.get("claira");
112 
113         assertThat(get1, successfulWith(is(Optional.empty())));
114 
115         // Change will not be visible to main cache
116         final CompletionStage<Boolean> dput1 = directCache.put("claira", "dancing", PutPolicy.PUT_ALWAYS);
117 
118         assertThat(dput1, successfulWith(is(true)));
119 
120         final CompletionStage<Optional<String>> get2 = cache.get("claira");
121 
122         assertThat(get2, successfulWith(is(Optional.empty())));
123 
124         // Add will fail as value exists from the other cache. However, we will now be able to see the
125         // value added by the other cache.
126         final CompletionStage<Boolean> put1 = cache.put("claira", "singing", PutPolicy.ADD_ONLY);
127 
128         assertThat(put1, successfulWith(is(false)));
129 
130         final CompletionStage<Optional<String>> get3 = cache.get("claira");
131 
132         assertThat(get3, successfulWith(is(Optional.of("dancing"))));
133 
134         final EnumMap<MetricLabel, ? extends LongMetric> cacheMetrics =
135                 requestMetrics().allExternalCacheLongMetrics().get(CACHE_NAME);
136         assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_GET_CALL, is(3L), greaterThan(0L)));
137         assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_PUT_CALL, is(1L), greaterThan(0L)));
138         assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_HITS, is(1L), is(1L)));
139         assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_MISSES, is(2L), is(2L)));
140         assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_REMOTE_GET, is(2L), is(2L)));
141         assertThat(cacheMetrics, hasSize(is(5)));
142     }
143 
144     @Test
145     public void dual_cache_get_with_supplier() throws ExecutionException, InterruptedException {
146         final CompletionStage<Optional<String>> get1 = cache.get("claira");
147 
148         assertThat(get1, successfulWith(is(Optional.empty())));
149 
150         final CompletionStage<String> get2 = cache.get("claira", () -> "dancing");
151 
152         assertThat(get2, successfulWith(is("dancing")));
153 
154         final CompletionStage<Optional<String>> get3 = cache.get("claira");
155 
156         assertThat(get3, successfulWith(is(Optional.of("dancing"))));
157 
158         // Change will not be visible to main cache
159         final CompletionStage<Boolean> dput1 = directCache.put("claira", "singing", PutPolicy.PUT_ALWAYS);
160         assertThat(dput1, successfulWith(is(true)));
161 
162         final CompletionStage<Optional<String>> get4 = cache.get("claira");
163 
164         assertThat(get4, successfulWith(is(Optional.of("dancing"))));
165 
166         final CompletionStage<String> get5 = cache.get("claira", () -> "riding");
167 
168         assertThat(get5, successfulWith(is("dancing")));
169 
170         final EnumMap<MetricLabel, ? extends LongMetric> cacheMetrics =
171                 requestMetrics().allExternalCacheLongMetrics().get(CACHE_NAME);
172         assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_GET_CALL, is(5L), greaterThan(0L)));
173         assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_SUPPLIER_CALL, is(1L), greaterThan(0L)));
174         assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_HITS, is(3L), is(3L)));
175         assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_MISSES, is(2L), is(2L)));
176         assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_REMOTE_GET, is(2L), is(2L)));
177         assertThat(cacheMetrics, hasSize(is(5)));
178     }
179 
180     @Test
181     public void dual_cache_get_with_supplier_take2() throws ExecutionException, InterruptedException {
182         final CompletionStage<Optional<String>> get1 = cache.get("claira");
183 
184         assertThat(get1, successfulWith(is(Optional.empty())));
185 
186         final CompletionStage<Boolean> dput1 = directCache.put("claira", "singing", PutPolicy.PUT_ALWAYS);
187         assertThat(dput1, successfulWith(is(true)));
188 
189         final CompletionStage<String> get2 = cache.get("claira", () -> "riding");
190 
191         assertThat(get2, successfulWith(is("singing")));
192 
193         final EnumMap<MetricLabel, ? extends LongMetric> cacheMetrics =
194                 requestMetrics().allExternalCacheLongMetrics().get(CACHE_NAME);
195         assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_GET_CALL, is(2L), greaterThan(0L)));
196         assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_SUPPLIER_CALL, is(1L), greaterThan(0L)));
197         assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_MISSES, is(2L), is(2L)));
198         assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_REMOTE_GET, is(3L), is(3L)));
199         assertThat(cacheMetrics, hasSize(is(4)));
200     }
201 
202     @Test
203     public void dual_cache_getBulk() throws ExecutionException, InterruptedException {
204         final CompletionStage<Boolean> dput1 = directCache.put("claira", "singing", PutPolicy.PUT_ALWAYS);
205         assertThat(dput1, successfulWith(is(true)));
206 
207         final CompletionStage<Map<String, Optional<String>>> get1 = cache.getBulk("claira", "josephine", "claira");
208 
209         assertThat(get1, successful());
210         final Map<String, Optional<String>> map1 = unsafeJoin(get1);
211         assertThat(map1.keySet(), Matchers.containsInAnyOrder("claira", "josephine"));
212         assertThat(map1.get("claira"), is(Optional.of("singing")));
213         assertThat(map1.get("josephine"), is(Optional.empty()));
214 
215         final CompletionStage<Boolean> dput2 = directCache.put("josephine", "football", PutPolicy.PUT_ALWAYS);
216         assertThat(dput2, successfulWith(is(true)));
217 
218         final CompletionStage<Boolean> dput3 = directCache.put("jasmin", "skiing", PutPolicy.PUT_ALWAYS);
219         assertThat(dput3, successfulWith(is(true)));
220 
221         final CompletionStage<Map<String, Optional<String>>> get2 = cache.getBulk("claira", "josephine", "jasmin");
222 
223         assertThat(get2, successful());
224         final Map<String, Optional<String>> map2 = unsafeJoin(get2);
225         assertThat(map2.keySet(), Matchers.containsInAnyOrder("claira", "josephine", "jasmin"));
226         assertThat(map2.get("claira"), is(Optional.of("singing")));
227         assertThat(map2.get("josephine"), is(Optional.empty()));
228         assertThat(map2.get("jasmin"), is(Optional.of("skiing")));
229 
230         final EnumMap<MetricLabel, ? extends LongMetric> cacheMetrics =
231                 requestMetrics().allExternalCacheLongMetrics().get(CACHE_NAME);
232         assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_GET_CALL, is(2L), greaterThan(0L)));
233         assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_HITS, is(2L), is(3L)));
234         assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_MISSES, is(2L), is(2L)));
235         assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_REMOTE_GET, is(2L), is(2L)));
236         assertThat(cacheMetrics, hasSize(is(4)));
237     }
238 
239     @Test
240     public void remove_normal() throws ExecutionException, InterruptedException {
241         final CompletionStage<Boolean> dput1 = directCache.put("claira", "singing", PutPolicy.PUT_ALWAYS);
242         assertThat(dput1, successfulWith(is(true)));
243 
244         final CompletionStage<Boolean> put1 = cache.put("josephine", "football", PutPolicy.PUT_ALWAYS);
245         assertThat(put1, successfulWith(is(true)));
246 
247         final CompletionStage<Map<String, Optional<String>>> get1 = cache.getBulk("claira", "josephine", "jasmin");
248 
249         assertThat(get1, successful());
250         final Map<String, Optional<String>> map1 = unsafeJoin(get1);
251         assertThat(map1.keySet(), Matchers.containsInAnyOrder("claira", "josephine", "jasmin"));
252         assertThat(map1.get("claira"), is(Optional.of("singing")));
253         assertThat(map1.get("josephine"), is(Optional.of("football")));
254         assertThat(map1.get("jasmin"), is(Optional.empty()));
255 
256         final CompletionStage<Void> rm1 = cache.remove("jasmin", "claira", "claira", "josephine");
257 
258         assertThat(rm1, successful());
259 
260         final CompletionStage<Map<String, Optional<String>>> dget1 = directCache.getBulk("claira", "josephine", "jasmin");
261 
262         assertThat(dget1, successful());
263         final Map<String, Optional<String>> dmap1 = unsafeJoin(dget1);
264         assertThat(dmap1.keySet(), Matchers.containsInAnyOrder("claira", "josephine", "jasmin"));
265         assertThat(dmap1.get("claira"), is(Optional.empty()));
266         assertThat(dmap1.get("josephine"), is(Optional.empty()));
267         assertThat(dmap1.get("jasmin"), is(Optional.empty()));
268 
269         final CompletionStage<Map<String, Optional<String>>> get2 = cache.getBulk("claira", "josephine", "jasmin");
270 
271         assertThat(get2, successful());
272         final Map<String, Optional<String>> map2 = unsafeJoin(get2);
273         assertThat(map2.keySet(), Matchers.containsInAnyOrder("claira", "josephine", "jasmin"));
274         assertThat(map2.get("claira"), is(Optional.empty()));
275         assertThat(map2.get("josephine"), is(Optional.empty()));
276         assertThat(map2.get("jasmin"), is(Optional.empty()));
277 
278         final EnumMap<MetricLabel, ? extends LongMetric> cacheMetrics =
279                 requestMetrics().allExternalCacheLongMetrics().get(CACHE_NAME);
280         assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_GET_CALL, is(2L), greaterThan(0L)));
281         assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_PUT_CALL, is(1L), greaterThan(0L)));
282         assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_REMOVE_CALL, is(1L), greaterThan(0L)));
283         assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_HITS, is(1L), is(2L)));
284         assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_MISSES, is(2L), is(4L)));
285         assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_REMOTE_GET, is(1L), is(1L)));
286         assertThat(cacheMetrics, hasSize(is(6)));
287     }
288 
289     @Test
290     public void simple_removeAll() throws ExecutionException, InterruptedException {
291         final CompletionStage<Boolean> put1 = cache.put("claira", "dancing", PutPolicy.PUT_ALWAYS);
292 
293         assertThat(put1, successfulWith(is(true)));
294 
295         final CompletionStage<Optional<String>> get1 = cache.get("claira");
296 
297         assertThat(get1, successfulWith(is(Optional.of("dancing"))));
298 
299         final CompletionStage<Void> rm1 = cache.removeAll();
300 
301         assertThat(rm1, successful());
302 
303         final CompletionStage<Optional<String>> get2 = cache.get("claira");
304 
305         assertThat(get2, successfulWith(is(Optional.empty())));
306 
307         final EnumMap<MetricLabel, ? extends LongMetric> cacheMetrics =
308                 requestMetrics().allExternalCacheLongMetrics().get(CACHE_NAME);
309         assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_GET_CALL, is(2L), greaterThan(0L)));
310         assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_PUT_CALL, is(1L), greaterThan(0L)));
311         assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_REMOVE_ALL_CALL, is(1L), greaterThan(0L)));
312         assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_HITS, is(1L), is(1L)));
313         assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_MISSES, is(1L), is(1L)));
314         assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_REMOTE_GET, is(1L), is(1L)));
315         assertThat(cacheMetrics, hasSize(is(6)));
316     }
317 
318     @Test
319     public void simple_getBulk_function() throws ExecutionException, InterruptedException {
320         final CompletionStage<Map<String, String>> get1 = cache.getBulk(
321                 keys -> Maps.asMap(keys, k -> k + "-1"), "claira");
322 
323         assertThat(get1, successful());
324         assertThat(unsafeJoin(get1).keySet(), containsInAnyOrder("claira"));
325         assertThat(unsafeJoin(get1).values(), containsInAnyOrder("claira-1"));
326 
327         final CompletionStage<Map<String, String>> get2 = cache.getBulk(
328                 keys -> Maps.asMap(keys, k -> k + "-2"), "claira", "josephine", "claira");
329 
330         assertThat(get2, successful());
331         assertThat(unsafeJoin(get2).keySet(), containsInAnyOrder("claira", "josephine"));
332         assertThat(unsafeJoin(get2).values(), containsInAnyOrder("claira-1", "josephine-2"));
333 
334         final EnumMap<MetricLabel, ? extends LongMetric> cacheMetrics =
335                 requestMetrics().allExternalCacheLongMetrics().get(CACHE_NAME);
336         assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_FACTORY_CALL, is(2L), greaterThan(0L)));
337         assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_GET_CALL, is(2L), greaterThan(0L)));
338         assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_FACTORY_KEYS, is(2L), is(2L)));
339         assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_HITS, is(1L), is(1L)));
340         assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_MISSES, is(2L), is(2L)));
341         assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_REMOTE_GET, is(4L), is(4L)));
342         assertThat(cacheMetrics, hasSize(is(6)));
343     }
344 
345     @Test
346     public void dual_getBulk_function() throws ExecutionException, InterruptedException {
347         final CompletionStage<Map<String, String>> get1 = cache.getBulk(
348                 keys -> Maps.asMap(keys, k -> k + "-1"), "claira");
349 
350         assertThat(get1, successful());
351         assertThat(unsafeJoin(get1).keySet(), containsInAnyOrder("claira"));
352         assertThat(unsafeJoin(get1).values(), containsInAnyOrder("claira-1"));
353 
354         final CompletionStage<Boolean> mput1 = directCache.put("claira", "singing", PutPolicy.PUT_ALWAYS);
355         assertThat(mput1, successfulWith(is(true)));
356 
357         final CompletionStage<Map<String, String>> get2 = cache.getBulk(
358                 keys -> Maps.asMap(keys, k -> k + "-2"), "claira", "josephine", "claira");
359 
360         assertThat(get2, successful());
361         assertThat(unsafeJoin(get2).keySet(), containsInAnyOrder("claira", "josephine"));
362         assertThat(unsafeJoin(get2).values(), containsInAnyOrder("claira-1", "josephine-2"));
363 
364         final CompletionStage<Void> rm1 = cache.remove("claira");
365 
366         assertThat(rm1, successful());
367 
368         final CompletionStage<Map<String, String>> get3 = cache.getBulk(
369                 keys -> Maps.asMap(keys, k -> k + "-3"), "claira", "josephine", "jasmin");
370 
371         assertThat(get3, successful());
372         assertThat(unsafeJoin(get3).keySet(), containsInAnyOrder("claira", "josephine", "jasmin"));
373         assertThat(unsafeJoin(get3).values(), containsInAnyOrder("claira-3", "josephine-2", "jasmin-3"));
374 
375         final EnumMap<MetricLabel, ? extends LongMetric> cacheMetrics =
376                 requestMetrics().allExternalCacheLongMetrics().get(CACHE_NAME);
377         assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_FACTORY_CALL, is(3L), greaterThan(0L)));
378         assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_GET_CALL, is(3L), greaterThan(0L)));
379         assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_REMOVE_CALL, is(1L), greaterThan(0L)));
380         assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_FACTORY_KEYS, is(3L), is(4L)));
381         assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_HITS, is(2L), is(2L)));
382         assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_MISSES, is(3L), is(4L)));
383         assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_REMOTE_GET, is(7L), is(7L)));
384         assertThat(cacheMetrics, hasSize(is(7)));
385     }
386 
387     @SuppressWarnings("ConstantConditions")
388     @Test
389     public void check_null_detection() {
390         assertThat(cache.get("kenny", () -> null), not(successful()));
391         assertThat(cache.put("key", null, PutPolicy.ADD_ONLY), not(successful()));
392         assertThat(cache.getBulk(
393                 strings -> strings.stream().collect(Collectors.toMap(k -> k, k -> null)),
394                 "extra"),
395                 not(successful()));
396 
397         final EnumMap<MetricLabel, ? extends LongMetric> cacheMetrics =
398                 requestMetrics().allExternalCacheLongMetrics().get(CACHE_NAME);
399         assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_FACTORY_CALL, is(1L), greaterThan(0L)));
400         assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_GET_CALL, is(2L), greaterThan(0L)));
401         assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_PUT_CALL, is(1L), greaterThan(0L)));
402         assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_SUPPLIER_CALL, is(1L), greaterThan(0L)));
403         assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_FACTORY_KEYS, is(1L), is(1L)));
404         assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_FAILED_GET, is(2L), is(2L)));
405         assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_FAILED_PUT, is(1L), is(1L)));
406         assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_REMOTE_GET, is(2L), is(2L)));
407         assertThat(cacheMetrics, hasSize(is(8)));
408     }
409 
410     @Test
411     public void both_getBulk_and_get_fail() throws InterruptedException {
412         //Scenario:
413         // T1: bulkGet A, B, C
414         // T2: get B, supplier
415         // T1: bulkGet completes load with an exception
416         // T2: get B, supplier -> fails with another exception
417 
418         final Barrier blockedInFactory = new Barrier();
419         final Barrier resumeInFactory = new Barrier();
420 
421         final RuntimeException exBulk = new RuntimeException("getBulk() exception");
422         final CompletableFuture<Map<String, String>> t1Result =
423                 runAndWaitForStart(() -> cache.getBulk(keys -> {
424                     log.info("{}: about to wait", Thread.currentThread().getName());
425                     blockedInFactory.signal();
426                     resumeInFactory.await();
427                     log.info("{}: About to throw exception", Thread.currentThread().getName());
428                     throw exBulk;
429                 }, "A", "B", "C").toCompletableFuture().join());
430 
431         final RuntimeException exSingle = new RuntimeException("get() exception");
432         blockedInFactory.await();
433         final CompletableFuture<String> t2Result =
434                 runAndWaitForStart(() -> cache.get("B", () -> {
435                     log.info("{}: About to throw exception", Thread.currentThread().getName());
436                     throw exSingle;
437                 }).toCompletableFuture().join());
438 
439         log.info("{}: About to signal on barrierBulk", Thread.currentThread().getName());
440         resumeInFactory.signal(); //Wait until T2 cache.get calls join
441 
442         assertThat(t1Result.handle((r, e) -> Throwables.getRootCause(e)).join(), is(exBulk));
443         assertThat(t2Result.handle((r, e) -> Throwables.getRootCause(e)).join(), is(exSingle));
444         assertThat(cache.get("A").toCompletableFuture().join(), is(Optional.empty()));
445         assertThat(cache.get("B").toCompletableFuture().join(), is(Optional.empty()));
446         assertThat(cache.get("C").toCompletableFuture().join(), is(Optional.empty()));
447     }
448 
449     @Test
450     public void both_get_and_getBulk_fail() throws InterruptedException {
451         //Scenario:
452         // T1: get B, supplier
453         // T2: bulkGet A, B, C
454         // T1: get B, supplier -> fails with another exception
455         // T2: bulkGet completes load with an exception
456 
457         final Barrier blockedInFactory = new Barrier();
458         final Barrier resumeInFactory = new Barrier();
459 
460         final RuntimeException exSingle = new RuntimeException("get() exception");
461         final CompletableFuture<String> t1Result =
462                 runAndWaitForStart(() -> cache.get("B", () -> {
463                     log.info("{}: About to throw exception", Thread.currentThread().getName());
464                     blockedInFactory.signal();
465                     resumeInFactory.await();
466                     throw exSingle;
467                 }).toCompletableFuture().join());
468 
469         final RuntimeException exBulk = new RuntimeException("getBulk() exception");
470         blockedInFactory.await();
471         final CompletableFuture<Map<String, String>> t2Result =
472                 runAndWaitForStart(() -> cache.getBulk(keys -> {
473                     log.info("{}: about to wait", Thread.currentThread().getName());
474                     log.info("{}: About to throw exception", Thread.currentThread().getName());
475                     throw exBulk;
476                 }, "A", "B", "C").toCompletableFuture().join());
477 
478         log.info("{}: About to signal on barrierBulk", Thread.currentThread().getName());
479         resumeInFactory.signal(); //Wait until T2 cache.get calls join
480 
481         assertThat(t1Result.handle((r, e) -> Throwables.getRootCause(e)).join(), is(exSingle));
482         assertThat(t2Result.handle((r, e) -> Throwables.getRootCause(e)).join(), is(exBulk));
483         assertThat(cache.get("A").toCompletableFuture().join(), is(Optional.empty()));
484         assertThat(cache.get("B").toCompletableFuture().join(), is(Optional.empty()));
485         assertThat(cache.get("C").toCompletableFuture().join(), is(Optional.empty()));
486     }
487 
488     @Test
489     public void get_fails_and_getBulk_succeeds() throws InterruptedException {
490         //Scenario:
491         // T1: get B, supplier, fails
492         // T2: bulkGet A, B, C succeeds
493         // T1: get B, supplier -> fails with another exception
494         // T2: bulkGet completes with a map
495 
496         final Barrier blockedInFactory = new Barrier();
497         final Barrier resumeInFactory = new Barrier();
498 
499         final RuntimeException exSingle = new RuntimeException("get() exception");
500         final CompletableFuture<String> t1Result =
501                 runAndWaitForStart(() -> cache.get("B", () -> {
502                     log.info("{}: about to wait", Thread.currentThread().getName());
503                     blockedInFactory.signal();
504                     resumeInFactory.await();
505                     log.info("{}: About to throw exception", Thread.currentThread().getName());
506                     throw exSingle;
507                 }).toCompletableFuture().join());
508 
509         blockedInFactory.await();
510         final CompletableFuture<Map<String, String>> t2Result =
511                 runAndWaitForStart(() -> cache.getBulk(keys -> {
512                     log.info("{}: About to return numbers", Thread.currentThread().getName());
513                     return ImmutableMap.of("A", "1", "B", "2", "C", "3");
514                 }, "A", "B", "C").toCompletableFuture().join());
515 
516         log.info("{}: About to signal on barrierBulk", Thread.currentThread().getName());
517         resumeInFactory.signal(); //Wait until T2 cache.get calls join
518 
519         assertThat(t1Result.handle((r, e) -> {
520             if (e != null) {
521                 return Throwables.getRootCause(e);
522             } else {
523                 log.info("r: {}", r);
524                 // This forces a fail.
525                 return new RuntimeException();
526             }
527         }).join(), is(exSingle));
528 
529         assertThat(t2Result.join().get("A"), is("1"));
530         assertThat(t2Result.join().get("B"), is("2"));
531         assertThat(t2Result.join().get("C"), is("3"));
532 
533         assertThat(cache.get("A").toCompletableFuture().join(), is(Optional.of("1")));
534         assertThat(cache.get("B").toCompletableFuture().join(), is(Optional.of("2")));
535         assertThat(cache.get("C").toCompletableFuture().join(), is(Optional.of("3")));
536     }
537 
538     @Test
539     public void getBulk_fail_and_get_succeed() throws InterruptedException {
540         //Scenario:
541         // T1: bulkGet A, B, C
542         // T2: get B, supplier
543         // T1: bulkGet function throws exception
544         // T2: get B, supplier -> returns value
545 
546         final Barrier blockedInFactory = new Barrier();
547         final Barrier resumeInFactory = new Barrier();
548 
549         final RuntimeException exBulk = new RuntimeException("getBulk() exception");
550         final CompletableFuture<Map<String, String>> t1Result =
551                 runAndWaitForStart(() -> cache.getBulk(keys -> {
552                     log.info("{}: about to wait", Thread.currentThread().getName());
553                     blockedInFactory.signal();
554                     resumeInFactory.await();
555                     log.info("{}: About to throw exception", Thread.currentThread().getName());
556                     throw exBulk;
557                 }, "A", "B", "C").toCompletableFuture().join());
558 
559         blockedInFactory.await();
560         final CompletableFuture<String> t2Result =
561                 runAndWaitForStart(() -> cache.get("B", () -> "bee").toCompletableFuture().join());
562 
563         log.info("{}: About to signal on barrierBulk", Thread.currentThread().getName());
564         resumeInFactory.signal(); //Wait until T2 cache.get calls join
565 
566 
567         assertThat(t1Result.handle((r, e) -> Throwables.getRootCause(e)).join(), is(exBulk));
568         assertThat(t2Result.join(), is("bee"));
569         assertThat(cache.get("A").toCompletableFuture().join(), is(Optional.empty()));
570         assertThat(cache.get("B").toCompletableFuture().join(), is(Optional.of("bee")));
571         assertThat(cache.get("C").toCompletableFuture().join(), is(Optional.empty()));
572     }
573 
574     @Test
575     public void getBulk_pass_and_get_reuse() throws InterruptedException {
576         //Scenario:
577         // T1: bulkGet A, B, C, blocks in the factory
578         // T2: get B, supplier runs
579         // T1: bulkGet function returns values, should return T2 value for B
580         // T2: get B, supplier -> returns T2 value
581 
582         final Barrier blockedInFactory = new Barrier();
583         final Barrier resumeInFactory = new Barrier();
584 
585         final CompletableFuture<Map<String, String>> t1Result =
586                 runAndWaitForStart(() -> cache.getBulk(keys -> {
587                     log.info("{}: About to await on barrierBulk", Thread.currentThread().getName());
588                     blockedInFactory.signal();
589                     resumeInFactory.await();
590                     log.info("{}: resuming", Thread.currentThread().getName());
591                     return keys.stream().collect(Collectors.toMap(k -> k, k -> k + "-1"));
592                 }, "A", "B", "C").toCompletableFuture().join());
593 
594         blockedInFactory.await();
595         final CompletableFuture<String> t2Result =
596                 runAndWaitForStart(() -> cache.get("B", () -> "B-2").toCompletableFuture().join());
597 
598         log.info("{}: About to signal on barrierBulk", Thread.currentThread().getName());
599         assertThat(t2Result.join(), is("B-2"));
600         resumeInFactory.signal(); // Wait until T2 cache.get calls join
601         log.info("{}: and now", Thread.currentThread().getName());
602 
603 
604         final Map<String, String> t1ResultData = t1Result.join();
605         assertThat(t1ResultData.get("A"), is("A-1"));
606         assertThat(t1ResultData.get("B"), is("B-2"));
607         assertThat(t1ResultData.get("C"), is("C-1"));
608         assertThat(cache.get("A").toCompletableFuture().join(), is(Optional.of("A-1")));
609         assertThat(cache.get("B").toCompletableFuture().join(), is(Optional.of("B-2")));
610         assertThat(cache.get("C").toCompletableFuture().join(), is(Optional.of("C-1")));
611     }
612 
613     @Test
614     public void potential_deadlock() {
615         final CompletableFuture<String> get = cache.get("lockType", () -> {
616             // People have actually done this!!
617             cache.remove("lockType");
618             return "lockwood";
619         }).toCompletableFuture();
620 
621         assertThat(get, successfulWith(is("lockwood")));
622     }
623 
624     @Test
625     public void repeated_concurrency() {
626         IntStream.range(1, 50).forEach((i) -> {
627             try {
628                 getBulk_fail_and_get_succeed();
629                 cache.removeAll();
630                 getBulk_pass_and_get_reuse();
631                 cache.removeAll();
632                 get_fails_and_getBulk_succeeds();
633                 cache.removeAll();
634                 both_get_and_getBulk_fail();
635                 cache.removeAll();
636                 both_getBulk_and_get_fail();
637                 cache.removeAll();
638 
639             } catch (InterruptedException e) {
640                 throw new RuntimeException(e);
641             }
642         });
643     }
644 }