View Javadoc

1   package com.atlassian.vcache.internal.redis;
2   
3   import com.atlassian.annotations.tenancy.Tenantless;
4   import com.atlassian.fugue.Pair;
5   import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
6   import org.slf4j.Logger;
7   import org.slf4j.LoggerFactory;
8   import redis.clients.jedis.Jedis;
9   import redis.clients.jedis.JedisPool;
10  
11  import java.net.URI;
12  import java.util.concurrent.atomic.AtomicInteger;
13  import java.util.function.Consumer;
14  
15  import static com.atlassian.fugue.Pair.pair;
16  
17  /**
18   * A specialised {@link JedisPool} which binds the currently in-use {@link Jedis} instance to the requesting thread,
19   * so that repeated calls to {@link #getResource()} without a corresponding {@link #returnResource(Jedis)} will
20   * return the same instance. A reference count is kept for each thread-bound instance, and when an instance is returned
21   * and the reference count reaches zero, the instance is returned to the pool for real.
22   *
23   * @since 1.10.1
24   */
25  public class ThreadAffinityJedisPool extends JedisPool {
26      private static final Logger log = LoggerFactory.getLogger(ThreadAffinityJedisPool.class);
27  
28      @Tenantless(reason = "Thread-local data is used only to coordinate the sharing of a Jedis client over the short-term life of a VCache operation.")
29      private final ThreadLocal<Pair<Jedis, AtomicInteger>> threadBoundResources = new ThreadLocal<>();
30  
31      public ThreadAffinityJedisPool() {
32          super();
33      }
34  
35      public ThreadAffinityJedisPool(GenericObjectPoolConfig poolConfig, String host) {
36          super(poolConfig, host);
37      }
38  
39      public ThreadAffinityJedisPool(String host, int port) {
40          super(host, port);
41      }
42  
43      public ThreadAffinityJedisPool(String host) {
44          super(host);
45      }
46  
47      public ThreadAffinityJedisPool(URI uri) {
48          super(uri);
49      }
50  
51      public ThreadAffinityJedisPool(URI uri, int timeout) {
52          super(uri, timeout);
53      }
54  
55      public ThreadAffinityJedisPool(GenericObjectPoolConfig poolConfig, String host, int port, int timeout, String password) {
56          super(poolConfig, host, port, timeout, password);
57      }
58  
59      public ThreadAffinityJedisPool(GenericObjectPoolConfig poolConfig, String host, int port) {
60          super(poolConfig, host, port);
61      }
62  
63      public ThreadAffinityJedisPool(GenericObjectPoolConfig poolConfig, String host, int port, int timeout) {
64          super(poolConfig, host, port, timeout);
65      }
66  
67      public ThreadAffinityJedisPool(GenericObjectPoolConfig poolConfig, String host, int port, int timeout, String password, int database) {
68          super(poolConfig, host, port, timeout, password, database);
69      }
70  
71      public ThreadAffinityJedisPool(GenericObjectPoolConfig poolConfig, String host, int port, int timeout, String password, int database, String clientName) {
72          super(poolConfig, host, port, timeout, password, database, clientName);
73      }
74  
75      public ThreadAffinityJedisPool(GenericObjectPoolConfig poolConfig, String host, int port, int connectionTimeout, int soTimeout, String password, int database, String clientName) {
76          super(poolConfig, host, port, connectionTimeout, soTimeout, password, database, clientName);
77      }
78  
79      public ThreadAffinityJedisPool(GenericObjectPoolConfig poolConfig, URI uri) {
80          super(poolConfig, uri);
81      }
82  
83      public ThreadAffinityJedisPool(GenericObjectPoolConfig poolConfig, URI uri, int timeout) {
84          super(poolConfig, uri, timeout);
85      }
86  
87      public ThreadAffinityJedisPool(GenericObjectPoolConfig poolConfig, URI uri, int connectionTimeout, int soTimeout) {
88          super(poolConfig, uri, connectionTimeout, soTimeout);
89      }
90  
91      @Override
92      public Jedis getResource() {
93          final Jedis jedis = getOrCreateThreadBound().left();
94          final int newCount = getOrCreateThreadBound().right().incrementAndGet();
95          log.debug("Using thread-bound {}, reference count is now {}", jedis, newCount);
96          return jedis;
97      }
98  
99      private Pair<Jedis, AtomicInteger> getOrCreateThreadBound() {
100         final Pair<Jedis, AtomicInteger> threadBoundResource = threadBoundResources.get();
101         if (threadBoundResource == null) {
102             log.debug("No thread-bound Jedis instance found, fetching one from the pool");
103             final Jedis jedis = super.getResource();
104             threadBoundResources.set(pair(jedis, new AtomicInteger(0)));
105             log.debug("Bound {} to thread with reference count 0", jedis);
106         }
107         return threadBoundResources.get();
108     }
109 
110     @Override
111     public void returnBrokenResource(Jedis jedis) {
112         returnInternal(jedis, super::returnBrokenResource);
113     }
114 
115     @Override
116     public void returnResource(Jedis jedis) {
117         returnInternal(jedis, super::returnResource);
118     }
119 
120     private void returnInternal(Jedis jedis, Consumer<Jedis> returnAction) {
121         log.debug("Returning {}", jedis);
122         if (threadBoundResources.get() == null) {
123             throw new IllegalStateException("No thread-bound resource present");
124         }
125         if (jedis != threadBoundResources.get().left()) {
126             throw new IllegalStateException("Thread-bound Jedis object is not the same as the object being returned");
127         }
128         final int newCount = threadBoundResources.get().right().decrementAndGet();
129         if (newCount == 0) {
130             log.debug("Reference count of thread-bound {} is now 0, returning to the pool", jedis);
131             threadBoundResources.remove();
132             returnAction.accept(jedis);
133         } else {
134             log.debug("Reference count of thread-bound {} is now {}, not returning to pool", jedis, newCount);
135         }
136     }
137 }