View Javadoc

1   package com.atlassian.vcache.internal.core.service;
2   
3   import com.atlassian.utt.concurrency.Barrier;
4   import com.atlassian.vcache.RequestCache;
5   import com.atlassian.vcache.internal.RequestContext;
6   import com.atlassian.vcache.internal.core.DefaultRequestContext;
7   import com.atlassian.vcache.internal.test.AbstractRequestCacheTest;
8   import com.google.common.base.Throwables;
9   import org.junit.After;
10  import org.junit.Before;
11  import org.junit.Test;
12  import org.slf4j.Logger;
13  import org.slf4j.LoggerFactory;
14  
15  import javax.annotation.Nullable;
16  import java.util.Map;
17  import java.util.Optional;
18  import java.util.concurrent.Callable;
19  import java.util.concurrent.CompletableFuture;
20  import java.util.concurrent.ExecutionException;
21  import java.util.concurrent.ExecutorService;
22  import java.util.concurrent.Executors;
23  import java.util.concurrent.Future;
24  import java.util.stream.Collectors;
25  import java.util.stream.IntStream;
26  
27  import static org.hamcrest.Matchers.is;
28  import static org.junit.Assert.assertThat;
29  
30  public class DefaultRequestCacheTest extends AbstractRequestCacheTest {
31      private static final Logger log = LoggerFactory.getLogger(DefaultRequestCacheTest.class);
32  
33      private final ExecutorService executorService = Executors.newCachedThreadPool();
34  
35      @Nullable
36      private RequestContext context;
37  
38      @Before
39      public void initContext() {
40          context = new DefaultRequestContext("tenant-1");
41      }
42  
43      @After
44      public void tearDown() {
45          executorService.shutdown();
46      }
47  
48      @Override
49      protected <K, V> RequestCache<K, V> createCache(String name) {
50          return new DefaultRequestCache<>(name, () -> context);
51      }
52  
53      @Test
54      public void repeated_concurrency() {
55          IntStream.range(0, 1_000).forEach(i -> {
56              both_getBulk_and_get_fail();
57              getBulk_fail_and_get_succeed();
58              getBulk_pass_and_get_reuse();
59          });
60      }
61  
62      @Test
63      public void both_getBulk_and_get_fail() {
64          //Scenario:
65          // T1: bulkGet A, B, C
66          // T2: get B, supplier
67          // T1: bulkGet completes load with an exception
68          // T2: get B, supplier -> fails with another exception
69  
70          final Barrier blockedInFactory = new Barrier();
71          final Barrier resumeInFactory = new Barrier();
72  
73          final DefaultRequestCache<String, String> cache = new DefaultRequestCache<>("cache", () -> context);
74  
75          final RuntimeException exBulk = new RuntimeException("getBulk() exception");
76          final CompletableFuture<Map<String, String>> t1Result = runAndWaitForStart(() -> cache.getBulk(keys -> {
77              log.debug("{}: about to wait", Thread.currentThread().getName());
78              blockedInFactory.signal();
79              resumeInFactory.await();
80              log.debug("{}: About to throw exception", Thread.currentThread().getName());
81              throw exBulk;
82          }, "A", "B", "C"));
83  
84          blockedInFactory.await();
85          final RuntimeException exSingle = new RuntimeException("get() exception");
86          final CompletableFuture<String> t2Result = runAndWaitForStart(() -> cache.get("B", () -> {
87              log.debug("{}: About to throw exception", Thread.currentThread().getName());
88              throw exSingle;
89          }));
90  
91          log.debug("{}: About to signal on barrierBulk", Thread.currentThread().getName());
92          resumeInFactory.signal(); //Wait until T2 cache.get calls join
93  
94          assertThat(t1Result.handle((r, e) -> Throwables.getRootCause(e)).join(), is(exBulk));
95          assertThat(t2Result.handle((r, e) -> Throwables.getRootCause(e)).join(), is(exSingle));
96          assertThat(cache.get("A"), is(Optional.empty()));
97          assertThat(cache.get("B"), is(Optional.empty()));
98          assertThat(cache.get("C"), is(Optional.empty()));
99      }
100 
101     @Test
102     public void getBulk_fail_and_get_succeed() {
103         //Scenario:
104         // T1: bulkGet A, B, C
105         // T2: get B, supplier
106         // T1: bulkGet function throws exception
107         // T2: get B, supplier -> returns value
108 
109         final Barrier blockedInFactory = new Barrier();
110         final Barrier resumeInFactory = new Barrier();
111 
112         final DefaultRequestCache<String, String> cache = new DefaultRequestCache<>("cache", () -> context);
113 
114         final RuntimeException exBulk = new RuntimeException("getBulk() exception");
115         final CompletableFuture<Map<String, String>> t1Result = runAndWaitForStart(() -> cache.getBulk(keys -> {
116             log.debug("{}: about to wait", Thread.currentThread().getName());
117             blockedInFactory.signal();
118             resumeInFactory.await();
119             log.debug("{}: About to throw exception", Thread.currentThread().getName());
120             throw exBulk;
121         }, "A", "B", "C"));
122 
123         blockedInFactory.await();
124         final CompletableFuture<String> t2Result = runAndWaitForStart(() -> cache.get("B", () -> "bee"));
125 
126         log.debug("{}: About to signal on barrierBulk", Thread.currentThread().getName());
127         resumeInFactory.signal(); //Wait until T2 cache.get calls join
128 
129         assertThat(t1Result.handle((r, e) -> Throwables.getRootCause(e)).join(), is(exBulk));
130         assertThat(t2Result.join(), is("bee"));
131         assertThat(cache.get("A"), is(Optional.empty()));
132         assertThat(cache.get("B"), is(Optional.of("bee")));
133         assertThat(cache.get("C"), is(Optional.empty()));
134     }
135 
136     @Test
137     public void getBulk_pass_and_get_reuse() {
138         //Scenario:
139         // T1: bulkGet A, B, C
140         // T2: get B, supplier
141         // T1: bulkGet function returns values
142         // T2: get B, supplier -> returns bulkGet value
143 
144         final Barrier blockedInFactory = new Barrier();
145         final Barrier resumeInFactory = new Barrier();
146 
147         final DefaultRequestCache<String, String> cache = new DefaultRequestCache<>("cache", () -> context);
148 
149         final CompletableFuture<Map<String, String>> t1Result = runAndWaitForStart(() -> cache.getBulk(keys -> {
150             log.debug("{}: About to await on barrierBulk", Thread.currentThread().getName());
151             blockedInFactory.signal();
152             resumeInFactory.await();
153             log.debug("{}: resuming", Thread.currentThread().getName());
154             return keys.stream().collect(Collectors.toMap(k -> k, k -> k + "-1"));
155         }, "A", "B", "C"));
156 
157         blockedInFactory.await();
158         final CompletableFuture<String> t2Result = runAndWaitForStart(() -> cache.get("B", () -> "bee"));
159 
160         log.debug("{}: About to signal on barrierBulk", Thread.currentThread().getName());
161         resumeInFactory.signal(); //Wait until T2 cache.get calls join
162         log.debug("{}: and now", Thread.currentThread().getName());
163 
164 
165         final Map<String, String> t1ResultData = t1Result.join();
166         assertThat(t1ResultData.get("A"), is("A-1"));
167         assertThat(t1ResultData.get("B"), is("B-1"));
168         assertThat(t1ResultData.get("C"), is("C-1"));
169         assertThat(t2Result.join(), is("B-1"));
170         assertThat(cache.get("A"), is(Optional.of("A-1")));
171         assertThat(cache.get("B"), is(Optional.of("B-1")));
172         assertThat(cache.get("C"), is(Optional.of("C-1")));
173     }
174 
175     private <T> CompletableFuture<T> runAndWaitForStart(Callable<T> r) {
176         final Barrier barrier = new Barrier();
177         log.debug("{}: About to submit()", Thread.currentThread().getName());
178         final Future<T> result = executorService.submit(() -> {
179             log.debug("{}: About to signal()", Thread.currentThread().getName());
180             barrier.signal();
181             log.debug("{}: resume after signal", Thread.currentThread().getName());
182             return r.call();
183         });
184 
185         log.debug("{}: About to await()", Thread.currentThread().getName());
186         barrier.await();
187         log.debug("{}: resume after await()", Thread.currentThread().getName());
188 
189         return CompletableFuture.supplyAsync(() -> {
190             try {
191                 return result.get();
192             } catch (InterruptedException | ExecutionException e) {
193                 throw new RuntimeException(e);
194             }
195         });
196     }
197 }