1 package com.atlassian.vcache.internal.test;
2
3 import com.atlassian.utt.concurrency.Barrier;
4 import com.atlassian.vcache.ChangeRate;
5 import com.atlassian.vcache.DirectExternalCache;
6 import com.atlassian.vcache.ExternalCacheSettings;
7 import com.atlassian.vcache.ExternalCacheSettingsBuilder;
8 import com.atlassian.vcache.PutPolicy;
9 import com.atlassian.vcache.StableReadExternalCache;
10 import com.atlassian.vcache.internal.LongMetric;
11 import com.atlassian.vcache.internal.MetricLabel;
12 import com.atlassian.vcache.internal.RequestMetrics;
13 import com.google.common.base.Throwables;
14 import com.google.common.collect.ImmutableMap;
15 import com.google.common.collect.Maps;
16 import org.hamcrest.Matchers;
17 import org.junit.Before;
18 import org.junit.Rule;
19 import org.junit.Test;
20 import org.slf4j.Logger;
21 import org.slf4j.LoggerFactory;
22
23 import java.time.Duration;
24 import java.util.Map;
25 import java.util.Optional;
26 import java.util.concurrent.CompletableFuture;
27 import java.util.concurrent.CompletionStage;
28 import java.util.concurrent.ExecutionException;
29 import java.util.stream.Collectors;
30 import java.util.stream.IntStream;
31
32 import static com.atlassian.vcache.VCacheUtils.unsafeJoin;
33 import static com.atlassian.vcache.internal.test.CacheMetricsMatcher.hasMetric;
34 import static com.atlassian.vcache.internal.test.CacheMetricsMatcher.hasSize;
35 import static com.atlassian.vcache.internal.test.CompletionStageSuccessful.successful;
36 import static com.atlassian.vcache.internal.test.CompletionStageSuccessful.successfulWith;
37 import static com.atlassian.vcache.internal.test.TestUtils.runAndWaitForStart;
38 import static org.hamcrest.Matchers.containsInAnyOrder;
39 import static org.hamcrest.Matchers.greaterThan;
40 import static org.hamcrest.Matchers.is;
41 import static org.hamcrest.Matchers.not;
42 import static org.junit.Assert.assertThat;
43
44
45
46
47 public abstract class AbstractStableReadExternalCacheIT {
48 private static final Logger log = LoggerFactory.getLogger(AbstractStableReadExternalCacheIT.class);
49 private static final String CACHE_NAME = "kids_hobbies";
50
51 @Rule
52 public LoggingTestWatcher watcher = new LoggingTestWatcher(log);
53
54 private StableReadExternalCache<String> cache;
55 private DirectExternalCache<String> directCache;
56
57 protected abstract DirectExternalCache<String> obtainDirectCache(String name, ExternalCacheSettings settings);
58
59 protected abstract StableReadExternalCache<String> createCache(String name, ExternalCacheSettings settings, Duration lockTimeout);
60
61 protected StableReadExternalCache<String> createCache(String name, ExternalCacheSettings settings) {
62 return createCache(name, settings, Duration.ofSeconds(2));
63 }
64
65 protected abstract RequestMetrics requestMetrics();
66
67 @Before
68 public void ensureCache() {
69 final ExternalCacheSettings settings = new ExternalCacheSettingsBuilder()
70 .entryGrowthRateHint(ChangeRate.LOW_CHANGE)
71 .entryCountHint(5)
72 .defaultTtl(Duration.ofMinutes(5))
73 .dataChangeRateHint(ChangeRate.HIGH_CHANGE)
74 .build();
75 cache = createCache(CACHE_NAME, settings);
76
77 directCache = obtainDirectCache(CACHE_NAME, settings);
78
79
80 final CompletionStage<Void> rm = directCache.removeAll();
81 assertThat(rm, successful());
82 }
83
84 @Test
85 public void single_cache_get_set() throws ExecutionException, InterruptedException {
86 final CompletionStage<Optional<String>> eldestGet = cache.get("claira");
87
88 assertThat(eldestGet, successfulWith(is(Optional.empty())));
89
90 final CompletionStage<Boolean> eldestAdd = cache.put("claira", "dancing", PutPolicy.ADD_ONLY);
91
92 assertThat(eldestAdd, successfulWith(is(true)));
93
94 final CompletionStage<Optional<String>> eldestGet2 = cache.get("claira");
95
96 assertThat(eldestGet2, successfulWith(is(Optional.of("dancing"))));
97
98 final Map<MetricLabel, ? extends LongMetric> cacheMetrics =
99 requestMetrics().allExternalCacheLongMetrics().get(CACHE_NAME);
100 assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_MISSES, is(1L), is(1L)));
101 assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_HITS, is(1L), is(1L)));
102 assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_REMOTE_GET, is(1L), is(1L)));
103 assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_GET_CALL, is(2L), greaterThan(0L)));
104 assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_PUT_CALL, is(1L), greaterThan(0L)));
105 assertThat(cacheMetrics, hasSize(is(5)));
106 }
107
108 @Test
109 public void dual_cache_get_set() throws ExecutionException, InterruptedException {
110 final CompletionStage<Optional<String>> get1 = cache.get("claira");
111
112 assertThat(get1, successfulWith(is(Optional.empty())));
113
114
115 final CompletionStage<Boolean> dput1 = directCache.put("claira", "dancing", PutPolicy.PUT_ALWAYS);
116
117 assertThat(dput1, successfulWith(is(true)));
118
119 final CompletionStage<Optional<String>> get2 = cache.get("claira");
120
121 assertThat(get2, successfulWith(is(Optional.empty())));
122
123
124
125 final CompletionStage<Boolean> put1 = cache.put("claira", "singing", PutPolicy.ADD_ONLY);
126
127 assertThat(put1, successfulWith(is(false)));
128
129 final CompletionStage<Optional<String>> get3 = cache.get("claira");
130
131 assertThat(get3, successfulWith(is(Optional.of("dancing"))));
132
133 final Map<MetricLabel, ? extends LongMetric> cacheMetrics =
134 requestMetrics().allExternalCacheLongMetrics().get(CACHE_NAME);
135 assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_GET_CALL, is(3L), greaterThan(0L)));
136 assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_PUT_CALL, is(1L), greaterThan(0L)));
137 assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_HITS, is(1L), is(1L)));
138 assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_MISSES, is(2L), is(2L)));
139 assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_REMOTE_GET, is(2L), is(2L)));
140 assertThat(cacheMetrics, hasSize(is(5)));
141 }
142
143 @Test
144 public void dual_cache_get_with_supplier() throws ExecutionException, InterruptedException {
145 final CompletionStage<Optional<String>> get1 = cache.get("claira");
146
147 assertThat(get1, successfulWith(is(Optional.empty())));
148
149 final CompletionStage<String> get2 = cache.get("claira", () -> "dancing");
150
151 assertThat(get2, successfulWith(is("dancing")));
152
153 final CompletionStage<Optional<String>> get3 = cache.get("claira");
154
155 assertThat(get3, successfulWith(is(Optional.of("dancing"))));
156
157
158 final CompletionStage<Boolean> dput1 = directCache.put("claira", "singing", PutPolicy.PUT_ALWAYS);
159 assertThat(dput1, successfulWith(is(true)));
160
161 final CompletionStage<Optional<String>> get4 = cache.get("claira");
162
163 assertThat(get4, successfulWith(is(Optional.of("dancing"))));
164
165 final CompletionStage<String> get5 = cache.get("claira", () -> "riding");
166
167 assertThat(get5, successfulWith(is("dancing")));
168
169 final Map<MetricLabel, ? extends LongMetric> cacheMetrics =
170 requestMetrics().allExternalCacheLongMetrics().get(CACHE_NAME);
171 assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_GET_CALL, is(5L), greaterThan(0L)));
172 assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_SUPPLIER_CALL, is(1L), greaterThan(0L)));
173 assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_HITS, is(3L), is(3L)));
174 assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_MISSES, is(2L), is(2L)));
175 assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_REMOTE_GET, is(2L), is(2L)));
176 assertThat(cacheMetrics, hasSize(is(5)));
177 }
178
179 @Test
180 public void dual_cache_get_with_supplier_take2() throws ExecutionException, InterruptedException {
181 final CompletionStage<Optional<String>> get1 = cache.get("claira");
182
183 assertThat(get1, successfulWith(is(Optional.empty())));
184
185 final CompletionStage<Boolean> dput1 = directCache.put("claira", "singing", PutPolicy.PUT_ALWAYS);
186 assertThat(dput1, successfulWith(is(true)));
187
188 final CompletionStage<String> get2 = cache.get("claira", () -> "riding");
189
190 assertThat(get2, successfulWith(is("singing")));
191
192 final Map<MetricLabel, ? extends LongMetric> cacheMetrics =
193 requestMetrics().allExternalCacheLongMetrics().get(CACHE_NAME);
194 assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_GET_CALL, is(2L), greaterThan(0L)));
195 assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_SUPPLIER_CALL, is(1L), greaterThan(0L)));
196 assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_MISSES, is(2L), is(2L)));
197 assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_REMOTE_GET, is(3L), is(3L)));
198 assertThat(cacheMetrics, hasSize(is(4)));
199 }
200
201 @Test
202 public void dual_cache_getBulk() throws ExecutionException, InterruptedException {
203 final CompletionStage<Boolean> dput1 = directCache.put("claira", "singing", PutPolicy.PUT_ALWAYS);
204 assertThat(dput1, successfulWith(is(true)));
205
206 final CompletionStage<Map<String, Optional<String>>> get1 = cache.getBulk("claira", "josephine", "claira");
207
208 assertThat(get1, successful());
209 final Map<String, Optional<String>> map1 = unsafeJoin(get1);
210 assertThat(map1.keySet(), Matchers.containsInAnyOrder("claira", "josephine"));
211 assertThat(map1.get("claira"), is(Optional.of("singing")));
212 assertThat(map1.get("josephine"), is(Optional.empty()));
213
214 final CompletionStage<Boolean> dput2 = directCache.put("josephine", "football", PutPolicy.PUT_ALWAYS);
215 assertThat(dput2, successfulWith(is(true)));
216
217 final CompletionStage<Boolean> dput3 = directCache.put("jasmin", "skiing", PutPolicy.PUT_ALWAYS);
218 assertThat(dput3, successfulWith(is(true)));
219
220 final CompletionStage<Map<String, Optional<String>>> get2 = cache.getBulk("claira", "josephine", "jasmin");
221
222 assertThat(get2, successful());
223 final Map<String, Optional<String>> map2 = unsafeJoin(get2);
224 assertThat(map2.keySet(), Matchers.containsInAnyOrder("claira", "josephine", "jasmin"));
225 assertThat(map2.get("claira"), is(Optional.of("singing")));
226 assertThat(map2.get("josephine"), is(Optional.empty()));
227 assertThat(map2.get("jasmin"), is(Optional.of("skiing")));
228
229 final Map<MetricLabel, ? extends LongMetric> cacheMetrics =
230 requestMetrics().allExternalCacheLongMetrics().get(CACHE_NAME);
231 assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_GET_CALL, is(2L), greaterThan(0L)));
232 assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_HITS, is(2L), is(3L)));
233 assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_MISSES, is(2L), is(2L)));
234 assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_REMOTE_GET, is(2L), is(2L)));
235 assertThat(cacheMetrics, hasSize(is(4)));
236 }
237
238 @Test
239 public void remove_normal() throws ExecutionException, InterruptedException {
240 final CompletionStage<Boolean> dput1 = directCache.put("claira", "singing", PutPolicy.PUT_ALWAYS);
241 assertThat(dput1, successfulWith(is(true)));
242
243 final CompletionStage<Boolean> put1 = cache.put("josephine", "football", PutPolicy.PUT_ALWAYS);
244 assertThat(put1, successfulWith(is(true)));
245
246 final CompletionStage<Map<String, Optional<String>>> get1 = cache.getBulk("claira", "josephine", "jasmin");
247
248 assertThat(get1, successful());
249 final Map<String, Optional<String>> map1 = unsafeJoin(get1);
250 assertThat(map1.keySet(), Matchers.containsInAnyOrder("claira", "josephine", "jasmin"));
251 assertThat(map1.get("claira"), is(Optional.of("singing")));
252 assertThat(map1.get("josephine"), is(Optional.of("football")));
253 assertThat(map1.get("jasmin"), is(Optional.empty()));
254
255 final CompletionStage<Void> rm1 = cache.remove("jasmin", "claira", "claira", "josephine");
256
257 assertThat(rm1, successful());
258
259 final CompletionStage<Map<String, Optional<String>>> dget1 = directCache.getBulk("claira", "josephine", "jasmin");
260
261 assertThat(dget1, successful());
262 final Map<String, Optional<String>> dmap1 = unsafeJoin(dget1);
263 assertThat(dmap1.keySet(), Matchers.containsInAnyOrder("claira", "josephine", "jasmin"));
264 assertThat(dmap1.get("claira"), is(Optional.empty()));
265 assertThat(dmap1.get("josephine"), is(Optional.empty()));
266 assertThat(dmap1.get("jasmin"), is(Optional.empty()));
267
268 final CompletionStage<Map<String, Optional<String>>> get2 = cache.getBulk("claira", "josephine", "jasmin");
269
270 assertThat(get2, successful());
271 final Map<String, Optional<String>> map2 = unsafeJoin(get2);
272 assertThat(map2.keySet(), Matchers.containsInAnyOrder("claira", "josephine", "jasmin"));
273 assertThat(map2.get("claira"), is(Optional.empty()));
274 assertThat(map2.get("josephine"), is(Optional.empty()));
275 assertThat(map2.get("jasmin"), is(Optional.empty()));
276
277 final Map<MetricLabel, ? extends LongMetric> cacheMetrics =
278 requestMetrics().allExternalCacheLongMetrics().get(CACHE_NAME);
279 assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_GET_CALL, is(2L), greaterThan(0L)));
280 assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_PUT_CALL, is(1L), greaterThan(0L)));
281 assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_REMOVE_CALL, is(1L), greaterThan(0L)));
282 assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_HITS, is(1L), is(2L)));
283 assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_MISSES, is(2L), is(4L)));
284 assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_REMOTE_GET, is(1L), is(1L)));
285 assertThat(cacheMetrics, hasSize(is(6)));
286 }
287
288 @Test
289 public void simple_removeAll() throws ExecutionException, InterruptedException {
290 final CompletionStage<Boolean> put1 = cache.put("claira", "dancing", PutPolicy.PUT_ALWAYS);
291
292 assertThat(put1, successfulWith(is(true)));
293
294 final CompletionStage<Optional<String>> get1 = cache.get("claira");
295
296 assertThat(get1, successfulWith(is(Optional.of("dancing"))));
297
298 final CompletionStage<Void> rm1 = cache.removeAll();
299
300 assertThat(rm1, successful());
301
302 final CompletionStage<Optional<String>> get2 = cache.get("claira");
303
304 assertThat(get2, successfulWith(is(Optional.empty())));
305
306 final Map<MetricLabel, ? extends LongMetric> cacheMetrics =
307 requestMetrics().allExternalCacheLongMetrics().get(CACHE_NAME);
308 assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_GET_CALL, is(2L), greaterThan(0L)));
309 assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_PUT_CALL, is(1L), greaterThan(0L)));
310 assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_REMOVE_ALL_CALL, is(1L), greaterThan(0L)));
311 assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_HITS, is(1L), is(1L)));
312 assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_MISSES, is(1L), is(1L)));
313 assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_REMOTE_GET, is(1L), is(1L)));
314 assertThat(cacheMetrics, hasSize(is(6)));
315 }
316
317 @Test
318 public void simple_getBulk_function() throws ExecutionException, InterruptedException {
319 final CompletionStage<Map<String, String>> get1 = cache.getBulk(
320 keys -> Maps.asMap(keys, k -> k + "-1"), "claira");
321
322 assertThat(get1, successful());
323 assertThat(unsafeJoin(get1).keySet(), containsInAnyOrder("claira"));
324 assertThat(unsafeJoin(get1).values(), containsInAnyOrder("claira-1"));
325
326 final CompletionStage<Map<String, String>> get2 = cache.getBulk(
327 keys -> Maps.asMap(keys, k -> k + "-2"), "claira", "josephine", "claira");
328
329 assertThat(get2, successful());
330 assertThat(unsafeJoin(get2).keySet(), containsInAnyOrder("claira", "josephine"));
331 assertThat(unsafeJoin(get2).values(), containsInAnyOrder("claira-1", "josephine-2"));
332
333 final Map<MetricLabel, ? extends LongMetric> cacheMetrics =
334 requestMetrics().allExternalCacheLongMetrics().get(CACHE_NAME);
335 assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_FACTORY_CALL, is(2L), greaterThan(0L)));
336 assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_GET_CALL, is(2L), greaterThan(0L)));
337 assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_FACTORY_KEYS, is(2L), is(2L)));
338 assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_HITS, is(1L), is(1L)));
339 assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_MISSES, is(2L), is(2L)));
340 assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_REMOTE_GET, is(4L), is(4L)));
341 assertThat(cacheMetrics, hasSize(is(6)));
342 }
343
344 @Test
345 public void dual_getBulk_function() throws ExecutionException, InterruptedException {
346 final CompletionStage<Map<String, String>> get1 = cache.getBulk(
347 keys -> Maps.asMap(keys, k -> k + "-1"), "claira");
348
349 assertThat(get1, successful());
350 assertThat(unsafeJoin(get1).keySet(), containsInAnyOrder("claira"));
351 assertThat(unsafeJoin(get1).values(), containsInAnyOrder("claira-1"));
352
353 final CompletionStage<Boolean> mput1 = directCache.put("claira", "singing", PutPolicy.PUT_ALWAYS);
354 assertThat(mput1, successfulWith(is(true)));
355
356 final CompletionStage<Map<String, String>> get2 = cache.getBulk(
357 keys -> Maps.asMap(keys, k -> k + "-2"), "claira", "josephine", "claira");
358
359 assertThat(get2, successful());
360 assertThat(unsafeJoin(get2).keySet(), containsInAnyOrder("claira", "josephine"));
361 assertThat(unsafeJoin(get2).values(), containsInAnyOrder("claira-1", "josephine-2"));
362
363 final CompletionStage<Void> rm1 = cache.remove("claira");
364
365 assertThat(rm1, successful());
366
367 final CompletionStage<Map<String, String>> get3 = cache.getBulk(
368 keys -> Maps.asMap(keys, k -> k + "-3"), "claira", "josephine", "jasmin");
369
370 assertThat(get3, successful());
371 assertThat(unsafeJoin(get3).keySet(), containsInAnyOrder("claira", "josephine", "jasmin"));
372 assertThat(unsafeJoin(get3).values(), containsInAnyOrder("claira-3", "josephine-2", "jasmin-3"));
373
374 final Map<MetricLabel, ? extends LongMetric> cacheMetrics =
375 requestMetrics().allExternalCacheLongMetrics().get(CACHE_NAME);
376 assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_FACTORY_CALL, is(3L), greaterThan(0L)));
377 assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_GET_CALL, is(3L), greaterThan(0L)));
378 assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_REMOVE_CALL, is(1L), greaterThan(0L)));
379 assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_FACTORY_KEYS, is(3L), is(4L)));
380 assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_HITS, is(2L), is(2L)));
381 assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_MISSES, is(3L), is(4L)));
382 assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_REMOTE_GET, is(7L), is(7L)));
383 assertThat(cacheMetrics, hasSize(is(7)));
384 }
385
386 @SuppressWarnings("ConstantConditions")
387 @Test
388 public void check_null_detection() {
389 assertThat(cache.get("kenny", () -> null), not(successful()));
390 assertThat(cache.put("key", null, PutPolicy.ADD_ONLY), not(successful()));
391 assertThat(cache.getBulk(
392 strings -> strings.stream().collect(Collectors.toMap(k -> k, k -> null)),
393 "extra"),
394 not(successful()));
395
396 final Map<MetricLabel, ? extends LongMetric> cacheMetrics =
397 requestMetrics().allExternalCacheLongMetrics().get(CACHE_NAME);
398 assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_FACTORY_CALL, is(1L), greaterThan(0L)));
399 assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_GET_CALL, is(2L), greaterThan(0L)));
400 assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_PUT_CALL, is(1L), greaterThan(0L)));
401 assertThat(cacheMetrics, hasMetric(MetricLabel.TIMED_SUPPLIER_CALL, is(1L), greaterThan(0L)));
402 assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_FACTORY_KEYS, is(1L), is(1L)));
403 assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_FAILED_GET, is(2L), is(2L)));
404 assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_FAILED_PUT, is(1L), is(1L)));
405 assertThat(cacheMetrics, hasMetric(MetricLabel.NUMBER_OF_REMOTE_GET, is(2L), is(2L)));
406 assertThat(cacheMetrics, hasSize(is(8)));
407 }
408
409 @Test
410 public void both_getBulk_and_get_fail() throws InterruptedException {
411
412
413
414
415
416
417 final Barrier blockedInFactory = new Barrier();
418 final Barrier resumeInFactory = new Barrier();
419
420 final RuntimeException exBulk = new RuntimeException("getBulk() exception");
421 final CompletableFuture<Map<String, String>> t1Result =
422 runAndWaitForStart(() -> cache.getBulk(keys -> {
423 log.info("{}: about to wait", Thread.currentThread().getName());
424 blockedInFactory.signal();
425 resumeInFactory.await();
426 log.info("{}: About to throw exception", Thread.currentThread().getName());
427 throw exBulk;
428 }, "A", "B", "C").toCompletableFuture().join());
429
430 final RuntimeException exSingle = new RuntimeException("get() exception");
431 blockedInFactory.await();
432 final CompletableFuture<String> t2Result =
433 runAndWaitForStart(() -> cache.get("B", () -> {
434 log.info("{}: About to throw exception", Thread.currentThread().getName());
435 throw exSingle;
436 }).toCompletableFuture().join());
437
438 log.info("{}: About to signal on barrierBulk", Thread.currentThread().getName());
439 resumeInFactory.signal();
440
441 assertThat(t1Result.handle((r, e) -> Throwables.getRootCause(e)).join(), is(exBulk));
442 assertThat(t2Result.handle((r, e) -> Throwables.getRootCause(e)).join(), is(exSingle));
443 assertThat(cache.get("A").toCompletableFuture().join(), is(Optional.empty()));
444 assertThat(cache.get("B").toCompletableFuture().join(), is(Optional.empty()));
445 assertThat(cache.get("C").toCompletableFuture().join(), is(Optional.empty()));
446 }
447
448 @Test
449 public void both_get_and_getBulk_fail() throws InterruptedException {
450
451
452
453
454
455
456 final Barrier blockedInFactory = new Barrier();
457 final Barrier resumeInFactory = new Barrier();
458
459 final RuntimeException exSingle = new RuntimeException("get() exception");
460 final CompletableFuture<String> t1Result =
461 runAndWaitForStart(() -> cache.get("B", () -> {
462 log.info("{}: About to throw exception", Thread.currentThread().getName());
463 blockedInFactory.signal();
464 resumeInFactory.await();
465 throw exSingle;
466 }).toCompletableFuture().join());
467
468 final RuntimeException exBulk = new RuntimeException("getBulk() exception");
469 blockedInFactory.await();
470 final CompletableFuture<Map<String, String>> t2Result =
471 runAndWaitForStart(() -> cache.getBulk(keys -> {
472 log.info("{}: about to wait", Thread.currentThread().getName());
473 log.info("{}: About to throw exception", Thread.currentThread().getName());
474 throw exBulk;
475 }, "A", "B", "C").toCompletableFuture().join());
476
477 log.info("{}: About to signal on barrierBulk", Thread.currentThread().getName());
478 resumeInFactory.signal();
479
480 assertThat(t1Result.handle((r, e) -> Throwables.getRootCause(e)).join(), is(exSingle));
481 assertThat(t2Result.handle((r, e) -> Throwables.getRootCause(e)).join(), is(exBulk));
482 assertThat(cache.get("A").toCompletableFuture().join(), is(Optional.empty()));
483 assertThat(cache.get("B").toCompletableFuture().join(), is(Optional.empty()));
484 assertThat(cache.get("C").toCompletableFuture().join(), is(Optional.empty()));
485 }
486
487 @Test
488 public void get_fails_and_getBulk_succeeds() throws InterruptedException {
489
490
491
492
493
494
495 final Barrier blockedInFactory = new Barrier();
496 final Barrier resumeInFactory = new Barrier();
497
498 final RuntimeException exSingle = new RuntimeException("get() exception");
499 final CompletableFuture<String> t1Result =
500 runAndWaitForStart(() -> cache.get("B", () -> {
501 log.info("{}: about to wait", Thread.currentThread().getName());
502 blockedInFactory.signal();
503 resumeInFactory.await();
504 log.info("{}: About to throw exception", Thread.currentThread().getName());
505 throw exSingle;
506 }).toCompletableFuture().join());
507
508 blockedInFactory.await();
509 final CompletableFuture<Map<String, String>> t2Result =
510 runAndWaitForStart(() -> cache.getBulk(keys -> {
511 log.info("{}: About to return numbers", Thread.currentThread().getName());
512 return ImmutableMap.of("A", "1", "B", "2", "C", "3");
513 }, "A", "B", "C").toCompletableFuture().join());
514
515 log.info("{}: About to signal on barrierBulk", Thread.currentThread().getName());
516 resumeInFactory.signal();
517
518 assertThat(t1Result.handle((r, e) -> {
519 if (e != null) {
520 return Throwables.getRootCause(e);
521 } else {
522 log.info("r: {}", r);
523
524 return new RuntimeException();
525 }
526 }).join(), is(exSingle));
527
528 assertThat(t2Result.join().get("A"), is("1"));
529 assertThat(t2Result.join().get("B"), is("2"));
530 assertThat(t2Result.join().get("C"), is("3"));
531
532 assertThat(cache.get("A").toCompletableFuture().join(), is(Optional.of("1")));
533 assertThat(cache.get("B").toCompletableFuture().join(), is(Optional.of("2")));
534 assertThat(cache.get("C").toCompletableFuture().join(), is(Optional.of("3")));
535 }
536
537 @Test
538 public void getBulk_fail_and_get_succeed() throws InterruptedException {
539
540
541
542
543
544
545 final Barrier blockedInFactory = new Barrier();
546 final Barrier resumeInFactory = new Barrier();
547
548 final RuntimeException exBulk = new RuntimeException("getBulk() exception");
549 final CompletableFuture<Map<String, String>> t1Result =
550 runAndWaitForStart(() -> cache.getBulk(keys -> {
551 log.info("{}: about to wait", Thread.currentThread().getName());
552 blockedInFactory.signal();
553 resumeInFactory.await();
554 log.info("{}: About to throw exception", Thread.currentThread().getName());
555 throw exBulk;
556 }, "A", "B", "C").toCompletableFuture().join());
557
558 blockedInFactory.await();
559 final CompletableFuture<String> t2Result =
560 runAndWaitForStart(() -> cache.get("B", () -> "bee").toCompletableFuture().join());
561
562 log.info("{}: About to signal on barrierBulk", Thread.currentThread().getName());
563 resumeInFactory.signal();
564
565
566 assertThat(t1Result.handle((r, e) -> Throwables.getRootCause(e)).join(), is(exBulk));
567 assertThat(t2Result.join(), is("bee"));
568 assertThat(cache.get("A").toCompletableFuture().join(), is(Optional.empty()));
569 assertThat(cache.get("B").toCompletableFuture().join(), is(Optional.of("bee")));
570 assertThat(cache.get("C").toCompletableFuture().join(), is(Optional.empty()));
571 }
572
573 @Test
574 public void getBulk_pass_and_get_reuse() throws InterruptedException {
575
576
577
578
579
580
581 final Barrier blockedInFactory = new Barrier();
582 final Barrier resumeInFactory = new Barrier();
583
584 final CompletableFuture<Map<String, String>> t1Result =
585 runAndWaitForStart(() -> cache.getBulk(keys -> {
586 log.info("{}: About to await on barrierBulk", Thread.currentThread().getName());
587 blockedInFactory.signal();
588 resumeInFactory.await();
589 log.info("{}: resuming", Thread.currentThread().getName());
590 return keys.stream().collect(Collectors.toMap(k -> k, k -> k + "-1"));
591 }, "A", "B", "C").toCompletableFuture().join());
592
593 blockedInFactory.await();
594 final CompletableFuture<String> t2Result =
595 runAndWaitForStart(() -> cache.get("B", () -> "B-2").toCompletableFuture().join());
596
597 log.info("{}: About to signal on barrierBulk", Thread.currentThread().getName());
598 assertThat(t2Result.join(), is("B-2"));
599 resumeInFactory.signal();
600 log.info("{}: and now", Thread.currentThread().getName());
601
602
603 final Map<String, String> t1ResultData = t1Result.join();
604 assertThat(t1ResultData.get("A"), is("A-1"));
605 assertThat(t1ResultData.get("B"), is("B-2"));
606 assertThat(t1ResultData.get("C"), is("C-1"));
607 assertThat(cache.get("A").toCompletableFuture().join(), is(Optional.of("A-1")));
608 assertThat(cache.get("B").toCompletableFuture().join(), is(Optional.of("B-2")));
609 assertThat(cache.get("C").toCompletableFuture().join(), is(Optional.of("C-1")));
610 }
611
612 @Test
613 public void potential_deadlock() {
614 final CompletableFuture<String> get = cache.get("lockType", () -> {
615
616 cache.remove("lockType");
617 return "lockwood";
618 }).toCompletableFuture();
619
620 assertThat(get, successfulWith(is("lockwood")));
621 }
622
623 @Test
624 public void repeated_concurrency() {
625 IntStream.range(1, 50).forEach((i) -> {
626 try {
627 getBulk_fail_and_get_succeed();
628 cache.removeAll();
629 getBulk_pass_and_get_reuse();
630 cache.removeAll();
631 get_fails_and_getBulk_succeeds();
632 cache.removeAll();
633 both_get_and_getBulk_fail();
634 cache.removeAll();
635 both_getBulk_and_get_fail();
636 cache.removeAll();
637
638 } catch (InterruptedException e) {
639 throw new RuntimeException(e);
640 }
641 });
642 }
643 }