Skip to content

Commit

Permalink
Merge branch 'router_read_changes' of github.com:Netflix/EVCache into…
Browse files Browse the repository at this point in the history
… router_read_changes
  • Loading branch information
srrangarajan committed Jul 13, 2022
2 parents b5d238d + e117ebe commit cedf50e
Showing 1 changed file with 41 additions and 16 deletions.
57 changes: 41 additions & 16 deletions evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ public class EVCacheImpl implements EVCache, EVCacheImplMBean {
private final EVCacheTranscoder evcacheValueTranscoder;
private final Property<Integer> maxReadDuration, maxWriteDuration;
private final Property<Boolean> clientReadRetry;
private final Property<Boolean> clientWriteToAllReplicas;

protected final EVCacheClientPoolManager _poolManager;
private final Map<String, Timer> timerMap = new ConcurrentHashMap<String, Timer>();
Expand Down Expand Up @@ -171,6 +172,7 @@ public class EVCacheImpl implements EVCache, EVCacheImplMBean {
});

this.clientReadRetry = propertyRepository.get(appName + ".router.client.read.shouldRetry", Boolean.class).orElse(true);
this.clientWriteToAllReplicas = propertyRepository.get(appName + ".router.client.write.shouldWriteToAllReplicas", Boolean.class).orElse(true);

_pool.pingServers();

Expand Down Expand Up @@ -1377,7 +1379,7 @@ public <T> EVCacheLatch touch(String key, int timeToLive, Policy policy) throws
checkTTL(timeToLive, Call.TOUCH);

final boolean throwExc = doThrowException();
final EVCacheClient[] clients = _pool.getEVCacheClientForWrite();
final EVCacheClient[] clients = getClientsForWrite();
if (clients.length == 0) {
incrementFastFail(EVCacheMetricsFactory.NULL_CLIENT, Call.TOUCH);
if (throwExc) throw new EVCacheException("Could not find a client to set the data");
Expand Down Expand Up @@ -1405,7 +1407,7 @@ public <T> EVCacheLatch touch(String key, int timeToLive, Policy policy) throws
String status = EVCacheMetricsFactory.SUCCESS;
final long start = EVCacheMetricsFactory.getInstance().getRegistry().clock().wallTime();
try {
final EVCacheLatchImpl latch = new EVCacheLatchImpl(policy == null ? Policy.ALL_MINUS_1 : policy, clients.length - _pool.getWriteOnlyEVCacheClients().length, _appName);
final EVCacheLatchImpl latch = new EVCacheLatchImpl(policy == null ? Policy.ALL_MINUS_1 : policy, getLatchCount(clients), _appName);
touchData(evcKey, timeToLive, clients, latch);

if (event != null) {
Expand Down Expand Up @@ -1436,7 +1438,7 @@ public <T> EVCacheLatch touch(String key, int timeToLive, Policy policy) throws
}

private void touchData(EVCacheKey evcKey, int timeToLive) throws Exception {
final EVCacheClient[] clients = _pool.getEVCacheClientForWrite();
final EVCacheClient[] clients = getClientsForWrite();
touchData(evcKey, timeToLive, clients);
}

Expand Down Expand Up @@ -1925,8 +1927,8 @@ public <T> EVCacheLatch set(String key, T value, Transcoder<T> tc, EVCacheLatch.
}

public <T> EVCacheLatch set(String key, T value, Transcoder<T> tc, int timeToLive, Policy policy) throws EVCacheException {
EVCacheClient[] clients = _pool.getEVCacheClientForWrite();
return this.set(key, value, tc, timeToLive, policy, clients, clients.length - _pool.getWriteOnlyEVCacheClients().length);
final EVCacheClient[] clients = getClientsForWrite();
return this.set(key, value, tc, timeToLive, policy, clients, getLatchCount(clients));
}

protected <T> EVCacheLatch set(String key, T value, Transcoder<T> tc, int timeToLive, Policy policy, EVCacheClient[] clients, int latchCount) throws EVCacheException {
Expand Down Expand Up @@ -2023,7 +2025,7 @@ public <T> EVCacheFuture[] append(String key, T value, Transcoder<T> tc, int tim
checkTTL(timeToLive, Call.APPEND);

final boolean throwExc = doThrowException();
final EVCacheClient[] clients = _pool.getEVCacheClientForWrite();
final EVCacheClient[] clients = getClientsForWrite();
if (clients.length == 0) {
incrementFastFail(EVCacheMetricsFactory.NULL_CLIENT, Call.APPEND);
if (throwExc) throw new EVCacheException("Could not find a client to set the data");
Expand Down Expand Up @@ -2143,7 +2145,7 @@ protected <T> EVCacheLatch deleteInternal(String key, Policy policy, boolean isO
if (key == null) throw new IllegalArgumentException("Key cannot be null");

final boolean throwExc = doThrowException();
final EVCacheClient[] clients = _pool.getEVCacheClientForWrite();
final EVCacheClient[] clients = getClientsForWrite();
if (clients.length == 0) {
incrementFastFail(EVCacheMetricsFactory.NULL_CLIENT, Call.DELETE);
if (throwExc) throw new EVCacheException("Could not find a client to delete the keyAPP " + _appName
Expand Down Expand Up @@ -2171,7 +2173,7 @@ protected <T> EVCacheLatch deleteInternal(String key, Policy policy, boolean isO

String status = EVCacheMetricsFactory.SUCCESS;
final long start = EVCacheMetricsFactory.getInstance().getRegistry().clock().wallTime();
final EVCacheLatchImpl latch = new EVCacheLatchImpl(policy == null ? Policy.ALL_MINUS_1 : policy, clients.length - _pool.getWriteOnlyEVCacheClients().length, _appName);
final EVCacheLatchImpl latch = new EVCacheLatchImpl(policy == null ? Policy.ALL_MINUS_1 : policy, getLatchCount(clients), _appName);
try {
for (int i = 0; i < clients.length; i++) {
Future<Boolean> future = clients[i].delete(isOriginalKeyHashed ? evcKey.getKey() : evcKey.getDerivedKey(clients[i].isDuetClient(), clients[i].getHashingAlgorithm(), clients[i].shouldEncodeHashKey(), clients[i].getMaxDigestBytes(), clients[i].getMaxHashLength(), clients[i].getBaseEncoder()), latch);
Expand Down Expand Up @@ -2210,12 +2212,27 @@ public int getDefaultTTL() {
return _timeToLive;
}

private EVCacheClient[] getClientsForWrite() {
EVCacheClient[] clients;
if (clientWriteToAllReplicas.get()) {
clients = _pool.getEVCacheClientForWrite();
} else {
EVCacheClient client = _pool.getEVCacheClientForRead();
if (client == null) {
clients = new EVCacheClient[]{};
} else {
clients = new EVCacheClient[]{client};
}
}
return clients;
}

public long incr(String key, long by, long defaultVal, int timeToLive) throws EVCacheException {
if ((null == key) || by < 0 || defaultVal < 0 || timeToLive < 0) throw new IllegalArgumentException();
checkTTL(timeToLive, Call.INCR);

final boolean throwExc = doThrowException();
final EVCacheClient[] clients = _pool.getEVCacheClientForWrite();
final EVCacheClient[] clients = getClientsForWrite();
if (clients.length == 0) {
incrementFastFail(EVCacheMetricsFactory.NULL_CLIENT, Call.INCR);
if (log.isDebugEnabled() && shouldLog()) log.debug("INCR : " + _metricPrefix + ":NULL_CLIENT");
Expand Down Expand Up @@ -2299,7 +2316,7 @@ public long decr(String key, long by, long defaultVal, int timeToLive) throws EV
checkTTL(timeToLive, Call.DECR);

final boolean throwExc = doThrowException();
final EVCacheClient[] clients = _pool.getEVCacheClientForWrite();
final EVCacheClient[] clients = getClientsForWrite();
if (clients.length == 0) {
incrementFastFail(EVCacheMetricsFactory.NULL_CLIENT, Call.DECR);
if (log.isDebugEnabled() && shouldLog()) log.debug("DECR : " + _metricPrefix + ":NULL_CLIENT");
Expand Down Expand Up @@ -2394,14 +2411,22 @@ public <T> EVCacheLatch replace(String key, T value, int timeToLive, Policy pol
return replace(key, value, (Transcoder<T>)_transcoder, timeToLive, policy);
}

private int getLatchCount(EVCacheClient[] clients) {
if (clientWriteToAllReplicas.get()) {
return clients.length - _pool.getWriteOnlyEVCacheClients().length;
} else {
return clients.length;
}
}

@Override
public <T> EVCacheLatch replace(String key, T value, Transcoder<T> tc, int timeToLive, Policy policy)
throws EVCacheException {
if ((null == key) || (null == value)) throw new IllegalArgumentException();
checkTTL(timeToLive, Call.REPLACE);

final boolean throwExc = doThrowException();
final EVCacheClient[] clients = _pool.getEVCacheClientForWrite();
final EVCacheClient[] clients = getClientsForWrite();
if (clients.length == 0) {
incrementFastFail(EVCacheMetricsFactory.NULL_CLIENT, Call.REPLACE);
if (throwExc) throw new EVCacheException("Could not find a client to set the data");
Expand All @@ -2428,7 +2453,7 @@ public <T> EVCacheLatch replace(String key, T value, Transcoder<T> tc, int timeT

final long start = EVCacheMetricsFactory.getInstance().getRegistry().clock().wallTime();
String status = EVCacheMetricsFactory.SUCCESS;
final EVCacheLatchImpl latch = new EVCacheLatchImpl(policy == null ? Policy.ALL_MINUS_1 : policy, clients.length - _pool.getWriteOnlyEVCacheClients().length, _appName);
final EVCacheLatchImpl latch = new EVCacheLatchImpl(policy == null ? Policy.ALL_MINUS_1 : policy,getLatchCount(clients) , _appName);
try {
final EVCacheFuture[] futures = new EVCacheFuture[clients.length];
CachedData cd = null;
Expand Down Expand Up @@ -2494,7 +2519,7 @@ public <T> EVCacheLatch appendOrAdd(String key, T value, Transcoder<T> tc, int t
checkTTL(timeToLive, Call.APPEND_OR_ADD);

final boolean throwExc = doThrowException();
final EVCacheClient[] clients = _pool.getEVCacheClientForWrite();
final EVCacheClient[] clients = getClientsForWrite();
if (clients.length == 0) {
incrementFastFail(EVCacheMetricsFactory.NULL_CLIENT, Call.APPEND_OR_ADD);
if (throwExc) throw new EVCacheException("Could not find a client to appendOrAdd the data");
Expand All @@ -2519,7 +2544,7 @@ public <T> EVCacheLatch appendOrAdd(String key, T value, Transcoder<T> tc, int t
startEvent(event);
}
final long start = EVCacheMetricsFactory.getInstance().getRegistry().clock().wallTime();
final EVCacheLatchImpl latch = new EVCacheLatchImpl(policy == null ? Policy.ALL_MINUS_1 : policy, clients.length - _pool.getWriteOnlyEVCacheClients().length, _appName);
final EVCacheLatchImpl latch = new EVCacheLatchImpl(policy == null ? Policy.ALL_MINUS_1 : policy, getLatchCount(clients), _appName);
String status = EVCacheMetricsFactory.SUCCESS;
try {
CachedData cd = null;
Expand Down Expand Up @@ -2598,8 +2623,8 @@ public <T> boolean add(String key, T value, Transcoder<T> tc, int timeToLive) th

@Override
public <T> EVCacheLatch add(String key, T value, Transcoder<T> tc, int timeToLive, Policy policy) throws EVCacheException {
EVCacheClient[] clients = _pool.getEVCacheClientForWrite();
return this.add(key, value, tc, timeToLive, policy, clients, clients.length - _pool.getWriteOnlyEVCacheClients().length);
final EVCacheClient[] clients = getClientsForWrite();
return this.add(key, value, tc, timeToLive, policy, clients, getLatchCount(clients));
}

protected <T> EVCacheLatch add(String key, T value, Transcoder<T> tc, int timeToLive, Policy policy, EVCacheClient[] clients, int latchCount) throws EVCacheException {
Expand Down

0 comments on commit cedf50e

Please sign in to comment.