Skip to content

Commit

Permalink
Add deadline support and tests
Browse files Browse the repository at this point in the history
Improve Javadoc
  • Loading branch information
Louis Ryan committed Jul 10, 2015
1 parent be0c033 commit 6640976
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 8 deletions.
90 changes: 85 additions & 5 deletions core/src/main/java/io/grpc/Context.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,68 @@

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import javax.annotation.Nullable;


/**
* --Add warning about using this too liberally--
* --Add warning about using this with containers that unload Classloaders--
* -- Add note that in the case of cascading closure that listeners to the parent are always
* notified before listeners on the children.
* A context propagation mechanism which carries deadlines, cancellation signals,
* and other scoped values across API boundaries and between threads. Examples of functionality
* propagated via context include:
* <ul>
* <li>Deadlines for a local operation or remote call.</li>
* <li>Security principals & credentials.</li>
* <li>Local and distributed tracing context.</li>
* </ul>
*
* <p>Context is not intended for passing optional parameters to an API and developers should
* take care to avoid excessive dependence on context when designing an API.
*
* <p>If Context is being used in an environment that needs to support class unloading it is the
* responsibility of the application to ensure that all contexts are properly cancelled.
*
*/
public class Context {

/**
* Use a shared resource to retain the {@link ScheduledExecutorService} used to
* implement deadline based context cancellation. This allows the executor to be
* shutdown if its not in use thereby allowing Context to be unloaded.
*/
static final SharedResourceHolder.Resource<ScheduledExecutorService> SCHEDULER =
new SharedResourceHolder.Resource<ScheduledExecutorService>() {
private static final String name = "context-scheduler";
@Override
public ScheduledExecutorService create() {
return Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
.setNameFormat(name + "-%d").build());
}

@Override
public void close(ScheduledExecutorService instance) {
instance.shutdown();
}

@Override
public String toString() {
return name;
}
};

/**
* Stack of context objects which is used to record attach & detach history on a thread.
*/
Expand Down Expand Up @@ -123,6 +161,28 @@ public CancellableContext withCancellation() {
return new CancellableContext(this);
}

/**
* Create a new context which will cancel itself after an absolute deadline expressed as
* nanoseconds in the {@link System#nanoTime()} clock.
*
* <p>It is recommended that callers only use this method when propagating a derivative of
* a received existing deadline. When establishing a new deadline {@link #withDeadlineAfter}
* is the better mechanism.
*/
public CancellableContext withDeadlineNanoTime(long deadlineNanoTime) {
return withDeadlineAfter(Math.max(0, deadlineNanoTime - System.nanoTime()),
TimeUnit.NANOSECONDS);
}

/**
* Create a new context which will cancel itself after the given {@code duration} from now.
*/
public CancellableContext withDeadlineAfter(long duration, TimeUnit unit) {
Preconditions.checkArgument(duration >= 0, "duration must be greater than or equal to 0");
Preconditions.checkNotNull(unit, "unit");
return new CancellableContext(this, unit.toNanos(duration));
}

/**
* Create a new context which cannot be cancelled and will not propagate the cancellation of its
* parent.
Expand Down Expand Up @@ -326,6 +386,7 @@ public static final class CancellableContext extends Context {
volatile int cancelled;
private volatile Throwable cause;
private final Context dummy;
private ScheduledFuture<?> scheduledFuture;


private CancellableContext(Context parent) {
Expand All @@ -335,6 +396,20 @@ private CancellableContext(Context parent) {
dummy = new Context(this, EMPTY_ENTRIES);
}

private CancellableContext(Context parent, long delayNanos) {
super(parent, EMPTY_ENTRIES);
// Create a dummy that inherits from this to attach and detach so that you cannot retrieve a
// cancellable context from Context.current()
dummy = new Context(this, EMPTY_ENTRIES);
scheduledFuture = SCHEDULER.create().schedule(new Runnable() {
@Override
public void run() {
cancel(new TimeoutException("context timed out"));
}
}, delayNanos, TimeUnit.NANOSECONDS);
}


@Override
public void attach() {
dummy.attach();
Expand Down Expand Up @@ -371,6 +446,11 @@ public void close() throws IOException {
*/
public boolean cancel(@Nullable Throwable cause) {
if (cancelledUpdater.compareAndSet(this, 0, 1)) {
if (scheduledFuture != null) {
// If we have a scheduled cancellation pending attempt to cancel it.
scheduledFuture.cancel(false);
scheduledFuture = null;
}
this.cause = cause;
notifyAndClearListeners();
return true;
Expand Down
69 changes: 66 additions & 3 deletions core/src/test/java/io/grpc/ContextTest.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
package io.grpc;

import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import com.google.common.util.concurrent.MoreExecutors;

Expand All @@ -10,13 +17,14 @@
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

import java.io.Closeable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
* TODO: http://go/java-style#javadoc
* Tests for {@link Context}.
*/
@RunWith(JUnit4.class)
public class ContextTest {
Expand All @@ -27,10 +35,12 @@ public class ContextTest {
private static final Context.Key<Object> OBSERVED = Context.key("observed");

private boolean listenerNotified;
private CountDownLatch deadlineLatch = new CountDownLatch(1);
private Context.CancellationListener cancellationListener = new Context.CancellationListener() {
@Override
public void cancelled(Context context) {
listenerNotified = true;
deadlineLatch.countDown();
}
};

Expand Down Expand Up @@ -253,6 +263,7 @@ public void typicalTryCatchFinallyHandling() throws Exception {
assertNotNull(base.cause());
}


/*
public void testTryWithResource() throws Exception {
Context.CancellableContext base = Context.current().withCancellation();
Expand Down Expand Up @@ -280,4 +291,56 @@ public void testTryWithResource() throws Exception {
assertNotNull(base.cause());
}
*/

@Test
public void absoluteDeadlineTriggersAndPropagates() throws Exception {
Context base = Context.current().withDeadlineNanoTime(System.nanoTime() +
TimeUnit.SECONDS.toNanos(1));
Context child = base.withValue(FOOD, "lasagna");
child.addListener(cancellationListener, MoreExecutors.directExecutor());
assertFalse(base.isCancelled());
assertFalse(child.isCancelled());
deadlineLatch.await(2, TimeUnit.SECONDS);
assertTrue(base.isCancelled());
assertTrue(base.cause() instanceof TimeoutException);
assertTrue(listenerNotified);
assertTrue(child.isCancelled());
assertSame(base.cause(), child.cause());
}

@Test
public void relativeDeadlineTriggersAndPropagates() throws Exception {
Context base = Context.current().withDeadlineAfter(1, TimeUnit.SECONDS);
Context child = base.withValue(FOOD, "lasagna");
child.addListener(cancellationListener, MoreExecutors.directExecutor());
assertFalse(base.isCancelled());
assertFalse(child.isCancelled());
deadlineLatch.await(2, TimeUnit.SECONDS);
assertTrue(base.isCancelled());
assertTrue(base.cause() instanceof TimeoutException);
assertTrue(listenerNotified);
assertTrue(child.isCancelled());
assertSame(base.cause(), child.cause());
}

@Test
public void innerDeadlineCompletesBeforeOuter() throws Exception {
Context base = Context.current().withDeadlineAfter(2, TimeUnit.SECONDS);
Context child = base.withDeadlineAfter(1, TimeUnit.SECONDS);
child.addListener(cancellationListener, MoreExecutors.directExecutor());
assertFalse(base.isCancelled());
assertFalse(child.isCancelled());
deadlineLatch.await(2, TimeUnit.SECONDS);
assertFalse(base.isCancelled());
assertTrue(listenerNotified);
assertTrue(child.isCancelled());
assertTrue(child.cause() instanceof TimeoutException);

deadlineLatch = new CountDownLatch(1);
base.addListener(cancellationListener, MoreExecutors.directExecutor());
deadlineLatch.await(2, TimeUnit.SECONDS);
assertTrue(base.isCancelled());
assertTrue(base.cause() instanceof TimeoutException);
assertNotSame(base.cause(), child.cause());
}
}

0 comments on commit 6640976

Please sign in to comment.