Skip to content

Commit

Permalink
Support micrometer context-propagation (#5577)
Browse files Browse the repository at this point in the history
### Motivation:
- Related Issue : #5145
- `Armeria` already support context-propagation to maintain
`RequestContext` during executing Reactor code. How it requires
maintenance.
- `Reactor` integrate `micro-meter:context-propagation` to do
context-propagation during `Flux`, `Mono` officially. thus, it would be
better to migrate from `RequestContextHook` to
`RequestContextPropagationHooks` because it can reduce maintenance cost.


### Modifications:
- Add new `Hook` for `Reactor`. 
- Add new `ThreadLocalAccessor` for `micro-meter:context-propagation` to
main `RequestContext` during executing Reactor code like `Mono`, `Flux`.
- Add new config `enableContextPropagation` to integrate
`micro-meter:context-propagation` with `spring-boot3`.


### Result:
- Closes #5145
- If user want to use `micrometer:context-propagation` to maintain
`RequestContext` during executing Reactor code like `Mono`, `Flux`, just
call `RequestContextPropagationHook.enable()`.

---------

Co-authored-by: minux <[email protected]>
Co-authored-by: Trustin Lee <[email protected]>
Co-authored-by: Trustin Lee <[email protected]>
Co-authored-by: jrhee17 <[email protected]>
Co-authored-by: Ikhun Um <[email protected]>
  • Loading branch information
6 people authored Nov 8, 2024
1 parent 460ea02 commit 0019bc7
Show file tree
Hide file tree
Showing 8 changed files with 1,340 additions and 0 deletions.
5 changes: 5 additions & 0 deletions dependencies.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ caffeine = "2.9.3"
cglib = "3.3.0"
checkerframework = "2.5.6"
checkstyle = "10.3.2"
context-propagation = "1.1.1"
controlplane = "1.0.45"
curator = "5.7.0"
dagger = "2.51.1"
Expand Down Expand Up @@ -320,6 +321,10 @@ version.ref = "checkerframework"
module = "com.puppycrawl.tools:checkstyle"
version.ref = "checkstyle"

[libraries.context-propagation]
module = "io.micrometer:context-propagation"
version.ref = "context-propagation"

[libraries.controlplane-api]
module = "io.envoyproxy.controlplane:api"
version.ref = "controlplane"
Expand Down
4 changes: 4 additions & 0 deletions micrometer-context/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
dependencies {
implementation libs.context.propagation
testImplementation project(':reactor3')
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright 2024 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.linecorp.armeria.common.micrometer.context;

import org.reactivestreams.Subscription;

import com.linecorp.armeria.common.RequestContext;
import com.linecorp.armeria.common.RequestContextStorage;
import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.common.annotation.UnstableApi;
import com.linecorp.armeria.internal.common.RequestContextUtil;

import io.micrometer.context.ContextRegistry;
import io.micrometer.context.ContextSnapshot;
import io.micrometer.context.ContextSnapshot.Scope;
import io.micrometer.context.ThreadLocalAccessor;

/**
* This class works with the
* <a href="https://docs.micrometer.io/context-propagation/reference/index.html">Micrometer
* Context Propagation</a> to keep the {@link RequestContext} during
* <a href="https://github.com/reactor/reactor-core">Reactor</a> operations.
* Get the {@link RequestContextThreadLocalAccessor} to register it to the {@link ContextRegistry}.
* Then, {@link ContextRegistry} will use {@link RequestContextThreadLocalAccessor} to
* propagate context during the
* <a href="https://github.com/reactor/reactor-core">Reactor</a> operations
* so that you can get the context using {@link RequestContext#current()}.
* However, please note that you should include Mono#contextWrite(ContextView) or
* Flux#contextWrite(ContextView) to end of the Reactor codes.
* If not, {@link RequestContext} will not be keep during Reactor Operation.
*/
@UnstableApi
public final class RequestContextThreadLocalAccessor implements ThreadLocalAccessor<RequestContext> {

private static final Object KEY = RequestContext.class;

/**
* The value which obtained through {@link RequestContextThreadLocalAccessor},
* will be stored in the Context under this {@code KEY}.
* This method will be called by {@link ContextSnapshot} internally.
*/
@Override
public Object key() {
return KEY;
}

/**
* {@link ContextSnapshot} will call this method during the execution
* of lambda functions in {@link ContextSnapshot#wrap(Runnable)},
* as well as during Mono#subscribe(), Flux#subscribe(),
* {@link Subscription#request(long)}, and CoreSubscriber#onSubscribe(Subscription).
* Following these calls, {@link ContextSnapshot#setThreadLocals()} is
* invoked to restore the state of {@link RequestContextStorage}.
* Furthermore, at the end of these methods, {@link Scope#close()} is executed
* to revert the {@link RequestContextStorage} to its original state.
*/
@Nullable
@Override
public RequestContext getValue() {
return RequestContext.currentOrNull();
}

/**
* {@link ContextSnapshot} will call this method during the execution
* of lambda functions in {@link ContextSnapshot#wrap(Runnable)},
* as well as during Mono#subscribe(), Flux#subscribe(),
* {@link Subscription#request(long)}, and CoreSubscriber#onSubscribe(Subscription).
* Following these calls, {@link ContextSnapshot#setThreadLocals()} is
* invoked to restore the state of {@link RequestContextStorage}.
* Furthermore, at the end of these methods, {@link Scope#close()} is executed
* to revert the {@link RequestContextStorage} to its original state.
*/
@Override
@SuppressWarnings("MustBeClosedChecker")
public void setValue(RequestContext value) {
RequestContextUtil.getAndSet(value);
}

/**
* This method will be called at the start of {@link ContextSnapshot.Scope} and
* the end of {@link ContextSnapshot.Scope}. If reactor Context does not
* contains {@link RequestContextThreadLocalAccessor#KEY}, {@link ContextSnapshot} will use
* this method to remove the value from {@link ThreadLocal}.
* Please note that {@link RequestContextUtil#pop()} return {@link AutoCloseable} instance,
* but it is not used in `Try with Resources` syntax. this is because {@link ContextSnapshot.Scope}
* will handle the {@link AutoCloseable} instance returned by {@link RequestContextUtil#pop()}.
*/
@Override
@SuppressWarnings("MustBeClosedChecker")
public void setValue() {
RequestContextUtil.pop();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2024 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

/**
* Micrometer context-propagation plugins to help keep {@link com.linecorp.armeria.common.RequestContext}
* during Reactor operations.
*/
@UnstableApi
@NonNullByDefault
package com.linecorp.armeria.common.micrometer.context;

import com.linecorp.armeria.common.annotation.NonNullByDefault;
import com.linecorp.armeria.common.annotation.UnstableApi;
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Copyright 2024 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.linecorp.armeria.common.micrometer.context;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import org.junit.jupiter.api.Test;

import com.linecorp.armeria.client.ClientRequestContext;
import com.linecorp.armeria.common.HttpMethod;
import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.common.RequestContext;
import com.linecorp.armeria.internal.common.RequestContextUtil;

import io.micrometer.context.ContextRegistry;
import io.micrometer.context.ContextSnapshot;
import io.micrometer.context.ContextSnapshot.Scope;
import io.micrometer.context.ContextSnapshotFactory;

class RequestContextThreadLocalAccessorTest {

@Test
void should_return_expected_key() {
// Given
final RequestContextThreadLocalAccessor reqCtxAccessor = new RequestContextThreadLocalAccessor();
final Object expectedValue = RequestContext.class;

// When
final Object result = reqCtxAccessor.key();

// Then
assertThat(result).isEqualTo(expectedValue);
}

@Test
@SuppressWarnings("MustBeClosedChecker")
void should_success_set() {
// Given
final ClientRequestContext ctx = newContext();
final RequestContextThreadLocalAccessor reqCtxAccessor = new RequestContextThreadLocalAccessor();

// When
reqCtxAccessor.setValue(ctx);

// Then
final RequestContext currentCtx = RequestContext.current();
assertThat(currentCtx).isEqualTo(ctx);

RequestContextUtil.pop();
}

@Test
void should_throw_NPE_when_set_null() {
// Given
final RequestContextThreadLocalAccessor reqCtxAccessor = new RequestContextThreadLocalAccessor();

// When + Then
assertThatThrownBy(() -> reqCtxAccessor.setValue(null))
.isInstanceOf(NullPointerException.class);
}

@Test
void should_be_null_when_setValue() {
// Given
final ClientRequestContext ctx = newContext();
final RequestContextThreadLocalAccessor reqCtxAccessor = new RequestContextThreadLocalAccessor();
reqCtxAccessor.setValue(ctx);

// When
reqCtxAccessor.setValue();

// Then
final RequestContext reqCtx = RequestContext.currentOrNull();
assertThat(reqCtx).isNull();
}

@Test
@SuppressWarnings("MustBeClosedChecker")
void should_be_restore_original_state_when_restore() {
// Given
final RequestContextThreadLocalAccessor reqCtxAccessor = new RequestContextThreadLocalAccessor();
final ClientRequestContext previousCtx = newContext();
final ClientRequestContext currentCtx = newContext();
reqCtxAccessor.setValue(currentCtx);

// When
reqCtxAccessor.restore(previousCtx);

// Then
final RequestContext reqCtx = RequestContext.currentOrNull();
assertThat(reqCtx).isNotNull();
assertThat(reqCtx).isEqualTo(previousCtx);

RequestContextUtil.pop();
}

@Test
void should_be_null_when_restore() {
// Given
final RequestContextThreadLocalAccessor reqCtxAccessor = new RequestContextThreadLocalAccessor();
final ClientRequestContext currentCtx = newContext();
reqCtxAccessor.setValue(currentCtx);

// When
reqCtxAccessor.restore();

// Then
final RequestContext reqCtx = RequestContext.currentOrNull();
assertThat(reqCtx).isNull();
}

@Test
void requestContext_should_exist_inside_scope_and_not_outside() {
// Given
final RequestContextThreadLocalAccessor reqCtxAccessor = new RequestContextThreadLocalAccessor();
ContextRegistry.getInstance()
.registerThreadLocalAccessor(reqCtxAccessor);
final ClientRequestContext currentCtx = newContext();
final ClientRequestContext expectedCtx = currentCtx;
reqCtxAccessor.setValue(currentCtx);

final ContextSnapshotFactory factory = ContextSnapshotFactory.builder()
.clearMissing(true)
.build();
final ContextSnapshot contextSnapshot = factory.captureAll();
reqCtxAccessor.setValue();

// When : contextSnapshot.setThreadLocals()
try (Scope ignored = contextSnapshot.setThreadLocals()) {

// Then : should not
final RequestContext reqCtxInScope = RequestContext.currentOrNull();
assertThat(reqCtxInScope).isSameAs(expectedCtx);
}

// Then
final RequestContext reqCtxOutOfScope = RequestContext.currentOrNull();
assertThat(reqCtxOutOfScope).isNull();
}

static ClientRequestContext newContext() {
return ClientRequestContext.of(HttpRequest.of(HttpMethod.GET, "/"));
}
}
Loading

0 comments on commit 0019bc7

Please sign in to comment.