1 package com.atlassian.vcache.internal.memcached;
2
3 import com.atlassian.marshalling.api.MarshallingPair;
4 import com.atlassian.vcache.ExternalCacheException;
5 import com.atlassian.vcache.ExternalCacheSettings;
6 import com.atlassian.vcache.PutPolicy;
7 import com.atlassian.vcache.internal.MetricLabel;
8 import com.atlassian.vcache.internal.RequestContext;
9 import com.atlassian.vcache.internal.core.ExternalCacheKeyGenerator;
10 import com.atlassian.vcache.internal.core.VCacheCoreUtils;
11 import com.atlassian.vcache.internal.core.metrics.CacheType;
12 import com.atlassian.vcache.internal.core.metrics.MetricsRecorder;
13 import com.atlassian.vcache.internal.core.service.AbstractStableReadExternalCache;
14 import com.atlassian.vcache.internal.core.service.VersionedExternalCacheRequestContext;
15 import net.spy.memcached.MemcachedClientIF;
16 import net.spy.memcached.OperationTimeoutException;
17 import org.slf4j.Logger;
18 import org.slf4j.LoggerFactory;
19
20 import java.time.Duration;
21 import java.util.Map;
22 import java.util.Optional;
23 import java.util.Set;
24 import java.util.concurrent.ExecutionException;
25 import java.util.concurrent.Future;
26 import java.util.function.Supplier;
27 import java.util.stream.Collectors;
28 import java.util.stream.StreamSupport;
29
30 import static com.atlassian.vcache.internal.core.VCacheCoreUtils.isEmpty;
31 import static com.atlassian.vcache.internal.core.VCacheCoreUtils.unmarshall;
32 import static com.atlassian.vcache.internal.memcached.MemcachedUtils.expiryTime;
33 import static com.atlassian.vcache.internal.memcached.MemcachedUtils.putOperationForPolicy;
34 import static java.util.Objects.requireNonNull;
35
36
37
38
39
40
41
42 class MemcachedStableReadExternalCache<V>
43 extends AbstractStableReadExternalCache<V> {
44 private static final Logger log = LoggerFactory.getLogger(MemcachedStableReadExternalCache.class);
45
46 private final Supplier<MemcachedClientIF> clientSupplier;
47 private final Supplier<RequestContext> contextSupplier;
48 private final ExternalCacheKeyGenerator keyGenerator;
49 private final MarshallingPair<V> valueMarshalling;
50 private final int ttlSeconds;
51
52 MemcachedStableReadExternalCache(
53 Supplier<MemcachedClientIF> clientSupplier,
54 Supplier<RequestContext> contextSupplier,
55 ExternalCacheKeyGenerator keyGenerator,
56 String name,
57 MarshallingPair<V> valueMarshalling,
58 ExternalCacheSettings settings,
59 MetricsRecorder metricsRecorder,
60 Duration lockTimeout) {
61 super(name, metricsRecorder, lockTimeout);
62 this.clientSupplier = requireNonNull(clientSupplier);
63 this.contextSupplier = requireNonNull(contextSupplier);
64 this.keyGenerator = requireNonNull(keyGenerator);
65 this.valueMarshalling = requireNonNull(valueMarshalling);
66 this.ttlSeconds = VCacheCoreUtils.roundUpToSeconds(settings.getDefaultTtl().get());
67 }
68
69 @Override
70 public boolean internalPut(String internalKey, V value, PutPolicy policy) {
71 final String externalKey = ensureCacheContext().externalEntryKeyFor(internalKey);
72 final byte[] valueBytes = valueMarshalling.getMarshaller().marshallToBytes(value);
73
74 try {
75 final Future<Boolean> putOp = putOperationForPolicy(
76 policy, clientSupplier.get(), externalKey, expiryTime(ttlSeconds), valueBytes);
77 return putOp.get();
78 } catch (InterruptedException | ExecutionException e) {
79 throw new ExternalCacheException(ExternalCacheException.Reason.UNCLASSIFIED_FAILURE, e);
80 }
81 }
82
83 @Override
84 protected void internalRemove(Iterable<String> internalKeys) {
85
86 if (isEmpty(internalKeys)) {
87 return;
88 }
89
90
91 final VersionedExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
92 final Map<String, Future<Boolean>> deleteOps =
93 StreamSupport.stream(internalKeys.spliterator(), false)
94 .distinct()
95 .collect(Collectors.toMap(
96 k -> k,
97 k -> clientSupplier.get().delete(cacheContext.externalEntryKeyFor(k))
98 ));
99
100
101
102 Exception failureException = null;
103 for (Map.Entry<String, Future<Boolean>> delOp : deleteOps.entrySet()) {
104 try {
105
106
107 delOp.getValue().get();
108 cacheContext.recordValue(delOp.getKey(), Optional.empty());
109 } catch (ExecutionException | InterruptedException ex) {
110 log.info("Cache {}: unable to remove key {}", name, delOp.getKey(), ex);
111 failureException = ex;
112 }
113 }
114
115
116 if (failureException != null) {
117 throw new ExternalCacheException(ExternalCacheException.Reason.NETWORK_FAILURE, failureException);
118 }
119 }
120
121 @Override
122 protected void internalRemoveAll() {
123 final VersionedExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
124 cacheContext.updateCacheVersion(
125 MemcachedUtils.incrementCacheVersion(clientSupplier, cacheContext.externalCacheVersionKey()));
126 }
127
128 @Override
129 protected Logger getLogger() {
130 return log;
131 }
132
133 @Override
134 protected VersionedExternalCacheRequestContext<V> ensureCacheContext() throws OperationTimeoutException {
135 final RequestContext requestContext = contextSupplier.get();
136
137 return requestContext.computeIfAbsent(this, () -> {
138
139
140 log.trace("Cache {}: Setting up a new context", name);
141 final VersionedExternalCacheRequestContext<V> newCacheContext = new VersionedExternalCacheRequestContext<>(
142 keyGenerator, name, requestContext::partitionIdentifier, lockTimeout);
143 newCacheContext.updateCacheVersion(
144 MemcachedUtils.obtainCacheVersion(clientSupplier, newCacheContext.externalCacheVersionKey()));
145 return newCacheContext;
146 });
147 }
148
149 @Override
150 protected V handleCreation(String internalKey, V candidateValue)
151 throws ExecutionException, InterruptedException {
152 final VersionedExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
153 final byte[] candidateValueBytes = valueMarshalling.getMarshaller().marshallToBytes(candidateValue);
154 final String externalKey = cacheContext.externalEntryKeyFor(internalKey);
155
156
157 for (; ; ) {
158 metricsRecorder.record(name, CacheType.EXTERNAL, MetricLabel.NUMBER_OF_REMOTE_GET, 1);
159 final Future<Boolean> addOp = clientSupplier.get().add(externalKey, expiryTime(ttlSeconds), candidateValueBytes);
160 if (addOp.get()) {
161
162
163 break;
164 }
165
166 getLogger().info("Cache {}, unable to add candidate for key {}, retrieve what was added", name, internalKey);
167 metricsRecorder.record(name, CacheType.EXTERNAL, MetricLabel.NUMBER_OF_REMOTE_GET, 1);
168 final Optional<V> otherAddedValue = unmarshall((byte[]) clientSupplier.get().get(externalKey), valueMarshalling);
169 if (otherAddedValue.isPresent()) {
170 return otherAddedValue.get();
171 }
172
173 getLogger().info("Cache {}, unable to retrieve recently added candidate for key {}, looping", name, internalKey);
174 }
175
176 return candidateValue;
177 }
178
179 @Override
180 protected final ExternalCacheException mapException(Exception ex) {
181 return MemcachedUtils.mapException(ex);
182 }
183
184 @Override
185 protected final Optional<V> directGet(String externalKey) {
186 return unmarshall((byte[]) clientSupplier.get().get(externalKey), valueMarshalling);
187 }
188
189 @Override
190 protected final Map<String, Optional<V>> directGetBulk(Set<String> externalKeys) {
191 return MemcachedUtils.directGetBulk(externalKeys, clientSupplier, valueMarshalling);
192 }
193 }