diff --git a/dependencies.toml b/dependencies.toml index fbfc7650d50..c0a90f46a63 100644 --- a/dependencies.toml +++ b/dependencies.toml @@ -19,6 +19,7 @@ caffeine = "2.9.3" cglib = "3.3.0" checkerframework = "2.5.6" checkstyle = "10.3.2" +context-propagation = "1.1.1" controlplane = "1.0.45" curator = "5.7.0" dagger = "2.51.1" @@ -320,6 +321,10 @@ version.ref = "checkerframework" module = "com.puppycrawl.tools:checkstyle" version.ref = "checkstyle" +[libraries.context-propagation] +module = "io.micrometer:context-propagation" +version.ref = "context-propagation" + [libraries.controlplane-api] module = "io.envoyproxy.controlplane:api" version.ref = "controlplane" diff --git a/micrometer-context/build.gradle b/micrometer-context/build.gradle new file mode 100644 index 00000000000..1af31e6e516 --- /dev/null +++ b/micrometer-context/build.gradle @@ -0,0 +1,4 @@ +dependencies { + implementation libs.context.propagation + testImplementation project(':reactor3') +} diff --git a/micrometer-context/src/main/java/com/linecorp/armeria/common/micrometer/context/RequestContextThreadLocalAccessor.java b/micrometer-context/src/main/java/com/linecorp/armeria/common/micrometer/context/RequestContextThreadLocalAccessor.java new file mode 100644 index 00000000000..0844a677f10 --- /dev/null +++ b/micrometer-context/src/main/java/com/linecorp/armeria/common/micrometer/context/RequestContextThreadLocalAccessor.java @@ -0,0 +1,106 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package com.linecorp.armeria.common.micrometer.context; + +import org.reactivestreams.Subscription; + +import com.linecorp.armeria.common.RequestContext; +import com.linecorp.armeria.common.RequestContextStorage; +import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.common.annotation.UnstableApi; +import com.linecorp.armeria.internal.common.RequestContextUtil; + +import io.micrometer.context.ContextRegistry; +import io.micrometer.context.ContextSnapshot; +import io.micrometer.context.ContextSnapshot.Scope; +import io.micrometer.context.ThreadLocalAccessor; + +/** + * This class works with the + * Micrometer + * Context Propagation to keep the {@link RequestContext} during + * Reactor operations. + * Get the {@link RequestContextThreadLocalAccessor} to register it to the {@link ContextRegistry}. + * Then, {@link ContextRegistry} will use {@link RequestContextThreadLocalAccessor} to + * propagate context during the + * Reactor operations + * so that you can get the context using {@link RequestContext#current()}. + * However, please note that you should include Mono#contextWrite(ContextView) or + * Flux#contextWrite(ContextView) to end of the Reactor codes. + * If not, {@link RequestContext} will not be keep during Reactor Operation. + */ +@UnstableApi +public final class RequestContextThreadLocalAccessor implements ThreadLocalAccessor { + + private static final Object KEY = RequestContext.class; + + /** + * The value which obtained through {@link RequestContextThreadLocalAccessor}, + * will be stored in the Context under this {@code KEY}. + * This method will be called by {@link ContextSnapshot} internally. + */ + @Override + public Object key() { + return KEY; + } + + /** + * {@link ContextSnapshot} will call this method during the execution + * of lambda functions in {@link ContextSnapshot#wrap(Runnable)}, + * as well as during Mono#subscribe(), Flux#subscribe(), + * {@link Subscription#request(long)}, and CoreSubscriber#onSubscribe(Subscription). + * Following these calls, {@link ContextSnapshot#setThreadLocals()} is + * invoked to restore the state of {@link RequestContextStorage}. + * Furthermore, at the end of these methods, {@link Scope#close()} is executed + * to revert the {@link RequestContextStorage} to its original state. + */ + @Nullable + @Override + public RequestContext getValue() { + return RequestContext.currentOrNull(); + } + + /** + * {@link ContextSnapshot} will call this method during the execution + * of lambda functions in {@link ContextSnapshot#wrap(Runnable)}, + * as well as during Mono#subscribe(), Flux#subscribe(), + * {@link Subscription#request(long)}, and CoreSubscriber#onSubscribe(Subscription). + * Following these calls, {@link ContextSnapshot#setThreadLocals()} is + * invoked to restore the state of {@link RequestContextStorage}. + * Furthermore, at the end of these methods, {@link Scope#close()} is executed + * to revert the {@link RequestContextStorage} to its original state. + */ + @Override + @SuppressWarnings("MustBeClosedChecker") + public void setValue(RequestContext value) { + RequestContextUtil.getAndSet(value); + } + + /** + * This method will be called at the start of {@link ContextSnapshot.Scope} and + * the end of {@link ContextSnapshot.Scope}. If reactor Context does not + * contains {@link RequestContextThreadLocalAccessor#KEY}, {@link ContextSnapshot} will use + * this method to remove the value from {@link ThreadLocal}. + * Please note that {@link RequestContextUtil#pop()} return {@link AutoCloseable} instance, + * but it is not used in `Try with Resources` syntax. this is because {@link ContextSnapshot.Scope} + * will handle the {@link AutoCloseable} instance returned by {@link RequestContextUtil#pop()}. + */ + @Override + @SuppressWarnings("MustBeClosedChecker") + public void setValue() { + RequestContextUtil.pop(); + } +} diff --git a/micrometer-context/src/main/java/com/linecorp/armeria/common/micrometer/context/package-info.java b/micrometer-context/src/main/java/com/linecorp/armeria/common/micrometer/context/package-info.java new file mode 100644 index 00000000000..24f7d364c1f --- /dev/null +++ b/micrometer-context/src/main/java/com/linecorp/armeria/common/micrometer/context/package-info.java @@ -0,0 +1,26 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +/** + * Micrometer context-propagation plugins to help keep {@link com.linecorp.armeria.common.RequestContext} + * during Reactor operations. + */ +@UnstableApi +@NonNullByDefault +package com.linecorp.armeria.common.micrometer.context; + +import com.linecorp.armeria.common.annotation.NonNullByDefault; +import com.linecorp.armeria.common.annotation.UnstableApi; diff --git a/micrometer-context/src/test/java/com/linecorp/armeria/common/micrometer/context/RequestContextThreadLocalAccessorTest.java b/micrometer-context/src/test/java/com/linecorp/armeria/common/micrometer/context/RequestContextThreadLocalAccessorTest.java new file mode 100644 index 00000000000..8aaef50d66b --- /dev/null +++ b/micrometer-context/src/test/java/com/linecorp/armeria/common/micrometer/context/RequestContextThreadLocalAccessorTest.java @@ -0,0 +1,158 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package com.linecorp.armeria.common.micrometer.context; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import org.junit.jupiter.api.Test; + +import com.linecorp.armeria.client.ClientRequestContext; +import com.linecorp.armeria.common.HttpMethod; +import com.linecorp.armeria.common.HttpRequest; +import com.linecorp.armeria.common.RequestContext; +import com.linecorp.armeria.internal.common.RequestContextUtil; + +import io.micrometer.context.ContextRegistry; +import io.micrometer.context.ContextSnapshot; +import io.micrometer.context.ContextSnapshot.Scope; +import io.micrometer.context.ContextSnapshotFactory; + +class RequestContextThreadLocalAccessorTest { + + @Test + void should_return_expected_key() { + // Given + final RequestContextThreadLocalAccessor reqCtxAccessor = new RequestContextThreadLocalAccessor(); + final Object expectedValue = RequestContext.class; + + // When + final Object result = reqCtxAccessor.key(); + + // Then + assertThat(result).isEqualTo(expectedValue); + } + + @Test + @SuppressWarnings("MustBeClosedChecker") + void should_success_set() { + // Given + final ClientRequestContext ctx = newContext(); + final RequestContextThreadLocalAccessor reqCtxAccessor = new RequestContextThreadLocalAccessor(); + + // When + reqCtxAccessor.setValue(ctx); + + // Then + final RequestContext currentCtx = RequestContext.current(); + assertThat(currentCtx).isEqualTo(ctx); + + RequestContextUtil.pop(); + } + + @Test + void should_throw_NPE_when_set_null() { + // Given + final RequestContextThreadLocalAccessor reqCtxAccessor = new RequestContextThreadLocalAccessor(); + + // When + Then + assertThatThrownBy(() -> reqCtxAccessor.setValue(null)) + .isInstanceOf(NullPointerException.class); + } + + @Test + void should_be_null_when_setValue() { + // Given + final ClientRequestContext ctx = newContext(); + final RequestContextThreadLocalAccessor reqCtxAccessor = new RequestContextThreadLocalAccessor(); + reqCtxAccessor.setValue(ctx); + + // When + reqCtxAccessor.setValue(); + + // Then + final RequestContext reqCtx = RequestContext.currentOrNull(); + assertThat(reqCtx).isNull(); + } + + @Test + @SuppressWarnings("MustBeClosedChecker") + void should_be_restore_original_state_when_restore() { + // Given + final RequestContextThreadLocalAccessor reqCtxAccessor = new RequestContextThreadLocalAccessor(); + final ClientRequestContext previousCtx = newContext(); + final ClientRequestContext currentCtx = newContext(); + reqCtxAccessor.setValue(currentCtx); + + // When + reqCtxAccessor.restore(previousCtx); + + // Then + final RequestContext reqCtx = RequestContext.currentOrNull(); + assertThat(reqCtx).isNotNull(); + assertThat(reqCtx).isEqualTo(previousCtx); + + RequestContextUtil.pop(); + } + + @Test + void should_be_null_when_restore() { + // Given + final RequestContextThreadLocalAccessor reqCtxAccessor = new RequestContextThreadLocalAccessor(); + final ClientRequestContext currentCtx = newContext(); + reqCtxAccessor.setValue(currentCtx); + + // When + reqCtxAccessor.restore(); + + // Then + final RequestContext reqCtx = RequestContext.currentOrNull(); + assertThat(reqCtx).isNull(); + } + + @Test + void requestContext_should_exist_inside_scope_and_not_outside() { + // Given + final RequestContextThreadLocalAccessor reqCtxAccessor = new RequestContextThreadLocalAccessor(); + ContextRegistry.getInstance() + .registerThreadLocalAccessor(reqCtxAccessor); + final ClientRequestContext currentCtx = newContext(); + final ClientRequestContext expectedCtx = currentCtx; + reqCtxAccessor.setValue(currentCtx); + + final ContextSnapshotFactory factory = ContextSnapshotFactory.builder() + .clearMissing(true) + .build(); + final ContextSnapshot contextSnapshot = factory.captureAll(); + reqCtxAccessor.setValue(); + + // When : contextSnapshot.setThreadLocals() + try (Scope ignored = contextSnapshot.setThreadLocals()) { + + // Then : should not + final RequestContext reqCtxInScope = RequestContext.currentOrNull(); + assertThat(reqCtxInScope).isSameAs(expectedCtx); + } + + // Then + final RequestContext reqCtxOutOfScope = RequestContext.currentOrNull(); + assertThat(reqCtxOutOfScope).isNull(); + } + + static ClientRequestContext newContext() { + return ClientRequestContext.of(HttpRequest.of(HttpMethod.GET, "/")); + } +} diff --git a/micrometer-context/src/test/java/com/linecorp/armeria/reactor3/RequestContextPropagationFluxTest.java b/micrometer-context/src/test/java/com/linecorp/armeria/reactor3/RequestContextPropagationFluxTest.java new file mode 100644 index 00000000000..7f075c60b80 --- /dev/null +++ b/micrometer-context/src/test/java/com/linecorp/armeria/reactor3/RequestContextPropagationFluxTest.java @@ -0,0 +1,641 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package com.linecorp.armeria.reactor3; + +import static com.linecorp.armeria.reactor3.RequestContextPropagationMonoTest.ctxExists; +import static com.linecorp.armeria.reactor3.RequestContextPropagationMonoTest.newContext; +import static com.linecorp.armeria.reactor3.RequestContextPropagationMonoTest.noopSubscription; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.time.Duration; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Stream; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.reactivestreams.Publisher; + +import com.linecorp.armeria.client.ClientRequestContext; +import com.linecorp.armeria.common.RequestContext; +import com.linecorp.armeria.common.micrometer.context.RequestContextThreadLocalAccessor; +import com.linecorp.armeria.common.util.SafeCloseable; +import com.linecorp.armeria.internal.testing.AnticipatedException; +import com.linecorp.armeria.internal.testing.GenerateNativeImageTrace; + +import io.micrometer.context.ContextRegistry; +import reactor.core.Disposable; +import reactor.core.publisher.ConnectableFlux; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Hooks; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; +import reactor.test.StepVerifier; +import reactor.util.context.Context; + +@GenerateNativeImageTrace +class RequestContextPropagationFluxTest { + + @BeforeAll + static void setUp() { + ContextRegistry + .getInstance() + .registerThreadLocalAccessor(new RequestContextThreadLocalAccessor()); + Hooks.enableAutomaticContextPropagation(); + } + + @AfterAll + static void tearDown() { + Hooks.disableAutomaticContextPropagation(); + } + + @ParameterizedTest + @CsvSource({ "true", "false" }) + void fluxError(boolean useContextCapture) { + final ClientRequestContext ctx = newContext(); + final Flux flux; + + final AtomicBoolean atomicBoolean = new AtomicBoolean(); + flux = addCallbacks(Flux.error(() -> { + if (!atomicBoolean.getAndSet(true)) { + // Flux.error().publishOn() calls this error supplier immediately to see if it can retrieve + // the value via Callable.call() without ctx. + assertThat(ctxExists(ctx)).isFalse(); + } else { + assertThat(ctxExists(ctx)).isTrue(); + } + return new AnticipatedException(); + }).publishOn(Schedulers.single()), ctx, useContextCapture); + + if (useContextCapture) { + try (SafeCloseable ignored = ctx.push()) { + StepVerifier.create(flux) + .verifyErrorMatches(t -> t instanceof AnticipatedException); + } + } else { + StepVerifier.create(flux) + .verifyErrorMatches(t -> t instanceof AnticipatedException); + } + assertThat(ctxExists(ctx)).isFalse(); + } + + @ParameterizedTest + @CsvSource({ "true", "false" }) + void fluxFromPublisher(boolean useContextCapture) { + final ClientRequestContext ctx = newContext(); + final Flux flux; + + flux = addCallbacks(Flux.from(s -> { + assertThat(ctxExists(ctx)).isTrue(); + s.onSubscribe(noopSubscription()); + s.onNext("foo"); + s.onComplete(); + }).publishOn(Schedulers.single()), ctx, useContextCapture); + + if (useContextCapture) { + try (SafeCloseable ignored = ctx.push()) { + StepVerifier.create(flux) + .expectNextMatches("foo"::equals) + .verifyComplete(); + } + } else { + StepVerifier.create(flux) + .expectNextMatches("foo"::equals) + .verifyComplete(); + } + assertThat(ctxExists(ctx)).isFalse(); + } + + @ParameterizedTest + @CsvSource({ "true", "false" }) + void fluxCreate(boolean useContextCapture) { + final ClientRequestContext ctx = newContext(); + final Flux flux; + + flux = addCallbacks(Flux.create(s -> { + assertThat(ctxExists(ctx)).isTrue(); + s.next("foo"); + s.complete(); + }).publishOn(Schedulers.single()), ctx, useContextCapture); + + if (useContextCapture) { + try (SafeCloseable ignored = ctx.push()) { + StepVerifier.create(flux) + .expectNextMatches("foo"::equals) + .verifyComplete(); + } + } else { + StepVerifier.create(flux) + .expectNextMatches("foo"::equals) + .verifyComplete(); + } + assertThat(ctxExists(ctx)).isFalse(); + } + + @ParameterizedTest + @CsvSource({ "true", "false" }) + void fluxCreate_error(boolean useContextCapture) { + final ClientRequestContext ctx = newContext(); + final Flux flux; + + flux = addCallbacks(Flux.create(s -> { + assertThat(ctxExists(ctx)).isTrue(); + s.error(new AnticipatedException()); + }).publishOn(Schedulers.single()), ctx, useContextCapture); + + if (useContextCapture) { + try (SafeCloseable ignored = ctx.push()) { + StepVerifier.create(flux) + .verifyErrorMatches(t -> t instanceof AnticipatedException); + } + } else { + StepVerifier.create(flux) + .verifyErrorMatches(t -> t instanceof AnticipatedException); + } + assertThat(ctxExists(ctx)).isFalse(); + } + + @ParameterizedTest + @CsvSource({ "true", "false" }) + void fluxConcat(boolean useContextCapture) { + final ClientRequestContext ctx = newContext(); + final Flux flux; + + flux = addCallbacks(Flux.concat(Mono.fromSupplier(() -> { + assertThat(ctxExists(ctx)).isTrue(); + return "foo"; + }), Mono.fromCallable(() -> { + assertThat(ctxExists(ctx)).isTrue(); + return "bar"; + })).publishOn(Schedulers.single()), ctx, useContextCapture); + + if (useContextCapture) { + try (SafeCloseable ignored = ctx.push()) { + StepVerifier.create(flux) + .expectNextMatches("foo"::equals) + .expectNextMatches("bar"::equals) + .verifyComplete(); + } + } else { + StepVerifier.create(flux) + .expectNextMatches("foo"::equals) + .expectNextMatches("bar"::equals) + .verifyComplete(); + } + assertThat(ctxExists(ctx)).isFalse(); + } + + @ParameterizedTest + @CsvSource({ "true", "false" }) + void fluxDefer(boolean useContextCapture) { + final ClientRequestContext ctx = newContext(); + final Flux flux; + + flux = addCallbacks(Flux.defer(() -> { + assertThat(ctxExists(ctx)).isTrue(); + return Flux.just("foo"); + }).publishOn(Schedulers.single()), ctx, useContextCapture); + + if (useContextCapture) { + try (SafeCloseable ignored = ctx.push()) { + StepVerifier.create(flux) + .expectNextMatches("foo"::equals) + .verifyComplete(); + } + } else { + StepVerifier.create(flux) + .expectNextMatches("foo"::equals) + .verifyComplete(); + } + assertThat(ctxExists(ctx)).isFalse(); + } + + @ParameterizedTest + @CsvSource({ "true", "false" }) + void fluxFromStream(boolean useContextCapture) { + final ClientRequestContext ctx = newContext(); + final Flux flux; + + flux = addCallbacks(Flux.fromStream(() -> { + assertThat(ctxExists(ctx)).isTrue(); + return Stream.of("foo"); + }).publishOn(Schedulers.single()), ctx, useContextCapture); + + if (useContextCapture) { + try (SafeCloseable ignored = ctx.push()) { + StepVerifier.create(flux) + .expectNextMatches("foo"::equals) + .verifyComplete(); + } + } else { + StepVerifier.create(flux) + .expectNextMatches("foo"::equals) + .verifyComplete(); + } + assertThat(ctxExists(ctx)).isFalse(); + } + + @ParameterizedTest + @CsvSource({ "true", "false" }) + void fluxCombineLatest(boolean useContextCapture) { + final ClientRequestContext ctx = newContext(); + final Flux flux; + + flux = addCallbacks(Flux.combineLatest(Mono.just("foo"), Mono.just("bar"), (a, b) -> { + assertThat(ctxExists(ctx)).isTrue(); + return a; + }).publishOn(Schedulers.single()), ctx, useContextCapture); + + if (useContextCapture) { + try (SafeCloseable ignored = ctx.push()) { + StepVerifier.create(flux) + .expectNextMatches("foo"::equals) + .verifyComplete(); + } + } else { + StepVerifier.create(flux) + .expectNextMatches("foo"::equals) + .verifyComplete(); + } + assertThat(ctxExists(ctx)).isFalse(); + } + + @ParameterizedTest + @CsvSource({ "true", "false" }) + void fluxGenerate(boolean useContextCapture) { + final ClientRequestContext ctx = newContext(); + final Flux flux; + + flux = addCallbacks(Flux.generate(s -> { + assertThat(ctxExists(ctx)).isTrue(); + s.next("foo"); + s.complete(); + }).publishOn(Schedulers.single()), ctx, useContextCapture); + + if (useContextCapture) { + try (SafeCloseable ignored = ctx.push()) { + StepVerifier.create(flux) + .expectNextMatches("foo"::equals) + .verifyComplete(); + } + } else { + StepVerifier.create(flux) + .expectNextMatches("foo"::equals) + .verifyComplete(); + } + assertThat(ctxExists(ctx)).isFalse(); + } + + @ParameterizedTest + @CsvSource({ "true", "false" }) + void fluxMerge(boolean useContextCapture) { + final ClientRequestContext ctx = newContext(); + final Flux flux; + + flux = addCallbacks(Flux.mergeSequential(s -> { + assertThat(ctxExists(ctx)).isTrue(); + s.onSubscribe(noopSubscription()); + s.onNext("foo"); + s.onComplete(); + }, s -> { + assertThat(ctxExists(ctx)).isTrue(); + s.onSubscribe(noopSubscription()); + s.onNext("bar"); + s.onComplete(); + }).publishOn(Schedulers.single()), ctx, useContextCapture); + + if (useContextCapture) { + try (SafeCloseable ignored = ctx.push()) { + StepVerifier.create(flux) + .expectNextMatches("foo"::equals) + .expectNextMatches("bar"::equals) + .verifyComplete(); + } + } else { + StepVerifier.create(flux) + .expectNextMatches("foo"::equals) + .expectNextMatches("bar"::equals) + .verifyComplete(); + } + assertThat(ctxExists(ctx)).isFalse(); + } + + @ParameterizedTest + @CsvSource({ "true", "false" }) + void fluxPush(boolean useContextCapture) { + final ClientRequestContext ctx = newContext(); + final Flux flux; + + flux = addCallbacks(Flux.push(s -> { + assertThat(ctxExists(ctx)).isTrue(); + s.next("foo"); + s.complete(); + }).publishOn(Schedulers.single()), ctx, useContextCapture); + + if (useContextCapture) { + try (SafeCloseable ignored = ctx.push()) { + StepVerifier.create(flux) + .expectNextMatches("foo"::equals) + .verifyComplete(); + } + } else { + StepVerifier.create(flux) + .expectNextMatches("foo"::equals) + .verifyComplete(); + } + assertThat(ctxExists(ctx)).isFalse(); + } + + @ParameterizedTest + @CsvSource({ "true", "false" }) + void fluxSwitchOnNext(boolean useContextCapture) { + final ClientRequestContext ctx = newContext(); + final Flux flux; + + flux = addCallbacks(Flux.switchOnNext(s -> { + assertThat(ctxExists(ctx)).isTrue(); + s.onSubscribe(noopSubscription()); + s.onNext((Publisher) s1 -> { + assertThat(ctxExists(ctx)).isTrue(); + s1.onSubscribe(noopSubscription()); + s1.onNext("foo"); + s1.onComplete(); + }); + s.onComplete(); + }).publishOn(Schedulers.single()), ctx, useContextCapture); + + if (useContextCapture) { + try (SafeCloseable ignored = ctx.push()) { + StepVerifier.create(flux) + .expectNextMatches("foo"::equals) + .verifyComplete(); + } + } else { + StepVerifier.create(flux) + .expectNextMatches("foo"::equals) + .verifyComplete(); + } + assertThat(ctxExists(ctx)).isFalse(); + } + + @ParameterizedTest + @CsvSource({ "true", "false" }) + void fluxZip(boolean useContextCapture) { + final ClientRequestContext ctx = newContext(); + final Flux flux; + + flux = addCallbacks(Flux.zip(Mono.just("foo"), Mono.just("bar"), (foo, bar) -> { + assertThat(ctxExists(ctx)).isTrue(); + return foo; + }).publishOn(Schedulers.single()), ctx, useContextCapture); + + if (useContextCapture) { + try (SafeCloseable ignored = ctx.push()) { + StepVerifier.create(flux) + .expectNextMatches("foo"::equals) + .verifyComplete(); + } + } else { + StepVerifier.create(flux) + .expectNextMatches("foo"::equals) + .verifyComplete(); + } + assertThat(ctxExists(ctx)).isFalse(); + } + + @ParameterizedTest + @CsvSource({ "true", "false" }) + void fluxInterval(boolean useContextCapture) { + final ClientRequestContext ctx = newContext(); + final Flux flux; + + flux = addCallbacks(Flux.interval(Duration.ofMillis(100)).take(2).concatMap(a -> { + assertThat(ctxExists(ctx)).isTrue(); + return Mono.just("foo"); + }).publishOn(Schedulers.single()), ctx, useContextCapture); + + if (useContextCapture) { + try (SafeCloseable ignored = ctx.push()) { + StepVerifier.create(flux) + .expectNextMatches("foo"::equals) + .expectNextMatches("foo"::equals) + .verifyComplete(); + } + } else { + StepVerifier.create(flux) + .expectNextMatches("foo"::equals) + .expectNextMatches("foo"::equals) + .verifyComplete(); + } + assertThat(ctxExists(ctx)).isFalse(); + } + + @ParameterizedTest + @CsvSource({ "true", "false" }) + void fluxConcatDelayError(boolean useContextCapture) { + final ClientRequestContext ctx = newContext(); + final Flux flux; + + flux = addCallbacks(Flux.concatDelayError(s -> { + assertThat(ctxExists(ctx)).isTrue(); + s.onSubscribe(noopSubscription()); + s.onNext("foo"); + s.onError(new AnticipatedException()); + }, s -> { + s.onSubscribe(noopSubscription()); + s.onNext("bar"); + s.onComplete(); + }).publishOn(Schedulers.single()), ctx, useContextCapture); + + if (useContextCapture) { + try (SafeCloseable ignored = ctx.push()) { + StepVerifier.create(flux) + .expectNextMatches("foo"::equals) + .expectNextMatches("bar"::equals) + .verifyErrorMatches(t -> t instanceof AnticipatedException); + } + } else { + StepVerifier.create(flux) + .expectNextMatches("foo"::equals) + .expectNextMatches("bar"::equals) + .verifyErrorMatches(t -> t instanceof AnticipatedException); + } + } + + @ParameterizedTest + @CsvSource({ "true", "false" }) + void fluxTransform(boolean useContextCapture) { + final ClientRequestContext ctx = newContext(); + final Flux flux; + + flux = addCallbacks(Flux.just("foo").transform(fooFlux -> s -> { + assertThat(ctxExists(ctx)).isTrue(); + s.onSubscribe(noopSubscription()); + s.onNext(fooFlux.blockFirst()); + s.onComplete(); + }).publishOn(Schedulers.single()), ctx, useContextCapture); + + if (useContextCapture) { + try (SafeCloseable ignored = ctx.push()) { + StepVerifier.create(flux) + .expectNextMatches("foo"::equals) + .verifyComplete(); + } + } else { + StepVerifier.create(flux) + .expectNextMatches("foo"::equals) + .verifyComplete(); + } + assertThat(ctxExists(ctx)).isFalse(); + } + + @ParameterizedTest + @CsvSource({ "true", "false" }) + void connectableFlux(boolean useContextCapture) { + final ClientRequestContext ctx = newContext(); + final Flux flux; + + final ConnectableFlux connectableFlux = Flux.just("foo").publish(); + flux = addCallbacks(connectableFlux.autoConnect(2).publishOn(Schedulers.single()), + ctx, + useContextCapture); + + if (useContextCapture) { + try (SafeCloseable ignored = ctx.push()) { + flux.subscribe().dispose(); + StepVerifier.create(flux) + .expectNextMatches("foo"::equals) + .verifyComplete(); + } + } else { + flux.subscribe().dispose(); + StepVerifier.create(flux) + .expectNextMatches("foo"::equals) + .verifyComplete(); + } + assertThat(ctxExists(ctx)).isFalse(); + } + + @ParameterizedTest + @CsvSource({ "true", "false" }) + void connectableFlux_dispose(boolean useContextCapture) throws InterruptedException { + final ClientRequestContext ctx = newContext(); + final Flux flux; + + final ConnectableFlux connectableFlux = Flux.just("foo").publish(); + flux = addCallbacks(connectableFlux.autoConnect(2, disposable -> { + assertThat(ctxExists(ctx)).isTrue(); + }).publishOn(Schedulers.newSingle("aaa")), ctx, useContextCapture); + + if (useContextCapture) { + try (SafeCloseable ignored = ctx.push()) { + final Disposable disposable1 = flux.subscribe(); + await().pollDelay(Duration.ofMillis(200)).until(() -> !disposable1.isDisposed()); + final Disposable disposable2 = flux.subscribe(); + await().untilAsserted(() -> { + assertThat(disposable1.isDisposed()).isTrue(); + assertThat(disposable2.isDisposed()).isTrue(); + }); + } + } else { + final Disposable disposable1 = flux.subscribe(); + await().pollDelay(Duration.ofMillis(200)).until(() -> !disposable1.isDisposed()); + final Disposable disposable2 = flux.subscribe(); + await().untilAsserted(() -> { + assertThat(disposable1.isDisposed()).isTrue(); + assertThat(disposable2.isDisposed()).isTrue(); + }); + } + assertThat(ctxExists(ctx)).isFalse(); + } + + @Test + void subscriberContextIsNotMissing() { + final ClientRequestContext ctx = newContext(); + final Flux flux; + + flux = Flux.deferContextual(reactorCtx -> { + assertThat((String) reactorCtx.get("foo")).isEqualTo("bar"); + return Flux.just("baz"); + }); + + final Flux flux1 = flux.contextWrite(reactorCtx -> reactorCtx.put("foo", "bar")); + StepVerifier.create(flux1) + .expectNextMatches("baz"::equals) + .verifyComplete(); + assertThat(ctxExists(ctx)).isFalse(); + } + + @Test + void ctxShouldBeCleanUpEvenIfErrorOccursDuringReactorOperationOnSchedulerThread() + throws InterruptedException { + // Given + final ClientRequestContext ctx = newContext(); + final Flux flux; + final Scheduler single = Schedulers.single(); + + // When + flux = Flux.just("Hello", "Hi") + .subscribeOn(single) + .delayElements(Duration.ofMillis(1000)) + .map(s -> { + if ("Hello".equals(s)) { + throw new RuntimeException(); + } + return s; + }) + .contextWrite(Context.of(RequestContext.class, ctx)); + + // Then + StepVerifier.create(flux) + .expectError(RuntimeException.class) + .verify(); + + final CountDownLatch latch = new CountDownLatch(1); + single.schedule(() -> { + assertThat(ctxExists(ctx)).isFalse(); + latch.countDown(); + }); + latch.await(); + + assertThat(ctxExists(ctx)).isFalse(); + } + + private static Flux addCallbacks(Flux flux0, + ClientRequestContext ctx, + boolean useContextCapture) { + final Flux flux = flux0.doFirst(() -> assertThat(ctxExists(ctx)).isTrue()) + .doOnSubscribe(s -> assertThat(ctxExists(ctx)).isTrue()) + .doOnRequest(l -> assertThat(ctxExists(ctx)).isTrue()) + .doOnNext(foo -> assertThat(ctxExists(ctx)).isTrue()) + .doOnComplete(() -> assertThat(ctxExists(ctx)).isTrue()) + .doOnEach(s -> assertThat(ctxExists(ctx)).isTrue()) + .doOnError(t -> assertThat(ctxExists(ctx)).isTrue()) + .doOnCancel(() -> assertThat(ctxExists(ctx)).isTrue()) + .doFinally(t -> assertThat(ctxExists(ctx)).isTrue()) + .doAfterTerminate(() -> assertThat(ctxExists(ctx)).isTrue()); + + if (useContextCapture) { + return flux.contextCapture(); + } + return flux.contextWrite(Context.of(RequestContext.class, ctx)); + } +} diff --git a/micrometer-context/src/test/java/com/linecorp/armeria/reactor3/RequestContextPropagationMonoTest.java b/micrometer-context/src/test/java/com/linecorp/armeria/reactor3/RequestContextPropagationMonoTest.java new file mode 100644 index 00000000000..1709883b9de --- /dev/null +++ b/micrometer-context/src/test/java/com/linecorp/armeria/reactor3/RequestContextPropagationMonoTest.java @@ -0,0 +1,399 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package com.linecorp.armeria.reactor3; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.reactivestreams.Subscription; + +import com.linecorp.armeria.client.ClientRequestContext; +import com.linecorp.armeria.common.HttpMethod; +import com.linecorp.armeria.common.HttpRequest; +import com.linecorp.armeria.common.RequestContext; +import com.linecorp.armeria.common.micrometer.context.RequestContextThreadLocalAccessor; +import com.linecorp.armeria.common.util.SafeCloseable; +import com.linecorp.armeria.internal.testing.AnticipatedException; +import com.linecorp.armeria.internal.testing.GenerateNativeImageTrace; + +import io.micrometer.context.ContextRegistry; +import reactor.core.publisher.Hooks; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; +import reactor.test.StepVerifier; +import reactor.util.context.Context; +import reactor.util.function.Tuple2; + +@GenerateNativeImageTrace +class RequestContextPropagationMonoTest { + + @BeforeAll + static void setUp() { + ContextRegistry + .getInstance() + .registerThreadLocalAccessor(new RequestContextThreadLocalAccessor()); + Hooks.enableAutomaticContextPropagation(); + } + + @AfterAll + static void tearDown() { + Hooks.disableAutomaticContextPropagation(); + } + + @ParameterizedTest + @CsvSource({ "true", "false" }) + void monoCreate_success(boolean useContextCapture) { + final ClientRequestContext ctx = newContext(); + final Mono mono; + mono = addCallbacks(Mono.create(sink -> { + assertThat(ctxExists(ctx)).isTrue(); + sink.success("foo"); + }).publishOn(Schedulers.single()), ctx, useContextCapture); + + if (useContextCapture) { + try (SafeCloseable ignored = ctx.push()) { + StepVerifier.create(mono) + .expectNextMatches("foo"::equals) + .verifyComplete(); + } + } else { + StepVerifier.create(mono) + .expectNextMatches("foo"::equals) + .verifyComplete(); + } + assertThat(ctxExists(ctx)).isFalse(); + } + + @ParameterizedTest + @CsvSource({ "true", "false" }) + void monoCreate_error(boolean useContextCapture) { + final ClientRequestContext ctx = newContext(); + final Mono mono; + mono = addCallbacks(Mono.create(sink -> { + assertThat(ctxExists(ctx)).isTrue(); + sink.error(new AnticipatedException()); + }).publishOn(Schedulers.single()), ctx, useContextCapture); + + if (useContextCapture) { + try (SafeCloseable ignored = ctx.push()) { + StepVerifier.create(mono) + .verifyErrorMatches(t -> t instanceof AnticipatedException); + } + } else { + StepVerifier.create(mono) + .verifyErrorMatches(t -> t instanceof AnticipatedException); + } + assertThat(ctxExists(ctx)).isFalse(); + } + + @ParameterizedTest + @CsvSource({ "true", "false" }) + void monoCreate_currentContext(boolean useContextCapture) { + final ClientRequestContext ctx = newContext(); + final Mono mono; + mono = addCallbacks(Mono.create(sink -> { + assertThat(ctxExists(ctx)).isTrue(); + sink.success("foo"); + }).publishOn(Schedulers.single()), ctx, useContextCapture); + + if (useContextCapture) { + try (SafeCloseable ignored = ctx.push()) { + StepVerifier.create(mono) + .expectNextMatches("foo"::equals) + .verifyComplete(); + } + } else { + StepVerifier.create(mono) + .expectNextMatches("foo"::equals) + .verifyComplete(); + } + assertThat(ctxExists(ctx)).isFalse(); + } + + @ParameterizedTest + @CsvSource({ "true", "false" }) + void monoDefer(boolean useContextCapture) { + final ClientRequestContext ctx = newContext(); + final Mono mono; + mono = addCallbacks(Mono.defer(() -> Mono.fromSupplier(() -> { + assertThat(ctxExists(ctx)).isTrue(); + return "foo"; + })).publishOn(Schedulers.single()), ctx, useContextCapture); + + if (useContextCapture) { + try (SafeCloseable ignored = ctx.push()) { + StepVerifier.create(mono) + .expectNextMatches("foo"::equals) + .verifyComplete(); + } + } else { + StepVerifier.create(mono) + .expectNextMatches("foo"::equals) + .verifyComplete(); + } + assertThat(ctxExists(ctx)).isFalse(); + } + + @ParameterizedTest + @CsvSource({ "true", "false" }) + void monoFromPublisher(boolean useContextCapture) { + final ClientRequestContext ctx = newContext(); + final Mono mono; + mono = addCallbacks(Mono.from(s -> { + assertThat(ctxExists(ctx)).isTrue(); + s.onSubscribe(noopSubscription()); + s.onNext("foo"); + s.onComplete(); + }).publishOn(Schedulers.single()), ctx, useContextCapture); + + if (useContextCapture) { + try (SafeCloseable ignored = ctx.push()) { + StepVerifier.create(mono) + .expectNextMatches("foo"::equals) + .verifyComplete(); + } + } else { + StepVerifier.create(mono) + .expectNextMatches("foo"::equals) + .verifyComplete(); + } + assertThat(ctxExists(ctx)).isFalse(); + } + + @ParameterizedTest + @CsvSource({ "true", "false" }) + void monoError(boolean useContextCapture) { + final ClientRequestContext ctx = newContext(); + final Mono mono; + mono = addCallbacks(Mono.error(() -> { + assertThat(ctxExists(ctx)).isTrue(); + return new AnticipatedException(); + }).publishOn(Schedulers.single()), ctx, useContextCapture); + + if (useContextCapture) { + try (SafeCloseable ignored = ctx.push()) { + StepVerifier.create(mono) + .verifyErrorMatches(t -> t instanceof AnticipatedException); + } + } else { + StepVerifier.create(mono) + .verifyErrorMatches(t -> t instanceof AnticipatedException); + } + assertThat(ctxExists(ctx)).isFalse(); + } + + @ParameterizedTest + @CsvSource({ "true", "false" }) + void monoFirst(boolean useContextCapture) { + final ClientRequestContext ctx = newContext(); + final Mono mono; + mono = addCallbacks(Mono.firstWithSignal(Mono.delay(Duration.ofMillis(1000)).then(Mono.just("bar")), + Mono.fromCallable(() -> { + assertThat(ctxExists(ctx)).isTrue(); + return "foo"; + })) + .publishOn(Schedulers.single()), ctx, useContextCapture); + + if (useContextCapture) { + try (SafeCloseable ignored = ctx.push()) { + StepVerifier.create(mono) + .expectNextMatches("foo"::equals) + .verifyComplete(); + } + } else { + StepVerifier.create(mono) + .expectNextMatches("foo"::equals) + .verifyComplete(); + } + assertThat(ctxExists(ctx)).isFalse(); + } + + @ParameterizedTest + @CsvSource({ "true", "false" }) + void monoFromFuture(boolean useContextCapture) { + final CompletableFuture future = new CompletableFuture<>(); + future.complete("foo"); + final ClientRequestContext ctx = newContext(); + final Mono mono; + mono = addCallbacks(Mono.fromFuture(future) + .publishOn(Schedulers.single()), ctx, useContextCapture); + + if (useContextCapture) { + try (SafeCloseable ignored = ctx.push()) { + StepVerifier.create(mono) + .expectNextMatches("foo"::equals) + .verifyComplete(); + } + } else { + StepVerifier.create(mono) + .expectNextMatches("foo"::equals) + .verifyComplete(); + } + assertThat(ctxExists(ctx)).isFalse(); + } + + @ParameterizedTest + @CsvSource({ "true", "false" }) + void monoDelay(boolean useContextCapture) { + final CompletableFuture future = new CompletableFuture<>(); + future.complete("foo"); + final ClientRequestContext ctx = newContext(); + final Mono mono; + mono = addCallbacks(Mono.delay(Duration.ofMillis(100)).then(Mono.fromCallable(() -> { + assertThat(ctxExists(ctx)).isTrue(); + return "foo"; + })).publishOn(Schedulers.single()), ctx, useContextCapture); + + if (useContextCapture) { + try (SafeCloseable ignored = ctx.push()) { + StepVerifier.create(mono) + .expectNextMatches("foo"::equals) + .verifyComplete(); + } + } else { + StepVerifier.create(mono) + .expectNextMatches("foo"::equals) + .verifyComplete(); + } + assertThat(ctxExists(ctx)).isFalse(); + } + + @ParameterizedTest + @CsvSource({ "true", "false" }) + void monoZip(boolean useContextCapture) { + final CompletableFuture future = new CompletableFuture<>(); + future.complete("foo"); + final ClientRequestContext ctx = newContext(); + final Mono> mono; + mono = addCallbacks(Mono.zip(Mono.fromSupplier(() -> { + assertThat(ctxExists(ctx)).isTrue(); + return "foo"; + }), Mono.fromSupplier(() -> { + assertThat(ctxExists(ctx)).isTrue(); + return "bar"; + })).publishOn(Schedulers.single()), ctx, useContextCapture); + + if (useContextCapture) { + try (SafeCloseable ignored = ctx.push()) { + StepVerifier.create(mono) + .expectNextMatches(t -> "foo".equals(t.getT1()) && "bar".equals(t.getT2())) + .verifyComplete(); + } + } else { + StepVerifier.create(mono) + .expectNextMatches(t -> "foo".equals(t.getT1()) && "bar".equals(t.getT2())) + .verifyComplete(); + } + assertThat(ctxExists(ctx)).isFalse(); + } + + @Test + void subscriberContextIsNotMissing() { + final ClientRequestContext ctx = newContext(); + final Mono mono; + mono = Mono.deferContextual(Mono::just).handle((reactorCtx, sink) -> { + assertThat((String) reactorCtx.get("foo")).isEqualTo("bar"); + sink.next("baz"); + }); + + final Mono mono1 = mono.contextWrite(reactorCtx -> reactorCtx.put("foo", "bar")); + StepVerifier.create(mono1) + .expectNextMatches("baz"::equals) + .verifyComplete(); + assertThat(ctxExists(ctx)).isFalse(); + } + + @Test + void ctxShouldBeCleanUpEvenIfErrorOccursDuringReactorOperationOnSchedulerThread() + throws InterruptedException { + // Given + final ClientRequestContext ctx = newContext(); + final Mono mono; + final Scheduler single = Schedulers.single(); + + // When + mono = Mono.just("Hello") + .subscribeOn(single) + .delayElement(Duration.ofMillis(1000)) + .map(s -> { + if ("Hello".equals(s)) { + throw new RuntimeException(); + } + return s; + }) + .contextWrite(Context.of(RequestContext.class, ctx)); + + // Then + StepVerifier.create(mono) + .expectError(RuntimeException.class) + .verify(); + + final CountDownLatch latch = new CountDownLatch(1); + single.schedule(() -> { + assertThat(ctxExists(ctx)).isFalse(); + latch.countDown(); + }); + latch.await(); + + assertThat(ctxExists(ctx)).isFalse(); + } + + static Subscription noopSubscription() { + return new Subscription() { + @Override + public void request(long n) {} + + @Override + public void cancel() {} + }; + } + + static boolean ctxExists(ClientRequestContext ctx) { + return RequestContext.currentOrNull() == ctx; + } + + static ClientRequestContext newContext() { + return ClientRequestContext.builder(HttpRequest.of(HttpMethod.GET, "/")) + .build(); + } + + private static Mono addCallbacks(Mono mono0, ClientRequestContext ctx, + boolean useContextCapture) { + final Mono mono = mono0.doFirst(() -> assertThat(ctxExists(ctx)).isTrue()) + .doOnSubscribe(s -> assertThat(ctxExists(ctx)).isTrue()) + .doOnRequest(l -> assertThat(ctxExists(ctx)).isTrue()) + .doOnNext(foo -> assertThat(ctxExists(ctx)).isTrue()) + .doOnSuccess(t -> assertThat(ctxExists(ctx)).isTrue()) + .doOnEach(s -> assertThat(ctxExists(ctx)).isTrue()) + .doOnError(t -> assertThat(ctxExists(ctx)).isTrue()) + .doOnCancel(() -> assertThat(ctxExists(ctx)).isTrue()) + .doFinally(t -> assertThat(ctxExists(ctx)).isTrue()) + .doAfterTerminate(() -> assertThat(ctxExists(ctx)).isTrue()); + if (useContextCapture) { + return mono.contextCapture(); + } + return mono.contextWrite(Context.of(RequestContext.class, ctx)); + } +} diff --git a/settings.gradle b/settings.gradle index 9b7e88b98de..f43ffc4c779 100644 --- a/settings.gradle +++ b/settings.gradle @@ -122,6 +122,7 @@ includeWithFlags ':logback13', 'java', 'publish', 'rel project(':logback13').projectDir = file('logback/logback13') includeWithFlags ':logback14', 'java11', 'publish', 'relocate', 'no_aggregation' project(':logback14').projectDir = file('logback/logback14') +includeWithFlags ':micrometer-context', 'java', 'relocate', 'native' includeWithFlags ':native-image-config' includeWithFlags ':oauth2', 'java', 'publish', 'relocate', 'native' includeWithFlags ':prometheus1', 'java', 'publish', 'relocate', 'native'