View Javadoc

1   package com.atlassian.vcache.internal.core.service;
2   
3   import com.atlassian.utt.concurrency.Barrier;
4   import com.atlassian.vcache.RequestCache;
5   import com.atlassian.vcache.internal.RequestContext;
6   import com.atlassian.vcache.internal.core.DefaultRequestContext;
7   import com.atlassian.vcache.internal.test.AbstractRequestCacheTest;
8   import com.google.common.base.Throwables;
9   import org.junit.Before;
10  import org.junit.Test;
11  import org.slf4j.Logger;
12  import org.slf4j.LoggerFactory;
13  
14  import javax.annotation.Nullable;
15  import java.time.Duration;
16  import java.util.Map;
17  import java.util.Optional;
18  import java.util.concurrent.CompletableFuture;
19  import java.util.stream.Collectors;
20  import java.util.stream.IntStream;
21  
22  import static com.atlassian.vcache.internal.test.TestUtils.runAndWaitForStart;
23  import static org.hamcrest.Matchers.is;
24  import static org.junit.Assert.assertThat;
25  
26  public class DefaultRequestCacheTest extends AbstractRequestCacheTest {
27      private static final Logger log = LoggerFactory.getLogger(DefaultRequestCacheTest.class);
28  
29      @Nullable
30      private RequestContext context;
31  
32      @Before
33      public void initContext() {
34          context = new DefaultRequestContext(() -> "tenant-1");
35      }
36  
37      @Override
38      protected <K, V> RequestCache<K, V> createCache(String name, Duration lockTimeout) {
39          return new DefaultRequestCache<>(name, () -> context, lockTimeout);
40      }
41  
42      @Test
43      public void repeated_concurrency() {
44          IntStream.range(0, 1_000).forEach(i -> {
45              both_getBulk_and_get_fail();
46              getBulk_fail_and_get_succeed();
47              getBulk_pass_and_get_reuse();
48          });
49      }
50  
51      @Test
52      public void both_getBulk_and_get_fail() {
53          //Scenario:
54          // T1: bulkGet A, B, C
55          // T2: get B, supplier
56          // T1: bulkGet completes load with an exception
57          // T2: get B, supplier -> fails with another exception
58  
59          final Barrier blockedInFactory = new Barrier();
60          final Barrier resumeInFactory = new Barrier();
61  
62          final RequestCache<String, String> cache = createCache("cache");
63  
64          final RuntimeException exBulk = new RuntimeException("getBulk() exception");
65          final CompletableFuture<Map<String, String>> t1Result =
66                  runAndWaitForStart(() -> cache.getBulk(keys -> {
67                      log.debug("{}: about to wait", Thread.currentThread().getName());
68                      blockedInFactory.signal();
69                      resumeInFactory.await();
70                      log.debug("{}: About to throw exception", Thread.currentThread().getName());
71                      throw exBulk;
72                  }, "A", "B", "C"));
73  
74          blockedInFactory.await();
75          final RuntimeException exSingle = new RuntimeException("get() exception");
76          final CompletableFuture<String> t2Result =
77                  runAndWaitForStart(() -> cache.get("B", () -> {
78                      log.debug("{}: About to throw exception", Thread.currentThread().getName());
79                      throw exSingle;
80                  }));
81  
82          log.debug("{}: About to signal on barrierBulk", Thread.currentThread().getName());
83          resumeInFactory.signal(); //Wait until T2 cache.get calls join
84  
85          assertThat(t1Result.handle((r, e) -> Throwables.getRootCause(e)).join(), is(exBulk));
86          assertThat(t2Result.handle((r, e) -> Throwables.getRootCause(e)).join(), is(exSingle));
87          assertThat(cache.get("A"), is(Optional.empty()));
88          assertThat(cache.get("B"), is(Optional.empty()));
89          assertThat(cache.get("C"), is(Optional.empty()));
90      }
91  
92      @Test
93      public void getBulk_fail_and_get_succeed() {
94          //Scenario:
95          // T1: bulkGet A, B, C
96          // T2: get B, supplier
97          // T1: bulkGet function throws exception
98          // T2: get B, supplier -> returns value
99  
100         final Barrier blockedInFactory = new Barrier();
101         final Barrier resumeInFactory = new Barrier();
102 
103         final RequestCache<String, String> cache = createCache("cache");
104 
105         final RuntimeException exBulk = new RuntimeException("getBulk() exception");
106         final CompletableFuture<Map<String, String>> t1Result =
107                 runAndWaitForStart(() -> cache.getBulk(keys -> {
108                     log.debug("{}: about to wait", Thread.currentThread().getName());
109                     blockedInFactory.signal();
110                     resumeInFactory.await();
111                     log.debug("{}: About to throw exception", Thread.currentThread().getName());
112                     throw exBulk;
113                 }, "A", "B", "C"));
114 
115         blockedInFactory.await();
116         final CompletableFuture<String> t2Result =
117                 runAndWaitForStart(() -> cache.get("B", () -> "bee"));
118 
119         log.debug("{}: About to signal on barrierBulk", Thread.currentThread().getName());
120         resumeInFactory.signal(); //Wait until T2 cache.get calls join
121 
122         assertThat(t1Result.handle((r, e) -> Throwables.getRootCause(e)).join(), is(exBulk));
123         assertThat(t2Result.join(), is("bee"));
124         assertThat(cache.get("A"), is(Optional.empty()));
125         assertThat(cache.get("B"), is(Optional.of("bee")));
126         assertThat(cache.get("C"), is(Optional.empty()));
127     }
128 
129     @Test
130     public void getBulk_pass_and_get_reuse() {
131         //Scenario:
132         // T1: bulkGet A, B, C
133         // T2: get B, supplier
134         // T1: bulkGet function returns values
135         // T2: get B, supplier -> returns bulkGet value
136 
137         final Barrier blockedInFactory = new Barrier();
138         final Barrier resumeInFactory = new Barrier();
139 
140         final RequestCache<String, String> cache = createCache("cache");
141 
142         final CompletableFuture<Map<String, String>> t1Result =
143                 runAndWaitForStart(() -> cache.getBulk(keys -> {
144                     log.debug("{}: About to await on barrierBulk", Thread.currentThread().getName());
145                     blockedInFactory.signal();
146                     resumeInFactory.await();
147                     log.debug("{}: resuming", Thread.currentThread().getName());
148                     return keys.stream().collect(Collectors.toMap(k -> k, k -> k + "-1"));
149                 }, "A", "B", "C"));
150 
151         blockedInFactory.await();
152         final CompletableFuture<String> t2Result =
153                 runAndWaitForStart(() -> cache.get("B", () -> "bee"));
154 
155         log.debug("{}: About to signal on barrierBulk", Thread.currentThread().getName());
156         resumeInFactory.signal(); //Wait until T2 cache.get calls join
157         log.debug("{}: and now", Thread.currentThread().getName());
158 
159 
160         final Map<String, String> t1ResultData = t1Result.join();
161         assertThat(t1ResultData.get("A"), is("A-1"));
162         assertThat(t1ResultData.get("B"), is("B-1"));
163         assertThat(t1ResultData.get("C"), is("C-1"));
164         assertThat(t2Result.join(), is("B-1"));
165         assertThat(cache.get("A"), is(Optional.of("A-1")));
166         assertThat(cache.get("B"), is(Optional.of("B-1")));
167         assertThat(cache.get("C"), is(Optional.of("C-1")));
168     }
169 }