1 package com.atlassian.vcache.internal.core.service;
2
3 import com.atlassian.utt.concurrency.Barrier;
4 import com.atlassian.vcache.RequestCache;
5 import com.atlassian.vcache.internal.RequestContext;
6 import com.atlassian.vcache.internal.core.DefaultRequestContext;
7 import com.atlassian.vcache.internal.test.AbstractRequestCacheTest;
8 import com.google.common.base.Throwables;
9 import org.junit.After;
10 import org.junit.Before;
11 import org.junit.Test;
12 import org.slf4j.Logger;
13 import org.slf4j.LoggerFactory;
14
15 import javax.annotation.Nullable;
16 import java.util.Map;
17 import java.util.Optional;
18 import java.util.concurrent.Callable;
19 import java.util.concurrent.CompletableFuture;
20 import java.util.concurrent.ExecutionException;
21 import java.util.concurrent.ExecutorService;
22 import java.util.concurrent.Executors;
23 import java.util.concurrent.Future;
24 import java.util.stream.Collectors;
25 import java.util.stream.IntStream;
26
27 import static org.hamcrest.Matchers.is;
28 import static org.junit.Assert.assertThat;
29
30 public class DefaultRequestCacheTest extends AbstractRequestCacheTest {
31 private static final Logger log = LoggerFactory.getLogger(DefaultRequestCacheTest.class);
32
33 private final ExecutorService executorService = Executors.newCachedThreadPool();
34
35 @Nullable
36 private RequestContext context;
37
38 @Before
39 public void initContext() {
40 context = new DefaultRequestContext("tenant-1");
41 }
42
43 @After
44 public void tearDown() {
45 executorService.shutdown();
46 }
47
48 @Override
49 protected <K, V> RequestCache<K, V> createCache(String name) {
50 return new DefaultRequestCache<>(name, () -> context);
51 }
52
53 @Test
54 public void repeated_concurrency() {
55 IntStream.range(0, 1_000).forEach(i -> {
56 both_getBulk_and_get_fail();
57 getBulk_fail_and_get_succeed();
58 getBulk_pass_and_get_reuse();
59 });
60 }
61
62 @Test
63 public void both_getBulk_and_get_fail() {
64
65
66
67
68
69
70 final Barrier blockedInFactory = new Barrier();
71 final Barrier resumeInFactory = new Barrier();
72
73 final DefaultRequestCache<String, String> cache = new DefaultRequestCache<>("cache", () -> context);
74
75 final RuntimeException exBulk = new RuntimeException("getBulk() exception");
76 final CompletableFuture<Map<String, String>> t1Result = runAndWaitForStart(() -> cache.getBulk(keys -> {
77 log.debug("{}: about to wait", Thread.currentThread().getName());
78 blockedInFactory.signal();
79 resumeInFactory.await();
80 log.debug("{}: About to throw exception", Thread.currentThread().getName());
81 throw exBulk;
82 }, "A", "B", "C"));
83
84 blockedInFactory.await();
85 final RuntimeException exSingle = new RuntimeException("get() exception");
86 final CompletableFuture<String> t2Result = runAndWaitForStart(() -> cache.get("B", () -> {
87 log.debug("{}: About to throw exception", Thread.currentThread().getName());
88 throw exSingle;
89 }));
90
91 log.debug("{}: About to signal on barrierBulk", Thread.currentThread().getName());
92 resumeInFactory.signal();
93
94 assertThat(t1Result.handle((r, e) -> Throwables.getRootCause(e)).join(), is(exBulk));
95 assertThat(t2Result.handle((r, e) -> Throwables.getRootCause(e)).join(), is(exSingle));
96 assertThat(cache.get("A"), is(Optional.empty()));
97 assertThat(cache.get("B"), is(Optional.empty()));
98 assertThat(cache.get("C"), is(Optional.empty()));
99 }
100
101 @Test
102 public void getBulk_fail_and_get_succeed() {
103
104
105
106
107
108
109 final Barrier blockedInFactory = new Barrier();
110 final Barrier resumeInFactory = new Barrier();
111
112 final DefaultRequestCache<String, String> cache = new DefaultRequestCache<>("cache", () -> context);
113
114 final RuntimeException exBulk = new RuntimeException("getBulk() exception");
115 final CompletableFuture<Map<String, String>> t1Result = runAndWaitForStart(() -> cache.getBulk(keys -> {
116 log.debug("{}: about to wait", Thread.currentThread().getName());
117 blockedInFactory.signal();
118 resumeInFactory.await();
119 log.debug("{}: About to throw exception", Thread.currentThread().getName());
120 throw exBulk;
121 }, "A", "B", "C"));
122
123 blockedInFactory.await();
124 final CompletableFuture<String> t2Result = runAndWaitForStart(() -> cache.get("B", () -> "bee"));
125
126 log.debug("{}: About to signal on barrierBulk", Thread.currentThread().getName());
127 resumeInFactory.signal();
128
129 assertThat(t1Result.handle((r, e) -> Throwables.getRootCause(e)).join(), is(exBulk));
130 assertThat(t2Result.join(), is("bee"));
131 assertThat(cache.get("A"), is(Optional.empty()));
132 assertThat(cache.get("B"), is(Optional.of("bee")));
133 assertThat(cache.get("C"), is(Optional.empty()));
134 }
135
136 @Test
137 public void getBulk_pass_and_get_reuse() {
138
139
140
141
142
143
144 final Barrier blockedInFactory = new Barrier();
145 final Barrier resumeInFactory = new Barrier();
146
147 final DefaultRequestCache<String, String> cache = new DefaultRequestCache<>("cache", () -> context);
148
149 final CompletableFuture<Map<String, String>> t1Result = runAndWaitForStart(() -> cache.getBulk(keys -> {
150 log.debug("{}: About to await on barrierBulk", Thread.currentThread().getName());
151 blockedInFactory.signal();
152 resumeInFactory.await();
153 log.debug("{}: resuming", Thread.currentThread().getName());
154 return keys.stream().collect(Collectors.toMap(k -> k, k -> k + "-1"));
155 }, "A", "B", "C"));
156
157 blockedInFactory.await();
158 final CompletableFuture<String> t2Result = runAndWaitForStart(() -> cache.get("B", () -> "bee"));
159
160 log.debug("{}: About to signal on barrierBulk", Thread.currentThread().getName());
161 resumeInFactory.signal();
162 log.debug("{}: and now", Thread.currentThread().getName());
163
164
165 final Map<String, String> t1ResultData = t1Result.join();
166 assertThat(t1ResultData.get("A"), is("A-1"));
167 assertThat(t1ResultData.get("B"), is("B-1"));
168 assertThat(t1ResultData.get("C"), is("C-1"));
169 assertThat(t2Result.join(), is("B-1"));
170 assertThat(cache.get("A"), is(Optional.of("A-1")));
171 assertThat(cache.get("B"), is(Optional.of("B-1")));
172 assertThat(cache.get("C"), is(Optional.of("C-1")));
173 }
174
175 private <T> CompletableFuture<T> runAndWaitForStart(Callable<T> r) {
176 final Barrier barrier = new Barrier();
177 log.debug("{}: About to submit()", Thread.currentThread().getName());
178 final Future<T> result = executorService.submit(() -> {
179 log.debug("{}: About to signal()", Thread.currentThread().getName());
180 barrier.signal();
181 log.debug("{}: resume after signal", Thread.currentThread().getName());
182 return r.call();
183 });
184
185 log.debug("{}: About to await()", Thread.currentThread().getName());
186 barrier.await();
187 log.debug("{}: resume after await()", Thread.currentThread().getName());
188
189 return CompletableFuture.supplyAsync(() -> {
190 try {
191 return result.get();
192 } catch (InterruptedException | ExecutionException e) {
193 throw new RuntimeException(e);
194 }
195 });
196 }
197 }