diff --git a/.github/workflows/nebula-publish.yml b/.github/workflows/nebula-publish.yml index 64b1d642..68aa5858 100644 --- a/.github/workflows/nebula-publish.yml +++ b/.github/workflows/nebula-publish.yml @@ -35,6 +35,8 @@ jobs: if: contains(github.ref, '-rc.') run: ./gradlew --info --stacktrace -Prelease.useLastTag=true -PnetflixossPublishCandidatesToMavenCentral=true candidate -x test env: + NETFLIX_OSS_SONATYPE_USERNAME: ${{ secrets.ORG_SONATYPE_USERNAME }} + NETFLIX_OSS_SONATYPE_PASSWORD: ${{ secrets.ORG_SONATYPE_PASSWORD }} NETFLIX_OSS_SIGNING_KEY: ${{ secrets.ORG_SIGNING_KEY }} NETFLIX_OSS_SIGNING_PASSWORD: ${{ secrets.ORG_SIGNING_PASSWORD }} NETFLIX_OSS_REPO_USERNAME: ${{ secrets.ORG_NETFLIXOSS_USERNAME }} diff --git a/evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java b/evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java index 40c684e2..90ae8f65 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java +++ b/evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java @@ -44,7 +44,6 @@ import com.netflix.evcache.pool.EVCacheClientUtil; import com.netflix.evcache.pool.EVCacheValue; import com.netflix.evcache.pool.ServerGroup; -import com.netflix.evcache.util.EVCacheConfig; import com.netflix.evcache.util.KeyHasher; import com.netflix.spectator.api.BasicTag; import com.netflix.spectator.api.Counter; @@ -96,6 +95,8 @@ public class EVCacheImpl implements EVCache, EVCacheImplMBean { private final Property maxHashLength; private final EVCacheTranscoder evcacheValueTranscoder; private final Property maxReadDuration, maxWriteDuration; + private final Property clientReadRetry; + private final Property clientWriteToAllReplicas; protected final EVCacheClientPoolManager _poolManager; private final Map timerMap = new ConcurrentHashMap(); @@ -170,6 +171,9 @@ public class EVCacheImpl implements EVCache, EVCacheImplMBean { this._pool = poolManager.getEVCacheClientPool(_appName); }); + this.clientReadRetry = propertyRepository.get(appName + ".router.client.read.shouldRetry", Boolean.class).orElse(true); + this.clientWriteToAllReplicas = propertyRepository.get(appName + ".router.client.write.shouldWriteToAllReplicas", Boolean.class).orElse(true); + _pool.pingServers(); setupMonitoring(); @@ -454,31 +458,37 @@ T doGet(EVCacheKey evcKey , Transcoder tc) throws EVCacheException { final boolean hasZF = hasZoneFallback(); boolean throwEx = hasZF ? false : throwExc; T data = getData(client, evcKey, tc, throwEx, hasZF); - if (data == null && hasZF) { - final List fbClients = _pool.getEVCacheClientsForReadExcluding(client.getServerGroup()); - if (fbClients != null && !fbClients.isEmpty()) { - for (int i = 0; i < fbClients.size(); i++) { - final EVCacheClient fbClient = fbClients.get(i); - if(i >= fbClients.size() - 1) throwEx = throwExc; - if (event != null) { - try { - if (shouldThrottle(event)) { + // In case of router, we don't need to retry from client (router.shouldRetry = false) + // Retries will be handled by the router + if (clientReadRetry.get()) { + if (data == null && hasZF) { + final List fbClients = _pool.getEVCacheClientsForReadExcluding(client.getServerGroup()); + if (fbClients != null && !fbClients.isEmpty()) { + for (int i = 0; i < fbClients.size(); i++) { + final EVCacheClient fbClient = fbClients.get(i); + if (i >= fbClients.size() - 1) throwEx = throwExc; + if (event != null) { + try { + if (shouldThrottle(event)) { + status = EVCacheMetricsFactory.THROTTLED; + if (throwExc) + throw new EVCacheException("Request Throttled for app " + _appName + " & key " + evcKey); + return null; + } + } catch (EVCacheException ex) { + if (throwExc) throw ex; status = EVCacheMetricsFactory.THROTTLED; - if (throwExc) throw new EVCacheException("Request Throttled for app " + _appName + " & key " + evcKey); return null; } - } catch(EVCacheException ex) { - if(throwExc) throw ex; - status = EVCacheMetricsFactory.THROTTLED; - return null; } - } - tries++; - data = getData(fbClient, evcKey, tc, throwEx, (i < fbClients.size() - 1) ? true : false); - if (log.isDebugEnabled() && shouldLog()) log.debug("Retry for APP " + _appName + ", key [" + evcKey + (log.isTraceEnabled() ? "], Value [" + data : "") + "], ServerGroup : " + fbClient.getServerGroup()); - if (data != null) { - client = fbClient; - break; + tries++; + data = getData(fbClient, evcKey, tc, throwEx, (i < fbClients.size() - 1) ? true : false); + if (log.isDebugEnabled() && shouldLog()) + log.debug("Retry for APP " + _appName + ", key [" + evcKey + (log.isTraceEnabled() ? "], Value [" + data : "") + "], ServerGroup : " + fbClient.getServerGroup()); + if (data != null) { + client = fbClient; + break; + } } } } @@ -662,31 +672,37 @@ protected EVCacheItem metaGetInternal(String key, Transcoder tc, boole final boolean hasZF = hasZoneFallback(); boolean throwEx = hasZF ? false : throwExc; EVCacheItem data = getEVCacheItem(client, evcKey, tc, throwEx, hasZF, isOriginalKeyHashed, true); - if (data == null && hasZF) { - final List fbClients = _pool.getEVCacheClientsForReadExcluding(client.getServerGroup()); - if (fbClients != null && !fbClients.isEmpty()) { - for (int i = 0; i < fbClients.size(); i++) { - final EVCacheClient fbClient = fbClients.get(i); - if(i >= fbClients.size() - 1) throwEx = throwExc; - if (event != null) { - try { - if (shouldThrottle(event)) { + // In case of router, we don't need to retry from client (router.shouldRetry = false) + // Retries will be handled by the router + if (clientReadRetry.get()) { + if (data == null && hasZF) { + final List fbClients = _pool.getEVCacheClientsForReadExcluding(client.getServerGroup()); + if (fbClients != null && !fbClients.isEmpty()) { + for (int i = 0; i < fbClients.size(); i++) { + final EVCacheClient fbClient = fbClients.get(i); + if (i >= fbClients.size() - 1) throwEx = throwExc; + if (event != null) { + try { + if (shouldThrottle(event)) { + status = EVCacheMetricsFactory.THROTTLED; + if (throwExc) + throw new EVCacheException("Request Throttled for app " + _appName + " & key " + evcKey); + return null; + } + } catch (EVCacheException ex) { + if (throwExc) throw ex; status = EVCacheMetricsFactory.THROTTLED; - if (throwExc) throw new EVCacheException("Request Throttled for app " + _appName + " & key " + evcKey); return null; } - } catch(EVCacheException ex) { - if(throwExc) throw ex; - status = EVCacheMetricsFactory.THROTTLED; - return null; } - } - tries++; - data = getEVCacheItem(fbClient, evcKey, tc, throwEx, (i < fbClients.size() - 1) ? true : false, isOriginalKeyHashed, true); - if (log.isDebugEnabled() && shouldLog()) log.debug("Retry for APP " + _appName + ", key [" + evcKey + (log.isTraceEnabled() ? "], Value [" + data : "") + "], ServerGroup : " + fbClient.getServerGroup()); - if (data != null) { - client = fbClient; - break; + tries++; + data = getEVCacheItem(fbClient, evcKey, tc, throwEx, (i < fbClients.size() - 1) ? true : false, isOriginalKeyHashed, true); + if (log.isDebugEnabled() && shouldLog()) + log.debug("Retry for APP " + _appName + ", key [" + evcKey + (log.isTraceEnabled() ? "], Value [" + data : "") + "], ServerGroup : " + fbClient.getServerGroup()); + if (data != null) { + client = fbClient; + break; + } } } } @@ -756,9 +772,15 @@ else if (count <= 2) public T get(String key, Transcoder tc, Policy policy) throws EVCacheException { if (null == key) throw new IllegalArgumentException(); - + // In case of router, client works as proxy and sends the request to the router. + // Currently, there is no way of passing the policy parameter to the router. So if this property is set + // then we just call the get API. + if (!clientReadRetry.get()) { + return get(key, tc); + } final boolean throwExc = doThrowException(); - final EVCacheClient[] clients = _pool.getEVCacheClientForWrite(); + + EVCacheClient[] clients = _pool.getEVCacheClientForWrite(); if (clients.length == 0) { incrementFastFail(EVCacheMetricsFactory.NULL_CLIENT, Call.GET); if (throwExc) throw new EVCacheException("Could not find a client to asynchronously get the data"); @@ -866,8 +888,10 @@ public Single get(String key, Transcoder tc, Scheduler scheduler) { final long start = EVCacheMetricsFactory.getInstance().getRegistry().clock().wallTime(); final boolean hasZF = hasZoneFallback(); final boolean throwEx = hasZF ? false : throwExc; + // In case of router, we don't need to retry from client (router.shouldRetry = false) + // Retries will be handled by the router return getData(client, evcKey, tc, throwEx, hasZF, scheduler).flatMap(data -> { - if (data == null && hasZF) { + if (data == null && hasZF && clientReadRetry.get()) { final List fbClients = _pool.getEVCacheClientsForReadExcluding(client.getServerGroup()); if (fbClients != null && !fbClients.isEmpty()) { return Observable.concat(Observable.from(fbClients).map( @@ -1123,7 +1147,7 @@ public Single getAndTouch(String key, int timeToLive, Transcoder tc, S final boolean throwEx = hasZF ? false : throwExc; //anyway we have to touch all copies so let's just reuse getData instead of getAndTouch return getData(client, evcKey, tc, throwEx, hasZF, scheduler).flatMap(data -> { - if (data == null && hasZF) { + if (data == null && hasZF && clientReadRetry.get()) { final List fbClients = _pool.getEVCacheClientsForReadExcluding(client.getServerGroup()); if (fbClients != null && !fbClients.isEmpty()) { return Observable.concat(Observable.from(fbClients).map( @@ -1255,30 +1279,36 @@ T doGetAndTouch(EVCacheKey evcKey, int timeToLive, Transcoder tc) throws final boolean hasZF = hasZoneFallback(); boolean throwEx = hasZF ? false : throwExc; T data = getData(client, evcKey, tc, throwEx, hasZF); - if (data == null && hasZF) { - final List fbClients = _pool.getEVCacheClientsForReadExcluding(client.getServerGroup()); - for (int i = 0; i < fbClients.size(); i++) { - final EVCacheClient fbClient = fbClients.get(i); - if(i >= fbClients.size() - 1) throwEx = throwExc; - if (event != null) { - try { - if (shouldThrottle(event)) { + // In case of router, we don't need to retry from client (router.shouldRetry = false) + // Retries will be handled by the router + if (clientReadRetry.get()) { + if (data == null && hasZF) { + final List fbClients = _pool.getEVCacheClientsForReadExcluding(client.getServerGroup()); + for (int i = 0; i < fbClients.size(); i++) { + final EVCacheClient fbClient = fbClients.get(i); + if (i >= fbClients.size() - 1) throwEx = throwExc; + if (event != null) { + try { + if (shouldThrottle(event)) { + status = EVCacheMetricsFactory.THROTTLED; + if (throwExc) + throw new EVCacheException("Request Throttled for app " + _appName + " & key " + evcKey); + return null; + } + } catch (EVCacheException ex) { + if (throwExc) throw ex; status = EVCacheMetricsFactory.THROTTLED; - if (throwExc) throw new EVCacheException("Request Throttled for app " + _appName + " & key " + evcKey); return null; } - } catch(EVCacheException ex) { - if(throwExc) throw ex; - status = EVCacheMetricsFactory.THROTTLED; - return null; } - } - tries++; - data = getData(fbClient, evcKey, tc, throwEx, (i < fbClients.size() - 1) ? true : false); - if (log.isDebugEnabled() && shouldLog()) log.debug("GetAndTouch Retry for APP " + _appName + ", key [" + evcKey + (log.isTraceEnabled() ? "], Value [" + data : "") + "], ServerGroup : " + fbClient.getServerGroup()); - if (data != null) { - client = fbClient; - break; + tries++; + data = getData(fbClient, evcKey, tc, throwEx, (i < fbClients.size() - 1) ? true : false); + if (log.isDebugEnabled() && shouldLog()) + log.debug("GetAndTouch Retry for APP " + _appName + ", key [" + evcKey + (log.isTraceEnabled() ? "], Value [" + data : "") + "], ServerGroup : " + fbClient.getServerGroup()); + if (data != null) { + client = fbClient; + break; + } } } } @@ -1349,7 +1379,7 @@ public EVCacheLatch touch(String key, int timeToLive, Policy policy) throws checkTTL(timeToLive, Call.TOUCH); final boolean throwExc = doThrowException(); - final EVCacheClient[] clients = _pool.getEVCacheClientForWrite(); + final EVCacheClient[] clients = getClientsForWrite(); if (clients.length == 0) { incrementFastFail(EVCacheMetricsFactory.NULL_CLIENT, Call.TOUCH); if (throwExc) throw new EVCacheException("Could not find a client to set the data"); @@ -1377,7 +1407,7 @@ public EVCacheLatch touch(String key, int timeToLive, Policy policy) throws String status = EVCacheMetricsFactory.SUCCESS; final long start = EVCacheMetricsFactory.getInstance().getRegistry().clock().wallTime(); try { - final EVCacheLatchImpl latch = new EVCacheLatchImpl(policy == null ? Policy.ALL_MINUS_1 : policy, clients.length - _pool.getWriteOnlyEVCacheClients().length, _appName); + final EVCacheLatchImpl latch = new EVCacheLatchImpl(policy == null ? Policy.ALL_MINUS_1 : policy, getLatchCount(clients), _appName); touchData(evcKey, timeToLive, clients, latch); if (event != null) { @@ -1408,7 +1438,7 @@ public EVCacheLatch touch(String key, int timeToLive, Policy policy) throws } private void touchData(EVCacheKey evcKey, int timeToLive) throws Exception { - final EVCacheClient[] clients = _pool.getEVCacheClientForWrite(); + final EVCacheClient[] clients = getClientsForWrite(); touchData(evcKey, timeToLive, clients); } @@ -1674,82 +1704,92 @@ private Map getBulk(final Collection keys, Transcoder final boolean hasZF = hasZoneFallbackForBulk(); boolean throwEx = hasZF ? false : throwExc; Map retMap = getBulkData(client, evcKeys, tc, throwEx, hasZF); - List fbClients = null; - if (hasZF) { - if (retMap == null || retMap.isEmpty()) { - fbClients = _pool.getEVCacheClientsForReadExcluding(client.getServerGroup()); - if (fbClients != null && !fbClients.isEmpty()) { - for (int i = 0; i < fbClients.size(); i++) { - final EVCacheClient fbClient = fbClients.get(i); - if(i >= fbClients.size() - 1) throwEx = throwExc; - if (event != null) { - try { - if (shouldThrottle(event)) { + // In case of router, we don't need to retry from client (router.shouldRetry = false) + // Retries will be handled by the router + if (clientReadRetry.get()) { + List fbClients = null; + if (hasZF) { + if (retMap == null || retMap.isEmpty()) { + fbClients = _pool.getEVCacheClientsForReadExcluding(client.getServerGroup()); + if (fbClients != null && !fbClients.isEmpty()) { + for (int i = 0; i < fbClients.size(); i++) { + final EVCacheClient fbClient = fbClients.get(i); + if (i >= fbClients.size() - 1) throwEx = throwExc; + if (event != null) { + try { + if (shouldThrottle(event)) { + status = EVCacheMetricsFactory.THROTTLED; + if (throwExc) + throw new EVCacheException("Request Throttled for app " + _appName + " & key " + evcKeys); + return null; + } + } catch (EVCacheException ex) { + if (throwExc) throw ex; status = EVCacheMetricsFactory.THROTTLED; - if (throwExc) throw new EVCacheException("Request Throttled for app " + _appName + " & key " + evcKeys); return null; } - } catch(EVCacheException ex) { - if(throwExc) throw ex; - status = EVCacheMetricsFactory.THROTTLED; - return null; } + tries++; + retMap = getBulkData(fbClient, evcKeys, tc, throwEx, (i < fbClients.size() - 1) ? true : false); + if (log.isDebugEnabled() && shouldLog()) + log.debug("Fallback for APP " + _appName + ", key [" + evcKeys + (log.isTraceEnabled() ? "], Value [" + retMap : "") + "], zone : " + fbClient.getZone()); + if (retMap != null && !retMap.isEmpty()) break; } - tries++; - retMap = getBulkData(fbClient, evcKeys, tc, throwEx, (i < fbClients.size() - 1) ? true : false); - if (log.isDebugEnabled() && shouldLog()) log.debug("Fallback for APP " + _appName + ", key [" + evcKeys + (log.isTraceEnabled() ? "], Value [" + retMap : "") + "], zone : " + fbClient.getZone()); - if (retMap != null && !retMap.isEmpty()) break; + //increment("BULK-FULL_RETRY-" + ((retMap == null || retMap.isEmpty()) ? "MISS" : "HIT")); } - //increment("BULK-FULL_RETRY-" + ((retMap == null || retMap.isEmpty()) ? "MISS" : "HIT")); - } - } else if (retMap != null && keys.size() > retMap.size() && _bulkPartialZoneFallbackFP.get()) { - final int initRetrySize = keys.size() - retMap.size(); - List retryEVCacheKeys = new ArrayList(initRetrySize); - for (Iterator keysItr = evcKeys.iterator(); keysItr.hasNext();) { - final EVCacheKey key = keysItr.next(); - if (!retMap.containsKey(key)) { - retryEVCacheKeys.add(key); + } else if (retMap != null && keys.size() > retMap.size() && _bulkPartialZoneFallbackFP.get()) { + final int initRetrySize = keys.size() - retMap.size(); + List retryEVCacheKeys = new ArrayList(initRetrySize); + for (Iterator keysItr = evcKeys.iterator(); keysItr.hasNext(); ) { + final EVCacheKey key = keysItr.next(); + if (!retMap.containsKey(key)) { + retryEVCacheKeys.add(key); + } } - } - fbClients = _pool.getEVCacheClientsForReadExcluding(client.getServerGroup()); - if (fbClients != null && !fbClients.isEmpty()) { - for (int ind = 0; ind < fbClients.size(); ind++) { - final EVCacheClient fbClient = fbClients.get(ind); - if (event != null) { - try { - if (shouldThrottle(event)) { + fbClients = _pool.getEVCacheClientsForReadExcluding(client.getServerGroup()); + if (fbClients != null && !fbClients.isEmpty()) { + for (int ind = 0; ind < fbClients.size(); ind++) { + final EVCacheClient fbClient = fbClients.get(ind); + if (event != null) { + try { + if (shouldThrottle(event)) { + status = EVCacheMetricsFactory.THROTTLED; + if (throwExc) + throw new EVCacheException("Request Throttled for app " + _appName + " & keys " + retryEVCacheKeys); + return null; + } + } catch (EVCacheException ex) { status = EVCacheMetricsFactory.THROTTLED; - if (throwExc) throw new EVCacheException("Request Throttled for app " + _appName + " & keys " + retryEVCacheKeys); + if (throwExc) throw ex; return null; } - } catch(EVCacheException ex) { - status = EVCacheMetricsFactory.THROTTLED; - if(throwExc) throw ex; - return null; } - } - tries++; - - final Map fbRetMap = getBulkData(fbClient, retryEVCacheKeys, tc, false, hasZF); - if (log.isDebugEnabled() && shouldLog()) log.debug("Fallback for APP " + _appName + ", key [" + retryEVCacheKeys + "], Fallback Server Group : " + fbClient .getServerGroup().getName()); - for (Map.Entry i : fbRetMap.entrySet()) { - retMap.put(i.getKey(), i.getValue()); - if (log.isDebugEnabled() && shouldLog()) log.debug("Fallback for APP " + _appName + ", key [" + i.getKey() + (log.isTraceEnabled() ? "], Value [" + i.getValue(): "]")); - } - if (retryEVCacheKeys.size() == fbRetMap.size()) break; - if (ind < fbClients.size()) { - retryEVCacheKeys = new ArrayList(keys.size() - retMap.size()); - for (Iterator keysItr = evcKeys.iterator(); keysItr.hasNext();) { - final EVCacheKey key = keysItr.next(); - if (!retMap.containsKey(key)) { - retryEVCacheKeys.add(key); + tries++; + + final Map fbRetMap = getBulkData(fbClient, retryEVCacheKeys, tc, false, hasZF); + if (log.isDebugEnabled() && shouldLog()) + log.debug("Fallback for APP " + _appName + ", key [" + retryEVCacheKeys + "], Fallback Server Group : " + fbClient.getServerGroup().getName()); + for (Map.Entry i : fbRetMap.entrySet()) { + retMap.put(i.getKey(), i.getValue()); + if (log.isDebugEnabled() && shouldLog()) + log.debug("Fallback for APP " + _appName + ", key [" + i.getKey() + (log.isTraceEnabled() ? "], Value [" + i.getValue() : "]")); + } + if (retryEVCacheKeys.size() == fbRetMap.size()) break; + if (ind < fbClients.size()) { + retryEVCacheKeys = new ArrayList(keys.size() - retMap.size()); + for (Iterator keysItr = evcKeys.iterator(); keysItr.hasNext(); ) { + final EVCacheKey key = keysItr.next(); + if (!retMap.containsKey(key)) { + retryEVCacheKeys.add(key); + } } } } } + if (log.isDebugEnabled() && shouldLog() && retMap.size() == keys.size()) + log.debug("Fallback SUCCESS for APP " + _appName + ", retMap [" + retMap + "]"); } - if (log.isDebugEnabled() && shouldLog() && retMap.size() == keys.size()) log.debug("Fallback SUCCESS for APP " + _appName + ", retMap [" + retMap + "]"); } } @@ -1887,8 +1927,8 @@ public EVCacheLatch set(String key, T value, Transcoder tc, EVCacheLatch. } public EVCacheLatch set(String key, T value, Transcoder tc, int timeToLive, Policy policy) throws EVCacheException { - EVCacheClient[] clients = _pool.getEVCacheClientForWrite(); - return this.set(key, value, tc, timeToLive, policy, clients, clients.length - _pool.getWriteOnlyEVCacheClients().length); + final EVCacheClient[] clients = getClientsForWrite(); + return this.set(key, value, tc, timeToLive, policy, clients, getLatchCount(clients)); } protected EVCacheLatch set(String key, T value, Transcoder tc, int timeToLive, Policy policy, EVCacheClient[] clients, int latchCount) throws EVCacheException { @@ -1985,7 +2025,7 @@ public EVCacheFuture[] append(String key, T value, Transcoder tc, int tim checkTTL(timeToLive, Call.APPEND); final boolean throwExc = doThrowException(); - final EVCacheClient[] clients = _pool.getEVCacheClientForWrite(); + final EVCacheClient[] clients = getClientsForWrite(); if (clients.length == 0) { incrementFastFail(EVCacheMetricsFactory.NULL_CLIENT, Call.APPEND); if (throwExc) throw new EVCacheException("Could not find a client to set the data"); @@ -2105,7 +2145,7 @@ protected EVCacheLatch deleteInternal(String key, Policy policy, boolean isO if (key == null) throw new IllegalArgumentException("Key cannot be null"); final boolean throwExc = doThrowException(); - final EVCacheClient[] clients = _pool.getEVCacheClientForWrite(); + final EVCacheClient[] clients = getClientsForWrite(); if (clients.length == 0) { incrementFastFail(EVCacheMetricsFactory.NULL_CLIENT, Call.DELETE); if (throwExc) throw new EVCacheException("Could not find a client to delete the keyAPP " + _appName @@ -2133,7 +2173,7 @@ protected EVCacheLatch deleteInternal(String key, Policy policy, boolean isO String status = EVCacheMetricsFactory.SUCCESS; final long start = EVCacheMetricsFactory.getInstance().getRegistry().clock().wallTime(); - final EVCacheLatchImpl latch = new EVCacheLatchImpl(policy == null ? Policy.ALL_MINUS_1 : policy, clients.length - _pool.getWriteOnlyEVCacheClients().length, _appName); + final EVCacheLatchImpl latch = new EVCacheLatchImpl(policy == null ? Policy.ALL_MINUS_1 : policy, getLatchCount(clients), _appName); try { for (int i = 0; i < clients.length; i++) { Future future = clients[i].delete(isOriginalKeyHashed ? evcKey.getKey() : evcKey.getDerivedKey(clients[i].isDuetClient(), clients[i].getHashingAlgorithm(), clients[i].shouldEncodeHashKey(), clients[i].getMaxDigestBytes(), clients[i].getMaxHashLength(), clients[i].getBaseEncoder()), latch); @@ -2172,12 +2212,27 @@ public int getDefaultTTL() { return _timeToLive; } + private EVCacheClient[] getClientsForWrite() { + EVCacheClient[] clients; + if (clientWriteToAllReplicas.get()) { + clients = _pool.getEVCacheClientForWrite(); + } else { + EVCacheClient client = _pool.getEVCacheClientForRead(); + if (client == null) { + clients = new EVCacheClient[]{}; + } else { + clients = new EVCacheClient[]{client}; + } + } + return clients; + } + public long incr(String key, long by, long defaultVal, int timeToLive) throws EVCacheException { if ((null == key) || by < 0 || defaultVal < 0 || timeToLive < 0) throw new IllegalArgumentException(); checkTTL(timeToLive, Call.INCR); final boolean throwExc = doThrowException(); - final EVCacheClient[] clients = _pool.getEVCacheClientForWrite(); + final EVCacheClient[] clients = getClientsForWrite(); if (clients.length == 0) { incrementFastFail(EVCacheMetricsFactory.NULL_CLIENT, Call.INCR); if (log.isDebugEnabled() && shouldLog()) log.debug("INCR : " + _metricPrefix + ":NULL_CLIENT"); @@ -2261,7 +2316,7 @@ public long decr(String key, long by, long defaultVal, int timeToLive) throws EV checkTTL(timeToLive, Call.DECR); final boolean throwExc = doThrowException(); - final EVCacheClient[] clients = _pool.getEVCacheClientForWrite(); + final EVCacheClient[] clients = getClientsForWrite(); if (clients.length == 0) { incrementFastFail(EVCacheMetricsFactory.NULL_CLIENT, Call.DECR); if (log.isDebugEnabled() && shouldLog()) log.debug("DECR : " + _metricPrefix + ":NULL_CLIENT"); @@ -2356,6 +2411,14 @@ public EVCacheLatch replace(String key, T value, int timeToLive, Policy pol return replace(key, value, (Transcoder)_transcoder, timeToLive, policy); } + private int getLatchCount(EVCacheClient[] clients) { + if (clientWriteToAllReplicas.get()) { + return clients.length - _pool.getWriteOnlyEVCacheClients().length; + } else { + return clients.length; + } + } + @Override public EVCacheLatch replace(String key, T value, Transcoder tc, int timeToLive, Policy policy) throws EVCacheException { @@ -2363,7 +2426,7 @@ public EVCacheLatch replace(String key, T value, Transcoder tc, int timeT checkTTL(timeToLive, Call.REPLACE); final boolean throwExc = doThrowException(); - final EVCacheClient[] clients = _pool.getEVCacheClientForWrite(); + final EVCacheClient[] clients = getClientsForWrite(); if (clients.length == 0) { incrementFastFail(EVCacheMetricsFactory.NULL_CLIENT, Call.REPLACE); if (throwExc) throw new EVCacheException("Could not find a client to set the data"); @@ -2390,7 +2453,7 @@ public EVCacheLatch replace(String key, T value, Transcoder tc, int timeT final long start = EVCacheMetricsFactory.getInstance().getRegistry().clock().wallTime(); String status = EVCacheMetricsFactory.SUCCESS; - final EVCacheLatchImpl latch = new EVCacheLatchImpl(policy == null ? Policy.ALL_MINUS_1 : policy, clients.length - _pool.getWriteOnlyEVCacheClients().length, _appName); + final EVCacheLatchImpl latch = new EVCacheLatchImpl(policy == null ? Policy.ALL_MINUS_1 : policy,getLatchCount(clients) , _appName); try { final EVCacheFuture[] futures = new EVCacheFuture[clients.length]; CachedData cd = null; @@ -2456,7 +2519,7 @@ public EVCacheLatch appendOrAdd(String key, T value, Transcoder tc, int t checkTTL(timeToLive, Call.APPEND_OR_ADD); final boolean throwExc = doThrowException(); - final EVCacheClient[] clients = _pool.getEVCacheClientForWrite(); + final EVCacheClient[] clients = getClientsForWrite(); if (clients.length == 0) { incrementFastFail(EVCacheMetricsFactory.NULL_CLIENT, Call.APPEND_OR_ADD); if (throwExc) throw new EVCacheException("Could not find a client to appendOrAdd the data"); @@ -2481,7 +2544,7 @@ public EVCacheLatch appendOrAdd(String key, T value, Transcoder tc, int t startEvent(event); } final long start = EVCacheMetricsFactory.getInstance().getRegistry().clock().wallTime(); - final EVCacheLatchImpl latch = new EVCacheLatchImpl(policy == null ? Policy.ALL_MINUS_1 : policy, clients.length - _pool.getWriteOnlyEVCacheClients().length, _appName); + final EVCacheLatchImpl latch = new EVCacheLatchImpl(policy == null ? Policy.ALL_MINUS_1 : policy, getLatchCount(clients), _appName); String status = EVCacheMetricsFactory.SUCCESS; try { CachedData cd = null; @@ -2560,8 +2623,8 @@ public boolean add(String key, T value, Transcoder tc, int timeToLive) th @Override public EVCacheLatch add(String key, T value, Transcoder tc, int timeToLive, Policy policy) throws EVCacheException { - EVCacheClient[] clients = _pool.getEVCacheClientForWrite(); - return this.add(key, value, tc, timeToLive, policy, clients, clients.length - _pool.getWriteOnlyEVCacheClients().length); + final EVCacheClient[] clients = getClientsForWrite(); + return this.add(key, value, tc, timeToLive, policy, clients, getLatchCount(clients)); } protected EVCacheLatch add(String key, T value, Transcoder tc, int timeToLive, Policy policy, EVCacheClient[] clients, int latchCount) throws EVCacheException {