View Javadoc

1   package com.atlassian.vcache.internal.test;
2   
3   import com.atlassian.utt.concurrency.Barrier;
4   import com.atlassian.vcache.ChangeRate;
5   import com.atlassian.vcache.DirectExternalCache;
6   import com.atlassian.vcache.ExternalCacheSettings;
7   import com.atlassian.vcache.ExternalCacheSettingsBuilder;
8   import com.atlassian.vcache.PutPolicy;
9   import com.atlassian.vcache.StableReadExternalCache;
10  import com.atlassian.vcache.internal.LongMetric;
11  import com.atlassian.vcache.internal.MetricLabel;
12  import com.atlassian.vcache.internal.RequestMetrics;
13  import com.google.common.base.Throwables;
14  import com.google.common.collect.ImmutableMap;
15  import com.google.common.collect.Maps;
16  import org.hamcrest.Matchers;
17  import org.junit.Before;
18  import org.junit.Rule;
19  import org.junit.Test;
20  import org.slf4j.Logger;
21  import org.slf4j.LoggerFactory;
22  
23  import java.time.Duration;
24  import java.util.Map;
25  import java.util.Optional;
26  import java.util.concurrent.CompletableFuture;
27  import java.util.concurrent.CompletionStage;
28  import java.util.concurrent.ExecutionException;
29  import java.util.stream.Collectors;
30  import java.util.stream.IntStream;
31  
32  import static com.atlassian.vcache.VCacheUtils.unsafeJoin;
33  import static com.atlassian.vcache.internal.test.CacheMetricsMatcher.hasMetric;
34  import static com.atlassian.vcache.internal.test.CacheMetricsMatcher.hasSize;
35  import static com.atlassian.vcache.internal.test.CompletionStageSuccessful.successful;
36  import static com.atlassian.vcache.internal.test.CompletionStageSuccessful.successfulWith;
37  import static com.atlassian.vcache.internal.test.TestUtils.runAndWaitForStart;
38  import static org.hamcrest.Matchers.containsInAnyOrder;
39  import static org.hamcrest.Matchers.greaterThan;
40  import static org.hamcrest.Matchers.is;
41  import static org.hamcrest.Matchers.not;
42  import static org.junit.Assert.assertThat;
43  
44  /**
45   * Base test class for the {@link StableReadExternalCache}.
46   */
47  public abstract class AbstractStableReadExternalCacheIT {
48      private static final Logger log = LoggerFactory.getLogger(AbstractStableReadExternalCacheIT.class);
49      private static final String CACHE_NAME = "kids_hobbies";
50  
51      @Rule
52      public LoggingTestWatcher watcher = new LoggingTestWatcher(log);
53  
54      private StableReadExternalCache<String> cache;
55      private DirectExternalCache<String> directCache;
56  
57      protected abstract DirectExternalCache<String> obtainDirectCache(String name, ExternalCacheSettings settings);
58  
59      protected abstract StableReadExternalCache<String> createCache(String name, ExternalCacheSettings settings, Duration lockTimeout);
60  
61      protected StableReadExternalCache<String> createCache(String name, ExternalCacheSettings settings) {
62          return createCache(name, settings, Duration.ofSeconds(2));
63      }
64  
65      protected abstract RequestMetrics requestMetrics();
66  
67      @Before
68      public void ensureCache() {
69          final ExternalCacheSettings settings = new ExternalCacheSettingsBuilder()
70                  .entryGrowthRateHint(ChangeRate.LOW_CHANGE)
71                  .entryCountHint(5)
72                  .defaultTtl(Duration.ofMinutes(5))
73                  .dataChangeRateHint(ChangeRate.HIGH_CHANGE)
74                  .build();
75          cache = createCache(CACHE_NAME, settings);
76  
77          directCache = obtainDirectCache(CACHE_NAME, settings);
78  
79          // Start from a clean slate
80          final CompletionStage<Void> rm = directCache.removeAll();
81          assertThat(rm, successful());
82      }
83  
84      @Test
85      public void single_cache_get_set() throws ExecutionException, InterruptedException {
86          final CompletionStage<Optional<String>> eldestGet = cache.get("claira");
87  
88          assertThat(eldestGet, successfulWith(is(Optional.empty())));
89  
90          final CompletionStage<Boolean> eldestAdd = cache.put("claira", "dancing", PutPolicy.ADD_ONLY);
91  
92          assertThat(eldestAdd, successfulWith(is(true)));
93  
94          final CompletionStage<Optional<String>> eldestGet2 = cache.get("claira");
95  
96          assertThat(eldestGet2, successfulWith(is(Optional.of("dancing"))));
97  
98          final Map<MetricLabel, ? extends LongMetric> cacheMetrics =
99                  requestMetrics().allExternalCacheLongMetrics().get(CACHE_NAME);
100         assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_MISSES, is(1L), is(1L)));
101         assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_HITS, is(1L), is(1L)));
102         assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_REMOTE_GET, is(1L), is(1L)));
103         assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_GET_CALL, is(2L), greaterThan(0L)));
104         assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_PUT_CALL, is(1L), greaterThan(0L)));
105         assertThat(cacheMetrics, hasSize(is(5)));
106     }
107 
108     @Test
109     public void dual_cache_get_set() throws ExecutionException, InterruptedException {
110         final CompletionStage<Optional<String>> get1 = cache.get("claira");
111 
112         assertThat(get1, successfulWith(is(Optional.empty())));
113 
114         // Change will not be visible to main cache
115         final CompletionStage<Boolean> dput1 = directCache.put("claira", "dancing", PutPolicy.PUT_ALWAYS);
116 
117         assertThat(dput1, successfulWith(is(true)));
118 
119         final CompletionStage<Optional<String>> get2 = cache.get("claira");
120 
121         assertThat(get2, successfulWith(is(Optional.empty())));
122 
123         // Add will fail as value exists from the other cache. However, we will now be able to see the
124         // value added by the other cache.
125         final CompletionStage<Boolean> put1 = cache.put("claira", "singing", PutPolicy.ADD_ONLY);
126 
127         assertThat(put1, successfulWith(is(false)));
128 
129         final CompletionStage<Optional<String>> get3 = cache.get("claira");
130 
131         assertThat(get3, successfulWith(is(Optional.of("dancing"))));
132 
133         final Map<MetricLabel, ? extends LongMetric> cacheMetrics =
134                 requestMetrics().allExternalCacheLongMetrics().get(CACHE_NAME);
135         assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_GET_CALL, is(3L), greaterThan(0L)));
136         assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_PUT_CALL, is(1L), greaterThan(0L)));
137         assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_HITS, is(1L), is(1L)));
138         assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_MISSES, is(2L), is(2L)));
139         assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_REMOTE_GET, is(2L), is(2L)));
140         assertThat(cacheMetrics, hasSize(is(5)));
141     }
142 
143     @Test
144     public void dual_cache_get_with_supplier() throws ExecutionException, InterruptedException {
145         final CompletionStage<Optional<String>> get1 = cache.get("claira");
146 
147         assertThat(get1, successfulWith(is(Optional.empty())));
148 
149         final CompletionStage<String> get2 = cache.get("claira", () -> "dancing");
150 
151         assertThat(get2, successfulWith(is("dancing")));
152 
153         final CompletionStage<Optional<String>> get3 = cache.get("claira");
154 
155         assertThat(get3, successfulWith(is(Optional.of("dancing"))));
156 
157         // Change will not be visible to main cache
158         final CompletionStage<Boolean> dput1 = directCache.put("claira", "singing", PutPolicy.PUT_ALWAYS);
159         assertThat(dput1, successfulWith(is(true)));
160 
161         final CompletionStage<Optional<String>> get4 = cache.get("claira");
162 
163         assertThat(get4, successfulWith(is(Optional.of("dancing"))));
164 
165         final CompletionStage<String> get5 = cache.get("claira", () -> "riding");
166 
167         assertThat(get5, successfulWith(is("dancing")));
168 
169         final Map<MetricLabel, ? extends LongMetric> cacheMetrics =
170                 requestMetrics().allExternalCacheLongMetrics().get(CACHE_NAME);
171         assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_GET_CALL, is(5L), greaterThan(0L)));
172         assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_SUPPLIER_CALL, is(1L), greaterThan(0L)));
173         assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_HITS, is(3L), is(3L)));
174         assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_MISSES, is(2L), is(2L)));
175         assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_REMOTE_GET, is(2L), is(2L)));
176         assertThat(cacheMetrics, hasSize(is(5)));
177     }
178 
179     @Test
180     public void dual_cache_get_with_supplier_take2() throws ExecutionException, InterruptedException {
181         final CompletionStage<Optional<String>> get1 = cache.get("claira");
182 
183         assertThat(get1, successfulWith(is(Optional.empty())));
184 
185         final CompletionStage<Boolean> dput1 = directCache.put("claira", "singing", PutPolicy.PUT_ALWAYS);
186         assertThat(dput1, successfulWith(is(true)));
187 
188         final CompletionStage<String> get2 = cache.get("claira", () -> "riding");
189 
190         assertThat(get2, successfulWith(is("singing")));
191 
192         final Map<MetricLabel, ? extends LongMetric> cacheMetrics =
193                 requestMetrics().allExternalCacheLongMetrics().get(CACHE_NAME);
194         assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_GET_CALL, is(2L), greaterThan(0L)));
195         assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_SUPPLIER_CALL, is(1L), greaterThan(0L)));
196         assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_MISSES, is(2L), is(2L)));
197         assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_REMOTE_GET, is(3L), is(3L)));
198         assertThat(cacheMetrics, hasSize(is(4)));
199     }
200 
201     @Test
202     public void dual_cache_getBulk() throws ExecutionException, InterruptedException {
203         final CompletionStage<Boolean> dput1 = directCache.put("claira", "singing", PutPolicy.PUT_ALWAYS);
204         assertThat(dput1, successfulWith(is(true)));
205 
206         final CompletionStage<Map<String, Optional<String>>> get1 = cache.getBulk("claira", "josephine", "claira");
207 
208         assertThat(get1, successful());
209         final Map<String, Optional<String>> map1 = unsafeJoin(get1);
210         assertThat(map1.keySet(), Matchers.containsInAnyOrder("claira", "josephine"));
211         assertThat(map1.get("claira"), is(Optional.of("singing")));
212         assertThat(map1.get("josephine"), is(Optional.empty()));
213 
214         final CompletionStage<Boolean> dput2 = directCache.put("josephine", "football", PutPolicy.PUT_ALWAYS);
215         assertThat(dput2, successfulWith(is(true)));
216 
217         final CompletionStage<Boolean> dput3 = directCache.put("jasmin", "skiing", PutPolicy.PUT_ALWAYS);
218         assertThat(dput3, successfulWith(is(true)));
219 
220         final CompletionStage<Map<String, Optional<String>>> get2 = cache.getBulk("claira", "josephine", "jasmin");
221 
222         assertThat(get2, successful());
223         final Map<String, Optional<String>> map2 = unsafeJoin(get2);
224         assertThat(map2.keySet(), Matchers.containsInAnyOrder("claira", "josephine", "jasmin"));
225         assertThat(map2.get("claira"), is(Optional.of("singing")));
226         assertThat(map2.get("josephine"), is(Optional.empty()));
227         assertThat(map2.get("jasmin"), is(Optional.of("skiing")));
228 
229         final Map<MetricLabel, ? extends LongMetric> cacheMetrics =
230                 requestMetrics().allExternalCacheLongMetrics().get(CACHE_NAME);
231         assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_GET_CALL, is(2L), greaterThan(0L)));
232         assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_HITS, is(2L), is(3L)));
233         assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_MISSES, is(2L), is(2L)));
234         assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_REMOTE_GET, is(2L), is(2L)));
235         assertThat(cacheMetrics, hasSize(is(4)));
236     }
237 
238     @Test
239     public void remove_normal() throws ExecutionException, InterruptedException {
240         final CompletionStage<Boolean> dput1 = directCache.put("claira", "singing", PutPolicy.PUT_ALWAYS);
241         assertThat(dput1, successfulWith(is(true)));
242 
243         final CompletionStage<Boolean> put1 = cache.put("josephine", "football", PutPolicy.PUT_ALWAYS);
244         assertThat(put1, successfulWith(is(true)));
245 
246         final CompletionStage<Map<String, Optional<String>>> get1 = cache.getBulk("claira", "josephine", "jasmin");
247 
248         assertThat(get1, successful());
249         final Map<String, Optional<String>> map1 = unsafeJoin(get1);
250         assertThat(map1.keySet(), Matchers.containsInAnyOrder("claira", "josephine", "jasmin"));
251         assertThat(map1.get("claira"), is(Optional.of("singing")));
252         assertThat(map1.get("josephine"), is(Optional.of("football")));
253         assertThat(map1.get("jasmin"), is(Optional.empty()));
254 
255         final CompletionStage<Void> rm1 = cache.remove("jasmin", "claira", "claira", "josephine");
256 
257         assertThat(rm1, successful());
258 
259         final CompletionStage<Map<String, Optional<String>>> dget1 = directCache.getBulk("claira", "josephine", "jasmin");
260 
261         assertThat(dget1, successful());
262         final Map<String, Optional<String>> dmap1 = unsafeJoin(dget1);
263         assertThat(dmap1.keySet(), Matchers.containsInAnyOrder("claira", "josephine", "jasmin"));
264         assertThat(dmap1.get("claira"), is(Optional.empty()));
265         assertThat(dmap1.get("josephine"), is(Optional.empty()));
266         assertThat(dmap1.get("jasmin"), is(Optional.empty()));
267 
268         final CompletionStage<Map<String, Optional<String>>> get2 = cache.getBulk("claira", "josephine", "jasmin");
269 
270         assertThat(get2, successful());
271         final Map<String, Optional<String>> map2 = unsafeJoin(get2);
272         assertThat(map2.keySet(), Matchers.containsInAnyOrder("claira", "josephine", "jasmin"));
273         assertThat(map2.get("claira"), is(Optional.empty()));
274         assertThat(map2.get("josephine"), is(Optional.empty()));
275         assertThat(map2.get("jasmin"), is(Optional.empty()));
276 
277         final Map<MetricLabel, ? extends LongMetric> cacheMetrics =
278                 requestMetrics().allExternalCacheLongMetrics().get(CACHE_NAME);
279         assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_GET_CALL, is(2L), greaterThan(0L)));
280         assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_PUT_CALL, is(1L), greaterThan(0L)));
281         assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_REMOVE_CALL, is(1L), greaterThan(0L)));
282         assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_HITS, is(1L), is(2L)));
283         assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_MISSES, is(2L), is(4L)));
284         assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_REMOTE_GET, is(1L), is(1L)));
285         assertThat(cacheMetrics, hasSize(is(6)));
286     }
287 
288     @Test
289     public void simple_removeAll() throws ExecutionException, InterruptedException {
290         final CompletionStage<Boolean> put1 = cache.put("claira", "dancing", PutPolicy.PUT_ALWAYS);
291 
292         assertThat(put1, successfulWith(is(true)));
293 
294         final CompletionStage<Optional<String>> get1 = cache.get("claira");
295 
296         assertThat(get1, successfulWith(is(Optional.of("dancing"))));
297 
298         final CompletionStage<Void> rm1 = cache.removeAll();
299 
300         assertThat(rm1, successful());
301 
302         final CompletionStage<Optional<String>> get2 = cache.get("claira");
303 
304         assertThat(get2, successfulWith(is(Optional.empty())));
305 
306         final Map<MetricLabel, ? extends LongMetric> cacheMetrics =
307                 requestMetrics().allExternalCacheLongMetrics().get(CACHE_NAME);
308         assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_GET_CALL, is(2L), greaterThan(0L)));
309         assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_PUT_CALL, is(1L), greaterThan(0L)));
310         assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_REMOVE_ALL_CALL, is(1L), greaterThan(0L)));
311         assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_HITS, is(1L), is(1L)));
312         assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_MISSES, is(1L), is(1L)));
313         assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_REMOTE_GET, is(1L), is(1L)));
314         assertThat(cacheMetrics, hasSize(is(6)));
315     }
316 
317     @Test
318     public void simple_getBulk_function() throws ExecutionException, InterruptedException {
319         final CompletionStage<Map<String, String>> get1 = cache.getBulk(
320                 keys -> Maps.asMap(keys, k -> k + "-1"), "claira");
321 
322         assertThat(get1, successful());
323         assertThat(unsafeJoin(get1).keySet(), containsInAnyOrder("claira"));
324         assertThat(unsafeJoin(get1).values(), containsInAnyOrder("claira-1"));
325 
326         final CompletionStage<Map<String, String>> get2 = cache.getBulk(
327                 keys -> Maps.asMap(keys, k -> k + "-2"), "claira", "josephine", "claira");
328 
329         assertThat(get2, successful());
330         assertThat(unsafeJoin(get2).keySet(), containsInAnyOrder("claira", "josephine"));
331         assertThat(unsafeJoin(get2).values(), containsInAnyOrder("claira-1", "josephine-2"));
332 
333         final Map<MetricLabel, ? extends LongMetric> cacheMetrics =
334                 requestMetrics().allExternalCacheLongMetrics().get(CACHE_NAME);
335         assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_FACTORY_CALL, is(2L), greaterThan(0L)));
336         assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_GET_CALL, is(2L), greaterThan(0L)));
337         assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_FACTORY_KEYS, is(2L), is(2L)));
338         assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_HITS, is(1L), is(1L)));
339         assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_MISSES, is(2L), is(2L)));
340         assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_REMOTE_GET, is(4L), is(4L)));
341         assertThat(cacheMetrics, hasSize(is(6)));
342     }
343 
344     @Test
345     public void dual_getBulk_function() throws ExecutionException, InterruptedException {
346         final CompletionStage<Map<String, String>> get1 = cache.getBulk(
347                 keys -> Maps.asMap(keys, k -> k + "-1"), "claira");
348 
349         assertThat(get1, successful());
350         assertThat(unsafeJoin(get1).keySet(), containsInAnyOrder("claira"));
351         assertThat(unsafeJoin(get1).values(), containsInAnyOrder("claira-1"));
352 
353         final CompletionStage<Boolean> mput1 = directCache.put("claira", "singing", PutPolicy.PUT_ALWAYS);
354         assertThat(mput1, successfulWith(is(true)));
355 
356         final CompletionStage<Map<String, String>> get2 = cache.getBulk(
357                 keys -> Maps.asMap(keys, k -> k + "-2"), "claira", "josephine", "claira");
358 
359         assertThat(get2, successful());
360         assertThat(unsafeJoin(get2).keySet(), containsInAnyOrder("claira", "josephine"));
361         assertThat(unsafeJoin(get2).values(), containsInAnyOrder("claira-1", "josephine-2"));
362 
363         final CompletionStage<Void> rm1 = cache.remove("claira");
364 
365         assertThat(rm1, successful());
366 
367         final CompletionStage<Map<String, String>> get3 = cache.getBulk(
368                 keys -> Maps.asMap(keys, k -> k + "-3"), "claira", "josephine", "jasmin");
369 
370         assertThat(get3, successful());
371         assertThat(unsafeJoin(get3).keySet(), containsInAnyOrder("claira", "josephine", "jasmin"));
372         assertThat(unsafeJoin(get3).values(), containsInAnyOrder("claira-3", "josephine-2", "jasmin-3"));
373 
374         final Map<MetricLabel, ? extends LongMetric> cacheMetrics =
375                 requestMetrics().allExternalCacheLongMetrics().get(CACHE_NAME);
376         assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_FACTORY_CALL, is(3L), greaterThan(0L)));
377         assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_GET_CALL, is(3L), greaterThan(0L)));
378         assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_REMOVE_CALL, is(1L), greaterThan(0L)));
379         assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_FACTORY_KEYS, is(3L), is(4L)));
380         assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_HITS, is(2L), is(2L)));
381         assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_MISSES, is(3L), is(4L)));
382         assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_REMOTE_GET, is(7L), is(7L)));
383         assertThat(cacheMetrics, hasSize(is(7)));
384     }
385 
386     @SuppressWarnings("ConstantConditions")
387     @Test
388     public void check_null_detection() {
389         assertThat(cache.get("kenny", () -> null), not(successful()));
390         assertThat(cache.put("key", null, PutPolicy.ADD_ONLY), not(successful()));
391         assertThat(cache.getBulk(
392                 strings -> strings.stream().collect(Collectors.toMap(k -> k, k -> null)),
393                 "extra"),
394                 not(successful()));
395 
396         final Map<MetricLabel, ? extends LongMetric> cacheMetrics =
397                 requestMetrics().allExternalCacheLongMetrics().get(CACHE_NAME);
398         assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_FACTORY_CALL, is(1L), greaterThan(0L)));
399         assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_GET_CALL, is(2L), greaterThan(0L)));
400         assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_PUT_CALL, is(1L), greaterThan(0L)));
401         assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_SUPPLIER_CALL, is(1L), greaterThan(0L)));
402         assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_FACTORY_KEYS, is(1L), is(1L)));
403         assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_FAILED_GET, is(2L), is(2L)));
404         assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_FAILED_PUT, is(1L), is(1L)));
405         assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_REMOTE_GET, is(2L), is(2L)));
406         assertThat(cacheMetrics, hasSize(is(8)));
407     }
408 
409     @Test
410     public void both_getBulk_and_get_fail() throws InterruptedException {
411         //Scenario:
412         // T1: bulkGet A, B, C
413         // T2: get B, supplier
414         // T1: bulkGet completes load with an exception
415         // T2: get B, supplier -> fails with another exception
416 
417         final Barrier blockedInFactory = new Barrier();
418         final Barrier resumeInFactory = new Barrier();
419 
420         final RuntimeException exBulk = new RuntimeException("getBulk() exception");
421         final CompletableFuture<Map<String, String>> t1Result =
422                 runAndWaitForStart(() -> cache.getBulk(keys -> {
423                     log.info("{}: about to wait", Thread.currentThread().getName());
424                     blockedInFactory.signal();
425                     resumeInFactory.await();
426                     log.info("{}: About to throw exception", Thread.currentThread().getName());
427                     throw exBulk;
428                 }, "A", "B", "C").toCompletableFuture().join());
429 
430         final RuntimeException exSingle = new RuntimeException("get() exception");
431         blockedInFactory.await();
432         final CompletableFuture<String> t2Result =
433                 runAndWaitForStart(() -> cache.get("B", () -> {
434                     log.info("{}: About to throw exception", Thread.currentThread().getName());
435                     throw exSingle;
436                 }).toCompletableFuture().join());
437 
438         log.info("{}: About to signal on barrierBulk", Thread.currentThread().getName());
439         resumeInFactory.signal(); //Wait until T2 cache.get calls join
440 
441         assertThat(t1Result.handle((r, e) -> Throwables.getRootCause(e)).join(), is(exBulk));
442         assertThat(t2Result.handle((r, e) -> Throwables.getRootCause(e)).join(), is(exSingle));
443         assertThat(cache.get("A").toCompletableFuture().join(), is(Optional.empty()));
444         assertThat(cache.get("B").toCompletableFuture().join(), is(Optional.empty()));
445         assertThat(cache.get("C").toCompletableFuture().join(), is(Optional.empty()));
446     }
447 
448     @Test
449     public void both_get_and_getBulk_fail() throws InterruptedException {
450         //Scenario:
451         // T1: get B, supplier
452         // T2: bulkGet A, B, C
453         // T1: get B, supplier -> fails with another exception
454         // T2: bulkGet completes load with an exception
455 
456         final Barrier blockedInFactory = new Barrier();
457         final Barrier resumeInFactory = new Barrier();
458 
459         final RuntimeException exSingle = new RuntimeException("get() exception");
460         final CompletableFuture<String> t1Result =
461                 runAndWaitForStart(() -> cache.get("B", () -> {
462                     log.info("{}: About to throw exception", Thread.currentThread().getName());
463                     blockedInFactory.signal();
464                     resumeInFactory.await();
465                     throw exSingle;
466                 }).toCompletableFuture().join());
467 
468         final RuntimeException exBulk = new RuntimeException("getBulk() exception");
469         blockedInFactory.await();
470         final CompletableFuture<Map<String, String>> t2Result =
471                 runAndWaitForStart(() -> cache.getBulk(keys -> {
472                     log.info("{}: about to wait", Thread.currentThread().getName());
473                     log.info("{}: About to throw exception", Thread.currentThread().getName());
474                     throw exBulk;
475                 }, "A", "B", "C").toCompletableFuture().join());
476 
477         log.info("{}: About to signal on barrierBulk", Thread.currentThread().getName());
478         resumeInFactory.signal(); //Wait until T2 cache.get calls join
479 
480         assertThat(t1Result.handle((r, e) -> Throwables.getRootCause(e)).join(), is(exSingle));
481         assertThat(t2Result.handle((r, e) -> Throwables.getRootCause(e)).join(), is(exBulk));
482         assertThat(cache.get("A").toCompletableFuture().join(), is(Optional.empty()));
483         assertThat(cache.get("B").toCompletableFuture().join(), is(Optional.empty()));
484         assertThat(cache.get("C").toCompletableFuture().join(), is(Optional.empty()));
485     }
486 
487     @Test
488     public void get_fails_and_getBulk_succeeds() throws InterruptedException {
489         //Scenario:
490         // T1: get B, supplier, fails
491         // T2: bulkGet A, B, C succeeds
492         // T1: get B, supplier -> fails with another exception
493         // T2: bulkGet completes with a map
494 
495         final Barrier blockedInFactory = new Barrier();
496         final Barrier resumeInFactory = new Barrier();
497 
498         final RuntimeException exSingle = new RuntimeException("get() exception");
499         final CompletableFuture<String> t1Result =
500                 runAndWaitForStart(() -> cache.get("B", () -> {
501                     log.info("{}: about to wait", Thread.currentThread().getName());
502                     blockedInFactory.signal();
503                     resumeInFactory.await();
504                     log.info("{}: About to throw exception", Thread.currentThread().getName());
505                     throw exSingle;
506                 }).toCompletableFuture().join());
507 
508         blockedInFactory.await();
509         final CompletableFuture<Map<String, String>> t2Result =
510                 runAndWaitForStart(() -> cache.getBulk(keys -> {
511                     log.info("{}: About to return numbers", Thread.currentThread().getName());
512                     return ImmutableMap.of("A", "1", "B", "2", "C", "3");
513                 }, "A", "B", "C").toCompletableFuture().join());
514 
515         log.info("{}: About to signal on barrierBulk", Thread.currentThread().getName());
516         resumeInFactory.signal(); //Wait until T2 cache.get calls join
517 
518         assertThat(t1Result.handle((r, e) -> {
519             if (e != null) {
520                 return Throwables.getRootCause(e);
521             } else {
522                 log.info("r: {}", r);
523                 // This forces a fail.
524                 return new RuntimeException();
525             }
526         }).join(), is(exSingle));
527 
528         assertThat(t2Result.join().get("A"), is("1"));
529         assertThat(t2Result.join().get("B"), is("2"));
530         assertThat(t2Result.join().get("C"), is("3"));
531 
532         assertThat(cache.get("A").toCompletableFuture().join(), is(Optional.of("1")));
533         assertThat(cache.get("B").toCompletableFuture().join(), is(Optional.of("2")));
534         assertThat(cache.get("C").toCompletableFuture().join(), is(Optional.of("3")));
535     }
536 
537     @Test
538     public void getBulk_fail_and_get_succeed() throws InterruptedException {
539         //Scenario:
540         // T1: bulkGet A, B, C
541         // T2: get B, supplier
542         // T1: bulkGet function throws exception
543         // T2: get B, supplier -> returns value
544 
545         final Barrier blockedInFactory = new Barrier();
546         final Barrier resumeInFactory = new Barrier();
547 
548         final RuntimeException exBulk = new RuntimeException("getBulk() exception");
549         final CompletableFuture<Map<String, String>> t1Result =
550                 runAndWaitForStart(() -> cache.getBulk(keys -> {
551                     log.info("{}: about to wait", Thread.currentThread().getName());
552                     blockedInFactory.signal();
553                     resumeInFactory.await();
554                     log.info("{}: About to throw exception", Thread.currentThread().getName());
555                     throw exBulk;
556                 }, "A", "B", "C").toCompletableFuture().join());
557 
558         blockedInFactory.await();
559         final CompletableFuture<String> t2Result =
560                 runAndWaitForStart(() -> cache.get("B", () -> "bee").toCompletableFuture().join());
561 
562         log.info("{}: About to signal on barrierBulk", Thread.currentThread().getName());
563         resumeInFactory.signal(); //Wait until T2 cache.get calls join
564 
565 
566         assertThat(t1Result.handle((r, e) -> Throwables.getRootCause(e)).join(), is(exBulk));
567         assertThat(t2Result.join(), is("bee"));
568         assertThat(cache.get("A").toCompletableFuture().join(), is(Optional.empty()));
569         assertThat(cache.get("B").toCompletableFuture().join(), is(Optional.of("bee")));
570         assertThat(cache.get("C").toCompletableFuture().join(), is(Optional.empty()));
571     }
572 
573     @Test
574     public void getBulk_pass_and_get_reuse() throws InterruptedException {
575         //Scenario:
576         // T1: bulkGet A, B, C, blocks in the factory
577         // T2: get B, supplier runs
578         // T1: bulkGet function returns values, should return T2 value for B
579         // T2: get B, supplier -> returns T2 value
580 
581         final Barrier blockedInFactory = new Barrier();
582         final Barrier resumeInFactory = new Barrier();
583 
584         final CompletableFuture<Map<String, String>> t1Result =
585                 runAndWaitForStart(() -> cache.getBulk(keys -> {
586                     log.info("{}: About to await on barrierBulk", Thread.currentThread().getName());
587                     blockedInFactory.signal();
588                     resumeInFactory.await();
589                     log.info("{}: resuming", Thread.currentThread().getName());
590                     return keys.stream().collect(Collectors.toMap(k -> k, k -> k + "-1"));
591                 }, "A", "B", "C").toCompletableFuture().join());
592 
593         blockedInFactory.await();
594         final CompletableFuture<String> t2Result =
595                 runAndWaitForStart(() -> cache.get("B", () -> "B-2").toCompletableFuture().join());
596 
597         log.info("{}: About to signal on barrierBulk", Thread.currentThread().getName());
598         assertThat(t2Result.join(), is("B-2"));
599         resumeInFactory.signal(); // Wait until T2 cache.get calls join
600         log.info("{}: and now", Thread.currentThread().getName());
601 
602 
603         final Map<String, String> t1ResultData = t1Result.join();
604         assertThat(t1ResultData.get("A"), is("A-1"));
605         assertThat(t1ResultData.get("B"), is("B-2"));
606         assertThat(t1ResultData.get("C"), is("C-1"));
607         assertThat(cache.get("A").toCompletableFuture().join(), is(Optional.of("A-1")));
608         assertThat(cache.get("B").toCompletableFuture().join(), is(Optional.of("B-2")));
609         assertThat(cache.get("C").toCompletableFuture().join(), is(Optional.of("C-1")));
610     }
611 
612     @Test
613     public void potential_deadlock() {
614         final CompletableFuture<String> get = cache.get("lockType", () -> {
615             // People have actually done this!!
616             cache.remove("lockType");
617             return "lockwood";
618         }).toCompletableFuture();
619 
620         assertThat(get, successfulWith(is("lockwood")));
621     }
622 
623     @Test
624     public void repeated_concurrency() {
625         IntStream.range(1, 50).forEach((i) -> {
626             try {
627                 getBulk_fail_and_get_succeed();
628                 cache.removeAll();
629                 getBulk_pass_and_get_reuse();
630                 cache.removeAll();
631                 get_fails_and_getBulk_succeeds();
632                 cache.removeAll();
633                 both_get_and_getBulk_fail();
634                 cache.removeAll();
635                 both_getBulk_and_get_fail();
636                 cache.removeAll();
637 
638             } catch (InterruptedException e) {
639                 throw new RuntimeException(e);
640             }
641         });
642     }
643 }