Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

router client changes for read retries #128

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
router write changes. No writing to all replicas (#129)
Co-authored-by: Sriram Rangarajan <srrangarajan@netflix.com>
  • Loading branch information
sriram-rangarajan and srrangarajan authored Jul 13, 2022
commit e117ebe35dc7c35d476e0d4fb2e1a5a756e34700
57 changes: 41 additions & 16 deletions evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java
Original file line number Diff line number Diff line change
@@ -96,6 +96,7 @@ public class EVCacheImpl implements EVCache, EVCacheImplMBean {
private final EVCacheTranscoder evcacheValueTranscoder;
private final Property<Integer> maxReadDuration, maxWriteDuration;
private final Property<Boolean> clientReadRetry;
private final Property<Boolean> clientWriteToAllReplicas;

protected final EVCacheClientPoolManager _poolManager;
private final Map<String, Timer> timerMap = new ConcurrentHashMap<String, Timer>();
@@ -171,6 +172,7 @@ public class EVCacheImpl implements EVCache, EVCacheImplMBean {
});

this.clientReadRetry = propertyRepository.get(appName + ".router.client.read.shouldRetry", Boolean.class).orElse(true);
this.clientWriteToAllReplicas = propertyRepository.get(appName + ".router.client.write.shouldWriteToAllReplicas", Boolean.class).orElse(true);

_pool.pingServers();

@@ -1377,7 +1379,7 @@ public <T> EVCacheLatch touch(String key, int timeToLive, Policy policy) throws
checkTTL(timeToLive, Call.TOUCH);

final boolean throwExc = doThrowException();
final EVCacheClient[] clients = _pool.getEVCacheClientForWrite();
final EVCacheClient[] clients = getClientsForWrite();
if (clients.length == 0) {
incrementFastFail(EVCacheMetricsFactory.NULL_CLIENT, Call.TOUCH);
if (throwExc) throw new EVCacheException("Could not find a client to set the data");
@@ -1405,7 +1407,7 @@ public <T> EVCacheLatch touch(String key, int timeToLive, Policy policy) throws
String status = EVCacheMetricsFactory.SUCCESS;
final long start = EVCacheMetricsFactory.getInstance().getRegistry().clock().wallTime();
try {
final EVCacheLatchImpl latch = new EVCacheLatchImpl(policy == null ? Policy.ALL_MINUS_1 : policy, clients.length - _pool.getWriteOnlyEVCacheClients().length, _appName);
final EVCacheLatchImpl latch = new EVCacheLatchImpl(policy == null ? Policy.ALL_MINUS_1 : policy, getLatchCount(clients), _appName);
touchData(evcKey, timeToLive, clients, latch);

if (event != null) {
@@ -1436,7 +1438,7 @@ public <T> EVCacheLatch touch(String key, int timeToLive, Policy policy) throws
}

private void touchData(EVCacheKey evcKey, int timeToLive) throws Exception {
final EVCacheClient[] clients = _pool.getEVCacheClientForWrite();
final EVCacheClient[] clients = getClientsForWrite();
touchData(evcKey, timeToLive, clients);
}

@@ -1925,8 +1927,8 @@ public <T> EVCacheLatch set(String key, T value, Transcoder<T> tc, EVCacheLatch.
}

public <T> EVCacheLatch set(String key, T value, Transcoder<T> tc, int timeToLive, Policy policy) throws EVCacheException {
EVCacheClient[] clients = _pool.getEVCacheClientForWrite();
return this.set(key, value, tc, timeToLive, policy, clients, clients.length - _pool.getWriteOnlyEVCacheClients().length);
final EVCacheClient[] clients = getClientsForWrite();
return this.set(key, value, tc, timeToLive, policy, clients, getLatchCount(clients));
}

protected <T> EVCacheLatch set(String key, T value, Transcoder<T> tc, int timeToLive, Policy policy, EVCacheClient[] clients, int latchCount) throws EVCacheException {
@@ -2023,7 +2025,7 @@ public <T> EVCacheFuture[] append(String key, T value, Transcoder<T> tc, int tim
checkTTL(timeToLive, Call.APPEND);

final boolean throwExc = doThrowException();
final EVCacheClient[] clients = _pool.getEVCacheClientForWrite();
final EVCacheClient[] clients = getClientsForWrite();
if (clients.length == 0) {
incrementFastFail(EVCacheMetricsFactory.NULL_CLIENT, Call.APPEND);
if (throwExc) throw new EVCacheException("Could not find a client to set the data");
@@ -2143,7 +2145,7 @@ protected <T> EVCacheLatch deleteInternal(String key, Policy policy, boolean isO
if (key == null) throw new IllegalArgumentException("Key cannot be null");

final boolean throwExc = doThrowException();
final EVCacheClient[] clients = _pool.getEVCacheClientForWrite();
final EVCacheClient[] clients = getClientsForWrite();
if (clients.length == 0) {
incrementFastFail(EVCacheMetricsFactory.NULL_CLIENT, Call.DELETE);
if (throwExc) throw new EVCacheException("Could not find a client to delete the keyAPP " + _appName
@@ -2171,7 +2173,7 @@ protected <T> EVCacheLatch deleteInternal(String key, Policy policy, boolean isO

String status = EVCacheMetricsFactory.SUCCESS;
final long start = EVCacheMetricsFactory.getInstance().getRegistry().clock().wallTime();
final EVCacheLatchImpl latch = new EVCacheLatchImpl(policy == null ? Policy.ALL_MINUS_1 : policy, clients.length - _pool.getWriteOnlyEVCacheClients().length, _appName);
final EVCacheLatchImpl latch = new EVCacheLatchImpl(policy == null ? Policy.ALL_MINUS_1 : policy, getLatchCount(clients), _appName);
try {
for (int i = 0; i < clients.length; i++) {
Future<Boolean> future = clients[i].delete(isOriginalKeyHashed ? evcKey.getKey() : evcKey.getDerivedKey(clients[i].isDuetClient(), clients[i].getHashingAlgorithm(), clients[i].shouldEncodeHashKey(), clients[i].getMaxDigestBytes(), clients[i].getMaxHashLength(), clients[i].getBaseEncoder()), latch);
@@ -2210,12 +2212,27 @@ public int getDefaultTTL() {
return _timeToLive;
}

private EVCacheClient[] getClientsForWrite() {
EVCacheClient[] clients;
if (clientWriteToAllReplicas.get()) {
clients = _pool.getEVCacheClientForWrite();
} else {
EVCacheClient client = _pool.getEVCacheClientForRead();
if (client == null) {
clients = new EVCacheClient[]{};
} else {
clients = new EVCacheClient[]{client};
}
}
return clients;
}

public long incr(String key, long by, long defaultVal, int timeToLive) throws EVCacheException {
if ((null == key) || by < 0 || defaultVal < 0 || timeToLive < 0) throw new IllegalArgumentException();
checkTTL(timeToLive, Call.INCR);

final boolean throwExc = doThrowException();
final EVCacheClient[] clients = _pool.getEVCacheClientForWrite();
final EVCacheClient[] clients = getClientsForWrite();
if (clients.length == 0) {
incrementFastFail(EVCacheMetricsFactory.NULL_CLIENT, Call.INCR);
if (log.isDebugEnabled() && shouldLog()) log.debug("INCR : " + _metricPrefix + ":NULL_CLIENT");
@@ -2299,7 +2316,7 @@ public long decr(String key, long by, long defaultVal, int timeToLive) throws EV
checkTTL(timeToLive, Call.DECR);

final boolean throwExc = doThrowException();
final EVCacheClient[] clients = _pool.getEVCacheClientForWrite();
final EVCacheClient[] clients = getClientsForWrite();
if (clients.length == 0) {
incrementFastFail(EVCacheMetricsFactory.NULL_CLIENT, Call.DECR);
if (log.isDebugEnabled() && shouldLog()) log.debug("DECR : " + _metricPrefix + ":NULL_CLIENT");
@@ -2394,14 +2411,22 @@ public <T> EVCacheLatch replace(String key, T value, int timeToLive, Policy pol
return replace(key, value, (Transcoder<T>)_transcoder, timeToLive, policy);
}

private int getLatchCount(EVCacheClient[] clients) {
if (clientWriteToAllReplicas.get()) {
return clients.length - _pool.getWriteOnlyEVCacheClients().length;
} else {
return clients.length;
}
}

@Override
public <T> EVCacheLatch replace(String key, T value, Transcoder<T> tc, int timeToLive, Policy policy)
throws EVCacheException {
if ((null == key) || (null == value)) throw new IllegalArgumentException();
checkTTL(timeToLive, Call.REPLACE);

final boolean throwExc = doThrowException();
final EVCacheClient[] clients = _pool.getEVCacheClientForWrite();
final EVCacheClient[] clients = getClientsForWrite();
if (clients.length == 0) {
incrementFastFail(EVCacheMetricsFactory.NULL_CLIENT, Call.REPLACE);
if (throwExc) throw new EVCacheException("Could not find a client to set the data");
@@ -2428,7 +2453,7 @@ public <T> EVCacheLatch replace(String key, T value, Transcoder<T> tc, int timeT

final long start = EVCacheMetricsFactory.getInstance().getRegistry().clock().wallTime();
String status = EVCacheMetricsFactory.SUCCESS;
final EVCacheLatchImpl latch = new EVCacheLatchImpl(policy == null ? Policy.ALL_MINUS_1 : policy, clients.length - _pool.getWriteOnlyEVCacheClients().length, _appName);
final EVCacheLatchImpl latch = new EVCacheLatchImpl(policy == null ? Policy.ALL_MINUS_1 : policy,getLatchCount(clients) , _appName);
try {
final EVCacheFuture[] futures = new EVCacheFuture[clients.length];
CachedData cd = null;
@@ -2494,7 +2519,7 @@ public <T> EVCacheLatch appendOrAdd(String key, T value, Transcoder<T> tc, int t
checkTTL(timeToLive, Call.APPEND_OR_ADD);

final boolean throwExc = doThrowException();
final EVCacheClient[] clients = _pool.getEVCacheClientForWrite();
final EVCacheClient[] clients = getClientsForWrite();
if (clients.length == 0) {
incrementFastFail(EVCacheMetricsFactory.NULL_CLIENT, Call.APPEND_OR_ADD);
if (throwExc) throw new EVCacheException("Could not find a client to appendOrAdd the data");
@@ -2519,7 +2544,7 @@ public <T> EVCacheLatch appendOrAdd(String key, T value, Transcoder<T> tc, int t
startEvent(event);
}
final long start = EVCacheMetricsFactory.getInstance().getRegistry().clock().wallTime();
final EVCacheLatchImpl latch = new EVCacheLatchImpl(policy == null ? Policy.ALL_MINUS_1 : policy, clients.length - _pool.getWriteOnlyEVCacheClients().length, _appName);
final EVCacheLatchImpl latch = new EVCacheLatchImpl(policy == null ? Policy.ALL_MINUS_1 : policy, getLatchCount(clients), _appName);
String status = EVCacheMetricsFactory.SUCCESS;
try {
CachedData cd = null;
@@ -2598,8 +2623,8 @@ public <T> boolean add(String key, T value, Transcoder<T> tc, int timeToLive) th

@Override
public <T> EVCacheLatch add(String key, T value, Transcoder<T> tc, int timeToLive, Policy policy) throws EVCacheException {
EVCacheClient[] clients = _pool.getEVCacheClientForWrite();
return this.add(key, value, tc, timeToLive, policy, clients, clients.length - _pool.getWriteOnlyEVCacheClients().length);
final EVCacheClient[] clients = getClientsForWrite();
return this.add(key, value, tc, timeToLive, policy, clients, getLatchCount(clients));
}

protected <T> EVCacheLatch add(String key, T value, Transcoder<T> tc, int timeToLive, Policy policy, EVCacheClient[] clients, int latchCount) throws EVCacheException {