Skip to content

Commit

Permalink
Removed StopWatch from EVCacheOperationFuture
Browse files Browse the repository at this point in the history
Removed reconnect call from EVCacheClient
Incrementing EVCacheClientPool-REFRESH_ON_QUEUE_FULL metric when we refresh the connection to node due to read queue full
Fixed getDynamicLongProperty signature
Updated EVCacheMemcachedClient to handle incr and decr operation
Added operations for all operations supported by EVCacheMemcachedClient
EVCacheNode tracks connect time and the number of times it reconnects
  • Loading branch information
smadappa committed Dec 2, 2016
1 parent e2dceeb commit 1fdd4cb
Show file tree
Hide file tree
Showing 7 changed files with 219 additions and 175 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1307,18 +1307,15 @@ public long incr(String key, long by, long defaultVal, int timeToLive) throws EV
}

if (currentValue != -1) {
if (log.isDebugEnabled()) log.debug("INCR : APP " + _appName + " current value = " + currentValue
+ " for key : " + key);
if (log.isDebugEnabled()) log.debug("INCR : APP " + _appName + " current value = " + currentValue + " for key : " + key);
for (int i = 0; i < vals.length; i++) {
if (vals[i] == -1 && currentValue > -1) {
if (log.isDebugEnabled()) log.debug("INCR : APP " + _appName + "; Zone " + clients[i].getZone()
+ " had a value = -1 so setting it to current value = "
+ currentValue + " for key : " + key);
+ " had a value = -1 so setting it to current value = " + currentValue + " for key : " + key);
clients[i].incr(canonicalKey, 0, currentValue, timeToLive);
} else if (vals[i] != currentValue) {
if (log.isDebugEnabled()) log.debug("INCR : APP " + _appName + "; Zone " + clients[i].getZone()
+ " had a value of " + vals[i]
+ " so setting it to current value = " + currentValue + " for key : " + key);
+ " had a value of " + vals[i] + " so setting it to current value = " + currentValue + " for key : " + key);
clients[i].set(canonicalKey, String.valueOf(currentValue), timeToLive);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.netflix.evcache.metrics.EVCacheMetricsFactory;
import com.netflix.evcache.pool.ServerGroup;
import com.netflix.servo.annotations.DataSourceType;
import com.netflix.servo.monitor.Stopwatch;
import com.sun.management.GcInfo;

import net.spy.memcached.MemcachedConnection;
Expand All @@ -28,6 +27,7 @@
import net.spy.memcached.ops.Operation;
import rx.Scheduler;
import rx.Single;
import rx.functions.Action0;

/**
* Managed future for operations.
Expand Down Expand Up @@ -56,11 +56,6 @@ public class EVCacheOperationFuture<T> extends OperationFuture<T> {
private final String appName;
private final ServerGroup serverGroup;
private final String key;
private final String metricName;

public EVCacheOperationFuture(String k, CountDownLatch l, AtomicReference<T> oref, long opTimeout, ExecutorService service, String appName, ServerGroup serverGroup) {
this(k, l, oref, opTimeout, service, appName, serverGroup, null);
}

public EVCacheOperationFuture(String k, CountDownLatch l, AtomicReference<T> oref, long opTimeout, ExecutorService service, String appName, ServerGroup serverGroup, String metricName) {
super(k, l, oref, opTimeout, service);
Expand All @@ -69,7 +64,6 @@ public EVCacheOperationFuture(String k, CountDownLatch l, AtomicReference<T> ore
this.appName = appName;
this.serverGroup = serverGroup;
this.key = k;
this.metricName = metricName;
}

public Operation getOperation() {
Expand Down Expand Up @@ -133,7 +127,6 @@ public EVCacheOperationFuture<T> removeListener(EVCacheGetOperationListener<T> l
*/
public T get(long duration, TimeUnit units, boolean throwException, boolean hasZF) throws InterruptedException, TimeoutException, ExecutionException {
final long startTime = System.currentTimeMillis();
final Stopwatch operationDuration = EVCacheMetricsFactory.getStatsTimer(appName, serverGroup, (metricName == null) ? "GetOperation" : metricName).start();
boolean status = latch.await(duration, units);
if (!status) {
boolean gcPause = false;
Expand Down Expand Up @@ -209,7 +202,6 @@ public T get(long duration, TimeUnit units, boolean throwException, boolean hasZ
throw new ExecutionException(new CheckedOperationTimeoutException("Operation timed out.", op));
}
}
operationDuration.stop();
return objRef.get();
}

Expand All @@ -226,7 +218,6 @@ public Single<T> observe() {
}

public Single<T> get(long duration, TimeUnit units, boolean throwException, boolean hasZF, Scheduler scheduler) {
final Stopwatch operationDuration = EVCacheMetricsFactory.getStatsTimer(appName, serverGroup, (metricName == null) ? "LatencyGet" : metricName).start();;
return observe().timeout(duration, units, Single.create(subscriber -> {
// whenever timeout occurs, continuous timeout counter will increase by 1.
MemcachedConnection.opTimedOut(op);
Expand All @@ -240,8 +231,12 @@ public Single<T> get(long duration, TimeUnit units, boolean throwException, bool
}
subscriber.onSuccess(objRef.get());
}
}), scheduler).doAfterTerminate(() ->
operationDuration.stop()
}), scheduler).doAfterTerminate(new Action0() {
@Override
public void call() {

}
}
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ private Collection<String> validateReadQueueSize(Collection<String> canonicalKey
if (node instanceof EVCacheNodeImpl) {
final EVCacheNodeImpl evcNode = (EVCacheNodeImpl) node;
if (!evcNode.isAvailable()) {
evcacheMemcachedClient.reconnect(evcNode);
continue;
}

Expand All @@ -144,7 +143,6 @@ private Collection<String> validateReadQueueSize(Collection<String> canonicalKey
EVCacheMetricsFactory.getCounter(appName + "-READ_QUEUE_FULL", evcNode.getBaseTags()).increment();
if (log.isDebugEnabled()) log.debug("Read Queue Full on Bulk Operation for app : " + appName
+ "; zone : " + zone + "; Current Size : " + size + "; Max Size : " + maxReadQueueSize.get() * 2);
evcacheMemcachedClient.reconnect(evcNode);
} else {
retKeys.add(key);
}
Expand All @@ -159,7 +157,6 @@ private boolean ensureWriteQueueSize(MemcachedNode node, String key) throws EVCa
if (!evcNode.isAvailable()) {
EVCacheMetricsFactory.getCounter("EVCacheClient-" + appName + "-INACTIVE_NODE", evcNode.getBaseTags()).increment();
pool.refreshAsync(evcNode);
evcacheMemcachedClient.reconnect(evcNode);
}

int i = 0;
Expand All @@ -181,7 +178,6 @@ private boolean ensureWriteQueueSize(MemcachedNode node, String key) throws EVCa
if (log.isDebugEnabled()) log.debug("Node : " + evcNode + " for app : " + appName + "; zone : "
+ zone + " is not active. Will Fail Fast and the write will be dropped for key : " + key);
evcNode.shutdown();
evcacheMemcachedClient.reconnect(evcNode);
return false;
}
}
Expand All @@ -200,7 +196,6 @@ private boolean validateNode(String key, boolean _throwException) throws EVCache
+ " is not active. Will Fail Fast so that we can fallback to Other Zone if available.");
if (_throwException) throw new EVCacheException("Connection for Node : " + node + " for app : " + appName
+ "; zone : " + zone + " is not active");
evcacheMemcachedClient.reconnect(evcNode);
return false;
}

Expand All @@ -214,7 +209,6 @@ private boolean validateNode(String key, boolean _throwException) throws EVCache
+ "; zone : " + zone + "; Current Size : " + size + "; Max Size : " + maxReadQueueSize.get());
if (_throwException) throw new EVCacheReadQueueException("Read Queue Full for Node : " + node + "; app : "
+ appName + "; zone : " + zone + "; Current Size : " + size + "; Max Size : " + maxReadQueueSize.get());
evcacheMemcachedClient.reconnect(evcNode);
return false;
}
}
Expand Down Expand Up @@ -809,14 +803,13 @@ public <T> T getAndTouch(String key, Transcoder<T> tc, int timeToLive, boolean _
if (enableChunking.get()) {
return assembleChunks(key, false, 0, tc, hasZF);
} else {
if(ignoreTouch.get()) {
returnVal = evcacheMemcachedClient.get(key, tc);
} else {
final CASValue<T> value = evcacheMemcachedClient.asyncGetAndTouch(key, timeToLive, tc)
.get(readTimeout.get(), TimeUnit.MILLISECONDS, _throwException, hasZF);
returnVal = (value == null) ? null : value.getValue();
}

if(ignoreTouch.get()) {
returnVal = evcacheMemcachedClient.get(key, tc);
} else {
final CASValue<T> value = evcacheMemcachedClient.asyncGetAndTouch(key, timeToLive, tc)
.get(readTimeout.get(), TimeUnit.MILLISECONDS, _throwException, hasZF);
returnVal = (value == null) ? null : value.getValue();
}
}
return returnVal;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import com.netflix.evcache.util.ServerGroupCircularIterator;
import com.netflix.servo.monitor.Monitors;
import com.netflix.servo.tag.TagList;
import com.netflix.servo.tag.BasicTag;

import net.spy.memcached.MemcachedNode;
import net.spy.memcached.protocol.binary.EVCacheNodeImpl;
Expand Down Expand Up @@ -441,8 +440,7 @@ protected boolean haveInstancesInServerGroupChanged(ServerGroup serverGroup, Set
// 12/5/2015 - Should we even do this anymore
for (Entry<InetSocketAddress, Long> entry : connectionObserver.getInActiveServers().entrySet()) {
if ((currentTime - entry.getValue().longValue()) > 1200000 && !discoveredHostsInServerGroup.contains(entry.getKey())) {
if (log.isDebugEnabled()) log.debug("AppName :" + _appName + "; ServerGroup : " + serverGroup
+ "; instance : " + entry.getKey()
if (log.isDebugEnabled()) log.debug("AppName :" + _appName + "; ServerGroup : " + serverGroup + "; instance : " + entry.getKey()
+ " not found in discovery and will shutdown the client and init it again.");
EVCacheMetricsFactory.getLongGauge("EVCacheClientPool-haveInstancesInServerGroupChanged", tags).set(Long.valueOf(2));
return true;
Expand Down Expand Up @@ -795,19 +793,28 @@ private void updateQueueStats() {
final int rSize = client.getReadQueueLength();
EVCacheMetricsFactory.getLongGauge("EVCacheClientPool-ReadQueueSize", client.getTagList()).set(Long.valueOf(rSize));
if(refreshConnectionOnReadQueueFull.get()) {
if(rSize > refreshConnectionOnReadQueueFullSize.get().intValue()) {
try {
EVCacheMetricsFactory.getCounter(_appName , null, serverGroup.getName(), "EVCacheClientPool-REFRESH_ON_QUEUE_FULL", new BasicTag("Id", String.valueOf(client.getId()))).increment();
refresh();
} catch (IOException e) {
log.error("Exception while refreshing queue", e);
final Collection<MemcachedNode> allNodes = client.getNodeLocator().getAll();
for (MemcachedNode node : allNodes) {
if (node instanceof EVCacheNodeImpl) {
final EVCacheNodeImpl evcNode = ((EVCacheNodeImpl) node);
if(evcNode.getReadQueueSize() >= refreshConnectionOnReadQueueFullSize.get().intValue()) {
EVCacheMetricsFactory.getCounter("EVCacheClientPool-REFRESH_ON_QUEUE_FULL", evcNode.getBaseTags()).increment();
client.getEVCacheMemcachedClient().reconnectNode(evcNode);
}
}
}
// if(rSize > refreshConnectionOnReadQueueFullSize.get().intValue()) {
// try {
// EVCacheMetricsFactory.getCounter(_appName , null, serverGroup.getName(), "EVCacheClientPool-REFRESH_ON_QUEUE_FULL", new BasicTag("Id", String.valueOf(client.getId()))).increment();
// refresh();
// } catch (IOException e) {
// log.error("Exception while refreshing queue", e);
// }
// }
}
}
}
}


public void pingServers() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,8 @@
import com.netflix.config.DynamicStringProperty;
import com.netflix.config.Property;
import com.netflix.servo.monitor.MonitorConfig;
import com.netflix.servo.monitor.Monitors;
import com.netflix.servo.tag.Tag;
import com.netflix.servo.tag.TagList;
import com.netflix.spectator.api.DistributionSummary;
import com.netflix.spectator.api.Registry;
import com.netflix.spectator.api.Spectator;

public class EVCacheConfig {

Expand All @@ -41,7 +37,7 @@ public DynamicIntProperty getDynamicIntProperty(String name, int defaultValue) {
return prop;
}

public DynamicLongProperty getDynamicLongProperty(String name, int defaultValue) {
public DynamicLongProperty getDynamicLongProperty(String name, long defaultValue) {
DynamicLongProperty prop = (DynamicLongProperty) fastPropMap.get(name);
if (prop != null) return prop;

Expand Down
Loading

0 comments on commit 1fdd4cb

Please sign in to comment.