Skip to content

Commit

Permalink
kotlinx-coroutines-reactor context propagation (#5196)
Browse files Browse the repository at this point in the history
* kotlinx-coroutines-reactor context propagation

* extract context from reactor

* add generics

* muzzle

* actually use the context extracted from reactor

* test context propagation operator

* typo

* used named instead of namedOneOf

* instrument newCoroutineContext, remove reactor specific code

* revert changes
  • Loading branch information
laurit committed Jan 28, 2022
1 parent 7df2b0e commit 014624e
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 22 deletions.
6 changes: 6 additions & 0 deletions instrumentation/kotlinx-coroutines/javaagent/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,20 @@ muzzle {
versions.set("[1.3.9,)")
}
}

dependencies {
compileOnly("io.opentelemetry:opentelemetry-extension-kotlin")
compileOnly("org.jetbrains.kotlin:kotlin-stdlib-jdk8")

testInstrumentation(project(":instrumentation:reactor:reactor-3.1:javaagent"))

testImplementation("io.opentelemetry:opentelemetry-extension-kotlin")
testImplementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
testImplementation(project(":instrumentation:reactor:reactor-3.1:library"))

// Use first version with flow support since we have tests for it.
testLibrary("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.0")
testLibrary("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.3.0")
}

tasks {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
package io.opentelemetry.javaagent.instrumentation.kotlinxcoroutines;

import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.namedOneOf;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;

import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
Expand All @@ -19,40 +18,27 @@
public class KotlinCoroutinesInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("kotlinx.coroutines.BuildersKt");
return named("kotlinx.coroutines.CoroutineContextKt");
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
namedOneOf("launch", "launch$default")
named("newCoroutineContext")
.and(takesArgument(1, named("kotlin.coroutines.CoroutineContext"))),
this.getClass().getName() + "$LaunchAdvice");
transformer.applyAdviceToMethod(
namedOneOf("runBlocking", "runBlocking$default")
.and(takesArgument(0, named("kotlin.coroutines.CoroutineContext"))),
this.getClass().getName() + "$RunBlockingAdvice");
this.getClass().getName() + "$ContextAdvice");
}

@SuppressWarnings("unused")
public static class LaunchAdvice {
public static class ContextAdvice {

@Advice.OnMethodEnter
public static void enter(
@Advice.Argument(value = 1, readOnly = false) CoroutineContext coroutineContext) {
coroutineContext =
KotlinCoroutinesInstrumentationHelper.addOpenTelemetryContext(coroutineContext);
}
}

@SuppressWarnings("unused")
public static class RunBlockingAdvice {

@Advice.OnMethodEnter
public static void enter(
@Advice.Argument(value = 0, readOnly = false) CoroutineContext coroutineContext) {
coroutineContext =
KotlinCoroutinesInstrumentationHelper.addOpenTelemetryContext(coroutineContext);
if (coroutineContext != null) {
coroutineContext =
KotlinCoroutinesInstrumentationHelper.addOpenTelemetryContext(coroutineContext);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -225,4 +225,87 @@ class KotlinCoroutineInstrumentationTest extends AgentInstrumentationSpecificati
assert seenItersA.equals(expectedIters)
assert seenItersB.equals(expectedIters)
}

def "kotlin traced mono"() {
setup:
KotlinCoroutineTests kotlinTest = new KotlinCoroutineTests(dispatcher)

when:
kotlinTest.tracedMono()

then:
assertTraces(1) {
trace(0, 2) {
span(0) {
name "parent"
attributes {
}
}
span("child") {
childOf span(0)
attributes {
}
}
}
}

where:
dispatcher << dispatchersToTest
}

def "kotlin traced mono with context propagation operator"() {
setup:
KotlinCoroutineTests kotlinTest = new KotlinCoroutineTests(dispatcher)

when:
kotlinTest.tracedMonoContextPropagationOperator()

then:
assertTraces(1) {
trace(0, 2) {
span(0) {
name "parent"
attributes {
}
}
span("child") {
childOf span(0)
attributes {
}
}
}
}

where:
dispatcher << dispatchersToTest
}

def "kotlin traced flux"() {
setup:
KotlinCoroutineTests kotlinTest = new KotlinCoroutineTests(dispatcher)

when:
kotlinTest.tracedFlux()

then:
assertTraces(1) {
trace(0, 4) {
span(0) {
name "parent"
attributes {
}
}
(0..2).each {
span("child_$it") {
childOf span(0)
attributes {
}
}
}
}
}

where:
dispatcher << dispatchersToTest
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@

import io.opentelemetry.api.GlobalOpenTelemetry
import io.opentelemetry.api.trace.Tracer
import io.opentelemetry.context.Context
import io.opentelemetry.extension.kotlin.asContextElement
import io.opentelemetry.instrumentation.reactor.ContextPropagationOperator
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
Expand All @@ -19,6 +21,11 @@ import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.consumeAsFlow
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.reactive.awaitSingle
import kotlinx.coroutines.reactive.collect
import kotlinx.coroutines.reactor.ReactorContext
import kotlinx.coroutines.reactor.flux
import kotlinx.coroutines.reactor.mono
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.selects.select
import kotlinx.coroutines.withContext
Expand Down Expand Up @@ -125,6 +132,38 @@ class KotlinCoroutineTests(private val dispatcher: CoroutineDispatcher) {
}
}

fun tracedMono(): Unit = runTest {
mono(dispatcher) {
tracedChild("child")
}.awaitSingle()
}

fun tracedMonoContextPropagationOperator(): Unit = runTest {
val currentContext = Context.current()
// clear current context to ensure that ContextPropagationOperator is used for context propagation
withContext(Context.root().asContextElement()) {
val mono = mono(dispatcher) {
// extract context from reactor and propagate it into coroutine
val reactorContext = coroutineContext[ReactorContext.Key]?.context
val otelContext = ContextPropagationOperator.getOpenTelemetryContext(reactorContext, Context.current())
withContext(otelContext.asContextElement()) {
tracedChild("child")
}
}
ContextPropagationOperator.runWithContext(mono, currentContext).awaitSingle()
}
}

fun tracedFlux() = runTest {
flux(dispatcher) {
repeat(3) {
tracedChild("child_$it")
send(it)
}
}.collect {
}
}

fun launchConcurrentSuspendFunctions(numIters: Int) {
runBlocking {
for (i in 0 until numIters) {
Expand Down

0 comments on commit 014624e

Please sign in to comment.