Skip to content

Commit

Permalink
Changed isValid to isAvailable for write operations. Added trace so w…
Browse files Browse the repository at this point in the history
…e can track which call path causes timeout
  • Loading branch information
smadappa committed Jun 14, 2016
1 parent 0265fe3 commit b3a6c49
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import com.netflix.evcache.util.EVCacheConfig;
import com.netflix.servo.monitor.Counter;
import com.netflix.servo.monitor.Stopwatch;
import com.netflix.servo.tag.BasicTag;
import com.netflix.servo.tag.Tag;

import net.spy.memcached.CASValue;
import net.spy.memcached.CachedData;
Expand Down Expand Up @@ -118,15 +120,16 @@ public ConnectionFactory getConnectionFactory() {
this.evcacheMemcachedClient.addObserver(connectionObserver);
}

private Collection<String> validateReadQueueSize(Collection<String> canonicalKeys) {
private Collection<String> validateReadQueueSize(Collection<String> canonicalKeys) throws EVCacheException {
if (evcacheMemcachedClient.getNodeLocator() == null) return canonicalKeys;
final Collection<String> retKeys = new ArrayList<>(canonicalKeys.size());
for (String key : canonicalKeys) {
final MemcachedNode node = evcacheMemcachedClient.getNodeLocator().getPrimary(key);
if (!node.isActive()) continue;

if (node instanceof EVCacheNodeImpl) {
final int size = ((EVCacheNodeImpl) node).getReadQueueSize();
final EVCacheNodeImpl evcNode = (EVCacheNodeImpl) node;
if (!evcNode.isAvailable()) continue;

final int size = evcNode.getReadQueueSize();
final boolean canAddToOpQueue = size < (maxReadQueueSize.get() * 2);
// if (log.isDebugEnabled()) log.debug("Bulk Current Read Queue
// Size - " + size + " for app " + appName + " & zone " + zone +
Expand All @@ -144,43 +147,54 @@ private Collection<String> validateReadQueueSize(Collection<String> canonicalKey
return retKeys;
}

private void ensureWriteQueueSize(MemcachedNode node, String key) throws Exception {
private boolean ensureWriteQueueSize(MemcachedNode node, String key) throws EVCacheException {
if (node instanceof EVCacheNodeImpl) {
final EVCacheNodeImpl evcNode = (EVCacheNodeImpl) node;
if (!evcNode.isAvailable()) {
EVCacheMetricsFactory.increment("EVCacheClient-" + appName + "-" + zone + "-INACTIVE_NODE");
pool.refreshAsync(evcNode);
}

long startTime = operationTimeout.get();
while (true) {
// if (!evcNode.isActive()) {
// EVCacheMetricsFactory.increment("EVCacheClient-" + appName + "-" + zone + "-INACTIVE_NODE");
// if (log.isDebugEnabled()) log.debug("Node : " + evcNode + " for app : " + appName + "; zone : "
// + zone
// + " is not active. Will Fail Fast and the write will be dropped for key : " + key);
// break;
// }
final int size = evcNode.getWriteQueueSize();
final boolean canAddToOpQueue = size < maxWriteQueueSize;
if (log.isDebugEnabled()) log.debug("App : " + appName + "; zone : " + zone + "; key : " + key
+ "; WriteQSize : " + size);
if (canAddToOpQueue) break;
EVCacheMetricsFactory.increment("EVCacheClient-" + appName + "-" + zone + "-WRITE_BLOCK");
Thread.sleep(100);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new EVCacheException("Thread was Interrupted", e);
}
if(startTime > 0) {
startTime -= 100;
} else {
Tag tag = new BasicTag("HOST", evcNode.getHostName());
EVCacheMetricsFactory.getCounter(appName, null, serverGroup.getName(), "EVCacheClient-" + appName + "-" + zone + "-INACTIVE_NODE", tag).increment();
if (log.isDebugEnabled()) log.debug("Node : " + evcNode + " for app : " + appName + "; zone : "
+ zone + " is not active. Will Fail Fast and the write will be dropped for key : " + key);
return false;
}
}
}
return true;
}

private boolean validateNode(String key, boolean _throwException) throws EVCacheException {
final MemcachedNode node = evcacheMemcachedClient.getEVCacheNode(key);
// First check if the node is active
if (!node.isActive()) {
EVCacheMetricsFactory.increment("EVCacheClient-" + appName + "-" + zone + "-INACTIVE_NODE");
if (log.isDebugEnabled()) log.debug("Node : " + node + " for app : " + appName + "; zone : " + zone
+ " is not active. Will Fail Fast so that we can fallback to Other Zone if available.");
if (_throwException) throw new EVCacheException("Connection for Node : " + node + " for app : " + appName
+ "; zone : " + zone + " is not active");
return false;
if (node instanceof EVCacheNodeImpl) {
final EVCacheNodeImpl evcNode = (EVCacheNodeImpl) node;
if (!evcNode.isAvailable()) {
EVCacheMetricsFactory.increment("EVCacheClient-" + appName + "-" + zone + "-INACTIVE_NODE");
if (log.isDebugEnabled()) log.debug("Node : " + node + " for app : " + appName + "; zone : " + zone
+ " is not active. Will Fail Fast so that we can fallback to Other Zone if available.");
if (_throwException) throw new EVCacheException("Connection for Node : " + node + " for app : " + appName
+ "; zone : " + zone + " is not active");
return false;
}
}

// now check to see if the read queue is full.
Expand Down Expand Up @@ -832,8 +846,8 @@ public <T> Map<String, T> getBulk(Collection<String> _canonicalKeys, Transcoder<

public <T> Single<Map<String, T>> getBulk(Collection<String> _canonicalKeys, Transcoder<T> tc, boolean _throwException,
boolean hasZF, Scheduler scheduler) {
final Collection<String> canonicalKeys = validateReadQueueSize(_canonicalKeys);
try {
final Collection<String> canonicalKeys = validateReadQueueSize(_canonicalKeys);
if (tc == null) tc = (Transcoder<T>) getTranscoder();
if (enableChunking.get()) {
return assembleChunks(_canonicalKeys, tc, hasZF, scheduler);
Expand All @@ -850,7 +864,7 @@ public <T> Future<Boolean> append(String key, T value) throws Exception {
if (enableChunking.get()) throw new EVCacheException(
"This operation is not supported as chunking is enabled on this EVCacheClient.");
final MemcachedNode node = evcacheMemcachedClient.getEVCacheNode(key);
if (!node.isActive()) return getDefaultFuture();
if (!ensureWriteQueueSize(node, key)) return getDefaultFuture();
return evcacheMemcachedClient.append(key, value);
}

Expand All @@ -860,7 +874,7 @@ public <T> Future<Boolean> set(String key, T value, int timeToLive) throws Excep

public <T> Future<Boolean> set(String key, T value, int timeToLive, EVCacheLatch evcacheLatch) throws Exception {
final MemcachedNode node = evcacheMemcachedClient.getEVCacheNode(key);
if (!node.isActive()) {
if (!ensureWriteQueueSize(node, key)) {
if (log.isInfoEnabled()) log.info("Node : " + node + " is not active. Failing fast and dropping the write event.");
final ListenableFuture<Boolean, OperationCompletionListener> defaultFuture = (ListenableFuture<Boolean, OperationCompletionListener>) getDefaultFuture();
if (evcacheLatch != null && evcacheLatch instanceof EVCacheLatchImpl) ((EVCacheLatchImpl) evcacheLatch)
Expand All @@ -869,7 +883,6 @@ public <T> Future<Boolean> set(String key, T value, int timeToLive, EVCacheLatch
}

try {
ensureWriteQueueSize(node, key);
int dataSize = Integer.MAX_VALUE;
if (value instanceof CachedData) {
dataSize = ((CachedData) value).getData().length;
Expand Down Expand Up @@ -905,15 +918,14 @@ public <T> Future<Boolean> set(String key, T value, int timeToLive, EVCacheLatch

public <T> Future<Boolean> appendOrAdd(String key, CachedData value, int timeToLive, EVCacheLatch evcacheLatch) throws Exception {
final MemcachedNode node = evcacheMemcachedClient.getEVCacheNode(key);
if (!node.isActive()) {
if (!ensureWriteQueueSize(node, key)) {
if (log.isInfoEnabled()) log.info("Node : " + node + " is not active. Failing fast and dropping the write event.");
final ListenableFuture<Boolean, OperationCompletionListener> defaultFuture = (ListenableFuture<Boolean, OperationCompletionListener>) getDefaultFuture();
if (evcacheLatch != null && evcacheLatch instanceof EVCacheLatchImpl) ((EVCacheLatchImpl) evcacheLatch).addFuture(defaultFuture);
return defaultFuture;
}

try {
ensureWriteQueueSize(node, key);
return evcacheMemcachedClient.asyncAppendOrAdd(key, timeToLive, value, evcacheLatch);
} catch (Exception e) {
log.error(e.getMessage(), e);
Expand All @@ -924,7 +936,7 @@ public <T> Future<Boolean> appendOrAdd(String key, CachedData value, int timeToL
public <T> Future<Boolean> replace(String key, T value, int timeToLive, EVCacheLatch evcacheLatch)
throws Exception {
final MemcachedNode node = evcacheMemcachedClient.getEVCacheNode(key);
if (!node.isActive()) {
if (!ensureWriteQueueSize(node, key)) {
if (log.isInfoEnabled()) log.info("Node : " + node
+ " is not active. Failing fast and dropping the replace event.");
final ListenableFuture<Boolean, OperationCompletionListener> defaultFuture = (ListenableFuture<Boolean, OperationCompletionListener>) getDefaultFuture();
Expand All @@ -934,7 +946,6 @@ public <T> Future<Boolean> replace(String key, T value, int timeToLive, EVCacheL
}

try {
ensureWriteQueueSize(node, key);
int dataSize = Integer.MAX_VALUE;
if (value instanceof CachedData) {
dataSize = ((CachedData) value).getData().length;
Expand Down Expand Up @@ -989,9 +1000,8 @@ public <T> Future<Boolean> add(String key, int exp, T value) throws Exception {
if (addCounter == null) addCounter = EVCacheMetricsFactory.getCounter(serverGroup.getName() + "-AddCall");

final MemcachedNode node = evcacheMemcachedClient.getEVCacheNode(key);
// if (!node.isActive()) return getDefaultFuture();
if (!ensureWriteQueueSize(node, key)) return getDefaultFuture();

ensureWriteQueueSize(node, key);
addCounter.increment();
return evcacheMemcachedClient.add(key, exp, value, null);
}
Expand All @@ -1001,17 +1011,16 @@ public <T> Future<Boolean> add(String key, int exp, T value, Transcoder<T> tc) t
if (addCounter == null) addCounter = EVCacheMetricsFactory.getCounter(serverGroup.getName() + "-AddCall");

final MemcachedNode node = evcacheMemcachedClient.getEVCacheNode(key);
// if (!node.isActive()) return getDefaultFuture();
if (!ensureWriteQueueSize(node, key)) return getDefaultFuture();

ensureWriteQueueSize(node, key);
addCounter.increment();
return evcacheMemcachedClient.add(key, exp, value, tc);
}


public <T> Future<Boolean> touch(String key, int timeToLive) throws Exception {
final MemcachedNode node = evcacheMemcachedClient.getEVCacheNode(key);
if (!node.isActive()) return getDefaultFuture();
if (!ensureWriteQueueSize(node, key)) return getDefaultFuture();
if (enableChunking.get()) {
final ChunkDetails<?> cd = getChunkDetails(key);
if (cd.isChunked()) {
Expand Down Expand Up @@ -1047,13 +1056,12 @@ public Future<Boolean> delete(String key) throws Exception {

public Future<Boolean> delete(String key, EVCacheLatch latch) throws Exception {
final MemcachedNode node = evcacheMemcachedClient.getEVCacheNode(key);
if (!node.isActive()) {
if (!ensureWriteQueueSize(node, key)) {
final ListenableFuture<Boolean, OperationCompletionListener> defaultFuture = (ListenableFuture<Boolean, OperationCompletionListener>) getDefaultFuture();
if (latch != null && latch instanceof EVCacheLatchImpl) ((EVCacheLatchImpl) latch).addFuture(defaultFuture);
return defaultFuture;
}

ensureWriteQueueSize(node, key);
if (enableChunking.get()) {
final ChunkDetails<?> cd = getChunkDetails(key);
if (cd == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public void connectionEstablished(SocketAddress sa, int reconnectCount) {
if (log.isDebugEnabled()) log.debug(appName + ":CONNECTION ESTABLISHED : From " + instanceInfo.getHostName()
+ " to " + address + " was established after " + reconnectCount + " retries");
}
if(log.isTraceEnabled()) log.trace("Stack", new Exception());
connect.increment();
connectCount++;
}
Expand All @@ -87,6 +88,7 @@ public void connectionLost(SocketAddress sa) {
if (log.isDebugEnabled()) log.debug(appName + ":CONNECTION LOST : From " + instanceInfo.getHostName()
+ " to " + address);
}
if(log.isTraceEnabled()) log.trace("Stack", new Exception());
lost.increment();
lostCount++;
}
Expand Down

0 comments on commit b3a6c49

Please sign in to comment.