1 package com.atlassian.vcache.internal.memcached;
2
3 import com.atlassian.marshalling.api.MarshallingPair;
4 import com.atlassian.vcache.ExternalCacheException;
5 import com.atlassian.vcache.ExternalCacheSettings;
6 import com.atlassian.vcache.PutPolicy;
7 import com.atlassian.vcache.internal.MetricLabel;
8 import com.atlassian.vcache.internal.RequestContext;
9 import com.atlassian.vcache.internal.core.ExternalCacheKeyGenerator;
10 import com.atlassian.vcache.internal.core.VCacheCoreUtils;
11 import com.atlassian.vcache.internal.core.metrics.CacheType;
12 import com.atlassian.vcache.internal.core.metrics.MetricsRecorder;
13 import com.atlassian.vcache.internal.core.service.AbstractStableReadExternalCache;
14 import com.atlassian.vcache.internal.core.service.VersionedExternalCacheRequestContext;
15 import net.spy.memcached.MemcachedClientIF;
16 import org.slf4j.Logger;
17 import org.slf4j.LoggerFactory;
18
19 import java.util.Map;
20 import java.util.Optional;
21 import java.util.Set;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.Future;
24 import java.util.function.Supplier;
25 import java.util.stream.Collectors;
26 import java.util.stream.StreamSupport;
27
28 import static com.atlassian.vcache.internal.core.VCacheCoreUtils.isEmpty;
29 import static com.atlassian.vcache.internal.core.VCacheCoreUtils.unmarshall;
30 import static com.atlassian.vcache.internal.memcached.MemcachedUtils.expiryTime;
31 import static com.atlassian.vcache.internal.memcached.MemcachedUtils.putOperationForPolicy;
32 import static java.util.Objects.requireNonNull;
33
34
35
36
37
38
39
40 class MemcachedStableReadExternalCache<V>
41 extends AbstractStableReadExternalCache<V> {
42 private static final Logger log = LoggerFactory.getLogger(MemcachedStableReadExternalCache.class);
43
44 private final Supplier<MemcachedClientIF> clientSupplier;
45 private final Supplier<RequestContext> contextSupplier;
46 private final ExternalCacheKeyGenerator keyGenerator;
47 private final MarshallingPair<V> valueMarshalling;
48 private final int ttlSeconds;
49
50 MemcachedStableReadExternalCache(
51 MemcachedVCacheServiceSettings serviceSettings,
52 Supplier<RequestContext> contextSupplier,
53 ExternalCacheKeyGenerator keyGenerator,
54 String name,
55 MarshallingPair<V> valueMarshalling,
56 ExternalCacheSettings settings,
57 MetricsRecorder metricsRecorder) {
58 super(name, metricsRecorder, serviceSettings.getLockTimeout(), serviceSettings.getExternalCacheExceptionListener());
59 this.clientSupplier = requireNonNull(serviceSettings.getClientSupplier());
60 this.contextSupplier = requireNonNull(contextSupplier);
61 this.keyGenerator = requireNonNull(keyGenerator);
62 this.valueMarshalling = requireNonNull(valueMarshalling);
63 this.ttlSeconds = VCacheCoreUtils.roundUpToSeconds(settings.getDefaultTtl().get());
64 }
65
66 @Override
67 public boolean internalPut(String internalKey, V value, PutPolicy policy) {
68 final String externalKey = ensureCacheContext().externalEntryKeyFor(internalKey);
69 final byte[] valueBytes = valueMarshalling.getMarshaller().marshallToBytes(value);
70
71 try {
72 final Future<Boolean> putOp = putOperationForPolicy(
73 policy, clientSupplier.get(), externalKey, expiryTime(ttlSeconds), valueBytes);
74 return putOp.get();
75 } catch (InterruptedException | ExecutionException e) {
76 throw new ExternalCacheException(ExternalCacheException.Reason.UNCLASSIFIED_FAILURE, e);
77 }
78 }
79
80 @Override
81 protected void internalRemove(Iterable<String> internalKeys) {
82
83 if (isEmpty(internalKeys)) {
84 return;
85 }
86
87
88 final VersionedExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
89 final Map<String, Future<Boolean>> deleteOps =
90 StreamSupport.stream(internalKeys.spliterator(), false)
91 .distinct()
92 .collect(Collectors.toMap(
93 k -> k,
94 k -> clientSupplier.get().delete(cacheContext.externalEntryKeyFor(k))
95 ));
96
97
98
99 Exception failureException = null;
100 for (Map.Entry<String, Future<Boolean>> delOp : deleteOps.entrySet()) {
101 try {
102
103
104 delOp.getValue().get();
105 cacheContext.recordValue(delOp.getKey(), Optional.empty());
106 } catch (ExecutionException | InterruptedException ex) {
107 log.info("Cache {}: unable to remove key {}", name, delOp.getKey(), ex);
108 failureException = ex;
109 }
110 }
111
112
113 if (failureException != null) {
114 throw new ExternalCacheException(ExternalCacheException.Reason.NETWORK_FAILURE, failureException);
115 }
116 }
117
118 @Override
119 protected void internalRemoveAll() {
120 final VersionedExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
121 cacheContext.updateCacheVersion(MemcachedUtils.cacheVersionIncrementer(clientSupplier));
122 }
123
124 @Override
125 protected Logger getLogger() {
126 return log;
127 }
128
129 @Override
130 protected VersionedExternalCacheRequestContext<V> ensureCacheContext() {
131 final RequestContext requestContext = contextSupplier.get();
132
133 return requestContext.computeIfAbsent(this, () -> {
134
135
136 log.trace("Cache {}: Setting up a new context", name);
137 return new VersionedExternalCacheRequestContext<>(
138 keyGenerator, name, requestContext::partitionIdentifier,
139 MemcachedUtils.cacheVersionSupplier(clientSupplier),
140 lockTimeout);
141 });
142 }
143
144 @Override
145 protected V handleCreation(String internalKey, V candidateValue)
146 throws ExecutionException, InterruptedException {
147 final VersionedExternalCacheRequestContext<V> cacheContext = ensureCacheContext();
148 final byte[] candidateValueBytes = valueMarshalling.getMarshaller().marshallToBytes(candidateValue);
149 final String externalKey = cacheContext.externalEntryKeyFor(internalKey);
150
151
152 for (; ; ) {
153 metricsRecorder.record(name, CacheType.EXTERNAL, MetricLabel.NUMBER_OF_REMOTE_GET, 1);
154 final Future<Boolean> addOp = clientSupplier.get().add(externalKey, expiryTime(ttlSeconds), candidateValueBytes);
155 if (addOp.get()) {
156
157
158 break;
159 }
160
161 getLogger().info("Cache {}, unable to add candidate for key {}, retrieve what was added", name, internalKey);
162 metricsRecorder.record(name, CacheType.EXTERNAL, MetricLabel.NUMBER_OF_REMOTE_GET, 1);
163 final Optional<V> otherAddedValue = unmarshall((byte[]) clientSupplier.get().get(externalKey), valueMarshalling);
164 if (otherAddedValue.isPresent()) {
165 return otherAddedValue.get();
166 }
167
168 getLogger().info("Cache {}, unable to retrieve recently added candidate for key {}, looping", name, internalKey);
169 }
170
171 return candidateValue;
172 }
173
174 @Override
175 protected final ExternalCacheException mapException(Exception ex) {
176 return MemcachedUtils.mapException(ex);
177 }
178
179 @Override
180 protected final Optional<V> directGet(String externalKey) {
181 return unmarshall((byte[]) clientSupplier.get().get(externalKey), valueMarshalling);
182 }
183
184 @Override
185 protected final Map<String, Optional<V>> directGetBulk(Set<String> externalKeys) {
186 return MemcachedUtils.directGetBulk(externalKeys, clientSupplier, valueMarshalling);
187 }
188 }