diff --git a/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheBulkGetFuture.java b/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheBulkGetFuture.java index d63c0810..2c6790fd 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheBulkGetFuture.java +++ b/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheBulkGetFuture.java @@ -16,6 +16,7 @@ import com.netflix.evcache.EVCacheGetOperationListener; import net.spy.memcached.internal.BulkGetCompletionListener; +import net.spy.memcached.internal.CheckedOperationTimeoutException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -181,9 +182,27 @@ public CompletableFuture makeFutureWithTimeout(long timeout, TimeUnit uni public CompletableFuture> getAsyncSome(long timeout, TimeUnit units) { CompletableFuture> future = makeFutureWithTimeout(timeout, units); doAsyncGetSome(future); - return future; + return future.handle((data, ex) -> { + if (ex != null) { + handleBulkTimeoutException(); + } + return data; + }); } + public void handleBulkTimeoutException() { + ExecutionException t = null; + for (Operation op : ops) { + if (op.isCancelled()) t = new ExecutionException(new CancellationException("Cancelled")); + else if (op.hasErrored()) t = new ExecutionException(op.getException()); + else { + op.timeOut(); + MemcachedConnection.opTimedOut(op); + t = new ExecutionException(new CheckedOperationTimeoutException("Checked Operation timed out.", op)); + } + } + throw new RuntimeException(t); + } public void doAsyncGetSome(CompletableFuture> promise) { this.addListener(future -> { try {