diff --git a/evcache-client/build.gradle b/evcache-client/build.gradle index 572966ad..950a71d8 100644 --- a/evcache-client/build.gradle +++ b/evcache-client/build.gradle @@ -17,7 +17,7 @@ configurations { dependencies { compile project(':evcache-core') compile group:"net.spy", name:"spymemcached", version:"2.11.4" - compile group:"com.netflix.evcache", name:"evcache-core", version:"5.+" + compile group:"com.google.code.findbugs", name:"annotations", version:"latest.release" compile group:"com.netflix.archaius", name:"archaius2-api", version:"latest.release" compile group:"com.netflix.archaius", name:"archaius2-core", version:"latest.release" diff --git a/evcache-client/test/com/netflix/evcache/test/EVCacheTestDI.java b/evcache-client/test/com/netflix/evcache/test/EVCacheTestDI.java index d1b1a3cc..c247fc55 100644 --- a/evcache-client/test/com/netflix/evcache/test/EVCacheTestDI.java +++ b/evcache-client/test/com/netflix/evcache/test/EVCacheTestDI.java @@ -5,21 +5,32 @@ import java.util.Map; import java.util.Properties; -import java.util.concurrent.Future; +import java.util.*; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import com.netflix.evcache.*; +import com.netflix.evcache.operation.EVCacheItem; +import com.netflix.evcache.operation.EVCacheItemMetaData; +import com.netflix.evcache.pool.EVCacheClient; +import com.netflix.evcache.pool.ServerGroup; +import com.netflix.evcache.util.KeyHasher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.Test; -import com.netflix.evcache.EVCache; -import com.netflix.evcache.EVCacheGetOperationListener; import com.netflix.evcache.operation.EVCacheOperationFuture; - import rx.schedulers.Schedulers; + +import static org.testng.Assert.*; + public class EVCacheTestDI extends DIBase implements EVCacheGetOperationListener { private static final Logger log = LoggerFactory.getLogger(EVCacheTestDI.class); private int loops = 1; + private Map propertiesToSet; + private String appName = "EVCACHE_TEST"; public static void main(String args[]) { try { @@ -31,31 +42,32 @@ public static void main(String args[]) { } public EVCacheTestDI() { + propertiesToSet = new HashMap<>(); + propertiesToSet.putIfAbsent(appName + ".us-east-1d.EVCacheClientPool.writeOnly", "false"); + propertiesToSet.putIfAbsent(appName + ".EVCacheClientPool.poolSize", "1"); + propertiesToSet.putIfAbsent(appName + ".ping.servers", "false"); + propertiesToSet.putIfAbsent(appName + ".cid.throw.exception", "true"); + propertiesToSet.putIfAbsent(appName + ".EVCacheClientPool.readTimeout", "500"); + propertiesToSet.putIfAbsent(appName + ".EVCacheClientPool.bulkReadTimeout", "500"); + propertiesToSet.putIfAbsent(appName + ".max.read.queue.length", "20"); + propertiesToSet.putIfAbsent("EVCacheClientPoolManager.log.apps", appName); + propertiesToSet.putIfAbsent(appName + ".fallback.zone", "true"); + propertiesToSet.putIfAbsent(appName + ".enable.throttling", "false"); + propertiesToSet.putIfAbsent(appName + ".throttle.time", "0"); + propertiesToSet.putIfAbsent(appName + ".throttle.percent", "0"); + propertiesToSet.putIfAbsent(appName + ".log.operation", "1000"); + propertiesToSet.putIfAbsent(appName + ".EVCacheClientPool.validate.input.queue", "true"); } protected Properties getProps() { Properties props = super.getProps(); - props.setProperty("EVCACHE_CCS.us-east-1d.EVCacheClientPool.writeOnly", "false"); - props.setProperty("EVCACHE_CCS.EVCacheClientPool.poolSize", "1"); - props.setProperty("EVCACHE_CCS.ping.servers", "false"); - props.setProperty("EVCACHE_CCS.cid.throw.exception", "true"); - props.setProperty("EVCACHE_CCS.EVCacheClientPool.readTimeout", "500"); - props.setProperty("EVCACHE_CCS.EVCacheClientPool.bulkReadTimeout", "500"); - props.setProperty("EVCACHE_CCS.max.read.queue.length", "20"); - props.setProperty("EVCacheClientPoolManager.log.apps", "EVCACHE_CCS"); - props.setProperty("EVCACHE_CCS.fallback.zone", "true"); - props.setProperty("EVCACHE_CCS.enable.throttling", "false"); - props.setProperty("EVCACHE_CCS.throttle.time", "0"); - props.setProperty("EVCACHE_CCS.throttle.percent", "0"); - props.setProperty("EVCACHE_CCS.log.operation", "1000"); - props.setProperty("EVCACHE_CCS.EVCacheClientPool.validate.input.queue", "true"); - + propertiesToSet.entrySet().forEach(entry -> props.setProperty(entry.getKey(), entry.getValue())); return props; } @Test public void testEVCache() { - this.evCache = getNewBuilder().setAppName("EVCACHE_CCS").setCachePrefix("cid").enableRetry().build(); + this.evCache = getNewBuilder().setAppName(appName).setCachePrefix("cid").enableRetry().build(); assertNotNull(evCache); } @@ -86,7 +98,7 @@ public void testKeySizeCheck() throws Exception { } assertTrue(exceptionThrown); } - + } @Test(dependsOnMethods = { "testKeySizeCheck" }) @@ -241,6 +253,179 @@ public void testAppendOrAdd() throws Exception { } } + private void refreshEVCache() { + setupEnv(); + testEVCache(); + } + + @Test(dependsOnMethods = {"testAppendOrAdd"}) + public void functionalTestsWithAppLevelAndASGLevelHashingScenarios() throws Exception { + refreshEVCache(); + + // no hashing + assertFalse(manager.getEVCacheConfig().getPropertyRepository().get(appName + ".hash.key", Boolean.class).orElse(false).get()); + doFunctionalTests(false); + + // hashing at app level + propertiesToSet.put(appName + ".hash.key", "true"); + refreshEVCache(); + assertTrue(manager.getEVCacheConfig().getPropertyRepository().get(appName + ".hash.key", Boolean.class).orElse(false).get()); + doFunctionalTests(true); + propertiesToSet.remove(appName + ".hash.key"); + + // hashing at app level due to auto hashing as a consequence of a large key + propertiesToSet.put(appName + ".auto.hash.keys", "true"); + refreshEVCache(); + assertTrue(manager.getEVCacheConfig().getPropertyRepository().get(appName + ".auto.hash.keys", Boolean.class).orElse(false).get()); + assertFalse(manager.getEVCacheConfig().getPropertyRepository().get(appName + ".hash.key", Boolean.class).orElse(false).get()); + testWithLargeKey(); + // negative scenario + propertiesToSet.remove(appName + ".auto.hash.keys"); + refreshEVCache(); + assertFalse(manager.getEVCacheConfig().getPropertyRepository().get(appName + ".auto.hash.keys", Boolean.class).orElse(false).get()); + assertFalse(manager.getEVCacheConfig().getPropertyRepository().get(appName + ".hash.key", Boolean.class).orElse(false).get()); + assertThrows(IllegalArgumentException.class, () -> { + testWithLargeKey(); + }); + + // hashing at app level by choice AND different hashing at each asg + Map hashingAlgorithmsByServerGroup = new HashMap<>(); + propertiesToSet.put(appName + ".hash.key", "true"); + refreshEVCache(); + assertTrue(manager.getEVCacheConfig().getPropertyRepository().get(appName + ".hash.key", Boolean.class).orElse(false).get()); + + // get server group names, to be used to configure the ASG level hashing properties + Map> clientsByServerGroup = manager.getEVCacheClientPool(appName).getAllInstancesByServerGroup(); + int i = 0; + for (ServerGroup serverGroup : clientsByServerGroup.keySet()) { + KeyHasher.HashingAlgorithm hashingAlgorithm = KeyHasher.HashingAlgorithm.values()[i++ % KeyHasher.HashingAlgorithm.values().length]; + hashingAlgorithmsByServerGroup.put(serverGroup.getName(), hashingAlgorithm); + propertiesToSet.put(serverGroup.getName() + ".hash.key", "true"); + propertiesToSet.put(serverGroup.getName() + ".hash.algo", hashingAlgorithm.name()); + } + refreshEVCache(); + clientsByServerGroup = manager.getEVCacheClientPool(appName).getAllInstancesByServerGroup(); + // validate hashing properties of asgs + for (ServerGroup serverGroup : clientsByServerGroup.keySet()) { + assertEquals(clientsByServerGroup.get(serverGroup).get(0).getHashingAlgorithm(), hashingAlgorithmsByServerGroup.get(serverGroup.getName())); + } + doFunctionalTests(true); + for (ServerGroup serverGroup : clientsByServerGroup.keySet()) { + propertiesToSet.remove(serverGroup.getName()); + } + } + + private void testWithLargeKey() throws Exception { + StringBuilder sb = new StringBuilder(); + for (int i= 0; i < 100; i++) { + sb.append(Long.toString(System.currentTimeMillis())); + } + String key = sb.toString(); + String value = UUID.randomUUID().toString(); + + // set + EVCacheLatch latch = evCache.set(key, value, EVCacheLatch.Policy.ALL); + latch.await(1000, TimeUnit.MILLISECONDS); + + // get + assertEquals(evCache.get(key), value); + } + + private void doFunctionalTests(boolean isHashingEnabled) throws Exception { + String key1 = Long.toString(System.currentTimeMillis()); + String value1 = UUID.randomUUID().toString(); + + // set + EVCacheLatch latch = evCache.set(key1, value1, EVCacheLatch.Policy.ALL); + latch.await(1000, TimeUnit.MILLISECONDS); + + // get + assertEquals(evCache.get(key1), value1); + + // replace + value1 = UUID.randomUUID().toString(); + latch = evCache.replace(key1, value1, EVCacheLatch.Policy.ALL); + latch.await(1000, TimeUnit.MILLISECONDS); + // get + assertEquals(evCache.get(key1), value1); + + // add a key + String key2 = Long.toString(System.currentTimeMillis()); + String value2 = UUID.randomUUID().toString(); + latch = evCache.add(key2, value2, null, 1000, EVCacheLatch.Policy.ALL); + latch.await(1000, TimeUnit.MILLISECONDS); + // get + assertEquals(evCache.get(key2), value2); + + // appendoradd - append case + String value3 = UUID.randomUUID().toString(); + if (isHashingEnabled) { + assertThrows(EVCacheException.class, () -> { + evCache.appendOrAdd(key2, value3, null, 1000, EVCacheLatch.Policy.ALL); + }); + } else { + latch = evCache.appendOrAdd(key2, value3, null, 1000, EVCacheLatch.Policy.ALL); + latch.await(3000, TimeUnit.MILLISECONDS); + assertEquals(evCache.get(key2), value2 + value3); + } + + // appendoradd - add case + String key3 = Long.toString(System.currentTimeMillis()); + String value4 = UUID.randomUUID().toString(); + if (isHashingEnabled) { + assertThrows(EVCacheException.class, () -> { + evCache.appendOrAdd(key3, value4, null, 1000, EVCacheLatch.Policy.ALL); + }); + } else { + latch = evCache.appendOrAdd(key3, value4, null, 1000, EVCacheLatch.Policy.ALL); + latch.await(3000, TimeUnit.MILLISECONDS); + // get + assertEquals(evCache.get(key3), value4); + } + + // append + String value5 = UUID.randomUUID().toString(); + if (isHashingEnabled) { + assertThrows(EVCacheException.class, () -> { + evCache.append(key3, value5, 1000); + }); + } else { + Future futures[] = evCache.append(key3, value5, 1000); + for (Future future : futures) { + assertTrue((Boolean) future.get()); + } + // get + assertEquals(evCache.get(key3), value4 + value5); + } + + String key4 = Long.toString(System.currentTimeMillis()); + assertEquals(evCache.incr(key4, 1, 10, 1000), 10); + assertEquals(evCache.incr(key4, 10, 10, 1000), 20); + + // decr + String key5 = Long.toString(System.currentTimeMillis()); + assertEquals(evCache.decr(key5, 1, 10, 1000), 10); + assertEquals(evCache.decr(key5, 20, 10, 1000), 0); + + // delete + latch = evCache.delete(key1, EVCacheLatch.Policy.ALL); + latch.await(1000, TimeUnit.MILLISECONDS); + latch = evCache.delete(key2, EVCacheLatch.Policy.ALL); + latch.await(1000, TimeUnit.MILLISECONDS); + latch = evCache.delete(key3, EVCacheLatch.Policy.ALL); + latch.await(1000, TimeUnit.MILLISECONDS); + latch = evCache.delete(key4, EVCacheLatch.Policy.ALL); + latch.await(1000, TimeUnit.MILLISECONDS); + latch = evCache.delete(key5, EVCacheLatch.Policy.ALL); + latch.await(1000, TimeUnit.MILLISECONDS); + + assertNull(evCache.get(key1)); + assertNull(evCache.get(key2)); + assertNull(evCache.get(key3)); + assertNull(evCache.get(key4)); + assertNull(evCache.get(key5)); + } + public void testAll() { try { setupEnv(); diff --git a/evcache-core/src/main/java/com/netflix/evcache/EVCache.java b/evcache-core/src/main/java/com/netflix/evcache/EVCache.java index bc6f77ea..3db85f40 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/EVCache.java +++ b/evcache-core/src/main/java/com/netflix/evcache/EVCache.java @@ -1392,6 +1392,10 @@ public Builder customizeWith(final Customizer customizer) { return this; } + protected EVCache newImpl(String appName, String cachePrefix, int ttl, Transcoder transcoder, boolean serverGroupRetry, boolean enableExceptionThrowing, EVCacheClientPoolManager poolManager) { + return new EVCacheImpl(appName, cachePrefix, ttl, transcoder, serverGroupRetry, enableExceptionThrowing, poolManager); + } + /** * Returns a newly created {@code EVCache} based on the contents of the * {@code Builder}. @@ -1417,8 +1421,7 @@ public EVCache build() { customize(); - return new EVCacheImpl( - _appName, _cachePrefix, _ttl, _transcoder, _serverGroupRetry, _enableExceptionThrowing, _poolManager); + return newImpl(_appName, _cachePrefix, _ttl, _transcoder, _serverGroupRetry, _enableExceptionThrowing, _poolManager); } } } diff --git a/evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java b/evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java index eaac4fea..55b8d67a 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java +++ b/evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java @@ -27,7 +27,6 @@ import com.netflix.archaius.api.Property; import com.netflix.archaius.api.PropertyRepository; -import com.netflix.evcache.EVCache.Call; import com.netflix.evcache.EVCacheInMemoryCache.DataNotFoundException; import com.netflix.evcache.EVCacheLatch.Policy; import com.netflix.evcache.event.EVCacheEvent; @@ -67,19 +66,19 @@ @SuppressWarnings("unchecked") @edu.umd.cs.findbugs.annotations.SuppressFBWarnings({ "PRMC_POSSIBLY_REDUNDANT_METHOD_CALLS", "WMI_WRONG_MAP_ITERATOR", "DB_DUPLICATE_BRANCHES", "REC_CATCH_EXCEPTION","RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE" }) -final public class EVCacheImpl implements EVCache, EVCacheImplMBean { +public class EVCacheImpl implements EVCache, EVCacheImplMBean { private static final Logger log = LoggerFactory.getLogger(EVCacheImpl.class); private final String _appName; private final String _cacheName; private final String _metricPrefix; - private final Transcoder _transcoder; + protected final Transcoder _transcoder; private final boolean _zoneFallback; private final boolean _throwException; private final int _timeToLive; // defaults to 15 minutes - private EVCacheClientPool _pool; + protected EVCacheClientPool _pool; private final Property _throwExceptionFP, _zoneFallbackFP, _useInMemoryCache; private final Property _bulkZoneFallbackFP; @@ -90,10 +89,12 @@ final public class EVCacheImpl implements EVCache, EVCacheImplMBean { private final Property ignoreTouch; private final Property hashKey; private final Property hashingAlgo; + private final Property shouldEncodeHashKey; + private final Property maxHashingBytes; private final EVCacheTranscoder evcacheValueTranscoder; private final Property maxReadDuration, maxWriteDuration; - private final EVCacheClientPoolManager _poolManager; + protected final EVCacheClientPoolManager _poolManager; private final Map timerMap = new ConcurrentHashMap(); private final Map distributionSummaryMap = new ConcurrentHashMap(); private final Map counterMap = new ConcurrentHashMap(); @@ -147,6 +148,8 @@ final public class EVCacheImpl implements EVCache, EVCacheImplMBean { this.hashKey = propertyRepository.get(appName + ".hash.key", Boolean.class).orElse(false); this.hashingAlgo = propertyRepository.get(appName + ".hash.algo", String.class).orElse("siphash24"); + this.shouldEncodeHashKey = propertyRepository.get(appName + ".hash.encode", Boolean.class).orElse(true); + this.maxHashingBytes = propertyRepository.get(appName + ".hash.max.bytes", Integer.class).orElse(-1); this.autoHashKeys = propertyRepository.get(_appName + ".auto.hash.keys", Boolean.class).orElseGet("evcache.auto.hash.keys").orElse(false); this.evcacheValueTranscoder = new EVCacheTranscoder(); evcacheValueTranscoder.setCompressionThreshold(Integer.MAX_VALUE); @@ -162,10 +165,10 @@ final public class EVCacheImpl implements EVCache, EVCacheImplMBean { }); _pool.pingServers(); - + setupMonitoring(); } - + private void setupMonitoring() { try { final ObjectName mBeanName = ObjectName.getInstance("com.netflix.evcache:Group=" + _appName @@ -181,7 +184,7 @@ private void setupMonitoring() { } } - + EVCacheKey getEVCacheKey(final String key) { if(key == null || key.length() == 0) throw new NullPointerException("Key cannot be null or empty"); @@ -199,20 +202,12 @@ EVCacheKey getEVCacheKey(final String key) { canonicalKey = new StringBuilder(keyLength).append(_cacheName).append(':').append(key).toString(); } - final String hashedKey; - if(hashKey.get()) { - hashedKey = KeyHasher.getHashedKey(canonicalKey, hashingAlgo.get()); - } else if(autoHashKeys.get() && canonicalKey.length() > this.maxKeyLength.get()) { - hashedKey = KeyHasher.getHashedKey(canonicalKey, hashingAlgo.get()); - } else { - hashedKey = null; - } - - if (hashedKey == null && canonicalKey.length() > this.maxKeyLength.get()) { + if (canonicalKey.length() > this.maxKeyLength.get() && !hashKey.get() && !autoHashKeys.get()) { throw new IllegalArgumentException("Key is too long (maxlen = " + this.maxKeyLength.get() + ')'); } - final EVCacheKey evcKey = new EVCacheKey(_appName, key, canonicalKey, hashedKey, hashingAlgo); + boolean shouldHashKeyAtAppLevel = hashKey.get() || (canonicalKey.length() > this.maxKeyLength.get() && autoHashKeys.get()); + final EVCacheKey evcKey = new EVCacheKey(_appName, key, canonicalKey, shouldHashKeyAtAppLevel ? KeyHasher.getHashingAlgorithmFromString(hashingAlgo.get()) : null, this.shouldEncodeHashKey, this.maxHashingBytes); if (log.isDebugEnabled() && shouldLog()) log.debug("Key : " + key + "; EVCacheKey : " + evcKey); return evcKey; } @@ -518,6 +513,10 @@ T doGet(EVCacheKey evcKey , Transcoder tc) throws EVCacheException { } public EVCacheItemMetaData metaDebug(String key) throws EVCacheException { + return this.metaDebugInternal(key, false); + } + + protected EVCacheItemMetaData metaDebugInternal(String key, boolean isOriginalKeyHashed) throws EVCacheException { if (null == key) throw new IllegalArgumentException("Key cannot be null"); final EVCacheKey evcKey = getEVCacheKey(key); final boolean throwExc = doThrowException(); @@ -552,7 +551,7 @@ public EVCacheItemMetaData metaDebug(String key) throws EVCacheException { try { final boolean hasZF = hasZoneFallback(); boolean throwEx = hasZF ? false : throwExc; - EVCacheItemMetaData data = getEVCacheItemMetaData(client, evcKey, throwEx, hasZF); + EVCacheItemMetaData data = getEVCacheItemMetaData(client, evcKey, throwEx, hasZF, isOriginalKeyHashed); if (data == null && hasZF) { final List fbClients = _pool.getEVCacheClientsForReadExcluding(client.getServerGroup()); if (fbClients != null && !fbClients.isEmpty()) { @@ -573,7 +572,7 @@ public EVCacheItemMetaData metaDebug(String key) throws EVCacheException { } } tries++; - data = getEVCacheItemMetaData(fbClient, evcKey, throwEx, (i < fbClients.size() - 1) ? true : false); + data = getEVCacheItemMetaData(fbClient, evcKey, throwEx, (i < fbClients.size() - 1) ? true : false, isOriginalKeyHashed); if (log.isDebugEnabled() && shouldLog()) log.debug("Retry for APP " + _appName + ", key [" + evcKey + (log.isTraceEnabled() ? "], Value [" + data : "") + "], ServerGroup : " + fbClient.getServerGroup()); if (data != null) { client = fbClient; @@ -618,6 +617,10 @@ public EVCacheItemMetaData metaDebug(String key) throws EVCacheException { } public EVCacheItem metaGet(String key, Transcoder tc) throws EVCacheException { + return this.metaGetInternal(key, tc, false); + } + + protected EVCacheItem metaGetInternal(String key, Transcoder tc, boolean isOriginalKeyHashed) throws EVCacheException { if (null == key) throw new IllegalArgumentException("Key cannot be null"); final EVCacheKey evcKey = getEVCacheKey(key); final boolean throwExc = doThrowException(); @@ -652,7 +655,7 @@ public EVCacheItem metaGet(String key, Transcoder tc) throws EVCacheEx try { final boolean hasZF = hasZoneFallback(); boolean throwEx = hasZF ? false : throwExc; - EVCacheItem data = getEVCacheItem(client, evcKey, tc, throwEx, hasZF); + EVCacheItem data = getEVCacheItem(client, evcKey, tc, throwEx, hasZF, isOriginalKeyHashed); if (data == null && hasZF) { final List fbClients = _pool.getEVCacheClientsForReadExcluding(client.getServerGroup()); if (fbClients != null && !fbClients.isEmpty()) { @@ -673,7 +676,7 @@ public EVCacheItem metaGet(String key, Transcoder tc) throws EVCacheEx } } tries++; - data = getEVCacheItem(fbClient, evcKey, tc, throwEx, (i < fbClients.size() - 1) ? true : false); + data = getEVCacheItem(fbClient, evcKey, tc, throwEx, (i < fbClients.size() - 1) ? true : false, isOriginalKeyHashed); if (log.isDebugEnabled() && shouldLog()) log.debug("Retry for APP " + _appName + ", key [" + evcKey + (log.isTraceEnabled() ? "], Value [" + data : "") + "], ServerGroup : " + fbClient.getServerGroup()); if (data != null) { client = fbClient; @@ -718,7 +721,7 @@ public EVCacheItem metaGet(String key, Transcoder tc) throws EVCacheEx } - + private int policyToCount(Policy policy, int count) { if (policy == null) return 0; switch (policy) { @@ -912,7 +915,7 @@ private T getData(EVCacheClient client, EVCacheKey evcKey, Transcoder tc, if (client == null) return null; final Transcoder transcoder = (tc == null) ? ((_transcoder == null) ? (Transcoder) client.getTranscoder() : (Transcoder) _transcoder) : tc; try { - String hashKey = evcKey.getHashKey(client.isDuetClient()); + String hashKey = evcKey.getHashKey(client.isDuetClient(), client.getHashingAlgorithm(), client.shouldEncodeHashKey(), client.getMaxHashingBytes()); String canonicalKey = evcKey.getCanonicalKey(client.isDuetClient()); if(hashKey != null) { @@ -950,10 +953,10 @@ private T getData(EVCacheClient client, EVCacheKey evcKey, Transcoder tc, } } - private EVCacheItemMetaData getEVCacheItemMetaData(EVCacheClient client, EVCacheKey evcKey, boolean throwException, boolean hasZF) throws Exception { + protected EVCacheItemMetaData getEVCacheItemMetaData(EVCacheClient client, EVCacheKey evcKey, boolean throwException, boolean hasZF, boolean isOriginalKeyHashed) throws Exception { if (client == null) return null; try { - return client.metaDebug(evcKey.getDerivedKey(client.isDuetClient())); + return client.metaDebug(isOriginalKeyHashed ? evcKey.getKey() : evcKey.getDerivedKey(client.isDuetClient(), client.getHashingAlgorithm(), client.shouldEncodeHashKey(), client.getMaxHashingBytes())); } catch (EVCacheConnectException ex) { if (log.isDebugEnabled() && shouldLog()) log.debug("EVCacheConnectException while getting with metadata for APP " + _appName + ", key : " + evcKey + "; hasZF : " + hasZF, ex); if (!throwException || hasZF) return null; @@ -973,11 +976,37 @@ private EVCacheItemMetaData getEVCacheItemMetaData(EVCacheClient client, EVCache } } - private EVCacheItem getEVCacheItem(EVCacheClient client, EVCacheKey evcKey, Transcoder tc, boolean throwException, boolean hasZF) throws Exception { + protected EVCacheItem getEVCacheItem(EVCacheClient client, EVCacheKey evcKey, Transcoder tc, boolean throwException, boolean hasZF, boolean isOriginalKeyHashed) throws Exception { if (client == null) return null; final Transcoder transcoder = (tc == null) ? ((_transcoder == null) ? (Transcoder) client.getTranscoder() : (Transcoder) _transcoder) : tc; try { - return client.metaGet(evcKey.getDerivedKey(client.isDuetClient()), transcoder, throwException, hasZF); + String hashKey = isOriginalKeyHashed ? evcKey.getKey() : evcKey.getHashKey(client.isDuetClient(), client.getHashingAlgorithm(), client.shouldEncodeHashKey(), client.getMaxHashingBytes()); + String canonicalKey = evcKey.getCanonicalKey(client.isDuetClient()); + if (hashKey != null) { + final EVCacheItem obj = client.metaGet(hashKey, evcacheValueTranscoder, throwException, hasZF); + if (null == obj) return null; + if (obj.getData() instanceof EVCacheValue) { + final EVCacheValue val = (EVCacheValue) obj.getData(); + if (null == val) { + return null; + } + + // compare the key embedded in the value to the original key only if the original key is not passed hashed + if (!isOriginalKeyHashed && !(val.getKey().equals(canonicalKey))) { + incrementFailure(EVCacheMetricsFactory.KEY_HASH_COLLISION, Call.META_GET.name(), EVCacheMetricsFactory.META_GET_OPERATION); + return null; + } + final CachedData cd = new CachedData(val.getFlags(), val.getValue(), CachedData.MAX_SIZE); + T t = transcoder.decode(cd); + obj.setData(t); + obj.setFlag(val.getFlags()); + return (EVCacheItem) obj; + } else { + return null; + } + } else { + return client.metaGet(canonicalKey, transcoder, throwException, hasZF); + } } catch (EVCacheConnectException ex) { if (log.isDebugEnabled() && shouldLog()) log.debug("EVCacheConnectException while getting with meta data for APP " + _appName + ", key : " + evcKey + "; hasZF : " + hasZF, ex); if (!throwException || hasZF) return null; @@ -1004,7 +1033,7 @@ private Single getData(int index, int size, EVCacheClient client, EVCache private Single getData(EVCacheClient client, EVCacheKey evcKey, Transcoder tc, boolean throwException, boolean hasZF, Scheduler scheduler) { if (client == null) return Single.error(new IllegalArgumentException("Client cannot be null")); - if(hashKey.get()) { + if(evcKey.getHashKey(client.isDuetClient(), client.getHashingAlgorithm(), client.shouldEncodeHashKey(), client.getMaxHashingBytes()) != null) { return Single.error(new IllegalArgumentException("Not supported")); } else { final Transcoder transcoder = (tc == null) ? ((_transcoder == null) ? (Transcoder) client.getTranscoder() : (Transcoder) _transcoder) : tc; @@ -1043,7 +1072,6 @@ public T getAndTouch(String key, int timeToLive) throws EVCacheException { return this.getAndTouch(key, timeToLive, (Transcoder) _transcoder); } - public Single getAndTouch(String key, int timeToLive, Scheduler scheduler) { return this.getAndTouch(key, timeToLive, (Transcoder) _transcoder, scheduler); } @@ -1376,11 +1404,10 @@ private void touchData(EVCacheKey evcKey, int timeToLive, EVCacheClient[] client touchData(evcKey, timeToLive, clients, null); } - private void touchData(EVCacheKey evcKey, int timeToLive, EVCacheClient[] clients, EVCacheLatch latch ) throws Exception { checkTTL(timeToLive, Call.TOUCH); for (EVCacheClient client : clients) { - client.touch(evcKey.getDerivedKey(client.isDuetClient()), timeToLive, latch); + client.touch(evcKey.getDerivedKey(client.isDuetClient(), client.getHashingAlgorithm(), client.shouldEncodeHashKey(), client.getMaxHashingBytes()), timeToLive, latch); } } @@ -1425,7 +1452,7 @@ private Future getGetFuture(final EVCacheClient client, final String key, final Future r; final long start = EVCacheMetricsFactory.getInstance().getRegistry().clock().wallTime(); try { - String hashKey = evcKey.getHashKey(client.isDuetClient()); + String hashKey = evcKey.getHashKey(client.isDuetClient(), client.getHashingAlgorithm(), client.shouldEncodeHashKey(), client.getMaxHashingBytes()); String canonicalKey = evcKey.getCanonicalKey(client.isDuetClient()); if(hashKey != null) { final Future objFuture = client.asyncGet(hashKey, evcacheValueTranscoder, throwExc, false); @@ -1501,7 +1528,7 @@ private Map getBulkData(EVCacheClient client, Collection keyMap = new HashMap(evcacheKeys.size() * 2); for(EVCacheKey evcKey : evcacheKeys) { String key = evcKey.getCanonicalKey(client.isDuetClient()); - String hashKey = evcKey.getHashKey(client.isDuetClient()); + String hashKey = evcKey.getHashKey(client.isDuetClient(), client.getHashingAlgorithm(), client.shouldEncodeHashKey(), client.getMaxHashingBytes()); if(hashKey != null) { if (log.isDebugEnabled() && shouldLog()) log.debug("APP " + _appName + ", key [" + key + "], has been hashed [" + hashKey + "]"); key = hashKey; @@ -1528,7 +1555,6 @@ private Map getBulkData(EVCacheClient client, Collection Map getBulk(final Collection keys, Transcoder if (retMap == null || retMap.isEmpty()) { if (log.isInfoEnabled() && shouldLog()) log.info("BULK : APP " + _appName + " ; Full cache miss for keys : " + keys); if (event != null) event.setAttribute("status", "BMISS_ALL"); - final Map returnMap = new HashMap(); if (retMap != null && retMap.isEmpty()) { for (String k : keys) { @@ -1765,7 +1790,7 @@ private Map getBulk(final Collection keys, Transcoder } } - if (log.isDebugEnabled() && shouldLog()) log.debug("APP " + _appName + ", BULK : Data [" + decanonicalR + "]"); + if (log.isDebugEnabled() && shouldLog()) log.debug("BulkGet; APP " + _appName + ", keys : " + keys + (log.isTraceEnabled() ? "; value : " + decanonicalR : "")); if (event != null) endEvent(event); return decanonicalR; } catch (net.spy.memcached.internal.CheckedOperationTimeoutException ex) { @@ -1850,11 +1875,15 @@ public EVCacheLatch set(String key, T value, Transcoder tc, EVCacheLatch. } public EVCacheLatch set(String key, T value, Transcoder tc, int timeToLive, Policy policy) throws EVCacheException { + EVCacheClient[] clients = _pool.getEVCacheClientForWrite(); + return this.set(key, value, tc, timeToLive, policy, clients, clients.length - _pool.getWriteOnlyEVCacheClients().length); + } + + protected EVCacheLatch set(String key, T value, Transcoder tc, int timeToLive, Policy policy, EVCacheClient[] clients, int latchCount) throws EVCacheException { if ((null == key) || (null == value)) throw new IllegalArgumentException(); checkTTL(timeToLive, Call.SET); final boolean throwExc = doThrowException(); - final EVCacheClient[] clients = _pool.getEVCacheClientForWrite(); if (clients.length == 0) { incrementFastFail(EVCacheMetricsFactory.NULL_CLIENT, Call.SET); if (throwExc) throw new EVCacheException("Could not find a client to set the data"); @@ -1882,27 +1911,26 @@ public EVCacheLatch set(String key, T value, Transcoder tc, int timeToLiv final long start = EVCacheMetricsFactory.getInstance().getRegistry().clock().wallTime(); String status = EVCacheMetricsFactory.SUCCESS; - final EVCacheLatchImpl latch = new EVCacheLatchImpl(policy == null ? Policy.ALL_MINUS_1 : policy, clients.length - _pool.getWriteOnlyEVCacheClients().length, _appName); + final EVCacheLatchImpl latch = new EVCacheLatchImpl(policy == null ? Policy.ALL_MINUS_1 : policy, latchCount, _appName); try { CachedData cd = null; for (EVCacheClient client : clients) { String canonicalKey = evcKey.getCanonicalKey(client.isDuetClient()); - String hashKey = evcKey.getHashKey(client.isDuetClient()); - if (cd == null) { - if (tc != null) { - cd = tc.encode(value); - } else if ( _transcoder != null) { - cd = ((Transcoder)_transcoder).encode(value); - } else { - cd = client.getTranscoder().encode(value); - } - if(hashKey != null) { - final EVCacheValue val = new EVCacheValue(canonicalKey, cd.getData(), cd.getFlags(), timeToLive, System.currentTimeMillis()); - cd = evcacheValueTranscoder.encode(val); - } + String hashKey = evcKey.getHashKey(client.isDuetClient(), client.getHashingAlgorithm(), client.shouldEncodeHashKey(), client.getMaxHashingBytes()); + if (tc != null) { + cd = tc.encode(value); + } else if (_transcoder != null) { + cd = ((Transcoder) _transcoder).encode(value); + } else { + cd = client.getTranscoder().encode(value); + } + if (hashKey != null) { + final EVCacheValue val = new EVCacheValue(canonicalKey, cd.getData(), cd.getFlags(), timeToLive, System.currentTimeMillis()); + cd = evcacheValueTranscoder.encode(val); } final Future future = client.set(hashKey == null ? canonicalKey : hashKey, cd, timeToLive, latch); - if (log.isDebugEnabled() && shouldLog()) log.debug("SET : APP " + _appName + ", Future " + future + " for key : " + evcKey); + if (log.isDebugEnabled() && shouldLog()) + log.debug("SET : APP " + _appName + ", Future " + future + " for key : " + evcKey); } if (event != null) { event.setTTL(timeToLive); @@ -1970,6 +1998,11 @@ public EVCacheFuture[] append(String key, T value, Transcoder tc, int tim CachedData cd = null; int index = 0; for (EVCacheClient client : clients) { + // ensure key hashing is not enabled + if (evcKey.getHashKey(client.isDuetClient(), client.getHashingAlgorithm(), client.shouldEncodeHashKey(), client.getMaxHashingBytes()) != null) { + throw new IllegalArgumentException("append is not supported when key hashing is enabled."); + } + if (cd == null) { if (tc != null) { cd = tc.encode(value); @@ -1980,7 +2013,7 @@ public EVCacheFuture[] append(String key, T value, Transcoder tc, int tim } //if (cd != null) EVCacheMetricsFactory.getInstance().getDistributionSummary(_appName + "-AppendData-Size", tags).record(cd.getData().length); } - final Future future = client.append(evcKey.getDerivedKey(client.isDuetClient()), cd); + final Future future = client.append(evcKey.getDerivedKey(client.isDuetClient(), client.getHashingAlgorithm(), client.shouldEncodeHashKey(), client.getMaxHashingBytes()), cd); futures[index++] = new EVCacheFuture(future, key, _appName, client.getServerGroup()); } if (event != null) { @@ -2020,7 +2053,11 @@ public EVCacheFuture[] set(String key, T value) throws EVCacheException { } public EVCacheFuture[] delete(String key) throws EVCacheException { - final EVCacheLatch latch = this.delete(key, null); + return this.deleteInternal(key, false); + } + + protected EVCacheFuture[] deleteInternal(String key, boolean isOriginalKeyHashed) throws EVCacheException { + final EVCacheLatch latch = this.deleteInternal(key, null, isOriginalKeyHashed); if (latch == null) return new EVCacheFuture[0]; final List> futures = latch.getAllFutures(); if (futures == null || futures.isEmpty()) return new EVCacheFuture[0]; @@ -2042,6 +2079,10 @@ public EVCacheFuture[] delete(String key) throws EVCacheException { @Override public EVCacheLatch delete(String key, Policy policy) throws EVCacheException { + return this.deleteInternal(key, policy, false); + } + + protected EVCacheLatch deleteInternal(String key, Policy policy, boolean isOriginalKeyHashed) throws EVCacheException { if (key == null) throw new IllegalArgumentException("Key cannot be null"); final boolean throwExc = doThrowException(); @@ -2076,7 +2117,7 @@ public EVCacheLatch delete(String key, Policy policy) throws EVCacheExceptio final EVCacheLatchImpl latch = new EVCacheLatchImpl(policy == null ? Policy.ALL_MINUS_1 : policy, clients.length - _pool.getWriteOnlyEVCacheClients().length, _appName); try { for (int i = 0; i < clients.length; i++) { - Future future = clients[i].delete(evcKey.getDerivedKey(clients[i].isDuetClient()), latch); + Future future = clients[i].delete(isOriginalKeyHashed ? evcKey.getKey() : evcKey.getDerivedKey(clients[i].isDuetClient(), clients[i].getHashingAlgorithm(), clients[i].shouldEncodeHashKey(), clients[i].getMaxHashingBytes()), latch); if (log.isDebugEnabled() && shouldLog()) log.debug("DELETE : APP " + _appName + ", Future " + future + " for key : " + evcKey); } @@ -2151,7 +2192,7 @@ public long incr(String key, long by, long defaultVal, int timeToLive) throws EV final long[] vals = new long[clients.length]; int index = 0; for (EVCacheClient client : clients) { - vals[index] = client.incr(evcKey.getDerivedKey(client.isDuetClient()), by, defaultVal, timeToLive); + vals[index] = client.incr(evcKey.getDerivedKey(client.isDuetClient(), client.getHashingAlgorithm(), client.shouldEncodeHashKey(), client.getMaxHashingBytes()), by, defaultVal, timeToLive); if (vals[index] != -1 && currentValue < vals[index]) { currentValue = vals[index]; if (log.isDebugEnabled()) log.debug("INCR : APP " + _appName + " current value = " + currentValue + " for key : " + key + " from client : " + client); @@ -2166,12 +2207,12 @@ public long incr(String key, long by, long defaultVal, int timeToLive) throws EV if (vals[i] == -1 && currentValue > -1) { if (log.isDebugEnabled()) log.debug("INCR : APP " + _appName + "; Zone " + clients[i].getZone() + " had a value = -1 so setting it to current value = " + currentValue + " for key : " + key); - clients[i].incr(evcKey.getDerivedKey(clients[i].isDuetClient()), 0, currentValue, timeToLive); + clients[i].incr(evcKey.getDerivedKey(clients[i].isDuetClient(), clients[i].getHashingAlgorithm(), clients[i].shouldEncodeHashKey(), clients[i].getMaxHashingBytes()), 0, currentValue, timeToLive); } else if (vals[i] != currentValue) { if(cd == null) cd = clients[i].getTranscoder().encode(String.valueOf(currentValue)); if (log.isDebugEnabled()) log.debug("INCR : APP " + _appName + "; Zone " + clients[i].getZone() + " had a value of " + vals[i] + " so setting it to current value = " + currentValue + " for key : " + key); - clients[i].set(evcKey.getDerivedKey(clients[i].isDuetClient()), cd, timeToLive); + clients[i].set(evcKey.getDerivedKey(clients[i].isDuetClient(), clients[i].getHashingAlgorithm(), clients[i].shouldEncodeHashKey(), clients[i].getMaxHashingBytes()), cd, timeToLive); } } } @@ -2233,7 +2274,7 @@ public long decr(String key, long by, long defaultVal, int timeToLive) throws EV final long[] vals = new long[clients.length]; int index = 0; for (EVCacheClient client : clients) { - vals[index] = client.decr(evcKey.getDerivedKey(client.isDuetClient()), by, defaultVal, timeToLive); + vals[index] = client.decr(evcKey.getDerivedKey(client.isDuetClient(), client.getHashingAlgorithm(), client.shouldEncodeHashKey(), client.getMaxHashingBytes()), by, defaultVal, timeToLive); if (vals[index] != -1 && currentValue < vals[index]) { currentValue = vals[index]; if (log.isDebugEnabled()) log.debug("DECR : APP " + _appName + " current value = " + currentValue + " for key : " + key + " from client : " + client); @@ -2250,13 +2291,13 @@ public long decr(String key, long by, long defaultVal, int timeToLive) throws EV if (log.isDebugEnabled()) log.debug("DECR : APP " + _appName + "; Zone " + clients[i].getZone() + " had a value = -1 so setting it to current value = " + currentValue + " for key : " + key); - clients[i].decr(evcKey.getDerivedKey(clients[i].isDuetClient()), 0, currentValue, timeToLive); + clients[i].decr(evcKey.getDerivedKey(clients[i].isDuetClient(), clients[i].getHashingAlgorithm(), clients[i].shouldEncodeHashKey(), clients[i].getMaxHashingBytes()), 0, currentValue, timeToLive); } else if (vals[i] != currentValue) { if(cd == null) cd = clients[i].getTranscoder().encode(currentValue); if (log.isDebugEnabled()) log.debug("DECR : APP " + _appName + "; Zone " + clients[i].getZone() + " had a value of " + vals[i] + " so setting it to current value = " + currentValue + " for key : " + key); - clients[i].set(evcKey.getDerivedKey(clients[i].isDuetClient()), cd, timeToLive); + clients[i].set(evcKey.getDerivedKey(clients[i].isDuetClient(), clients[i].getHashingAlgorithm(), clients[i].shouldEncodeHashKey(), clients[i].getMaxHashingBytes()), cd, timeToLive); } } } @@ -2334,21 +2375,19 @@ public EVCacheLatch replace(String key, T value, Transcoder tc, int timeT CachedData cd = null; int index = 0; for (EVCacheClient client : clients) { - if (cd == null) { - if (tc != null) { - cd = tc.encode(value); - } else if ( _transcoder != null) { - cd = ((Transcoder)_transcoder).encode(value); - } else { - cd = client.getTranscoder().encode(value); - } + if (tc != null) { + cd = tc.encode(value); + } else if (_transcoder != null) { + cd = ((Transcoder) _transcoder).encode(value); + } else { + cd = client.getTranscoder().encode(value); + } - if(hashKey.get()) { - final EVCacheValue val = new EVCacheValue(evcKey.getCanonicalKey(client.isDuetClient()), cd.getData(), cd.getFlags(), timeToLive, System.currentTimeMillis()); - cd = evcacheValueTranscoder.encode(val); - } + if (evcKey.getHashKey(client.isDuetClient(), client.getHashingAlgorithm(), client.shouldEncodeHashKey(), client.getMaxHashingBytes()) != null) { + final EVCacheValue val = new EVCacheValue(evcKey.getCanonicalKey(client.isDuetClient()), cd.getData(), cd.getFlags(), timeToLive, System.currentTimeMillis()); + cd = evcacheValueTranscoder.encode(val); } - final Future future = client.replace(evcKey.getDerivedKey(client.isDuetClient()), cd, timeToLive, latch); + final Future future = client.replace(evcKey.getDerivedKey(client.isDuetClient(), client.getHashingAlgorithm(), client.shouldEncodeHashKey(), client.getMaxHashingBytes()), cd, timeToLive, latch); futures[index++] = new EVCacheFuture(future, key, _appName, client.getServerGroup()); } if (event != null) { @@ -2426,6 +2465,11 @@ public EVCacheLatch appendOrAdd(String key, T value, Transcoder tc, int t try { CachedData cd = null; for (EVCacheClient client : clients) { + // ensure key hashing is not enabled + if (evcKey.getHashKey(client.isDuetClient(), client.getHashingAlgorithm(), client.shouldEncodeHashKey(), client.getMaxHashingBytes()) != null) { + throw new IllegalArgumentException("appendOrAdd is not supported when key hashing is enabled."); + } + if (cd == null) { if (tc != null) { cd = tc.encode(value); @@ -2435,7 +2479,7 @@ public EVCacheLatch appendOrAdd(String key, T value, Transcoder tc, int t cd = client.getTranscoder().encode(value); } } - final Future future = client.appendOrAdd(evcKey.getDerivedKey(client.isDuetClient()), cd, timeToLive, latch); + final Future future = client.appendOrAdd(evcKey.getDerivedKey(client.isDuetClient(), client.getHashingAlgorithm(), client.shouldEncodeHashKey(), client.getMaxHashingBytes()), cd, timeToLive, latch); if (log.isDebugEnabled() && shouldLog()) log.debug("APPEND_OR_ADD : APP " + _appName + ", Future " + future + " for key : " + evcKey); } if (event != null) { @@ -2471,7 +2515,6 @@ public Future[] appendOrAdd(String key, T value, Transcoder tc, return new EVCacheFuture[0]; } - public boolean add(String key, T value, Transcoder tc, int timeToLive) throws EVCacheException { final EVCacheLatch latch = add(key, value, tc, timeToLive, Policy.NONE); try { @@ -2496,11 +2539,15 @@ public boolean add(String key, T value, Transcoder tc, int timeToLive) th @Override public EVCacheLatch add(String key, T value, Transcoder tc, int timeToLive, Policy policy) throws EVCacheException { + EVCacheClient[] clients = _pool.getEVCacheClientForWrite(); + return this.add(key, value, tc, timeToLive, policy, clients, clients.length - _pool.getWriteOnlyEVCacheClients().length); + } + + protected EVCacheLatch add(String key, T value, Transcoder tc, int timeToLive, Policy policy, EVCacheClient[] clients, int latchCount) throws EVCacheException { if ((null == key) || (null == value)) throw new IllegalArgumentException(); checkTTL(timeToLive, Call.ADD); final boolean throwExc = doThrowException(); - final EVCacheClient[] clients = _pool.getEVCacheClientForWrite(); if (clients.length == 0) { incrementFastFail(EVCacheMetricsFactory.NULL_CLIENT, Call.ADD); if (throwExc) throw new EVCacheException("Could not find a client to Add the data"); @@ -2530,24 +2577,22 @@ public EVCacheLatch add(String key, T value, Transcoder tc, int timeToLiv EVCacheLatch latch = null; try { CachedData cd = null; - if (cd == null) { - if (tc != null) { - cd = tc.encode(value); - } else if ( _transcoder != null) { - cd = ((Transcoder)_transcoder).encode(value); - } else { - cd = _pool.getEVCacheClientForRead().getTranscoder().encode(value); - } + if (tc != null) { + cd = tc.encode(value); + } else if (_transcoder != null) { + cd = ((Transcoder) _transcoder).encode(value); + } else { + cd = _pool.getEVCacheClientForRead().getTranscoder().encode(value); } - if(clientUtil == null) clientUtil = new EVCacheClientUtil(_pool); - latch = clientUtil.add(evcKey, cd, hashKey.get(), evcacheValueTranscoder, timeToLive, policy); + if (clientUtil == null) clientUtil = new EVCacheClientUtil(_appName, _pool.getOperationTimeout().get()); + latch = clientUtil.add(evcKey, cd, evcacheValueTranscoder, timeToLive, policy, clients, latchCount); if (event != null) { event.setTTL(timeToLive); event.setCachedData(cd); - if(_eventsUsingLatchFP.get()) { + if (_eventsUsingLatchFP.get()) { latch.setEVCacheEvent(event); - if(latch instanceof EVCacheLatchImpl) - ((EVCacheLatchImpl)latch).scheduledFutureValidation(); + if (latch instanceof EVCacheLatchImpl) + ((EVCacheLatchImpl) latch).scheduledFutureValidation(); } else { endEvent(event); } diff --git a/evcache-core/src/main/java/com/netflix/evcache/EVCacheInternal.java b/evcache-core/src/main/java/com/netflix/evcache/EVCacheInternal.java new file mode 100644 index 00000000..3ae757c9 --- /dev/null +++ b/evcache-core/src/main/java/com/netflix/evcache/EVCacheInternal.java @@ -0,0 +1,77 @@ +package com.netflix.evcache; + +import com.netflix.evcache.operation.EVCacheItem; +import com.netflix.evcache.operation.EVCacheItemMetaData; +import com.netflix.evcache.pool.EVCacheClientPoolManager; +import net.spy.memcached.CachedData; +import net.spy.memcached.MemcachedNode; +import net.spy.memcached.transcoders.Transcoder; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.Future; + +public interface EVCacheInternal extends EVCache { + EVCacheItem metaGet(String key, Transcoder tc, boolean isOriginalKeyHashed) throws EVCacheException; + + Map metaGetPerClient(String key, Transcoder tc, boolean isOriginalKeyHashed) throws EVCacheException; + + EVCacheItemMetaData metaDebug(String key, boolean isOriginalKeyHashed) throws EVCacheException; + + Map metaDebugPerClient(String key, boolean isOriginalKeyHashed) throws EVCacheException; + + Future[] delete(String key, boolean isOriginalKeyHashed) throws EVCacheException; + + EVCacheLatch addOrSetToWriteOnly(boolean replaceItem, String key, CachedData value, int timeToLive, EVCacheLatch.Policy policy) throws EVCacheException; + + EVCacheLatch addOrSet(boolean replaceItem, String key, CachedData value, int timeToLive, EVCacheLatch.Policy policy, List serverGroups) throws EVCacheException; + + EVCacheLatch addOrSet(boolean replaceItem, String key, CachedData value, int timeToLive, EVCacheLatch.Policy policy, String serverGroup) throws EVCacheException; + + EVCacheLatch addOrSet(boolean replaceItem, String key, CachedData value, int timeToLive, EVCacheLatch.Policy policy, String serverGroupName, List destinationIps) throws EVCacheException; + + KeyHashedState isKeyHashed(String appName, String serverGroup); + + public enum KeyHashedState { + YES, + NO, + MAYBE + } + + public static class CachedValues { + private final String key; + private final CachedData data; + private EVCacheItemMetaData itemMetaData; + + public CachedValues(String key, CachedData data, EVCacheItemMetaData itemMetaData) { + this.key = key; + this.data = data; + this.itemMetaData = itemMetaData; + } + + public String getKey() { + return key; + } + + public CachedData getData() { + return data; + } + + public EVCacheItemMetaData getEVCacheItemMetaData() { + return itemMetaData; + } + + + } + + public class Builder extends EVCache.Builder { + public Builder() { + super(); + } + + @Override + protected EVCache newImpl(String appName, String cachePrefix, int ttl, Transcoder transcoder, boolean serverGroupRetry, boolean enableExceptionThrowing, EVCacheClientPoolManager poolManager) { + return new EVCacheInternalImpl(appName, cachePrefix, ttl, transcoder, serverGroupRetry, enableExceptionThrowing, poolManager); + } + } +} \ No newline at end of file diff --git a/evcache-core/src/main/java/com/netflix/evcache/EVCacheInternalImpl.java b/evcache-core/src/main/java/com/netflix/evcache/EVCacheInternalImpl.java new file mode 100644 index 00000000..64221401 --- /dev/null +++ b/evcache-core/src/main/java/com/netflix/evcache/EVCacheInternalImpl.java @@ -0,0 +1,145 @@ +package com.netflix.evcache; + +import com.netflix.archaius.api.PropertyRepository; +import com.netflix.evcache.operation.EVCacheItem; +import com.netflix.evcache.operation.EVCacheItemMetaData; +import com.netflix.evcache.pool.EVCacheClient; +import com.netflix.evcache.pool.EVCacheClientPoolManager; +import com.netflix.evcache.pool.ServerGroup; +import net.spy.memcached.CachedData; +import net.spy.memcached.MemcachedNode; +import net.spy.memcached.transcoders.Transcoder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import sun.misc.Cache; + +import java.net.InetSocketAddress; +import java.util.*; +import java.util.concurrent.Future; +import java.util.stream.Collectors; + +/** + * This class is for internal-use only by EVCache components, and is not recommended to be used for any other purpose. EVCache and EVCacheImpl are recommended instead. + */ +class EVCacheInternalImpl extends EVCacheImpl implements EVCacheInternal { + private static final Logger log = LoggerFactory.getLogger(EVCacheInternalImpl.class); + + public EVCacheItem metaGet(String key, Transcoder tc, boolean isOriginalKeyHashed) throws EVCacheException { + return this.metaGetInternal(key, tc, isOriginalKeyHashed); + } + + public Map metaGetPerClient(String key, Transcoder tc, boolean isOriginalKeyHashed) throws EVCacheException { + Map map = new HashMap<>(); + final Map> instancesByZone = _pool.getAllInstancesByZone(); + final EVCacheKey evcKey = getEVCacheKey(key); + for (ServerGroup sGroup : instancesByZone.keySet()) { + try { + for (EVCacheClient client : instancesByZone.get(sGroup)) { + EVCacheItem item = getEVCacheItem(client, evcKey, tc, true, false, isOriginalKeyHashed); + map.put(client.getNodeLocator().getPrimary(key), null == item ? null : new CachedValues(key, item.getData(), item.getItemMetaData())); + } + } catch (Exception e) { + log.error("Error getting meta data", e); + } + } + + return map; + } + + public EVCacheItemMetaData metaDebug(String key, boolean isOriginalKeyHashed) throws EVCacheException { + return this.metaDebugInternal(key, isOriginalKeyHashed); + } + + public Map metaDebugPerClient(String key, boolean isOriginalKeyHashed) throws EVCacheException { + Map map = new HashMap<>(); + final Map> instancesByZone = _pool.getAllInstancesByZone(); + final EVCacheKey evcKey = getEVCacheKey(key); + for (ServerGroup sGroup : instancesByZone.keySet()) { + try { + for (EVCacheClient client : instancesByZone.get(sGroup)) { + EVCacheItemMetaData itemMetaData = getEVCacheItemMetaData(client, evcKey, true, false, isOriginalKeyHashed); + map.put(client.getNodeLocator().getPrimary(key), itemMetaData); + } + } catch (Exception e) { + log.error("Error getting meta data", e); + } + } + + return map; + } + + public Future[] delete(String key, boolean isOriginalKeyHashed) throws EVCacheException { + return this.deleteInternal(key, isOriginalKeyHashed); + } + + public EVCacheInternalImpl(String appName, String cacheName, int timeToLive, Transcoder transcoder, boolean enableZoneFallback, + boolean throwException, EVCacheClientPoolManager poolManager) { + super(appName, cacheName, timeToLive, transcoder, enableZoneFallback, throwException, poolManager); + } + + public EVCacheLatch addOrSetToWriteOnly(boolean replaceItem, String key, CachedData value, int timeToLive, EVCacheLatch.Policy policy) throws EVCacheException { + EVCacheClient[] clients = _pool.getWriteOnlyEVCacheClients(); + if (replaceItem) + return set(key, value, null, timeToLive, policy, clients, clients.length); + else + return add(key, value, null, timeToLive, policy, clients, clients.length); + } + + public EVCacheLatch addOrSet(boolean replaceItem, String key, CachedData value, int timeToLive, EVCacheLatch.Policy policy, List serverGroups) throws EVCacheException { + return addOrSet(replaceItem, key, value, timeToLive, policy, serverGroups, null); + } + + public EVCacheLatch addOrSet(boolean replaceItem, String key, CachedData value, int timeToLive, EVCacheLatch.Policy policy, String serverGroupName) throws EVCacheException { + return addOrSet(replaceItem, key, value, timeToLive, policy, serverGroupName, null); + } + + public EVCacheLatch addOrSet(boolean replaceItem, String key, CachedData value, int timeToLive, EVCacheLatch.Policy policy, String serverGroupName, List destinationIps) throws EVCacheException { + List serverGroups = new ArrayList<>(); + serverGroups.add(serverGroupName); + + return addOrSet(replaceItem, key, value, timeToLive, policy, serverGroups, destinationIps); + } + + private EVCacheLatch addOrSet(boolean replaceItem, String key, CachedData value, int timeToLive, EVCacheLatch.Policy policy, List serverGroups, List destinationIps) throws EVCacheException { + Map> clientsByServerGroup = _pool.getAllInstancesByZone(); + + List evCacheClients = clientsByServerGroup.entrySet().stream() + .filter(entry -> serverGroups.contains(entry.getKey().getName())) + .map(Map.Entry::getValue) + .flatMap(List::stream) + .collect(Collectors.toList()); + + if (null != destinationIps && !destinationIps.isEmpty()) { + // identify that evcache client whose primary node is the destination ip for the key being processed + evCacheClients = evCacheClients.stream().filter(client -> + destinationIps.contains(((InetSocketAddress) client.getNodeLocator() + .getPrimary(getEVCacheKey(key).getDerivedKey(client.isDuetClient(), client.getHashingAlgorithm(), client.shouldEncodeHashKey(), client.getMaxHashingBytes())) + .getSocketAddress()).getAddress().getHostAddress()) + ).collect(Collectors.toList()); + } + + EVCacheClient[] evCacheClientsArray = new EVCacheClient[evCacheClients.size()]; + evCacheClients.toArray(evCacheClientsArray); + + if (replaceItem) + return this.set(key, value, null, timeToLive, policy, evCacheClientsArray, evCacheClientsArray.length); + else + return this.add(key, value, null, timeToLive, policy, evCacheClientsArray, evCacheClientsArray.length); + } + + public KeyHashedState isKeyHashed(String appName, String serverGroup) { + PropertyRepository propertyRepository = _poolManager.getEVCacheConfig().getPropertyRepository(); + boolean isKeyHashedAtAppOrAsg = propertyRepository.get(serverGroup + ".hash.key", Boolean.class).orElseGet(appName + ".hash.key").orElse(false).get(); + if (isKeyHashedAtAppOrAsg) { + return KeyHashedState.YES; + } + + if (propertyRepository.get(appName + ".auto.hash.keys", Boolean.class).orElseGet("evcache.auto.hash.keys").orElse(false).get()) { + return KeyHashedState.MAYBE; + } + + return KeyHashedState.NO; + } + + +} diff --git a/evcache-core/src/main/java/com/netflix/evcache/EVCacheKey.java b/evcache-core/src/main/java/com/netflix/evcache/EVCacheKey.java index d0474868..863fd2b9 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/EVCacheKey.java +++ b/evcache-core/src/main/java/com/netflix/evcache/EVCacheKey.java @@ -2,23 +2,31 @@ import com.netflix.archaius.api.Property; import com.netflix.evcache.util.KeyHasher; +import com.netflix.evcache.util.KeyHasher.HashingAlgorithm; +import java.util.HashMap; +import java.util.Map; public class EVCacheKey { private final String appName; - private final Property hashingAlgo; + private final HashingAlgorithm hashingAlgorithmAtAppLevel; + private final Property shouldEncodeHashKeyAtAppLevel; + private final Property maxHashingBytesAtAppLevel; private final String key; private final String canonicalKey; private String canonicalKeyForDuet; - private final String hashKey; - private String hashKeyForDuet; + private final Map hashedKeysByAlgorithm; + private final Map hashedKeysByAlgorithmForDuet; - public EVCacheKey(String appName, String key, String canonicalKey, String hashKey, Property hashingAlgo) { + public EVCacheKey(String appName, String key, String canonicalKey, HashingAlgorithm hashingAlgorithmAtAppLevel, Property shouldEncodeHashKeyAtAppLevel, Property maxHashingBytesAtAppLevel) { super(); this.appName = appName; this.key = key; this.canonicalKey = canonicalKey; - this.hashKey = hashKey; - this.hashingAlgo = hashingAlgo; + this.hashingAlgorithmAtAppLevel = hashingAlgorithmAtAppLevel; + this.shouldEncodeHashKeyAtAppLevel = shouldEncodeHashKeyAtAppLevel; + this.maxHashingBytesAtAppLevel = maxHashingBytesAtAppLevel; + hashedKeysByAlgorithm = new HashMap<>(); + hashedKeysByAlgorithmForDuet = new HashMap<>(); } public String getKey() { @@ -45,27 +53,56 @@ private String getCanonicalKeyForDuet() { @Deprecated public String getHashKey() { - return hashKey; + return getHashKey(hashingAlgorithmAtAppLevel, null == shouldEncodeHashKeyAtAppLevel ? null : shouldEncodeHashKeyAtAppLevel.get(), null == maxHashingBytesAtAppLevel ? null : maxHashingBytesAtAppLevel.get()); } - public String getHashKey(boolean isDuet) { - return isDuet ? getHashKeyForDuet() : hashKey; + // overlays app level hashing and client level hashing + public String getHashKey(boolean isDuet, HashingAlgorithm hashingAlgorithm, Boolean shouldEncodeHashKey, Integer maxHashingBytes) { + if (hashingAlgorithm == HashingAlgorithm.NO_HASHING) { + return null; + } + + if (null == hashingAlgorithm) { + hashingAlgorithm = hashingAlgorithmAtAppLevel; + } + + if (null == shouldEncodeHashKey) { + shouldEncodeHashKey = this.shouldEncodeHashKeyAtAppLevel.get(); + } + + if (null == maxHashingBytes) { + maxHashingBytes = this.maxHashingBytesAtAppLevel.get(); + } + + return isDuet ? getHashKeyForDuet(hashingAlgorithm, shouldEncodeHashKey, maxHashingBytes) : getHashKey(hashingAlgorithm, shouldEncodeHashKey, maxHashingBytes); + } + + // overlays app level hashing algorithm and client level hashing algorithm + public String getDerivedKey(boolean isDuet, HashingAlgorithm hashingAlgorithm, Boolean shouldEncodeHashKey, Integer maxHashingBytes) { + // this overlay of hashingAlgorithm helps determine if there at all needs to be hashing performed, otherwise, will return canonical key + if (null == hashingAlgorithm) { + hashingAlgorithm = hashingAlgorithmAtAppLevel; + } + + return null == hashingAlgorithm || hashingAlgorithm == HashingAlgorithm.NO_HASHING ? getCanonicalKey(isDuet) : getHashKey(isDuet, hashingAlgorithm, shouldEncodeHashKey, maxHashingBytes); } - private String getHashKeyForDuet() { - if (null == hashKeyForDuet && null != hashKey) { - hashKeyForDuet = KeyHasher.getHashedKey(getCanonicalKeyForDuet(), hashingAlgo.get()); + private String getHashKey(HashingAlgorithm hashingAlgorithm, Boolean shouldEncodeHashKey, Integer maxHashingBytes) { + if (null == hashingAlgorithm) { + return null; } - return hashKeyForDuet; + // TODO: Once the issue around passing hashedKey in bytes[] is figured, we will start using (nullable) shouldEncodeHashKey, and call KeyHasher.getHashedKeyInBytes() accordingly + return hashedKeysByAlgorithm.computeIfAbsent(hashingAlgorithm, ha -> KeyHasher.getHashedKeyEncoded(canonicalKey, ha, maxHashingBytes)); } - - public String getDerivedKey(boolean isDuet) - { - if (isDuet) - return null == getHashKeyForDuet() ? getCanonicalKeyForDuet() : getHashKeyForDuet(); - return null == hashKey ? canonicalKey : hashKey; + private String getHashKeyForDuet(HashingAlgorithm hashingAlgorithm, Boolean shouldEncodeHashKey, Integer maxHashingBytes) { + if (null == hashingAlgorithm) { + return null; + } + + // TODO: Once the issue around passing hashedKey in bytes[] is figured, we will start using (nullable) shouldEncodeHashKey, and call KeyHasher.getHashedKeyInBytes() accordingly + return hashedKeysByAlgorithmForDuet.computeIfAbsent(hashingAlgorithm, ha -> KeyHasher.getHashedKeyEncoded(getCanonicalKeyForDuet(), ha, maxHashingBytes)); } @Override @@ -74,8 +111,6 @@ public int hashCode() { int result = 1; result = prime * result + ((canonicalKey == null) ? 0 : canonicalKey.hashCode()); result = prime * result + ((canonicalKeyForDuet == null) ? 0 : canonicalKeyForDuet.hashCode()); - result = prime * result + ((hashKey == null) ? 0 : hashKey.hashCode()); - result = prime * result + ((hashKeyForDuet == null) ? 0 : hashKeyForDuet.hashCode()); result = prime * result + ((key == null) ? 0 : key.hashCode()); return result; } @@ -99,16 +134,6 @@ public boolean equals(Object obj) { return false; } else if (!canonicalKeyForDuet.equals(other.canonicalKeyForDuet)) return false; - if (hashKey == null) { - if (other.hashKey != null) - return false; - } else if (!hashKey.equals(other.hashKey)) - return false; - if (hashKeyForDuet == null) { - if (other.hashKeyForDuet != null) - return false; - } else if (!hashKeyForDuet.equals(other.hashKeyForDuet)) - return false; if (key == null) { if (other.key != null) return false; @@ -119,7 +144,7 @@ public boolean equals(Object obj) { @Override public String toString() { - return "EVCacheKey [key=" + key + ", canonicalKey=" + canonicalKey + ", canonicalKeyForDuet=" + canonicalKeyForDuet + (hashKey != null ? ", hashKey=" + hashKey : "") + (hashKeyForDuet != null ? ", hashKeyForDuet=" + hashKeyForDuet + "]" : "]"); + return "EVCacheKey [key=" + key + ", canonicalKey=" + canonicalKey + ", canonicalKeyForDuet=" + canonicalKeyForDuet + (hashedKeysByAlgorithm.size() > 0 ? ", hashedKeysByAlgorithm=" + hashedKeysByAlgorithm.toString() : "") + (hashedKeysByAlgorithmForDuet.size() > 0 ? ", hashedKeysByAlgorithmForDuet=" + hashedKeysByAlgorithmForDuet.toString() + "]" : "]"); } } \ No newline at end of file diff --git a/evcache-core/src/main/java/com/netflix/evcache/event/EVCacheEvent.java b/evcache-core/src/main/java/com/netflix/evcache/event/EVCacheEvent.java index 68f9fb27..57a7a34a 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/event/EVCacheEvent.java +++ b/evcache-core/src/main/java/com/netflix/evcache/event/EVCacheEvent.java @@ -170,7 +170,7 @@ public Collection getCanonicalKeys() { public Collection getMemcachedNode(EVCacheKey evckey) { final Collection nodeList = new ArrayList(clients.size()); for(EVCacheClient client : clients) { - String key = evckey.getDerivedKey(client.isDuetClient()); + String key = evckey.getDerivedKey(client.isDuetClient(), client.getHashingAlgorithm(), client.shouldEncodeHashKey(), client.getMaxHashingBytes()); nodeList.add(client.getNodeLocator().getPrimary(key)); } return nodeList; diff --git a/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java b/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java index c4d8e073..4871bb73 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java +++ b/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java @@ -35,7 +35,6 @@ import com.netflix.evcache.EVCacheException; import com.netflix.evcache.EVCacheLatch; import com.netflix.evcache.EVCacheReadQueueException; -import com.netflix.evcache.EVCacheTranscoder; import com.netflix.evcache.metrics.EVCacheMetricsFactory; import com.netflix.evcache.operation.EVCacheFutures; import com.netflix.evcache.operation.EVCacheItem; @@ -44,6 +43,7 @@ import com.netflix.evcache.pool.observer.EVCacheConnectionObserver; import com.netflix.evcache.util.EVCacheConfig; import com.netflix.evcache.util.KeyHasher; +import com.netflix.evcache.util.KeyHasher.HashingAlgorithm; import com.netflix.spectator.api.BasicTag; import com.netflix.spectator.api.Counter; import com.netflix.spectator.api.Tag; @@ -88,11 +88,11 @@ public class EVCacheClient { private final Property maxReadQueueSize; private final Property ignoreInactiveNodes; private final Property enableChunking; - private final Property hashKeyByApp; private final Property hashKeyByServerGroup; + private final Property shouldEncodeHashKey; + private final Property maxHashingBytes; private final Property chunkSize, writeBlock; private final ChunkTranscoder chunkingTranscoder; - private final EVCacheTranscoder evcacheValueTranscoder; private final SerializingTranscoder decodingTranscoder; private static final int SPECIAL_BYTEARRAY = (8 << 8); private final EVCacheClientPool pool; @@ -148,12 +148,10 @@ public class EVCacheClient { this.decodingTranscoder = new SerializingTranscoder(Integer.MAX_VALUE); decodingTranscoder.setCompressionThreshold(Integer.MAX_VALUE); - this.evcacheValueTranscoder = new EVCacheTranscoder(); - evcacheValueTranscoder.setCompressionThreshold(Integer.MAX_VALUE); - - this.hashKeyByApp = EVCacheConfig.getInstance().getPropertyRepository().get(appName + ".hash.key", Boolean.class).orElseGet(appName + ".auto.hash.keys").orElseGet("evcache.auto.hash.keys").orElse(false); - this.hashKeyByServerGroup = EVCacheConfig.getInstance().getPropertyRepository().get(this.serverGroup.getName() + ".hash.key", Boolean.class).orElse(false); + this.hashKeyByServerGroup = EVCacheConfig.getInstance().getPropertyRepository().get(this.serverGroup.getName() + ".hash.key", Boolean.class).orElse(null); this.hashingAlgo = EVCacheConfig.getInstance().getPropertyRepository().get(this.serverGroup.getName() + ".hash.algo", String.class).orElseGet(appName + ".hash.algo").orElse("siphash24"); + this.shouldEncodeHashKey = EVCacheConfig.getInstance().getPropertyRepository().get(this.serverGroup.getName() + ".hash.encode", Boolean.class).orElse(null); + this.maxHashingBytes = EVCacheConfig.getInstance().getPropertyRepository().get(this.serverGroup.getName() + ".hash.max.bytes", Integer.class).orElse(null); ping(); } @@ -172,6 +170,14 @@ public boolean isDuetClient() { return isDuetClient; } + public Boolean shouldEncodeHashKey() { + return this.shouldEncodeHashKey.get(); + } + + public Integer getMaxHashingBytes() { + return this.maxHashingBytes.get(); + } + private Collection validateReadQueueSize(Collection canonicalKeys, EVCache.Call call) throws EVCacheException { if (evcacheMemcachedClient.getNodeLocator() == null) return canonicalKeys; final Collection retKeys = new ArrayList<>(canonicalKeys.size()); @@ -846,24 +852,6 @@ public long decr(String key, long by, long defaultVal, int timeToLive) throws EV public T get(String key, Transcoder tc, boolean _throwException, boolean hasZF, boolean chunked) throws Exception { if (chunked) { return assembleChunks(key, false, 0, tc, hasZF); - } else if(shouldHashKey()) { - final String hKey = getHashedKey(key); - final Object obj = evcacheMemcachedClient.asyncGet(hKey, evcacheValueTranscoder, null).get(readTimeout.get(), TimeUnit.MILLISECONDS, _throwException, hasZF); - if(obj instanceof EVCacheValue) { - final EVCacheValue val = (EVCacheValue)obj; - if(val == null || !(val.getKey().equals(key))) { - incrementFailure(EVCacheMetricsFactory.KEY_HASH_COLLISION, Call.GET); - return null; - } - final CachedData cd = new CachedData(val.getFlags(), val.getValue(), CachedData.MAX_SIZE); - if(tc == null) { - return (T)evcacheMemcachedClient.getTranscoder().decode(cd); - } else { - return tc.decode(cd); - } - } else { - return null; - } } else { return evcacheMemcachedClient.asyncGet(key, tc, null).get(readTimeout.get(), TimeUnit.MILLISECONDS, _throwException, hasZF); @@ -885,24 +873,6 @@ public T get(String key, Transcoder tc, boolean _throwException, boolean public Single get(String key, Transcoder tc, boolean _throwException, boolean hasZF, boolean chunked, Scheduler scheduler) throws Exception { if (chunked) { return assembleChunks(key, _throwException, 0, tc, hasZF, scheduler); - } else if(shouldHashKey()) { - final String hKey = getHashedKey(key); - final Object obj = evcacheMemcachedClient.asyncGet(hKey, evcacheValueTranscoder, null).get(readTimeout.get(), TimeUnit.MILLISECONDS, _throwException, hasZF); - if(obj instanceof EVCacheValue) { - final EVCacheValue val = (EVCacheValue)obj; - if(val == null || !(val.getKey().equals(key))) { - incrementFailure(EVCacheMetricsFactory.KEY_HASH_COLLISION, Call.GET); - return null; - } - final CachedData cd = new CachedData(val.getFlags(), val.getValue(), CachedData.MAX_SIZE); - if(tc == null) { - return Single.just((T)evcacheMemcachedClient.getTranscoder().decode(cd)); - } else { - return Single.just(tc.decode(cd)); - } - } else { - return null; - } } else { return evcacheMemcachedClient.asyncGet(key, tc, null) .get(readTimeout.get(), TimeUnit.MILLISECONDS, _throwException, hasZF, scheduler); @@ -940,30 +910,6 @@ public T getAndTouch(String key, Transcoder tc, int timeToLive, boolean _ final T returnVal; if (enableChunking.get()) { return assembleChunks(key, false, 0, tc, hasZF); - } else if(shouldHashKey()) { - final String hKey = getHashedKey(key); - final Object obj; - if(ignoreTouch.get()) { - obj = _client.asyncGet(hKey, evcacheValueTranscoder, null).get(readTimeout.get(), TimeUnit.MILLISECONDS, _throwException, hasZF); - } else { - final CASValue value = _client.asyncGetAndTouch(key, timeToLive, evcacheValueTranscoder).get(readTimeout.get(), TimeUnit.MILLISECONDS, _throwException, hasZF); - obj = (value == null) ? null : value.getValue(); - } - if(obj != null && obj instanceof EVCacheValue) { - final EVCacheValue val = (EVCacheValue)obj; - if(val == null || !(val.getKey().equals(key))) { - incrementFailure(EVCacheMetricsFactory.KEY_HASH_COLLISION, Call.GET_AND_TOUCH); - return null; - } - final CachedData cd = new CachedData(val.getFlags(), val.getValue(), CachedData.MAX_SIZE); - if(tc == null) { - return (T)_client.getTranscoder().decode(cd); - } else { - return tc.decode(cd); - } - } else { - return null; - } } else { if(ignoreTouch.get()) { returnVal = _client.asyncGet(key, tc, null).get(readTimeout.get(), TimeUnit.MILLISECONDS, _throwException, hasZF); @@ -991,45 +937,6 @@ public Single getAndTouch(String key, Transcoder transcoder, int timeT final Transcoder tc = (transcoder == null) ? (Transcoder) getTranscoder(): transcoder; if (enableChunking.get()) { return assembleChunks(key, false, 0, tc, hasZF, scheduler); - } else if(shouldHashKey()) { - final String hKey = getHashedKey(key); - if(ignoreTouch.get()) { - final Single value = _client.asyncGet(hKey, evcacheValueTranscoder, null).get(readTimeout.get(), TimeUnit.MILLISECONDS, _throwException, hasZF, scheduler); - return value.flatMap(r -> { - final CASValue rObj = (CASValue)r; - final EVCacheValue val = (EVCacheValue)rObj.getValue(); - if(val == null || !(val.getKey().equals(key))) { - incrementFailure(EVCacheMetricsFactory.KEY_HASH_COLLISION, Call.GET_AND_TOUCH); - return null; - } - final CachedData cd = new CachedData(val.getFlags(), val.getValue(), CachedData.MAX_SIZE); - if(tc == null) { - return Single.just((T)_client.getTranscoder().decode(cd)); - } else { - return Single.just(tc.decode(cd)); - } - }); - } else { - final Single> value = _client.asyncGetAndTouch(hKey, timeToLive, evcacheValueTranscoder).get(readTimeout.get(), TimeUnit.MILLISECONDS, _throwException, hasZF, scheduler); - if(value != null ) { - return value.flatMap(r -> { - final CASValue rObj = (CASValue)r; - final EVCacheValue val = (EVCacheValue)rObj.getValue(); - if(val == null || !(val.getKey().equals(key))) { - incrementFailure(EVCacheMetricsFactory.KEY_HASH_COLLISION, Call.GET_AND_TOUCH); - return null; - } - final CachedData cd = new CachedData(val.getFlags(), val.getValue(), CachedData.MAX_SIZE); - if(tc == null) { - return Single.just((T)_client.getTranscoder().decode(cd)); - } else { - return Single.just(tc.decode(cd)); - } - }); - } else { - return null; - } - } } else { return _client.asyncGetAndTouch(key, timeToLive, tc) .get(readTimeout.get(), TimeUnit.MILLISECONDS, _throwException, hasZF, scheduler) @@ -1048,36 +955,6 @@ public Map getBulk(Collection _canonicalKeys, Transcoder< if (tc == null) tc = (Transcoder) getTranscoder(); if (enableChunking.get()) { returnVal = assembleChunks(_canonicalKeys, tc, hasZF); - } else if(shouldHashKey()) { - final Collection hashKeys = new ArrayList(canonicalKeys.size()); - returnVal = new HashMap(canonicalKeys.size()); - for(String cKey : canonicalKeys) { - final String hKey = getHashedKey(cKey); - hashKeys.add(hKey); - returnVal.put(cKey, null); - } - final Map vals = evcacheMemcachedClient.asyncGetBulk(hashKeys, evcacheValueTranscoder, null).getSome(bulkReadTimeout.get(), TimeUnit.MILLISECONDS, _throwException, hasZF); - if(vals != null && !vals.isEmpty()) { - for(Entry entry : vals.entrySet()) { - final Object obj = entry.getValue(); - if(obj instanceof EVCacheValue) { - final EVCacheValue val = (EVCacheValue)obj; - if(val == null || !(returnVal.containsKey(val.getKey()))) { - incrementFailure(EVCacheMetricsFactory.KEY_HASH_COLLISION, Call.BULK); - } - final CachedData cd = new CachedData(val.getFlags(), val.getValue(), CachedData.MAX_SIZE); - if(tc == null) { - returnVal.put(val.getKey(), (T)evcacheMemcachedClient.getTranscoder().decode(cd)); - } else { - returnVal.put(val.getKey(), tc.decode(cd)); - } - } else { - if (log.isDebugEnabled()) log.debug("Value for key : " + entry.getKey() + " is not EVCacheValue. val : " + obj); - } - } - } else { - return Collections. emptyMap(); - } } else { returnVal = evcacheMemcachedClient.asyncGetBulk(canonicalKeys, tc, null) .getSome(bulkReadTimeout.get(), TimeUnit.MILLISECONDS, _throwException, hasZF); @@ -1096,39 +973,6 @@ public Single> getBulk(Collection _canonicalKeys, fin final Transcoder tc = (transcoder == null) ? (Transcoder) getTranscoder() : transcoder; if (enableChunking.get()) { return assembleChunks(_canonicalKeys, tc, hasZF, scheduler); - } else if(shouldHashKey()) { - final Collection hashKeys = new ArrayList(canonicalKeys.size()); - final HashMap returnVal = new HashMap(); - for(String cKey : canonicalKeys) { - final String hKey = getHashedKey(cKey); - hashKeys.add(hKey); - returnVal.compute(cKey, null); - } - final Single> vals = evcacheMemcachedClient.asyncGetBulk(hashKeys, evcacheValueTranscoder, null).getSome(bulkReadTimeout.get(), TimeUnit.MILLISECONDS, _throwException, hasZF, scheduler); - if(vals != null ) { - return vals.flatMap(r -> { - for(Entry entry : r.entrySet()) { - final Object obj = entry.getValue(); - if(obj instanceof EVCacheValue) { - final EVCacheValue val = (EVCacheValue)obj; - if(val == null || !(returnVal.containsKey(val.getKey()))) { - incrementFailure(EVCacheMetricsFactory.KEY_HASH_COLLISION, Call.BULK); - } - final CachedData cd = new CachedData(val.getFlags(), val.getValue(), CachedData.MAX_SIZE); - if(tc == null) { - returnVal.put(val.getKey(), (T)evcacheMemcachedClient.getTranscoder().decode(cd)); - } else { - returnVal.put(val.getKey(), tc.decode(cd)); - } - } else { - if (log.isDebugEnabled()) log.debug("Value for key : " + entry.getKey() + " is not EVCacheValue. val : " + obj); - } - } - return Single.just(returnVal); - }); - } else { - return Single.just(Collections. emptyMap()); - } } else { return evcacheMemcachedClient.asyncGetBulk(canonicalKeys, tc, null) .getSome(bulkReadTimeout.get(), TimeUnit.MILLISECONDS, _throwException, hasZF, scheduler); @@ -1143,12 +987,7 @@ public Future append(String key, T value) throws Exception { "This operation is not supported as chunking is enabled on this EVCacheClient."); final MemcachedNode node = evcacheMemcachedClient.getEVCacheNode(key); if (!ensureWriteQueueSize(node, key, Call.APPEND)) return getDefaultFuture(); - if(shouldHashKey()) { - final String hKey = getHashedKey(key); - return evcacheMemcachedClient.append(hKey, value); - } else { - return evcacheMemcachedClient.append(key, value); - } + return evcacheMemcachedClient.append(key, value); } public Future set(String key, CachedData value, int timeToLive) throws Exception { @@ -1206,10 +1045,6 @@ private Future _set(String key, CachedData value, int timeToLive, EVCac delete(key); return evcacheMemcachedClient.set(key, timeToLive, value, null, evcacheLatch); } - } else if(shouldHashKey()) { - final String hKey = getHashedKey(key); - final CachedData cVal = getEVCacheValue(key, value, timeToLive); - return evcacheMemcachedClient.set(hKey, timeToLive, cVal, null, evcacheLatch); } else { return evcacheMemcachedClient.set(key, timeToLive, value, null, evcacheLatch); } @@ -1219,17 +1054,18 @@ private Future _set(String key, CachedData value, int timeToLive, EVCac } } - protected CachedData getEVCacheValue(String key, CachedData cData, int timeToLive) { - final EVCacheValue val = new EVCacheValue(key, cData.getData(), cData.getFlags(), timeToLive, System.currentTimeMillis()); - return evcacheValueTranscoder.encode(val); + private Boolean shouldHashKey() { + return hashKeyByServerGroup.get(); } - protected boolean shouldHashKey() { - return (!hashKeyByApp.get() && hashKeyByServerGroup.get()); - } + public HashingAlgorithm getHashingAlgorithm() { + if (null == shouldHashKey()) { + // hash key property is not set at the client level + return null; + } - protected String getHashedKey(String key) { - return KeyHasher.getHashedKey(key, hashingAlgo.get()); + // return NO_HASHING if hashing is explicitly disabled at client level + return shouldHashKey() ? KeyHasher.getHashingAlgorithmFromString(hashingAlgo.get()) : HashingAlgorithm.NO_HASHING; } public Future appendOrAdd(String key, CachedData value, int timeToLive, EVCacheLatch evcacheLatch) throws Exception { @@ -1284,10 +1120,6 @@ private Future _replace(String key, CachedData value, int timeToLive, E futures[i] = evcacheMemcachedClient.replace(key + "_" + prefix + i, timeToLive, cd[i], null, null); } return new EVCacheFutures(futures, key, appName, serverGroup, evcacheLatch); - } else if(shouldHashKey()) { - final String hKey = getHashedKey(key); - final CachedData cVal = getEVCacheValue(key, value, timeToLive); - return evcacheMemcachedClient.replace(hKey, timeToLive, cVal, null, evcacheLatch); } else { return evcacheMemcachedClient.replace(key, timeToLive, value, null, evcacheLatch); } @@ -1302,13 +1134,7 @@ private Future _add(String key, int exp, CachedData value, EVCacheLatch final MemcachedNode node = evcacheMemcachedClient.getEVCacheNode(key); if (!ensureWriteQueueSize(node, key, Call.ADD)) return getDefaultFuture(); - if(shouldHashKey()) { - final String hKey = getHashedKey(key); - final CachedData cVal = getEVCacheValue(key, value, exp); - return evcacheMemcachedClient.add(hKey, exp, cVal, null, latch); - } else { - return evcacheMemcachedClient.add(key, exp, value, null, latch); - } + return evcacheMemcachedClient.add(key, exp, value, null, latch); } @Deprecated @@ -1388,9 +1214,6 @@ public Future touch(String key, int timeToLive, EVCacheLatch latch) } else { return evcacheMemcachedClient.touch(key, timeToLive, latch); } - } else if(shouldHashKey()) { - final String hKey = getHashedKey(key); - return evcacheMemcachedClient.touch(hKey, timeToLive, latch); } else { return evcacheMemcachedClient.touch(key, timeToLive, latch); } @@ -1402,12 +1225,7 @@ public Future asyncGet(String key, Transcoder tc, boolean _throwExcept "This operation is not supported as chunking is enabled on this EVCacheClient."); if (!validateNode(key, _throwException, Call.ASYNC_GET)) return null; if (tc == null) tc = (Transcoder) getTranscoder(); - if(shouldHashKey()) { - final String hKey = getHashedKey(key); - return evcacheMemcachedClient.asyncGet(hKey, tc, null); - } else { - return evcacheMemcachedClient.asyncGet(key, tc, null); - } + return evcacheMemcachedClient.asyncGet(key, tc, null); } public Future delete(String key) throws Exception { @@ -1439,9 +1257,6 @@ public Future delete(String key, EVCacheLatch latch) throws Exception { } return new EVCacheFutures(futures, key, appName, serverGroup, latch); } - } else if(shouldHashKey()) { - final String hKey = getHashedKey(key); - return evcacheMemcachedClient.delete(hKey, latch); } else { return evcacheMemcachedClient.delete(key, latch); } @@ -1926,41 +1741,15 @@ private boolean isDataAvailableForRead(BufferedInputStream bufferedReader, long public EVCacheItemMetaData metaDebug(String key) throws Exception { - if(shouldHashKey()) { - key = getHashedKey(key); - } final EVCacheItemMetaData obj = evcacheMemcachedClient.metaDebug(key); if(log.isDebugEnabled()) log.debug("EVCacheItemMetaData : " + obj); return obj; } public EVCacheItem metaGet(String key, Transcoder tc, boolean _throwException, boolean hasZF) throws Exception { - if(shouldHashKey()) { - final String hKey = getHashedKey(key); - final EVCacheItem obj = evcacheMemcachedClient.asyncMetaGet(hKey, evcacheValueTranscoder, null).get(readTimeout.get(), TimeUnit.MILLISECONDS, _throwException, hasZF); - if(obj.getData() instanceof EVCacheValue) { - final EVCacheValue val = (EVCacheValue)obj.getData(); - if(val == null || !(val.getKey().equals(key))) { - incrementFailure(EVCacheMetricsFactory.KEY_HASH_COLLISION, Call.GET); - return null; - } - final CachedData cd = new CachedData(val.getFlags(), val.getValue(), CachedData.MAX_SIZE); - T t = null; - if(tc == null) { - t = (T)evcacheMemcachedClient.getTranscoder().decode(cd); - } else { - t = tc.decode(cd); - } - obj.setData(t); - obj.setFlag(val.getFlags()); - return (EVCacheItem) obj; - } else - return null; - } else { - final EVCacheItem obj = evcacheMemcachedClient.asyncMetaGet(key, tc, null).get(readTimeout.get(), TimeUnit.MILLISECONDS, _throwException, hasZF); - if(log.isDebugEnabled()) log.debug("EVCacheItem : " + obj); - return obj; - } + final EVCacheItem obj = evcacheMemcachedClient.asyncMetaGet(key, tc, null).get(readTimeout.get(), TimeUnit.MILLISECONDS, _throwException, hasZF); + if (log.isDebugEnabled()) log.debug("EVCacheItem : " + obj); + return obj; } diff --git a/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClientPool.java b/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClientPool.java index 358e4586..1a72680f 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClientPool.java +++ b/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClientPool.java @@ -70,6 +70,8 @@ public class EVCacheClientPool implements Runnable, EVCacheClientPoolMBean { // name of the duet EVCache application, if applicable. private final Property duet; + // indicates if duet needs to be primary + private final Property duetPrimary; // evCacheClientPool of the duet EVCache application, if applicable. Supports daisy chaining. private EVCacheClientPool duetClientPool; @@ -174,6 +176,8 @@ public Property get(Object _serverGroup) { setupDuet(); }); + this.duetPrimary = config.getPropertyRepository().get(appName + ".duet.primary", Boolean.class).orElseGet("evcache.duet.primary").orElse(false); + tagList = new ArrayList(2); EVCacheMetricsFactory.getInstance().addAppNameTags(tagList, _appName); @@ -251,11 +255,20 @@ private EVCacheClient getEVCacheClientForReadInternal() { public EVCacheClient getEVCacheClientForRead() { EVCacheClient evCacheClient = getEVCacheClientForReadInternal(); - if (evCacheClient != null) { + // most common production scenario + if (null == duetClientPool) { return evCacheClient; } - return duetClientPool != null ? duetClientPool.getEVCacheClientForRead() : null; + // return duet if current client is not available or if duet is primary + if (null == evCacheClient || duetPrimary.get()) { + EVCacheClient duetClient = duetClientPool.getEVCacheClientForRead(); + + // if duetClient is not present, fallback to evCacheClient + return null == duetClient ? evCacheClient : duetClient; + } + + return evCacheClient; } private List getAllEVCacheClientForReadInternal() { @@ -288,17 +301,26 @@ private List getAllEVCacheClientForReadInternal() { public List getAllEVCacheClientForRead() { List evCacheClients = getAllEVCacheClientForReadInternal(); - if (duetClientPool != null) { - List duetEVCacheClients = duetClientPool.getAllEVCacheClientForRead(); - if (null == evCacheClients) - return duetEVCacheClients; - if (null == duetEVCacheClients) - return evCacheClients; + // most common production scenario + if (null == duetClientPool) { + return evCacheClients; + } - evCacheClients.addAll(duetClientPool.getAllEVCacheClientForRead()); + List duetEVCacheClients = duetClientPool.getAllEVCacheClientForRead(); + if (null == evCacheClients) + return duetEVCacheClients; + + if (null == duetEVCacheClients) + return evCacheClients; + + if (duetPrimary.get()) { + duetEVCacheClients.addAll(evCacheClients); + return duetEVCacheClients; + } else { + evCacheClients.addAll(duetEVCacheClients); + return evCacheClients; } - return evCacheClients; } private EVCacheClient selectClient(List clients) { @@ -340,11 +362,20 @@ private EVCacheClient getEVCacheClientForReadExcludeInternal(ServerGroup rsetUse public EVCacheClient getEVCacheClientForReadExclude(ServerGroup rsetUsed) { EVCacheClient evCacheClient = getEVCacheClientForReadExcludeInternal(rsetUsed); - if (evCacheClient != null) { + // most common production scenario + if (null == duetClientPool) { return evCacheClient; } - return duetClientPool != null ? duetClientPool.getEVCacheClientForReadExclude(rsetUsed) : null; + // return duet if current client is not available or if duet is primary + if (null == evCacheClient || duetPrimary.get()) { + EVCacheClient duetClient = duetClientPool.getEVCacheClientForReadExclude(rsetUsed); + + // if duetClient is not present, fallback to evCacheClient + return null == duetClient ? evCacheClient : duetClient; + } + + return evCacheClient; } private EVCacheClient getEVCacheClientInternal(ServerGroup serverGroup) { @@ -374,11 +405,20 @@ private EVCacheClient getEVCacheClientInternal(ServerGroup serverGroup) { public EVCacheClient getEVCacheClient(ServerGroup serverGroup) { EVCacheClient evCacheClient = getEVCacheClientInternal(serverGroup); - if (evCacheClient != null) { + // most common production scenario + if (null == duetClientPool) { return evCacheClient; } - return duetClientPool != null ? duetClientPool.getEVCacheClient(serverGroup) : null; + // return duet if current client is not available or if duet is primary + if (null == evCacheClient || duetPrimary.get()) { + EVCacheClient duetClient = duetClientPool.getEVCacheClient(serverGroup); + + // if duetClient is not present, fallback to evCacheClient + return null == duetClient ? evCacheClient : duetClient; + } + + return evCacheClient; } private List getEVCacheClientsForReadExcludingInternal(ServerGroup serverGroupToExclude) { @@ -431,17 +471,26 @@ private List getEVCacheClientsForReadExcludingInternal(ServerGrou public List getEVCacheClientsForReadExcluding(ServerGroup serverGroupToExclude) { List evCacheClients = getEVCacheClientsForReadExcludingInternal(serverGroupToExclude); - if (duetClientPool != null) { - List duetEVCacheClients = duetClientPool.getEVCacheClientsForReadExcluding(serverGroupToExclude); - if (null == evCacheClients) - return duetEVCacheClients; - if (null == duetEVCacheClients) - return evCacheClients; + // most common production scenario + if (null == duetClientPool) { + return evCacheClients; + } + + List duetEVCacheClients = duetClientPool.getEVCacheClientsForReadExcluding(serverGroupToExclude); + if (null == evCacheClients) + return duetEVCacheClients; + + if (null == duetEVCacheClients) + return evCacheClients; + if (duetPrimary.get()) { + duetEVCacheClients.addAll(evCacheClients); + return duetEVCacheClients; + } else { evCacheClients.addAll(duetEVCacheClients); + return evCacheClients; } - return evCacheClients; } public boolean isInWriteOnly(ServerGroup serverGroup) { @@ -503,27 +552,39 @@ private EVCacheClient[] getWriteOnlyEVCacheClientsInternal() { } public EVCacheClient[] getWriteOnlyEVCacheClients() { - EVCacheClient[] evCacheClients = null; - try { - evCacheClients = getWriteOnlyEVCacheClientsInternal(); - if (duetClientPool != null) { - EVCacheClient[] duetEVCacheClients = duetClientPool.getWriteOnlyEVCacheClients(); - - // common scenario for duet usage - if (null == evCacheClients || evCacheClients.length == 0) { - return duetEVCacheClients; - } - - if (null != duetEVCacheClients && duetEVCacheClients.length > 0) { - EVCacheClient[] allEVCacheClients = Arrays.copyOf(evCacheClients, evCacheClients.length + duetEVCacheClients.length); - System.arraycopy(duetEVCacheClients, 0, allEVCacheClients, evCacheClients.length, duetEVCacheClients.length); - return allEVCacheClients; - } - } - return evCacheClients; - } finally { - if(evCacheClients == null) return new EVCacheClient[0]; - } + EVCacheClient[] evCacheClients = getWriteOnlyEVCacheClientsInternal(); + + // most common production scenario + if (null == duetClientPool) { + return evCacheClients; + } + + EVCacheClient[] duetEVCacheClients = duetClientPool.getWriteOnlyEVCacheClients(); + if (null == evCacheClients || evCacheClients.length == 0) { + return duetEVCacheClients; + } + + if (null == duetEVCacheClients || duetEVCacheClients.length == 0) { + return evCacheClients; + } + + if (duetPrimary.get()) { + // return write-only of duet app and all writers of original app to which duet is attached + // get all writers of original app + evCacheClients = getEVCacheClientForWriteInternal(); + + EVCacheClient[] allEVCacheClients = Arrays.copyOf(duetEVCacheClients, duetEVCacheClients.length + evCacheClients.length); + System.arraycopy(evCacheClients, 0, allEVCacheClients, duetEVCacheClients.length, evCacheClients.length); + return allEVCacheClients; + } else { + // return write-only of original app and all writers of duet app + // get all writers of duet app + duetEVCacheClients = duetClientPool.getEVCacheClientForWrite(); + + EVCacheClient[] allEVCacheClients = Arrays.copyOf(evCacheClients, evCacheClients.length + duetEVCacheClients.length); + System.arraycopy(duetEVCacheClients, 0, allEVCacheClients, evCacheClients.length, duetEVCacheClients.length); + return allEVCacheClients; + } } EVCacheClient[] getAllWriteClients() { @@ -594,27 +655,31 @@ private EVCacheClient[] getEVCacheClientForWriteInternal() { } public EVCacheClient[] getEVCacheClientForWrite() { - EVCacheClient[] evCacheClients = null; - try { - evCacheClients = getEVCacheClientForWriteInternal(); - if (duetClientPool != null) { - EVCacheClient[] duetEVCacheClients = duetClientPool.getEVCacheClientForWrite(); - - // common scenario for duet usage - if (null == evCacheClients || evCacheClients.length == 0) { - return duetEVCacheClients; - } - - if (null != duetEVCacheClients && duetEVCacheClients.length > 0) { - EVCacheClient[] allEVCacheClients = Arrays.copyOf(evCacheClients, evCacheClients.length + duetEVCacheClients.length); - System.arraycopy(duetEVCacheClients, 0, allEVCacheClients, evCacheClients.length, duetEVCacheClients.length); - return allEVCacheClients; - } - } - return evCacheClients; - } finally { - if(evCacheClients == null) return new EVCacheClient[0]; - } + EVCacheClient[] evCacheClients = getEVCacheClientForWriteInternal(); + + // most common production scenario + if (null == duetClientPool) { + return evCacheClients; + } + + EVCacheClient[] duetEVCacheClients = duetClientPool.getEVCacheClientForWrite(); + if (null == evCacheClients || evCacheClients.length == 0) { + return duetEVCacheClients; + } + + if (null == duetEVCacheClients || duetEVCacheClients.length == 0) { + return evCacheClients; + } + + if (duetPrimary.get()) { + EVCacheClient[] allEVCacheClients = Arrays.copyOf(duetEVCacheClients, duetEVCacheClients.length + evCacheClients.length); + System.arraycopy(evCacheClients, 0, allEVCacheClients, duetEVCacheClients.length, evCacheClients.length); + return allEVCacheClients; + } else { + EVCacheClient[] allEVCacheClients = Arrays.copyOf(evCacheClients, evCacheClients.length + duetEVCacheClients.length); + System.arraycopy(duetEVCacheClients, 0, allEVCacheClients, evCacheClients.length, duetEVCacheClients.length); + return allEVCacheClients; + } } private void refresh() throws IOException { @@ -1432,7 +1497,7 @@ public String getFallbackServerGroup() { } public boolean supportsFallback() { - return memcachedFallbackReadInstances.getSize() > 1 || (duetClientPool != null && duetClientPool.supportsFallback()); + return memcachedFallbackReadInstances.getSize() > 1 || (duetClientPool != null && duetPrimary.get() && duetClientPool.supportsFallback()); } public boolean isLogEventEnabled() { @@ -1486,6 +1551,9 @@ public Property getOpQueueMaxBlockTime() { } public Property getOperationTimeout() { + if (duetClientPool !=null && duetPrimary.get()) { + return duetClientPool.getOperationTimeout(); + } return _operationTimeout; } @@ -1528,6 +1596,9 @@ public Map> getWriteOnlyFastPropertyMap() { } public Property getReadTimeout() { + if (duetClientPool != null && duetPrimary.get()) { + return duetClientPool.getReadTimeout(); + } return _readTimeout; } diff --git a/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClientUtil.java b/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClientUtil.java index a29abb60..e4438de9 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClientUtil.java +++ b/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClientUtil.java @@ -18,34 +18,32 @@ public class EVCacheClientUtil { private static final Logger log = LoggerFactory.getLogger(EVCacheClientUtil.class); private final ChunkTranscoder ct = new ChunkTranscoder(); private final String _appName; - private final EVCacheClientPool _pool; + private final long _operationTimeout; - public EVCacheClientUtil(EVCacheClientPool pool) { - this._pool = pool; - this._appName = pool.getAppName(); + public EVCacheClientUtil(String appName, long operationTimeout) { + this._appName = appName; + this._operationTimeout = operationTimeout; } + //TODO: Remove this todo. This method has been made hashing agnostic. /** - * TODO : once metaget is available we need to get the remaining ttl from an existing entry and use it + * TODO : once metaget is available we need to get the remaining ttl from an existing entry and use it */ - public EVCacheLatch add(EVCacheKey evcKey, final CachedData cd, boolean shouldHashKey, Transcoder evcacheValueTranscoder, int timeToLive, Policy policy) throws Exception { - if (cd == null) return null; - - final EVCacheClient[] clients = _pool.getEVCacheClientForWrite(); - final EVCacheLatchImpl latch = new EVCacheLatchImpl(policy, clients.length - _pool.getWriteOnlyEVCacheClients().length, _appName); + public EVCacheLatch add(EVCacheKey evcKey, final CachedData cd, Transcoder evcacheValueTranscoder, int timeToLive, Policy policy, final EVCacheClient[] clients, int latchCount) throws Exception { + if (cd == null) return null; + + final EVCacheLatchImpl latch = new EVCacheLatchImpl(policy, latchCount, _appName); - CachedData cd1 = null; Boolean firstStatus = null; for (EVCacheClient client : clients) { - String key = evcKey.getDerivedKey(client.isDuetClient()); - if (shouldHashKey) { - if(cd1 == null) { - final EVCacheValue val = new EVCacheValue(evcKey.getCanonicalKey(client.isDuetClient()), cd.getData(), cd.getFlags(), timeToLive, System.currentTimeMillis()); - cd1 = evcacheValueTranscoder.encode(val); - } + CachedData cd1 = null; + if (evcKey.getHashKey(client.isDuetClient(), client.getHashingAlgorithm(), client.shouldEncodeHashKey(), client.getMaxHashingBytes()) != null) { + final EVCacheValue val = new EVCacheValue(evcKey.getCanonicalKey(client.isDuetClient()), cd.getData(), cd.getFlags(), timeToLive, System.currentTimeMillis()); + cd1 = evcacheValueTranscoder.encode(val); } else { cd1 = cd; } + String key = evcKey.getDerivedKey(client.isDuetClient(), client.getHashingAlgorithm(), client.shouldEncodeHashKey(), client.getMaxHashingBytes()); final Future f = client.add(key, timeToLive, cd1, latch); if (log.isDebugEnabled()) log.debug("ADD : Op Submitted : APP " + _appName + ", key " + key + "; future : " + f + "; client : " + client); boolean status = f.get().booleanValue(); @@ -67,15 +65,15 @@ public EVCacheLatch add(EVCacheKey evcKey, final CachedData cd, boolean shouldHa private EVCacheLatch fixup(EVCacheClient sourceClient, EVCacheClient[] destClients, EVCacheKey evcKey, int timeToLive, Policy policy) { final EVCacheLatchImpl latch = new EVCacheLatchImpl(policy, destClients.length, _appName); try { - final CachedData readData = sourceClient.get(evcKey.getDerivedKey(sourceClient.isDuetClient()), ct, false, false); + final CachedData readData = sourceClient.get(evcKey.getDerivedKey(sourceClient.isDuetClient(), sourceClient.getHashingAlgorithm(), sourceClient.shouldEncodeHashKey(), sourceClient.getMaxHashingBytes()), ct, false, false); if(readData != null) { - sourceClient.touch(evcKey.getDerivedKey(sourceClient.isDuetClient()), timeToLive); + sourceClient.touch(evcKey.getDerivedKey(sourceClient.isDuetClient(), sourceClient.getHashingAlgorithm(), sourceClient.shouldEncodeHashKey(), sourceClient.getMaxHashingBytes()), timeToLive); for(EVCacheClient destClient : destClients) { - destClient.set(evcKey.getDerivedKey(destClient.isDuetClient()), readData, timeToLive, latch); + destClient.set(evcKey.getDerivedKey(destClient.isDuetClient(), destClient.getHashingAlgorithm(), destClient.shouldEncodeHashKey(), destClient.getMaxHashingBytes()), readData, timeToLive, latch); } } - latch.await(_pool.getOperationTimeout().get(), TimeUnit.MILLISECONDS); + latch.await(_operationTimeout, TimeUnit.MILLISECONDS); } catch (Exception e) { log.error("Error reading the data", e); } diff --git a/evcache-core/src/main/java/com/netflix/evcache/util/KeyHasher.java b/evcache-core/src/main/java/com/netflix/evcache/util/KeyHasher.java index e4c8e5e5..4fc41b34 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/util/KeyHasher.java +++ b/evcache-core/src/main/java/com/netflix/evcache/util/KeyHasher.java @@ -1,5 +1,6 @@ package com.netflix.evcache.util; +import java.util.ArrayList; import java.util.Arrays; import java.util.Base64; import java.util.Base64.Encoder; @@ -13,7 +14,6 @@ import com.google.common.hash.Hashing; public class KeyHasher { - /** * meta data size * 40 + key + 'item_hdr' size @@ -24,9 +24,31 @@ public class KeyHasher { And if CAS and client flags are present: 40 + keysize + 4 bytes(for flags) + 8(for CAS) + 12 */ - - - + + public enum HashingAlgorithm { + murmur3, + adler32, + crc32, + sha1, + sha256, + siphash24, + goodfasthash, + md5, + NO_HASHING // useful for disabling hashing at client level, while Hashing is enabled at App level + } + + public static HashingAlgorithm getHashingAlgorithmFromString(String algorithmStr) { + try { + if (null == algorithmStr || algorithmStr.isEmpty()) { + return null; + } + return HashingAlgorithm.valueOf(algorithmStr.toLowerCase()); + } catch (IllegalArgumentException ex) { + // default to md5 incase of unsupported algorithm + return HashingAlgorithm.md5; + } + } + private static final Logger log = LoggerFactory.getLogger(KeyHasher.class); private static final Encoder encoder= Base64.getEncoder().withoutPadding(); @@ -45,49 +67,66 @@ public class KeyHasher { // } // } - public static String getHashedKey(String key, String hashingAlgorithm) { + public static String getHashedKeyEncoded(String key, HashingAlgorithm hashingAlgorithm, Integer maxHashingBytes) { final long start = System.nanoTime(); - HashFunction hf = null; - switch(hashingAlgorithm.toLowerCase()) { - case "murmur3" : + byte[] digest = getHashedKey(key, hashingAlgorithm, maxHashingBytes); + if(log.isDebugEnabled()) log.debug("Key : " + key +"; digest length : " + digest.length + "; byte Array contents : " + Arrays.toString(digest) ); + final String hKey = encoder.encodeToString(digest); + if(log.isDebugEnabled()) log.debug("Key : " + key +"; Hashed & encoded key : " + hKey + "; Took " + (System.nanoTime() - start) + " nanos"); + return hKey; + } + + public static byte[] getHashedKeyInBytes(String key, HashingAlgorithm hashingAlgorithm, Integer maxHashingBytes) { + final long start = System.nanoTime(); + byte[] digest = getHashedKey(key, hashingAlgorithm, maxHashingBytes); + if(log.isDebugEnabled()) log.debug("Key : " + key +"; digest length : " + digest.length + "; byte Array contents : " + Arrays.toString(digest) + "; Took " + (System.nanoTime() - start) + " nanos"); + return digest; + } + + private static byte[] getHashedKey(String key, HashingAlgorithm hashingAlgorithm, Integer maxHashingBytes) { + HashFunction hf = null; + switch (hashingAlgorithm) { + case murmur3: hf = Hashing.murmur3_128(); break; - - case "adler32" : + + case adler32: hf = Hashing.adler32(); break; - - case "crc32" : + + case crc32: hf = Hashing.crc32(); break; - - case "sha1" : + + case sha1: hf = Hashing.sha1(); break; - - case "sha256" : + + case sha256: hf = Hashing.sha256(); break; - - case "siphash24" : + + case siphash24: hf = Hashing.sipHash24(); break; - case "goodfasthash" : + case goodfasthash: hf = Hashing.goodFastHash(128); break; - - case "md5" : - default : + + case md5: + default: hf = Hashing.md5(); break; } final HashCode hc = hf.newHasher().putString(key, Charsets.UTF_8).hash(); final byte[] digest = hc.asBytes(); - if(log.isDebugEnabled()) log.debug("Key : " + key +"; digest length : " + digest.length + "; byte Array contents : " + Arrays.toString(digest) ); - final String hKey = encoder.encodeToString(digest); - if(log.isDebugEnabled()) log.debug("Key : " + key +"; Hashed & encoded key : " + hKey + "; Took " + (System.nanoTime() - start) + " nanos"); - return hKey; + + if (maxHashingBytes != null && maxHashingBytes > 0 && maxHashingBytes < digest.length) { + return Arrays.copyOfRange(digest, 0, maxHashingBytes); + } + + return digest; } } diff --git a/evcache-zipkin-tracing/src/test/java/com/netflix/evcache/EVCacheTracingEventListenerUnitTests.java b/evcache-zipkin-tracing/src/test/java/com/netflix/evcache/EVCacheTracingEventListenerUnitTests.java index f7456d04..ee24cdda 100644 --- a/evcache-zipkin-tracing/src/test/java/com/netflix/evcache/EVCacheTracingEventListenerUnitTests.java +++ b/evcache-zipkin-tracing/src/test/java/com/netflix/evcache/EVCacheTracingEventListenerUnitTests.java @@ -40,7 +40,7 @@ public void resetMocks() { when(mockEVCacheEvent.getAppName()).thenReturn("dummyAppName"); when(mockEVCacheEvent.getCacheName()).thenReturn("dummyCacheName"); when(mockEVCacheEvent.getEVCacheKeys()) - .thenReturn(Arrays.asList(new EVCacheKey("dummyAppName", "dummyKey", "dummyCanonicalKey", null, null))); + .thenReturn(Arrays.asList(new EVCacheKey("dummyAppName", "dummyKey", "dummyCanonicalKey", null, null, null))); when(mockEVCacheEvent.getStatus()).thenReturn("success"); when(mockEVCacheEvent.getDurationInMillis()).thenReturn(1L); when(mockEVCacheEvent.getTTL()).thenReturn(0);