Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BlockHound integration #1821

Merged
merged 15 commits into from Mar 16, 2020
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 3 additions & 1 deletion gradle.properties
Expand Up @@ -14,13 +14,15 @@ knit_version=0.1.3
html_version=0.6.8
lincheck_version=2.5.3
dokka_version=0.9.16-rdev-2-mpp-hacks
byte_buddy_version=1.9.3
byte_buddy_version=1.10.7
reactor_vesion=3.2.5.RELEASE
reactive_streams_version=1.0.2
rxjava2_version=2.2.8
javafx_version=11.0.2
javafx_plugin_version=0.0.8
binary_compatibility_validator_version=0.2.2
blockhound_version=1.0.2.RELEASE
jna_version=5.5.0

# Android versions
android_version=4.1.1.4
Expand Down
Expand Up @@ -951,3 +951,12 @@ internal class CoroutineScheduler(
TERMINATED
}
}

@Suppress("UNUSED")
@JvmName("isSchedulerWorker")
internal fun isSchedulerWorker(thread: Thread) = thread is CoroutineScheduler.Worker
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved

@Suppress("UNUSED")
@JvmName("mayNotBlock")
internal fun mayNotBlock(thread: Thread) = thread is CoroutineScheduler.Worker &&
thread.state == CoroutineScheduler.WorkerState.CPU_ACQUIRED
5 changes: 5 additions & 0 deletions kotlinx-coroutines-debug/api/kotlinx-coroutines-debug.api
Expand Up @@ -8,6 +8,11 @@ public final class kotlinx/coroutines/debug/CoroutineInfo {
public fun toString ()Ljava/lang/String;
}

public final class kotlinx/coroutines/debug/CoroutinesBlockHoundIntegration : reactor/blockhound/integration/BlockHoundIntegration {
public fun <init> ()V
public fun applyTo (Lreactor/blockhound/BlockHound$Builder;)V
}

public final class kotlinx/coroutines/debug/DebugProbes {
public static final field INSTANCE Lkotlinx/coroutines/debug/DebugProbes;
public final fun dumpCoroutines (Ljava/io/PrintStream;)V
Expand Down
8 changes: 7 additions & 1 deletion kotlinx-coroutines-debug/build.gradle
Expand Up @@ -22,6 +22,10 @@ dependencies {
compileOnly "junit:junit:$junit_version"
shadowDeps "net.bytebuddy:byte-buddy:$byte_buddy_version"
shadowDeps "net.bytebuddy:byte-buddy-agent:$byte_buddy_version"
compileOnly "io.projectreactor.tools:blockhound:$blockhound_version"
testCompile "io.projectreactor.tools:blockhound:$blockhound_version"
runtime "net.java.dev.jna:jna:$jna_version"
runtime "net.java.dev.jna:jna-platform:$jna_version"
}

jar {
Expand All @@ -35,5 +39,7 @@ shadowJar {
classifier null
// Shadow only byte buddy, do not package kotlin stdlib
configurations = [project.configurations.shadowDeps]
relocate 'net.bytebuddy', 'kotlinx.coroutines.repackaged.net.bytebuddy'
relocate('net.bytebuddy', 'kotlinx.coroutines.repackaged.net.bytebuddy') {
exclude 'net.bytebuddy.agent'
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved
}
}
@@ -0,0 +1 @@
kotlinx.coroutines.debug.CoroutinesBlockHoundIntegration
16 changes: 16 additions & 0 deletions kotlinx-coroutines-debug/src/CoroutinesBlockHoundIntegration.kt
@@ -0,0 +1,16 @@
@file:Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
package kotlinx.coroutines.debug

import reactor.blockhound.BlockHound
import kotlinx.coroutines.scheduling.*
import reactor.blockhound.integration.*

@Suppress("UNUSED")
public class CoroutinesBlockHoundIntegration: BlockHoundIntegration {

override fun applyTo(builder: BlockHound.Builder) {
builder.addDynamicThreadPredicate { isSchedulerWorker(it) }
builder.nonBlockingThreadPredicate { p -> p.or { mayNotBlock(it) } }
}

}
2 changes: 1 addition & 1 deletion kotlinx-coroutines-debug/src/internal/DebugProbesImpl.kt
Expand Up @@ -57,7 +57,7 @@ internal object DebugProbesImpl {
public fun install(): Unit = coroutineStateLock.write {
if (++installations > 1) return

ByteBuddyAgent.install()
ByteBuddyAgent.install(ByteBuddyAgent.AttachmentProvider.ForEmulatedAttachment.INSTANCE)
val cl = Class.forName("kotlin.coroutines.jvm.internal.DebugProbesKt")
val cl2 = Class.forName("kotlinx.coroutines.debug.DebugProbesKt")

Expand Down
73 changes: 73 additions & 0 deletions kotlinx-coroutines-debug/test/BlockHoundTest.kt
@@ -0,0 +1,73 @@
package kotlinx.coroutines.debug
import kotlinx.coroutines.*
import org.junit.*
import reactor.blockhound.*

class BlockHoundTest : TestBase() {

@Before
fun init() {
BlockHound.install()
}

@Test(expected = BlockingOperationError::class)
fun shouldDetectBlockingInDefault() = runTest {
withContext(Dispatchers.Default) {
Thread.sleep(1)
}
}

@Test
fun shouldNotDetectBlockingInIO() = runTest {
withContext(Dispatchers.IO) {
Thread.sleep(1)
}
}

@Test
fun shouldNotDetectNonblocking() = runTest {
withContext(Dispatchers.Default) {
val a = 1
val b = 2
assert(a + b == 3)
}
}

@Test
fun testReusingThreads() = runTest {
val n = 100
repeat(n) {
async(Dispatchers.IO) {
Thread.sleep(1)
}
}
repeat(n) {
async(Dispatchers.Default) {
}
}
repeat(n) {
async(Dispatchers.IO) {
Thread.sleep(1)
}
}
}

@Test(expected = BlockingOperationError::class)
fun testReusingThreadsFailure() = runTest {
val n = 100
repeat(n) {
async(Dispatchers.IO) {
Thread.sleep(1)
}
}
async(Dispatchers.Default) {
Thread.sleep(1)
}
repeat(n) {
async(Dispatchers.IO) {
Thread.sleep(1)
}
}
}

}
13 changes: 8 additions & 5 deletions kotlinx-coroutines-debug/test/CoroutinesDumpTest.kt
Expand Up @@ -39,7 +39,7 @@ class CoroutinesDumpTest : DebugTestBase() {

@Test
fun testRunningCoroutine() = runBlocking {
val deferred = async(Dispatchers.Default) {
val deferred = async(Dispatchers.IO) {
activeMethod(shouldSuspend = false)
assertTrue(true)
}
Expand Down Expand Up @@ -70,7 +70,7 @@ class CoroutinesDumpTest : DebugTestBase() {

@Test
fun testRunningCoroutineWithSuspensionPoint() = runBlocking {
val deferred = async(Dispatchers.Default) {
val deferred = async(Dispatchers.IO) {
activeMethod(shouldSuspend = true)
yield() // tail-call
}
Expand Down Expand Up @@ -100,7 +100,7 @@ class CoroutinesDumpTest : DebugTestBase() {

@Test
fun testCreationStackTrace() = runBlocking {
val deferred = async(Dispatchers.Default) {
val deferred = async(Dispatchers.IO) {
activeMethod(shouldSuspend = true)
}

Expand Down Expand Up @@ -129,7 +129,7 @@ class CoroutinesDumpTest : DebugTestBase() {

@Test
fun testFinishedCoroutineRemoved() = runBlocking {
val deferred = async(Dispatchers.Default) {
val deferred = async(Dispatchers.IO) {
activeMethod(shouldSuspend = true)
}

Expand All @@ -149,7 +149,10 @@ class CoroutinesDumpTest : DebugTestBase() {
if (shouldSuspend) yield()
notifyCoroutineStarted()
while (coroutineContext[Job]!!.isActive) {
runCatching { Thread.sleep(60_000) }
try {
Thread.sleep(60_000)
} catch (_ : InterruptedException) {
}
}
}

Expand Down
Expand Up @@ -133,7 +133,7 @@ class RunningThreadStackMergeTest : DebugTestBase() {
}

private fun CoroutineScope.launchEscapingCoroutineWithoutContext() {
launch(Dispatchers.Default) {
launch(Dispatchers.IO) {
suspendingFunctionWithoutContext()
assertTrue(true)
}
Expand Down
29 changes: 27 additions & 2 deletions kotlinx-coroutines-debug/test/StracktraceUtils.kt
Expand Up @@ -13,7 +13,7 @@ public fun String.trimStackTrace(): String =
.replace(Regex("#[0-9]+"), "")
.replace(Regex("(?<=\tat )[^\n]*/"), "")
.replace(Regex("\t"), "")
.replace("sun.misc.Unsafe.park", "jdk.internal.misc.Unsafe.park") // JDK8->JDK11
.replace("sun.misc.Unsafe.", "jdk.internal.misc.Unsafe.") // JDK8->JDK11
.applyBackspace()

public fun String.applyBackspace(): String {
Expand Down Expand Up @@ -62,6 +62,31 @@ public fun verifyDump(vararg traces: String, ignoredCoroutine: String? = null, f
}
}

/** Clean the stacktraces from artifacts of BlockHound instrumentation
*
* BlockHound works by switching a native call by a class generated with ByteBuddy, which, if the blocking
* call is allowed in this context, in turn calls the real native call that is now available under a
* different name.
*
* The traces thus undergo the following two changes when the execution is instrumented:
* - The original native call is replaced with a non-native one with the same FQN, and
* - An additional native call is placed on top of the stack, with the original name that also has
* `$$BlockHound$$_` prepended at the last component.
*/
private fun cleanBlockHoundTraces(frames: List<String>): List<String> {
var result = mutableListOf<String>()
val blockHoundSubstr = "\$\$BlockHound\$\$_"
var i = 0
while (i < frames.size) {
result.add(frames[i].replace(blockHoundSubstr, ""))
if (frames[i].contains(blockHoundSubstr)) {
i += 1
}
i += 1
}
return result
}

public fun verifyDump(vararg traces: String, ignoredCoroutine: String? = null) {
val baos = ByteArrayOutputStream()
DebugProbes.dumpCoroutines(PrintStream(baos))
Expand All @@ -85,7 +110,7 @@ public fun verifyDump(vararg traces: String, ignoredCoroutine: String? = null) {
expected.withIndex().forEach { (index, trace) ->
val actualTrace = actual[index].trimStackTrace().sanitizeAddresses()
val expectedTrace = trace.trimStackTrace().sanitizeAddresses()
val actualLines = actualTrace.split("\n")
val actualLines = cleanBlockHoundTraces(actualTrace.split("\n"))
val expectedLines = expectedTrace.split("\n")
for (i in expectedLines.indices) {
assertEquals(expectedLines[i], actualLines[i])
Expand Down