1 package com.atlassian.vcache.internal.harness;
2
3 import com.atlassian.vcache.DirectExternalCache;
4 import com.atlassian.vcache.LocalCacheOperations;
5 import com.atlassian.vcache.PutPolicy;
6 import com.atlassian.vcache.internal.RequestMetrics;
7 import com.codahale.metrics.Timer;
8 import org.slf4j.Logger;
9 import org.slf4j.LoggerFactory;
10
11 import java.util.ArrayList;
12 import java.util.List;
13 import java.util.Random;
14 import java.util.stream.IntStream;
15
16 import static java.util.Objects.requireNonNull;
17 import static java.util.stream.Collectors.joining;
18
19 public class Worker implements Runnable
20 {
21 private static final Logger log = LoggerFactory.getLogger(Worker.class);
22 private static final String BIG_STRING =
23 IntStream.range(0, 100).mapToObj(Integer::toString).collect(joining("-"));
24
25 private final int id;
26 private final OverallConfig overallConfig;
27 private final Random numGenerator;
28 private final List<String> cacheNames;
29
30 private volatile boolean stopNow;
31
32 public Worker(int id, OverallConfig overallConfig)
33 {
34 this.id = id;
35 this.overallConfig = requireNonNull(overallConfig);
36 this.numGenerator = new Random(id);
37 this.cacheNames = new ArrayList<>(overallConfig.cacheTestConfigMap.keySet());
38 }
39
40 @Override
41 public void run()
42 {
43
44
45 log.info("Thread {}: running", id);
46 while (!stopNow)
47 {
48 log.trace("Thread {}: starting loop", id);
49 overallConfig.requestContextSupplier.initThread("server-mode");
50
51 final int numCachesOps = numGenerator.nextInt(overallConfig.numOfCachesOps);
52 log.trace("Going to perform: {} ", numCachesOps);
53 IntStream.range(0, numCachesOps).forEach(i -> {
54
55 final int cacheIndex = numGenerator.nextInt(cacheNames.size());
56 final String cacheName = cacheNames.get(cacheIndex);
57 log.trace("Going to operate on {}", cacheName);
58 final CacheConfig cacheConfig = overallConfig.cacheTestConfigMap.get(cacheName);
59 if (cacheConfig.cache instanceof LocalCacheOperations)
60 {
61 handleLocalCacheOperations(cacheConfig);
62 }
63 else if (cacheConfig.cache instanceof DirectExternalCache)
64 {
65 handleDirectExternalCache(cacheConfig);
66 }
67 else
68 {
69 log.warn("Ignoring unknown cache type: {}", cacheConfig.cache.getClass().getName());
70 }
71 });
72
73 final RequestMetrics requestMetrics =
74 overallConfig.service.metrics(overallConfig.requestContextSupplier.get());
75 overallConfig.requestContextSupplier.clearThread();
76 log.trace("Thread {}: stopping loop", id);
77
78
79 try
80 {
81 Thread.sleep(numGenerator.nextInt(overallConfig.maxSleepBetweenLoops));
82 }
83 catch (InterruptedException e)
84 {
85 Thread.interrupted();
86 throw new RuntimeException(e);
87 }
88 }
89 log.info("Thread {}: stopping", id);
90 }
91
92 private void handleDirectExternalCache(CacheConfig cacheConfig)
93 {
94 final DirectExternalCache<String> directCache = (DirectExternalCache<String>) cacheConfig.cache;
95 final int numOperations = numGenerator.nextInt(cacheConfig.maxLoops);
96 final String cacheType = cacheConfig.cache.getClass().getSimpleName();
97
98 IntStream.range(0, numOperations).forEach(i -> {
99 final String key = "key-" + numGenerator.nextInt(cacheConfig.numOfKeys);
100 randomlyPerform(
101 cacheConfig.chanceOfPutOp,
102 cacheType + ".put",
103 () -> directCache.put(key, BIG_STRING, PutPolicy.PUT_ALWAYS));
104 randomlyPerform(
105 cacheConfig.chanceOfGetOp,
106 cacheType + ".get",
107 () -> directCache.get(key));
108 });
109 }
110
111 private void handleLocalCacheOperations(CacheConfig cacheConfig)
112 {
113 final LocalCacheOperations<String, String> localOps = (LocalCacheOperations<String, String>) cacheConfig.cache;
114 final int numOperations = numGenerator.nextInt(cacheConfig.maxLoops);
115 final String cacheType = cacheConfig.cache.getClass().getSimpleName();
116
117 IntStream.range(0, numOperations).forEach(i -> {
118 final String key = "key-" + numGenerator.nextInt(cacheConfig.numOfKeys);
119 randomlyPerform(
120 cacheConfig.chanceOfPutOp,
121 cacheType + ".put",
122 () -> localOps.put(key, BIG_STRING));
123 randomlyPerform(
124 cacheConfig.chanceOfGetOp,
125 cacheType + ".get",
126 () -> localOps.get(key));
127 });
128 }
129
130 private void randomlyPerform(int chance, String timerName, Runnable perform)
131 {
132 final int dice = numGenerator.nextInt(100);
133 if (dice < chance)
134 {
135 try (Timer.Context ignored = overallConfig.metricRegistry.timer(timerName).time())
136 {
137 perform.run();
138 }
139 }
140 }
141
142 void stopIt()
143 {
144 stopNow = true;
145 }
146 }