Skip to content

Commit

Permalink
Merge #2588 into 2.0.0-M4
Browse files Browse the repository at this point in the history
  • Loading branch information
violetagg committed Nov 28, 2022
2 parents 20688bd + 106cbff commit 1338a50
Show file tree
Hide file tree
Showing 33 changed files with 1,339 additions and 42 deletions.
1 change: 0 additions & 1 deletion build.gradle
Expand Up @@ -88,7 +88,6 @@ ext {
micrometerTracingVersion = '1.0.0' //optional baseline
micrometerDocsVersion = '1.0.0' //optional baseline

// Used only in tests
contextPropagationVersion = '1.0.0' //optional baseline

// Used only in examples
Expand Down
18 changes: 18 additions & 0 deletions docs/asciidoc/http-client.adoc
Expand Up @@ -468,6 +468,24 @@ The result in `Zipkin` looks like:

image::images/http-client-tracing.png[]

=== Access Current Observation
Project Micrometer provides https://micrometer.io/docs/contextPropagation[`a library`] that assists with context propagation across
different types of context mechanisms such as `ThreadLocal`, `Reactor Context` and others.

The following example shows how to use this library in a custom `ChannelHandler`:

====
[source,java,indent=0]
.{examplesdir}/tracing/custom/Application.java
----
include::{examplesdir}/tracing/custom/Application.java[lines=18..85]
----
<1> Initializes Brave, Zipkin, and the Observation registry.
<2> Templated URIs are used as an URI tag value when possible.
<3> Enables the built-in integration with Micrometer.
<4> Custom `ChannelHandler` that uses context propagation library.
====

== Unix Domain Sockets
The `HTTP` client supports Unix Domain Sockets (UDS) for all transports (native and NIO).

Expand Down
18 changes: 18 additions & 0 deletions docs/asciidoc/http-server.adoc
Expand Up @@ -644,6 +644,24 @@ The result in `Zipkin` looks like:

image::images/http-server-tracing.png[]

=== Access Current Observation
Project Micrometer provides https://micrometer.io/docs/contextPropagation[`a library`] that assists with context propagation across
different types of context mechanisms such as `ThreadLocal`, `Reactor Context` and others.

The following example shows how to use this library in a custom `ChannelHandler`:

====
[source,java,indent=0]
.{examplesdir}/tracing/custom/Application.java
----
include::{examplesdir}/tracing/custom/Application.java[lines=18..85]
----
<1> Initializes Brave, Zipkin, and the Observation registry.
<2> Templated URIs are used as an URI tag value when possible.
<3> Enables the built-in integration with Micrometer.
<4> Custom `ChannelHandler` that uses context propagation library.
====

== Unix Domain Sockets
The `HTTP` server supports Unix Domain Sockets (UDS) for all transports (native and NIO).

Expand Down
17 changes: 17 additions & 0 deletions docs/asciidoc/tcp-client.adoc
Expand Up @@ -355,6 +355,23 @@ The result in `Zipkin` looks like:

image::images/tcp-client-tracing.png[]

=== Access Current Observation
Project Micrometer provides https://micrometer.io/docs/contextPropagation[`a library`] that assists with context propagation across
different types of context mechanisms such as `ThreadLocal`, `Reactor Context` and others.

The following example shows how to use this library in a custom `ChannelHandler`:

====
[source,java,indent=0]
.{examplesdir}/tracing/custom/Application.java
----
include::{examplesdir}/tracing/custom/Application.java[lines=18..80]
----
<1> Initializes Brave, Zipkin, and the Observation registry.
<2> Enables the built-in integration with Micrometer.
<3> Custom `ChannelHandler` that uses context propagation library.
====

== Unix Domain Sockets
The `TCP` client supports Unix Domain Sockets (UDS) for all transports (native and NIO).

Expand Down
17 changes: 17 additions & 0 deletions docs/asciidoc/tcp-server.adoc
Expand Up @@ -323,6 +323,23 @@ The result in `Zipkin` looks like:

image::images/tcp-server-tracing.png[]

=== Access Current Observation
Project Micrometer provides https://micrometer.io/docs/contextPropagation[`a library`] that assists with context propagation across
different types of context mechanisms such as `ThreadLocal`, `Reactor Context` and others.

The following example shows how to use this library in a custom `ChannelHandler`:

====
[source,java,indent=0]
.{examplesdir}/tracing/custom/Application.java
----
include::{examplesdir}/tracing/custom/Application.java[lines=18..86]
----
<1> Initializes Brave, Zipkin, and the Observation registry.
<2> Enables the built-in integration with Micrometer.
<3> Custom `ChannelHandler` that uses context propagation library.
====

== Unix Domain Sockets
The `TCP` server supports Unix Domain Sockets (UDS) for all transports (native and NIO).

Expand Down
5 changes: 5 additions & 0 deletions reactor-netty5-core/build.gradle
Expand Up @@ -113,6 +113,7 @@ dependencies {
//Metrics
compileOnly "io.micrometer:micrometer-core:$micrometerVersion"
compileOnly "io.micrometer:micrometer-tracing:$micrometerTracingVersion"
compileOnly "io.micrometer:context-propagation:$contextPropagationVersion"

// Logging
compileOnly "org.slf4j:slf4j-api:$slf4jVersion"
Expand Down Expand Up @@ -154,14 +155,18 @@ dependencies {
for (dependency in project.configurations.shaded.dependencies) {
compileOnly(dependency)
testImplementation(dependency)
contextPropagationTestImplementation(dependency)
}

jarFileTestImplementation "org.assertj:assertj-core:$assertJVersion"
jarFileTestImplementation "org.junit.jupiter:junit-jupiter-api:$junitVersion"
jarFileTestRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:$junitVersion"

contextPropagationTestImplementation "io.projectreactor:reactor-test:$testAddonVersion"
contextPropagationTestImplementation "org.assertj:assertj-core:$assertJVersion"
contextPropagationTestImplementation "org.junit.jupiter:junit-jupiter-api:$junitVersion"
contextPropagationTestImplementation "org.junit.jupiter:junit-jupiter-params:$junitVersion"
contextPropagationTestImplementation "ch.qos.logback:logback-classic:$logbackVersion"
contextPropagationTestImplementation "io.micrometer:micrometer-core:$micrometerVersion"
contextPropagationTestImplementation "io.micrometer:context-propagation:$contextPropagationVersion"
contextPropagationTestRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:$junitVersion"
Expand Down
@@ -0,0 +1,64 @@
/*
* Copyright (c) 2022 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package reactor.netty5;

import io.micrometer.context.ContextRegistry;
import io.micrometer.context.ContextSnapshot;
import io.netty5.channel.Channel;
import io.netty5.channel.embedded.EmbeddedChannel;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import reactor.util.context.Context;

import static org.assertj.core.api.Assertions.assertThat;
import static reactor.netty5.ReactorNetty.setChannelContext;

class ChannelContextAccessorTest {
static final ContextRegistry registry = ContextRegistry.getInstance();

static Object[][] data() {
return new Object[][]{
{null, "test1"},
{Context.empty(), "test1"},
{Context.of("test", "test"), "test1"}
};
}

@ParameterizedTest
@MethodSource("data")
@SuppressWarnings("FutureReturnValueIgnored")
void test(Context context, String expectation) {
registry.registerThreadLocalAccessor(new TestThreadLocalAccessor());

TestThreadLocalHolder.value("test1");

Channel channel = new EmbeddedChannel();
setChannelContext(channel, context);

ContextSnapshot.captureAll(registry).updateContext(channel);

TestThreadLocalHolder.value("test2");

try (ContextSnapshot.Scope scope = ContextSnapshot.captureFrom(channel).setThreadLocals()) {
assertThat(TestThreadLocalHolder.value()).isEqualTo(expectation);
}
finally {
//"FutureReturnValueIgnored" this is deliberate
channel.close();
registry.removeThreadLocalAccessor(TestThreadLocalAccessor.KEY);
}
}
}
Expand Up @@ -15,8 +15,23 @@
*/
package reactor.netty5;

import io.micrometer.context.ContextRegistry;
import io.micrometer.context.ContextSnapshot;
import io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor;
import io.netty5.buffer.Buffer;
import io.netty5.channel.ChannelHandler;
import io.netty5.channel.ChannelHandlerAdapter;
import io.netty5.channel.ChannelHandlerContext;
import io.netty5.util.concurrent.Future;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;
import reactor.netty5.tcp.TcpClient;
import reactor.netty5.tcp.TcpServer;
import reactor.test.StepVerifier;

import java.nio.charset.Charset;
import java.time.Duration;
import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;

Expand All @@ -26,4 +41,75 @@ class ContextPropagationTest {
void testObservationKey() {
assertThat(Metrics.OBSERVATION_KEY).isEqualTo(ObservationThreadLocalAccessor.KEY);
}

@Test
void testContextPropagation() {
DisposableServer disposableServer =
TcpServer.create()
.wiretap(true)
.handle((in, out) -> out.send(in.receive().transferOwnership()))
.bindNow();

ContextRegistry registry = ContextRegistry.getInstance();
try {
registry.registerThreadLocalAccessor(new TestThreadLocalAccessor());
TestThreadLocalHolder.value("First");

Connection connection =
TcpClient.create()
.port(disposableServer.port())
.wiretap(true)
.connect()
.contextWrite(ctx -> ContextSnapshot.captureAll(registry).updateContext(ctx))
.block();

assertThat(connection).isNotNull();

connection.outbound()
.withConnection(conn -> conn.addHandlerLast(TestChannelOutboundHandler.INSTANCE))
.sendString(Mono.just("Test"))
.then()
.subscribe();

connection.inbound()
.receive()
.asString()
.take(1)
.as(StepVerifier::create)
.expectNext("TestFirstSecond")
.expectComplete()
.verify(Duration.ofSeconds(5));
}
finally {
registry.removeThreadLocalAccessor(TestThreadLocalAccessor.KEY);
disposableServer.disposeNow();
}
}

static final class TestChannelOutboundHandler extends ChannelHandlerAdapter {

static final ChannelHandler INSTANCE = new TestChannelOutboundHandler();

@Override
public boolean isSharable() {
return true;
}

@Override
@SuppressWarnings("try")
public Future<Void> write(ChannelHandlerContext ctx, Object msg) {
TestThreadLocalHolder.value("Second");
if (msg instanceof Buffer buffer) {
Buffer buffer1;
try (ContextSnapshot.Scope scope = ContextSnapshot.captureFrom(ctx.channel()).setThreadLocals()) {
buffer1 = ctx.bufferAllocator().copyOf(TestThreadLocalHolder.value(), Charset.defaultCharset());
}
Buffer buffer2 = ctx.bufferAllocator().copyOf(TestThreadLocalHolder.value(), Charset.defaultCharset());
return ctx.write(ctx.bufferAllocator().compose(List.of(buffer.send(), buffer1.send(), buffer2.send())));
}
else {
return ctx.write(msg);
}
}
}
}
@@ -0,0 +1,43 @@
/*
* Copyright (c) 2022 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package reactor.netty5;

import io.micrometer.context.ThreadLocalAccessor;

final class TestThreadLocalAccessor implements ThreadLocalAccessor<String> {

static final String KEY = "testContextPropagation";

@Override
public Object key() {
return KEY;
}

@Override
public String getValue() {
return TestThreadLocalHolder.value();
}

@Override
public void setValue(String value) {
TestThreadLocalHolder.value(value);
}

@Override
public void reset() {
TestThreadLocalHolder.reset();
}
}
@@ -0,0 +1,33 @@
/*
* Copyright (c) 2022 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package reactor.netty5;

final class TestThreadLocalHolder {

static final ThreadLocal<String> holder = new ThreadLocal<>();

static void reset() {
holder.remove();
}

static String value() {
return holder.get();
}

static void value(String value) {
holder.set(value);
}
}

0 comments on commit 1338a50

Please sign in to comment.