1 package com.atlassian.vcache.internal.core.service;
2
3 import com.atlassian.utt.concurrency.Barrier;
4 import com.atlassian.vcache.RequestCache;
5 import com.atlassian.vcache.internal.RequestContext;
6 import com.atlassian.vcache.internal.core.DefaultRequestContext;
7 import com.atlassian.vcache.internal.test.AbstractRequestCacheTest;
8 import com.google.common.base.Throwables;
9 import org.junit.Before;
10 import org.junit.Test;
11 import org.slf4j.Logger;
12 import org.slf4j.LoggerFactory;
13
14 import javax.annotation.Nullable;
15 import java.time.Duration;
16 import java.util.Map;
17 import java.util.Optional;
18 import java.util.concurrent.CompletableFuture;
19 import java.util.stream.Collectors;
20 import java.util.stream.IntStream;
21
22 import static com.atlassian.vcache.internal.test.TestUtils.runAndWaitForStart;
23 import static org.hamcrest.Matchers.is;
24 import static org.junit.Assert.assertThat;
25
26 public class DefaultRequestCacheTest extends AbstractRequestCacheTest {
27 private static final Logger log = LoggerFactory.getLogger(DefaultRequestCacheTest.class);
28
29 @Nullable
30 private RequestContext context;
31
32 @Before
33 public void initContext() {
34 context = new DefaultRequestContext("tenant-1");
35 }
36
37 @Override
38 protected <K, V> RequestCache<K, V> createCache(String name, Duration lockTimeout) {
39 return new DefaultRequestCache<>(name, () -> context, lockTimeout);
40 }
41
42 @Test
43 public void repeated_concurrency() {
44 IntStream.range(0, 1_000).forEach(i -> {
45 both_getBulk_and_get_fail();
46 getBulk_fail_and_get_succeed();
47 getBulk_pass_and_get_reuse();
48 });
49 }
50
51 @Test
52 public void both_getBulk_and_get_fail() {
53
54
55
56
57
58
59 final Barrier blockedInFactory = new Barrier();
60 final Barrier resumeInFactory = new Barrier();
61
62 final RequestCache<String, String> cache = createCache("cache");
63
64 final RuntimeException exBulk = new RuntimeException("getBulk() exception");
65 final CompletableFuture<Map<String, String>> t1Result =
66 runAndWaitForStart(() -> cache.getBulk(keys -> {
67 log.debug("{}: about to wait", Thread.currentThread().getName());
68 blockedInFactory.signal();
69 resumeInFactory.await();
70 log.debug("{}: About to throw exception", Thread.currentThread().getName());
71 throw exBulk;
72 }, "A", "B", "C"));
73
74 blockedInFactory.await();
75 final RuntimeException exSingle = new RuntimeException("get() exception");
76 final CompletableFuture<String> t2Result =
77 runAndWaitForStart(() -> cache.get("B", () -> {
78 log.debug("{}: About to throw exception", Thread.currentThread().getName());
79 throw exSingle;
80 }));
81
82 log.debug("{}: About to signal on barrierBulk", Thread.currentThread().getName());
83 resumeInFactory.signal();
84
85 assertThat(t1Result.handle((r, e) -> Throwables.getRootCause(e)).join(), is(exBulk));
86 assertThat(t2Result.handle((r, e) -> Throwables.getRootCause(e)).join(), is(exSingle));
87 assertThat(cache.get("A"), is(Optional.empty()));
88 assertThat(cache.get("B"), is(Optional.empty()));
89 assertThat(cache.get("C"), is(Optional.empty()));
90 }
91
92 @Test
93 public void getBulk_fail_and_get_succeed() {
94
95
96
97
98
99
100 final Barrier blockedInFactory = new Barrier();
101 final Barrier resumeInFactory = new Barrier();
102
103 final RequestCache<String, String> cache = createCache("cache");
104
105 final RuntimeException exBulk = new RuntimeException("getBulk() exception");
106 final CompletableFuture<Map<String, String>> t1Result =
107 runAndWaitForStart(() -> cache.getBulk(keys -> {
108 log.debug("{}: about to wait", Thread.currentThread().getName());
109 blockedInFactory.signal();
110 resumeInFactory.await();
111 log.debug("{}: About to throw exception", Thread.currentThread().getName());
112 throw exBulk;
113 }, "A", "B", "C"));
114
115 blockedInFactory.await();
116 final CompletableFuture<String> t2Result =
117 runAndWaitForStart(() -> cache.get("B", () -> "bee"));
118
119 log.debug("{}: About to signal on barrierBulk", Thread.currentThread().getName());
120 resumeInFactory.signal();
121
122 assertThat(t1Result.handle((r, e) -> Throwables.getRootCause(e)).join(), is(exBulk));
123 assertThat(t2Result.join(), is("bee"));
124 assertThat(cache.get("A"), is(Optional.empty()));
125 assertThat(cache.get("B"), is(Optional.of("bee")));
126 assertThat(cache.get("C"), is(Optional.empty()));
127 }
128
129 @Test
130 public void getBulk_pass_and_get_reuse() {
131
132
133
134
135
136
137 final Barrier blockedInFactory = new Barrier();
138 final Barrier resumeInFactory = new Barrier();
139
140 final RequestCache<String, String> cache = createCache("cache");
141
142 final CompletableFuture<Map<String, String>> t1Result =
143 runAndWaitForStart(() -> cache.getBulk(keys -> {
144 log.debug("{}: About to await on barrierBulk", Thread.currentThread().getName());
145 blockedInFactory.signal();
146 resumeInFactory.await();
147 log.debug("{}: resuming", Thread.currentThread().getName());
148 return keys.stream().collect(Collectors.toMap(k -> k, k -> k + "-1"));
149 }, "A", "B", "C"));
150
151 blockedInFactory.await();
152 final CompletableFuture<String> t2Result =
153 runAndWaitForStart(() -> cache.get("B", () -> "bee"));
154
155 log.debug("{}: About to signal on barrierBulk", Thread.currentThread().getName());
156 resumeInFactory.signal();
157 log.debug("{}: and now", Thread.currentThread().getName());
158
159
160 final Map<String, String> t1ResultData = t1Result.join();
161 assertThat(t1ResultData.get("A"), is("A-1"));
162 assertThat(t1ResultData.get("B"), is("B-1"));
163 assertThat(t1ResultData.get("C"), is("C-1"));
164 assertThat(t2Result.join(), is("B-1"));
165 assertThat(cache.get("A"), is(Optional.of("A-1")));
166 assertThat(cache.get("B"), is(Optional.of("B-1")));
167 assertThat(cache.get("C"), is(Optional.of("C-1")));
168 }
169 }