Skip to content

Commit

Permalink
capturing timeout metrics for bulk
Browse files Browse the repository at this point in the history
  • Loading branch information
srrangarajan committed Sep 20, 2022
1 parent 6704c9a commit 08337bd
Showing 1 changed file with 20 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import com.netflix.evcache.EVCacheGetOperationListener;
import net.spy.memcached.internal.BulkGetCompletionListener;
import net.spy.memcached.internal.CheckedOperationTimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -181,9 +182,27 @@ public <U> CompletableFuture<U> makeFutureWithTimeout(long timeout, TimeUnit uni
public CompletableFuture<Map<String, T>> getAsyncSome(long timeout, TimeUnit units) {
CompletableFuture<Map<String, T>> future = makeFutureWithTimeout(timeout, units);
doAsyncGetSome(future);
return future;
return future.handle((data, ex) -> {
if (ex != null) {
handleBulkTimeoutException();
}
return data;
});
}

public void handleBulkTimeoutException() {
ExecutionException t = null;
for (Operation op : ops) {
if (op.isCancelled()) t = new ExecutionException(new CancellationException("Cancelled"));
else if (op.hasErrored()) t = new ExecutionException(op.getException());
else {
op.timeOut();
MemcachedConnection.opTimedOut(op);
t = new ExecutionException(new CheckedOperationTimeoutException("Checked Operation timed out.", op));
}
}
throw new RuntimeException(t);
}
public void doAsyncGetSome(CompletableFuture<Map<String, T>> promise) {
this.addListener(future -> {
try {
Expand Down

0 comments on commit 08337bd

Please sign in to comment.