1 package com.atlassian.vcache.internal.redis;
2
3 import com.atlassian.annotations.tenancy.Tenantless;
4 import com.atlassian.fugue.Pair;
5 import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
6 import org.slf4j.Logger;
7 import org.slf4j.LoggerFactory;
8 import redis.clients.jedis.Jedis;
9 import redis.clients.jedis.JedisPool;
10
11 import java.net.URI;
12 import java.util.concurrent.atomic.AtomicInteger;
13 import java.util.function.Consumer;
14
15 import static com.atlassian.fugue.Pair.pair;
16
17
18
19
20
21
22
23
24
25 public class ThreadAffinityJedisPool extends JedisPool {
26 private static final Logger log = LoggerFactory.getLogger(ThreadAffinityJedisPool.class);
27
28 @Tenantless(reason = "Thread-local data is used only to coordinate the sharing of a Jedis client over the short-term life of a VCache operation.")
29 private final ThreadLocal<Pair<Jedis, AtomicInteger>> threadBoundResources = new ThreadLocal<>();
30
31 public ThreadAffinityJedisPool() {
32 super();
33 }
34
35 public ThreadAffinityJedisPool(GenericObjectPoolConfig poolConfig, String host) {
36 super(poolConfig, host);
37 }
38
39 public ThreadAffinityJedisPool(String host, int port) {
40 super(host, port);
41 }
42
43 public ThreadAffinityJedisPool(String host) {
44 super(host);
45 }
46
47 public ThreadAffinityJedisPool(URI uri) {
48 super(uri);
49 }
50
51 public ThreadAffinityJedisPool(URI uri, int timeout) {
52 super(uri, timeout);
53 }
54
55 public ThreadAffinityJedisPool(GenericObjectPoolConfig poolConfig, String host, int port, int timeout, String password) {
56 super(poolConfig, host, port, timeout, password);
57 }
58
59 public ThreadAffinityJedisPool(GenericObjectPoolConfig poolConfig, String host, int port) {
60 super(poolConfig, host, port);
61 }
62
63 public ThreadAffinityJedisPool(GenericObjectPoolConfig poolConfig, String host, int port, int timeout) {
64 super(poolConfig, host, port, timeout);
65 }
66
67 public ThreadAffinityJedisPool(GenericObjectPoolConfig poolConfig, String host, int port, int timeout, String password, int database) {
68 super(poolConfig, host, port, timeout, password, database);
69 }
70
71 public ThreadAffinityJedisPool(GenericObjectPoolConfig poolConfig, String host, int port, int timeout, String password, int database, String clientName) {
72 super(poolConfig, host, port, timeout, password, database, clientName);
73 }
74
75 public ThreadAffinityJedisPool(GenericObjectPoolConfig poolConfig, String host, int port, int connectionTimeout, int soTimeout, String password, int database, String clientName) {
76 super(poolConfig, host, port, connectionTimeout, soTimeout, password, database, clientName);
77 }
78
79 public ThreadAffinityJedisPool(GenericObjectPoolConfig poolConfig, URI uri) {
80 super(poolConfig, uri);
81 }
82
83 public ThreadAffinityJedisPool(GenericObjectPoolConfig poolConfig, URI uri, int timeout) {
84 super(poolConfig, uri, timeout);
85 }
86
87 public ThreadAffinityJedisPool(GenericObjectPoolConfig poolConfig, URI uri, int connectionTimeout, int soTimeout) {
88 super(poolConfig, uri, connectionTimeout, soTimeout);
89 }
90
91 @Override
92 public Jedis getResource() {
93 final Jedis jedis = getOrCreateThreadBound().left();
94 final int newCount = getOrCreateThreadBound().right().incrementAndGet();
95 log.debug("Using thread-bound {}, reference count is now {}", jedis, newCount);
96 return jedis;
97 }
98
99 private Pair<Jedis, AtomicInteger> getOrCreateThreadBound() {
100 final Pair<Jedis, AtomicInteger> threadBoundResource = threadBoundResources.get();
101 if (threadBoundResource == null) {
102 log.debug("No thread-bound Jedis instance found, fetching one from the pool");
103 final Jedis jedis = super.getResource();
104 threadBoundResources.set(pair(jedis, new AtomicInteger(0)));
105 log.debug("Bound {} to thread with reference count 0", jedis);
106 }
107 return threadBoundResources.get();
108 }
109
110 @Override
111 public void returnBrokenResource(Jedis jedis) {
112 returnInternal(jedis, super::returnBrokenResource);
113 }
114
115 @Override
116 public void returnResource(Jedis jedis) {
117 returnInternal(jedis, super::returnResource);
118 }
119
120 private void returnInternal(Jedis jedis, Consumer<Jedis> returnAction) {
121 log.debug("Returning {}", jedis);
122 if (threadBoundResources.get() == null) {
123 throw new IllegalStateException("No thread-bound resource present");
124 }
125 if (jedis != threadBoundResources.get().left()) {
126 throw new IllegalStateException("Thread-bound Jedis object is not the same as the object being returned");
127 }
128 final int newCount = threadBoundResources.get().right().decrementAndGet();
129 if (newCount == 0) {
130 log.debug("Reference count of thread-bound {} is now 0, returning to the pool", jedis);
131 threadBoundResources.remove();
132 returnAction.accept(jedis);
133 } else {
134 log.debug("Reference count of thread-bound {} is now {}, not returning to pool", jedis, newCount);
135 }
136 }
137 }