1 package com.atlassian.vcache.internal.memcached;
2
3 import com.atlassian.marshalling.api.MarshallingPair;
4 import com.atlassian.vcache.CasIdentifier;
5 import com.atlassian.vcache.ExternalCacheException;
6 import com.atlassian.vcache.IdentifiedValue;
7 import com.atlassian.vcache.PutPolicy;
8 import com.atlassian.vcache.internal.core.DefaultIdentifiedValue;
9 import com.atlassian.vcache.internal.core.VCacheCoreUtils;
10 import net.spy.memcached.CASValue;
11 import net.spy.memcached.MemcachedClientIF;
12 import net.spy.memcached.OperationTimeoutException;
13 import net.spy.memcached.internal.CheckedOperationTimeoutException;
14 import org.slf4j.Logger;
15 import org.slf4j.LoggerFactory;
16
17 import java.util.Map;
18 import java.util.Optional;
19 import java.util.Set;
20 import java.util.concurrent.ExecutionException;
21 import java.util.concurrent.Future;
22 import java.util.function.Function;
23 import java.util.function.Supplier;
24 import java.util.stream.Collectors;
25
26 import static com.atlassian.vcache.ExternalCacheException.Reason.TIMEOUT;
27 import static com.atlassian.vcache.ExternalCacheException.Reason.UNCLASSIFIED_FAILURE;
28 import static com.atlassian.vcache.internal.core.VCacheCoreUtils.unmarshall;
29
30
31
32
33
34
35 class MemcachedUtils {
36 private static final Logger log = LoggerFactory.getLogger(MemcachedUtils.class);
37
38
39
40
41 private static final int MAX_SECONDS_OFFSET = 60 * 60 * 24 * 30;
42
43 static long safeExtractId(CasIdentifier casId) {
44 if (casId instanceof MemcachedCasIdentifier) {
45 return ((MemcachedCasIdentifier) casId).getId();
46 }
47
48 log.warn("Passed an unknown CasIdentifier instance of class {}.", casId.getClass().getName());
49 throw new ExternalCacheException(UNCLASSIFIED_FAILURE);
50 }
51
52 static <V> Optional<IdentifiedValue<V>> identifiedValueFrom(Future<CASValue<Object>> op, MarshallingPair<V> valueMarshalling) {
53 final CASValue<Object> casValue;
54 try {
55 casValue = op.get();
56 } catch (ExecutionException | InterruptedException ex) {
57 throw new ExternalCacheException(UNCLASSIFIED_FAILURE, ex);
58 }
59
60 if (casValue == null) {
61 return Optional.empty();
62 }
63
64 final CasIdentifier identifier = new MemcachedCasIdentifier(casValue.getCas());
65 final IdentifiedValue<V> iv = new DefaultIdentifiedValue<>(
66 identifier, VCacheCoreUtils.unmarshall((byte[]) casValue.getValue(), valueMarshalling).get());
67 return Optional.of(iv);
68 }
69
70 static Future<Boolean> putOperationForPolicy(
71 PutPolicy policy, MemcachedClientIF client, String externalKey, int defaultTtl, byte[] valueBytes) {
72 final Future<Boolean> putOp;
73 if (policy == PutPolicy.ADD_ONLY) {
74 putOp = client.add(externalKey, defaultTtl, valueBytes);
75 } else if (policy == PutPolicy.PUT_ALWAYS) {
76 putOp = client.set(externalKey, defaultTtl, valueBytes);
77 } else if (policy == PutPolicy.REPLACE_ONLY) {
78 putOp = client.replace(externalKey, defaultTtl, valueBytes);
79 } else {
80 throw new IllegalArgumentException("Unknown put policy: " + policy);
81 }
82
83 return putOp;
84 }
85
86 static ExternalCacheException mapException(Exception ex) {
87 if (ex instanceof OperationTimeoutException || ex instanceof CheckedOperationTimeoutException) {
88 return new ExternalCacheException(TIMEOUT, ex);
89 } else {
90 return new ExternalCacheException(UNCLASSIFIED_FAILURE, ex);
91 }
92 }
93
94 static <V> Map<String, Optional<V>> directGetBulk(
95 Set<String> externalKeys, Supplier<MemcachedClientIF> clientSupplier, MarshallingPair<V> valueMarshalling) {
96
97 final Map<String, Object> haveValues = clientSupplier.get().getBulk(externalKeys);
98
99 return externalKeys.stream().collect(Collectors.toMap(
100 k -> k,
101 k -> unmarshall((byte[]) haveValues.get(k), valueMarshalling)));
102 }
103
104 private static long obtainCacheVersion(Supplier<MemcachedClientIF> clientSupplier, String externalCacheVersionKey) {
105
106 return clientSupplier.get().incr(externalCacheVersionKey, 0, 1);
107 }
108
109 private static long incrementCacheVersion(Supplier<MemcachedClientIF> clientSupplier, String externalCacheVersionKey) {
110 return clientSupplier.get().incr(externalCacheVersionKey, 1, 1);
111 }
112
113 static Function<String, Long> cacheVersionIncrementer(Supplier<MemcachedClientIF> clientSupplier) {
114 return externalCacheVersionKey -> incrementCacheVersion(clientSupplier, externalCacheVersionKey);
115 }
116
117 static Function<String, Long> cacheVersionSupplier(Supplier<MemcachedClientIF> clientSupplier) {
118 return externalCacheVersionKey -> obtainCacheVersion(clientSupplier, externalCacheVersionKey);
119 }
120
121
122
123
124
125
126
127
128 static int expiryTime(int seconds) {
129 if (seconds < MAX_SECONDS_OFFSET) {
130 return seconds;
131 }
132
133 final int currentTime = (int) (System.currentTimeMillis() / 1_000);
134 return currentTime + seconds;
135 }
136 }