View Javadoc

1   package com.atlassian.vcache.internal.memcached;
2   
3   import com.atlassian.marshalling.api.MarshallingPair;
4   import com.atlassian.vcache.CasIdentifier;
5   import com.atlassian.vcache.ExternalCacheException;
6   import com.atlassian.vcache.IdentifiedValue;
7   import com.atlassian.vcache.PutPolicy;
8   import com.atlassian.vcache.internal.core.DefaultIdentifiedValue;
9   import com.atlassian.vcache.internal.core.VCacheCoreUtils;
10  import net.spy.memcached.CASValue;
11  import net.spy.memcached.MemcachedClientIF;
12  import net.spy.memcached.OperationTimeoutException;
13  import net.spy.memcached.internal.CheckedOperationTimeoutException;
14  import org.slf4j.Logger;
15  import org.slf4j.LoggerFactory;
16  
17  import java.util.Map;
18  import java.util.Optional;
19  import java.util.Set;
20  import java.util.concurrent.ExecutionException;
21  import java.util.concurrent.Future;
22  import java.util.function.Function;
23  import java.util.function.Supplier;
24  import java.util.stream.Collectors;
25  
26  import static com.atlassian.vcache.ExternalCacheException.Reason.TIMEOUT;
27  import static com.atlassian.vcache.ExternalCacheException.Reason.UNCLASSIFIED_FAILURE;
28  import static com.atlassian.vcache.internal.core.VCacheCoreUtils.unmarshall;
29  
30  /**
31   * Common utility methods that are specific for the Memcached implementation.
32   *
33   * @since 1.0.0
34   */
35  class MemcachedUtils {
36      private static final Logger log = LoggerFactory.getLogger(MemcachedUtils.class);
37  
38      /**
39       * The maximum number of seconds offset for a time to live, before TTL must be expressed as offset since 1970.
40       */
41      private static final int MAX_SECONDS_OFFSET = 60 * 60 * 24 * 30;
42  
43      static long safeExtractId(CasIdentifier casId) {
44          if (casId instanceof MemcachedCasIdentifier) {
45              return ((MemcachedCasIdentifier) casId).getId();
46          }
47  
48          log.warn("Passed an unknown CasIdentifier instance of class {}.", casId.getClass().getName());
49          throw new ExternalCacheException(UNCLASSIFIED_FAILURE);
50      }
51  
52      static <V> Optional<IdentifiedValue<V>> identifiedValueFrom(Future<CASValue<Object>> op, MarshallingPair<V> valueMarshalling) {
53          final CASValue<Object> casValue;
54          try {
55              casValue = op.get();
56          } catch (ExecutionException | InterruptedException ex) {
57              throw new ExternalCacheException(UNCLASSIFIED_FAILURE, ex);
58          }
59  
60          if (casValue == null) {
61              return Optional.empty();
62          }
63  
64          final CasIdentifier identifier = new MemcachedCasIdentifier(casValue.getCas());
65          final IdentifiedValue<V> iv = new DefaultIdentifiedValue<>(
66                  identifier, VCacheCoreUtils.unmarshall((byte[]) casValue.getValue(), valueMarshalling).get());
67          return Optional.of(iv);
68      }
69  
70      static Future<Boolean> putOperationForPolicy(
71              PutPolicy policy, MemcachedClientIF client, String externalKey, int defaultTtl, byte[] valueBytes) {
72          final Future<Boolean> putOp;
73          if (policy == PutPolicy.ADD_ONLY) {
74              putOp = client.add(externalKey, defaultTtl, valueBytes);
75          } else if (policy == PutPolicy.PUT_ALWAYS) {
76              putOp = client.set(externalKey, defaultTtl, valueBytes);
77          } else if (policy == PutPolicy.REPLACE_ONLY) {
78              putOp = client.replace(externalKey, defaultTtl, valueBytes);
79          } else {
80              throw new IllegalArgumentException("Unknown put policy: " + policy);
81          }
82  
83          return putOp;
84      }
85  
86      static ExternalCacheException mapException(Exception ex) {
87          if (ex instanceof OperationTimeoutException || ex instanceof CheckedOperationTimeoutException) {
88              return new ExternalCacheException(TIMEOUT, ex);
89          } else {
90              return new ExternalCacheException(UNCLASSIFIED_FAILURE, ex);
91          }
92      }
93  
94      static <V> Map<String, Optional<V>> directGetBulk(
95              Set<String> externalKeys, Supplier<MemcachedClientIF> clientSupplier, MarshallingPair<V> valueMarshalling) {
96          // Returns map of keys that contain values, so need to handle the missing ones
97          final Map<String, Object> haveValues = clientSupplier.get().getBulk(externalKeys);
98  
99          return externalKeys.stream().collect(Collectors.toMap(
100                 k -> k,
101                 k -> unmarshall((byte[]) haveValues.get(k), valueMarshalling)));
102     }
103 
104     private static long obtainCacheVersion(Supplier<MemcachedClientIF> clientSupplier, String externalCacheVersionKey) {
105         // Incrementing by 0, to get the current value
106         return clientSupplier.get().incr(externalCacheVersionKey, 0, 1);
107     }
108 
109     private static long incrementCacheVersion(Supplier<MemcachedClientIF> clientSupplier, String externalCacheVersionKey) {
110         return clientSupplier.get().incr(externalCacheVersionKey, 1, 1);
111     }
112 
113     static Function<String, Long> cacheVersionIncrementer(Supplier<MemcachedClientIF> clientSupplier) {
114         return externalCacheVersionKey -> incrementCacheVersion(clientSupplier, externalCacheVersionKey);
115     }
116 
117     static Function<String, Long> cacheVersionSupplier(Supplier<MemcachedClientIF> clientSupplier) {
118         return externalCacheVersionKey -> obtainCacheVersion(clientSupplier, externalCacheVersionKey);
119     }
120 
121     /**
122      * Calculates the expiry time for a Memcached entry, taking into account the
123      * logic required to handle when the seconds are longer than 30 days.
124      *
125      * @param seconds the period to expiry, in seconds
126      * @return a Memcached compliant expiry time
127      */
128     static int expiryTime(int seconds) {
129         if (seconds < MAX_SECONDS_OFFSET) {
130             return seconds;
131         }
132 
133         final int currentTime = (int) (System.currentTimeMillis() / 1_000);
134         return currentTime + seconds;
135     }
136 }