Skip to content

Commit

Permalink
BlockHound integration POC
Browse files Browse the repository at this point in the history
  • Loading branch information
dkhalanskyjb committed Mar 13, 2020
1 parent a25bf36 commit edc388c
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 0 deletions.
2 changes: 2 additions & 0 deletions build.gradle
Expand Up @@ -60,6 +60,7 @@ buildscript {
repositories {
jcenter()
maven { url "https://kotlin.bintray.com/kotlinx" }
maven { url "https://repo.spring.io/snapshot/" }
maven {
url "https://kotlin.bintray.com/kotlin-dev"
credentials {
Expand Down Expand Up @@ -153,6 +154,7 @@ allprojects {
jcenter()
maven {
url "https://kotlin.bintray.com/kotlin-dev"
url "https://repo.spring.io/snapshot/"
credentials {
username = project.hasProperty('bintrayUser') ? project.property('bintrayUser') : System.getenv('BINTRAY_USER') ?: ""
password = project.hasProperty('bintrayApiKey') ? project.property('bintrayApiKey') : System.getenv('BINTRAY_API_KEY') ?: ""
Expand Down
2 changes: 2 additions & 0 deletions kotlinx-coroutines-debug/build.gradle
Expand Up @@ -22,6 +22,8 @@ dependencies {
compileOnly "junit:junit:$junit_version"
shadowDeps "net.bytebuddy:byte-buddy:$byte_buddy_version"
shadowDeps "net.bytebuddy:byte-buddy-agent:$byte_buddy_version"
compile 'io.projectreactor.tools:blockhound:1.0.1.BUILD-SNAPSHOT'
compile "org.jetbrains.kotlin:kotlin-reflect:$kotlin_version"
}

jar {
Expand Down
43 changes: 43 additions & 0 deletions kotlinx-coroutines-debug/src/internal/BlockHoundIntegration.kt
@@ -0,0 +1,43 @@
package kotlinx.coroutines.debug.internal

import reactor.blockhound.BlockHound
import kotlin.reflect.KClass
import kotlin.reflect.full.*

internal object BlockHoundIntegration {

init {
val cls = Class.forName("kotlinx.coroutines.scheduling.CoroutineScheduler\$Worker").kotlin
initializerHelper(cls)
}

private fun <T : Any> initializerHelper(cls: KClass<T>) {
val field = cls.declaredMemberProperties.find { it.name == "state" }!!
BlockHound.builder()
.addDynamicThreadPredicate(cls::isInstance)
.nonBlockingThreadPredicate { p ->
p.or { thread ->
val castThread = cls.safeCast(thread)
if (!enabled || castThread == null) {
false
} else {
val state = field(castThread) as Enum<*>
state.name == "CPU_ACQUIRED"
}
}
}
.install()
}

@Volatile
private var enabled = false

fun install() {
enabled = true
}

fun uninstall() {
enabled = false
}

}
4 changes: 4 additions & 0 deletions kotlinx-coroutines-debug/src/internal/DebugProbesImpl.kt
Expand Up @@ -66,6 +66,8 @@ internal object DebugProbesImpl {
.name(cl.name)
.make()
.load(cl.classLoader, ClassReloadingStrategy.fromInstalledAgent())

BlockHoundIntegration.install()
}

public fun uninstall(): Unit = coroutineStateLock.write {
Expand All @@ -82,6 +84,8 @@ internal object DebugProbesImpl {
.name(cl.name)
.make()
.load(cl.classLoader, ClassReloadingStrategy.fromInstalledAgent())

BlockHoundIntegration.uninstall()
}

public fun hierarchyToString(job: Job): String = coroutineStateLock.write {
Expand Down
79 changes: 79 additions & 0 deletions kotlinx-coroutines-debug/test/BlockHoundTest.kt
@@ -0,0 +1,79 @@
package kotlinx.coroutines.debug
import kotlinx.coroutines.*
import kotlinx.coroutines.debug.internal.BlockHoundIntegration
import org.junit.*
import reactor.blockhound.BlockingOperationError

class BlockHoundTest : TestBase() {

@Before
fun init() {
BlockHoundIntegration.install()
}

@After
fun deinit() {
BlockHoundIntegration.uninstall()
}

@Test(expected = BlockingOperationError::class)
fun shouldDetectBlockingInDefault() = runTest {
withContext(Dispatchers.Default) {
Thread.sleep(1)
}
}

@Test
fun shouldNotDetectBlockingInIO() = runTest {
withContext(Dispatchers.IO) {
Thread.sleep(1)
}
}

@Test
fun shouldNotDetectNonblocking() = runTest {
withContext(Dispatchers.Default) {
val a = 1
val b = 2
assert(a + b == 3)
}
}

@Test
fun testReusingThreads() = runTest {
val n = 100
repeat(n) {
async(Dispatchers.IO) {
Thread.sleep(1)
}
}
repeat(n) {
async(Dispatchers.Default) {
}
}
repeat(n) {
async(Dispatchers.IO) {
Thread.sleep(1)
}
}
}

@Test(expected = BlockingOperationError::class)
fun testReusingThreadsFailure() = runTest {
val n = 100
repeat(n) {
async(Dispatchers.IO) {
Thread.sleep(1)
}
}
async(Dispatchers.Default) {
Thread.sleep(1)
}
repeat(n) {
async(Dispatchers.IO) {
Thread.sleep(1)
}
}
}

}

0 comments on commit edc388c

Please sign in to comment.