Skip to content

Commit

Permalink
kotlinx-coroutines-reactor context propagation
Browse files Browse the repository at this point in the history
  • Loading branch information
laurit committed Jan 21, 2022
1 parent 3c8a192 commit d7e910a
Show file tree
Hide file tree
Showing 6 changed files with 170 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ dependencies {
testImplementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
// Use first version with flow support since we have tests for it.
testLibrary("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.0")
testLibrary("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.3.0")
}

tasks {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@

package io.opentelemetry.javaagent.instrumentation.kotlinxcoroutines;

import static java.util.Collections.singletonList;
import static java.util.Arrays.asList;

import com.google.auto.service.AutoService;
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.instrumentation.kotlinxcoroutines.reactor.KotlinCoroutinesFluxInstrumentation;
import io.opentelemetry.javaagent.instrumentation.kotlinxcoroutines.reactor.KotlinCoroutinesMonoInstrumentation;
import java.util.List;

@AutoService(InstrumentationModule.class)
Expand All @@ -26,6 +28,9 @@ public boolean isHelperClass(String className) {

@Override
public List<TypeInstrumentation> typeInstrumentations() {
return singletonList(new KotlinCoroutinesInstrumentation());
return asList(
new KotlinCoroutinesInstrumentation(),
new KotlinCoroutinesMonoInstrumentation(),
new KotlinCoroutinesFluxInstrumentation());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.kotlinxcoroutines.reactor;

import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.namedOneOf;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;

import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import io.opentelemetry.javaagent.instrumentation.kotlinxcoroutines.KotlinCoroutinesInstrumentationHelper;
import kotlin.coroutines.CoroutineContext;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;

public class KotlinCoroutinesFluxInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("kotlinx.coroutines.reactor.FluxKt");
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
namedOneOf("flux").and(takesArgument(0, named("kotlin.coroutines.CoroutineContext"))),
this.getClass().getName() + "$FluxAdvice");
}

@SuppressWarnings("unused")
public static class FluxAdvice {

@Advice.OnMethodEnter
public static void enter(
@Advice.Argument(value = 0, readOnly = false) CoroutineContext coroutineContext) {
coroutineContext =
KotlinCoroutinesInstrumentationHelper.addOpenTelemetryContext(coroutineContext);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.kotlinxcoroutines.reactor;

import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.namedOneOf;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;

import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import io.opentelemetry.javaagent.instrumentation.kotlinxcoroutines.KotlinCoroutinesInstrumentationHelper;
import kotlin.coroutines.CoroutineContext;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;

public class KotlinCoroutinesMonoInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("kotlinx.coroutines.reactor.MonoKt");
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
namedOneOf("mono").and(takesArgument(0, named("kotlin.coroutines.CoroutineContext"))),
this.getClass().getName() + "$MonoAdvice");
}

@SuppressWarnings("unused")
public static class MonoAdvice {

@Advice.OnMethodEnter
public static void enter(
@Advice.Argument(value = 0, readOnly = false) CoroutineContext coroutineContext) {
coroutineContext =
KotlinCoroutinesInstrumentationHelper.addOpenTelemetryContext(coroutineContext);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -225,4 +225,60 @@ class KotlinCoroutineInstrumentationTest extends AgentInstrumentationSpecificati
assert seenItersA.equals(expectedIters)
assert seenItersB.equals(expectedIters)
}

def "kotlin traced mono"() {
setup:
KotlinCoroutineTests kotlinTest = new KotlinCoroutineTests(dispatcher)

when:
kotlinTest.tracedMono()

then:
assertTraces(1) {
trace(0, 2) {
span(0) {
name "parent"
attributes {
}
}
span("child") {
childOf span(0)
attributes {
}
}
}
}

where:
dispatcher << dispatchersToTest
}

def "kotlin traced flux"() {
setup:
KotlinCoroutineTests kotlinTest = new KotlinCoroutineTests(dispatcher)

when:
kotlinTest.tracedFlux()

then:
assertTraces(1) {
trace(0, 4) {
span(0) {
name "parent"
attributes {
}
}
(0..2).each {
span("child_$it") {
childOf span(0)
attributes {
}
}
}
}
}

where:
dispatcher << dispatchersToTest
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.consumeAsFlow
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.reactive.awaitSingle
import kotlinx.coroutines.reactive.collect
import kotlinx.coroutines.reactor.flux
import kotlinx.coroutines.reactor.mono
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.selects.select
import kotlinx.coroutines.withContext
Expand Down Expand Up @@ -125,6 +129,22 @@ class KotlinCoroutineTests(private val dispatcher: CoroutineDispatcher) {
}
}

fun tracedMono(): Unit = runTest {
mono(dispatcher) {
tracedChild("child")
}.awaitSingle()
}

fun tracedFlux() = runTest {
flux(dispatcher) {
repeat(3) {
tracedChild("child_$it")
send(it)
}
}.collect {
}
}

fun launchConcurrentSuspendFunctions(numIters: Int) {
runBlocking {
for (i in 0 until numIters) {
Expand Down

0 comments on commit d7e910a

Please sign in to comment.