View Javadoc

1   package com.atlassian.vcache.internal.harness;
2   
3   import com.atlassian.vcache.DirectExternalCache;
4   import com.atlassian.vcache.LocalCacheOperations;
5   import com.atlassian.vcache.PutPolicy;
6   import com.atlassian.vcache.internal.RequestMetrics;
7   import com.codahale.metrics.Timer;
8   import org.slf4j.Logger;
9   import org.slf4j.LoggerFactory;
10  
11  import java.util.ArrayList;
12  import java.util.List;
13  import java.util.Random;
14  import java.util.stream.IntStream;
15  
16  import static java.util.Objects.requireNonNull;
17  import static java.util.stream.Collectors.joining;
18  
19  public class Worker implements Runnable
20  {
21      private static final Logger log = LoggerFactory.getLogger(Worker.class);
22      private static final String BIG_STRING =
23              IntStream.range(0, 100).mapToObj(Integer::toString).collect(joining("-"));
24  
25      private final int id;
26      private final OverallConfig overallConfig;
27      private final Random numGenerator;
28      private final List<String> cacheNames;
29  
30      private volatile boolean stopNow;
31  
32      public Worker(int id, OverallConfig overallConfig)
33      {
34          this.id = id;
35          this.overallConfig = requireNonNull(overallConfig);
36          this.numGenerator = new Random(id);
37          this.cacheNames = new ArrayList<>(overallConfig.cacheTestConfigMap.keySet());
38      }
39  
40      @Override
41      public void run()
42      {
43          // The main logic is to loop until told to stop.
44          // For each loop, setup the thread, perform cache operations, get the request metrics and clear the thread.
45          log.info("Thread {}: running", id);
46          while (!stopNow)
47          {
48              log.trace("Thread {}: starting loop", id);
49              overallConfig.requestContextSupplier.initThread("server-mode");
50  
51              final int numCachesOps = numGenerator.nextInt(overallConfig.numOfCachesOps);
52              log.trace("Going to perform: {} ", numCachesOps);
53              IntStream.range(0, numCachesOps).forEach(i -> {
54                  // Set a cache to operate on
55                  final int cacheIndex = numGenerator.nextInt(cacheNames.size());
56                  final String cacheName = cacheNames.get(cacheIndex);
57                  log.trace("Going to operate on {}", cacheName);
58                  final CacheConfig cacheConfig = overallConfig.cacheTestConfigMap.get(cacheName);
59                  if (cacheConfig.cache instanceof LocalCacheOperations)
60                  {
61                      handleLocalCacheOperations(cacheConfig);
62                  }
63                  else if (cacheConfig.cache instanceof DirectExternalCache)
64                  {
65                      handleDirectExternalCache(cacheConfig);
66                  }
67                  else
68                  {
69                      log.warn("Ignoring unknown cache type: {}", cacheConfig.cache.getClass().getName());
70                  }
71              });
72  
73              final RequestMetrics requestMetrics =
74                      overallConfig.service.metrics(overallConfig.requestContextSupplier.get());
75              overallConfig.requestContextSupplier.clearThread();
76              log.trace("Thread {}: stopping loop", id);
77  
78              // Sleep until ready to go again
79              try
80              {
81                  Thread.sleep(numGenerator.nextInt(overallConfig.maxSleepBetweenLoops));
82              }
83              catch (InterruptedException e)
84              {
85                  Thread.interrupted();
86                  throw new RuntimeException(e);
87              }
88          }
89          log.info("Thread {}: stopping", id);
90      }
91  
92      private void handleDirectExternalCache(CacheConfig cacheConfig)
93      {
94          final DirectExternalCache<String> directCache = (DirectExternalCache<String>) cacheConfig.cache;
95          final int numOperations = numGenerator.nextInt(cacheConfig.maxLoops);
96          final String cacheType = cacheConfig.cache.getClass().getSimpleName();
97  
98          IntStream.range(0, numOperations).forEach(i -> {
99              final String key = "key-" + numGenerator.nextInt(cacheConfig.numOfKeys);
100             randomlyPerform(
101                     cacheConfig.chanceOfPutOp,
102                     cacheType + ".put",
103                     () -> directCache.put(key, BIG_STRING, PutPolicy.PUT_ALWAYS));
104             randomlyPerform(
105                     cacheConfig.chanceOfGetOp,
106                     cacheType + ".get",
107                     () -> directCache.get(key));
108         });
109     }
110 
111     private void handleLocalCacheOperations(CacheConfig cacheConfig)
112     {
113         final LocalCacheOperations<String, String> localOps = (LocalCacheOperations<String, String>) cacheConfig.cache;
114         final int numOperations = numGenerator.nextInt(cacheConfig.maxLoops);
115         final String cacheType = cacheConfig.cache.getClass().getSimpleName();
116 
117         IntStream.range(0, numOperations).forEach(i -> {
118             final String key = "key-" + numGenerator.nextInt(cacheConfig.numOfKeys);
119             randomlyPerform(
120                     cacheConfig.chanceOfPutOp,
121                     cacheType + ".put",
122                     () -> localOps.put(key, BIG_STRING));
123             randomlyPerform(
124                     cacheConfig.chanceOfGetOp,
125                     cacheType + ".get",
126                     () -> localOps.get(key));
127         });
128     }
129 
130     private void randomlyPerform(int chance, String timerName, Runnable perform)
131     {
132         final int dice = numGenerator.nextInt(100);
133         if (dice < chance)
134         {
135             try (Timer.Context ignored = overallConfig.metricRegistry.timer(timerName).time())
136             {
137                 perform.run();
138             }
139         }
140     }
141 
142     void stopIt()
143     {
144         stopNow = true;
145     }
146 }