Skip to content

Commit

Permalink
support for appendOrAdd using latch
Browse files Browse the repository at this point in the history
  • Loading branch information
smadappa committed May 12, 2016
1 parent eea2e76 commit 36b2ff7
Show file tree
Hide file tree
Showing 4 changed files with 180 additions and 0 deletions.
33 changes: 33 additions & 0 deletions evcache-client/src/main/java/com/netflix/evcache/EVCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.netflix.evcache.EVCacheLatch.Policy;
import com.netflix.evcache.pool.EVCacheClientPoolManager;

import net.spy.memcached.transcoders.Transcoder;
Expand Down Expand Up @@ -844,6 +845,38 @@ <T> Map<String, T> getBulkAndTouch(Collection<String> keys, Transcoder<T> tc, in
*/
<T> Future<Boolean>[] appendOrAdd(String key, T value, Transcoder<T> tc, int timeToLive) throws EVCacheException;


/**
* Append the given value to the existing value in EVCache. If the Key does not exist the the key will added.
*
*
* @param key
* the key under which this object should be appended or Added. Ensure the
* key is properly encoded and does not contain whitespace or
* control characters.
* @param T
* the value to be appended
* @param tc
* the transcoder the will be used for serialization
* @param timeToLive
* the expiration of this object i.e. less than 30 days in
* seconds or the exact expiry time as UNIX time
*
* @param policy
* The Latch will be returned based on the Policy. The Latch can then be used to await until the count down has reached to 0 or the specified time has elapsed.
*
* @return EVCacheLatch which will encompasses the Operation. You can block
* on the Operation based on the policy to ensure the required
* criteria is met. The Latch can also be queried to get details on
* status of the operations
*
* @throws EVCacheException
* in the circumstance where queue is too full to accept any
* more requests or issues Serializing the value or any IO
* Related issues
*/
<T> EVCacheLatch appendOrAdd(String key, T value, Transcoder<T> tc, int timeToLive, Policy policy) throws EVCacheException;

/**
* The {@code appName} that will be used by this {@code EVCache}.
*
Expand Down
64 changes: 64 additions & 0 deletions evcache-client/src/main/java/com/netflix/evcache/EVCacheImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -1404,6 +1404,70 @@ public String getAppName() {
public String getCacheName() {
return _cacheName;
}

public <T> EVCacheLatch appendOrAdd(String key, T value, Transcoder<T> tc, int timeToLive, Policy policy) throws EVCacheException {
if ((null == key) || (null == value)) throw new IllegalArgumentException();

final boolean throwExc = doThrowException();
final EVCacheClient[] clients = _pool.getEVCacheClientForWrite();
if (clients.length == 0) {
increment("NULL_CLIENT");
if (throwExc) throw new EVCacheException("Could not find a client to appendOrAdd the data");
return new EVCacheLatchImpl(policy, 0, _appName); // Fast failure
}

final EVCacheEvent event = createEVCacheEvent(Arrays.asList(clients), Collections.singletonList(key), Call.APPEND_OR_ADD);
if (event != null) {
if (shouldThrottle(event)) {
increment("THROTTLED");
if (throwExc) throw new EVCacheException("Request Throttled for app " + _appName + " & key " + key);
return new EVCacheLatchImpl(policy, 0, _appName); // Fast failure
}
startEvent(event);
}

final String canonicalKey = getCanonicalizedKey(key);
final Operation op = EVCacheMetricsFactory.getOperation(_metricName, Call.APPEND_OR_ADD, stats, Operation.TYPE.MILLI);
final EVCacheLatchImpl latch = new EVCacheLatchImpl(policy == null ? Policy.ALL_MINUS_1 : policy,
clients.length, _appName);
try {
CachedData cd = null;
for (EVCacheClient client : clients) {
if (cd == null) {
if (tc != null) {
cd = tc.encode(value);
} else if ( _transcoder != null) {
cd = ((Transcoder<Object>)_transcoder).encode(value);
} else {
cd = client.getTranscoder().encode(value);
}
}
if (cd != null) {
if (appendDataSizeSummary == null) this.appendDataSizeSummary = EVCacheConfig.getInstance().getDistributionSummary(_appName + "-AppendData-Size");
if (appendDataSizeSummary != null) this.appendDataSizeSummary.record(cd.getData().length);
}
final Future<Boolean> future = client.appendOrAdd(canonicalKey, cd, timeToLive, latch);
if (log.isDebugEnabled() && shouldLog()) log.debug("APPEND_OR_ADD : APP " + _appName + ", Future " + future
+ " for key : " + canonicalKey);
}
if (event != null) {
event.setCanonicalKeys(Arrays.asList(canonicalKey));
event.setTTL(timeToLive);
event.setCachedData(cd);
event.setLatch(latch);
endEvent(event);
}
return latch;
} catch (Exception ex) {
if (log.isDebugEnabled() && shouldLog()) log.debug("Exception while appendOrAdd the data for APP " + _appName + ", key : " + canonicalKey, ex);
if (event != null) eventError(event, ex);
if (!throwExc) return new EVCacheLatchImpl(policy, 0, _appName);
throw new EVCacheException("Exception while appendOrAdd data for APP " + _appName + ", key : " + canonicalKey, ex);
} finally {
op.stop();
if (log.isDebugEnabled() && shouldLog()) log.debug("APPEND_OR_ADD : APP " + _appName + ", Took " + op.getDuration() + " milliSec for key : " + canonicalKey);
}
}

@Override
public <T> Future<Boolean>[] appendOrAdd(String key, T value, Transcoder<T> tc, int timeToLive) throws EVCacheException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -902,6 +902,25 @@ public <T> Future<Boolean> set(String key, T value, int timeToLive, EVCacheLatch
}
}

public <T> Future<Boolean> appendOrAdd(String key, CachedData value, int timeToLive, EVCacheLatch evcacheLatch) throws Exception {
final MemcachedNode node = evcacheMemcachedClient.getEVCacheNode(key);
if (!node.isActive()) {
if (log.isInfoEnabled()) log.info("Node : " + node + " is not active. Failing fast and dropping the write event.");
final ListenableFuture<Boolean, OperationCompletionListener> defaultFuture = (ListenableFuture<Boolean, OperationCompletionListener>) getDefaultFuture();
if (evcacheLatch != null && evcacheLatch instanceof EVCacheLatchImpl) ((EVCacheLatchImpl) evcacheLatch)
.addFuture(defaultFuture);
return defaultFuture;
}

try {
ensureWriteQueueSize(node, key);
return evcacheMemcachedClient.asyncAppendOrAdd(key, timeToLive, value, evcacheLatch);
} catch (Exception e) {
log.error(e.getMessage(), e);
throw e;
}
}

public <T> Future<Boolean> replace(String key, T value, int timeToLive, EVCacheLatch evcacheLatch)
throws Exception {
final MemcachedNode node = evcacheMemcachedClient.getEVCacheNode(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@

import net.spy.memcached.internal.GetFuture;
import net.spy.memcached.internal.OperationFuture;
import net.spy.memcached.ops.ConcatenationType;
import net.spy.memcached.ops.DeleteOperation;
import net.spy.memcached.ops.GetAndTouchOperation;
import net.spy.memcached.ops.GetOperation;
import net.spy.memcached.ops.Operation;
import net.spy.memcached.ops.OperationCallback;
import net.spy.memcached.ops.OperationStatus;
import net.spy.memcached.ops.StatusCode;
import net.spy.memcached.ops.StoreOperation;
Expand Down Expand Up @@ -270,6 +272,68 @@ public void complete() {
mconn.enqueueOperation(key, op);
return rv;
}

public <T> OperationFuture<Boolean> asyncAppendOrAdd(final String key, int exp, CachedData co, EVCacheLatch evcacheLatch) {
final CountDownLatch latch = new CountDownLatch(1);
final OperationFuture<Boolean> rv = new EVCacheOperationFuture<Boolean>(key, latch, new AtomicReference<Boolean>(null), operationTimeout, executorService, appName, serverGroup, "LatencyAoA" );

Operation op = opFact.cat(ConcatenationType.append, 0, key, co.getData(),
new OperationCallback() {
boolean appendSuccess = true;
@Override
public void receivedStatus(OperationStatus val) {
if (val.getStatusCode().equals(StatusCode.SUCCESS)) {
EVCacheMetricsFactory.getCounter(appName + "-" + serverGroup.getName() + "-AoA-AppendCall-SUCCESS").increment();
rv.set(val.isSuccess(), val);
} else {
appendSuccess = false;
}
}

@Override
public void complete() {
if(appendSuccess) {
rv.signalComplete();
latch.countDown();
} else {
Operation op = opFact.store(StoreType.add, key, co.getFlags(), exp, co.getData(), new StoreOperation.Callback() {
@Override
public void receivedStatus(OperationStatus val) {
if (log.isDebugEnabled()) log.debug("Storing Key : " + key + "; Status : " + val.getStatusCode().name()
+ "; Message : " + val.getMessage());

Tag tag = null;
final MemcachedNode node = getEVCacheNode(key);
if (node.getSocketAddress() instanceof InetSocketAddress) {
tag = new BasicTag("HOST", ((InetSocketAddress) node.getSocketAddress()).getHostName());
}
rv.set(val.isSuccess(), val);
}

@Override
public void gotData(String key, long cas) {
rv.setCas(cas);
}

@Override
public void complete() {
latch.countDown();
rv.signalComplete();
}
});
rv.setOperation(op);
mconn.enqueueOperation(key, op);
}
}
});
rv.setOperation(op);
mconn.enqueueOperation(key, op);
if (evcacheLatch != null && evcacheLatch instanceof EVCacheLatchImpl) ((EVCacheLatchImpl) evcacheLatch)
.addFuture(rv);
return rv;
}



private <T> OperationFuture<Boolean> asyncStore(final StoreType storeType, final String key, int exp, T value, Transcoder<T> tc, EVCacheLatch evcacheLatch) {
CachedData co;
Expand Down

0 comments on commit 36b2ff7

Please sign in to comment.