Skip to content

Commit

Permalink
Fix BlockHound false positives (#2331)
Browse files Browse the repository at this point in the history
Fixes #2302
Fixes #2190
Partially fixes #2303
  • Loading branch information
dkhalanskyjb committed Oct 26, 2020
1 parent ee78090 commit 45ba58e
Show file tree
Hide file tree
Showing 2 changed files with 179 additions and 5 deletions.
162 changes: 157 additions & 5 deletions kotlinx-coroutines-debug/src/CoroutinesBlockHoundIntegration.kt
@@ -1,16 +1,168 @@
@file:Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")

package kotlinx.coroutines.debug

import reactor.blockhound.BlockHound
import kotlinx.coroutines.scheduling.*
import reactor.blockhound.*
import reactor.blockhound.integration.*

@Suppress("UNUSED")
public class CoroutinesBlockHoundIntegration: BlockHoundIntegration {
public class CoroutinesBlockHoundIntegration : BlockHoundIntegration {

override fun applyTo(builder: BlockHound.Builder): Unit = with(builder) {
allowBlockingCallsInPrimitiveImplementations()
allowBlockingWhenEnqueuingTasks()
allowServiceLoaderInvocationsOnInit()
allowBlockingCallsInReflectionImpl()
/* The predicates that define that BlockHound should only report blocking calls from threads that are part of
the coroutine thread pool and currently execute a CPU-bound coroutine computation. */
addDynamicThreadPredicate { isSchedulerWorker(it) }
nonBlockingThreadPredicate { p -> p.or { mayNotBlock(it) } }
}

/**
* Allows blocking calls in various coroutine structures, such as flows and channels.
*
* They use locks in implementations, though only for protecting short pieces of fast and well-understood code, so
* locking in such places doesn't affect the program liveness.
*/
private fun BlockHound.Builder.allowBlockingCallsInPrimitiveImplementations() {
allowBlockingCallsInJobSupport()
allowBlockingCallsInThreadSafeHeap()
allowBlockingCallsInFlow()
allowBlockingCallsInChannels()
}

/**
* Allows blocking inside [kotlinx.coroutines.JobSupport].
*/
private fun BlockHound.Builder.allowBlockingCallsInJobSupport() {
for (method in listOf("finalizeFinishingState", "invokeOnCompletion", "makeCancelling",
"tryMakeCompleting"))
{
allowBlockingCallsInside("kotlinx.coroutines.JobSupport", method)
}
}

/**
* Allows blocking inside [kotlinx.coroutines.internal.ThreadSafeHeap].
*/
private fun BlockHound.Builder.allowBlockingCallsInThreadSafeHeap() {
for (method in listOf("clear", "peek", "removeFirstOrNull", "addLast")) {
allowBlockingCallsInside("kotlinx.coroutines.internal.ThreadSafeHeap", method)
}
// [addLastIf] is only used in [EventLoop.common]. Users of [removeFirstIf]:
allowBlockingCallsInside("kotlinx.coroutines.test.TestCoroutineDispatcher", "doActionsUntil")
allowBlockingCallsInside("kotlinx.coroutines.test.TestCoroutineContext", "triggerActions")
}

private fun BlockHound.Builder.allowBlockingCallsInFlow() {
allowBlockingCallsInsideStateFlow()
allowBlockingCallsInsideSharedFlow()
}

/**
* Allows blocking inside the implementation of [kotlinx.coroutines.flow.StateFlow].
*/
private fun BlockHound.Builder.allowBlockingCallsInsideStateFlow() {
allowBlockingCallsInside("kotlinx.coroutines.flow.StateFlowImpl", "updateState")
}

/**
* Allows blocking inside the implementation of [kotlinx.coroutines.flow.SharedFlow].
*/
private fun BlockHound.Builder.allowBlockingCallsInsideSharedFlow() {
for (method in listOf("emitSuspend", "awaitValue", "getReplayCache", "tryEmit", "cancelEmitter",
"tryTakeValue", "resetReplayCache"))
{
allowBlockingCallsInside("kotlinx.coroutines.flow.SharedFlowImpl", method)
}
for (method in listOf("getSubscriptionCount", "allocateSlot", "freeSlot")) {
allowBlockingCallsInside("kotlinx.coroutines.flow.internal.AbstractSharedFlow", method)
}
}

private fun BlockHound.Builder.allowBlockingCallsInChannels() {
allowBlockingCallsInArrayChannel()
allowBlockingCallsInBroadcastChannel()
allowBlockingCallsInConflatedChannel()
}

/**
* Allows blocking inside [kotlinx.coroutines.channels.ArrayChannel].
*/
private fun BlockHound.Builder.allowBlockingCallsInArrayChannel() {
for (method in listOf(
"pollInternal", "isEmpty", "isFull", "isClosedForReceive", "offerInternal", "offerSelectInternal",
"enqueueSend", "pollInternal", "pollSelectInternal", "enqueueReceiveInternal", "onCancelIdempotent"))
{
allowBlockingCallsInside("kotlinx.coroutines.channels.ArrayChannel", method)
}
}

/**
* Allows blocking inside [kotlinx.coroutines.channels.ArrayBroadcastChannel].
*/
private fun BlockHound.Builder.allowBlockingCallsInBroadcastChannel() {
for (method in listOf("offerInternal", "offerSelectInternal", "updateHead")) {
allowBlockingCallsInside("kotlinx.coroutines.channels.ArrayBroadcastChannel", method)
}
for (method in listOf("checkOffer", "pollInternal", "pollSelectInternal")) {
allowBlockingCallsInside("kotlinx.coroutines.channels.ArrayBroadcastChannel\$Subscriber", method)
}
}

/**
* Allows blocking inside [kotlinx.coroutines.channels.ConflatedChannel].
*/
private fun BlockHound.Builder.allowBlockingCallsInConflatedChannel() {
for (method in listOf("offerInternal", "offerSelectInternal", "pollInternal", "pollSelectInternal",
"onCancelIdempotent"))
{
allowBlockingCallsInside("kotlinx.coroutines.channels.ConflatedChannel", method)
}
}

/**
* Allows blocking when enqueuing tasks into a thread pool.
*
* Without this, the following code breaks:
* ```
* withContext(Dispatchers.Default) {
* withContext(newSingleThreadContext("singleThreadedContext")) {
* }
* }
* ```
*/
private fun BlockHound.Builder.allowBlockingWhenEnqueuingTasks() {
/* This method may block as part of its implementation, but is probably safe. */
allowBlockingCallsInside("java.util.concurrent.ScheduledThreadPoolExecutor", "execute")
}

/**
* Allows instances of [java.util.ServiceLoader] being called.
*
* Each instance is listed separately; another approach could be to generally allow the operations performed by
* service loaders, as they can generally be considered safe. This was not done here because ServiceLoader has a
* large API surface, with some methods being hidden as implementation details (in particular, the implementation of
* its iterator is completely opaque). Relying on particular names being used in ServiceLoader's implementation
* would be brittle, so here we only provide clearance rules for some specific instances.
*/
private fun BlockHound.Builder.allowServiceLoaderInvocationsOnInit() {
allowBlockingCallsInside("kotlinx.coroutines.reactive.ReactiveFlowKt", "<clinit>")
allowBlockingCallsInside("kotlinx.coroutines.CoroutineExceptionHandlerImplKt", "<clinit>")
// not part of the coroutines library, but it would be nice if reflection also wasn't considered blocking
allowBlockingCallsInside("kotlin.reflect.jvm.internal.impl.resolve.OverridingUtil", "<clinit>")
}

override fun applyTo(builder: BlockHound.Builder) {
builder.addDynamicThreadPredicate { isSchedulerWorker(it) }
builder.nonBlockingThreadPredicate { p -> p.or { mayNotBlock(it) } }
/**
* Allows some blocking calls from the reflection API.
*
* The API is big, so surely some other blocking calls will show up, but with these rules in place, at least some
* simple examples work without problems.
*/
private fun BlockHound.Builder.allowBlockingCallsInReflectionImpl() {
allowBlockingCallsInside("kotlin.reflect.jvm.internal.impl.builtins.jvm.JvmBuiltInsPackageFragmentProvider", "findPackage")
}

}
22 changes: 22 additions & 0 deletions kotlinx-coroutines-debug/test/BlockHoundTest.kt
@@ -1,5 +1,6 @@
package kotlinx.coroutines.debug
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import org.junit.*
import reactor.blockhound.*

Expand Down Expand Up @@ -52,6 +53,27 @@ class BlockHoundTest : TestBase() {
}
}

@Test
fun testChannelsNotBeingConsideredBlocking() = runTest {
withContext(Dispatchers.Default) {
// Copy of kotlinx.coroutines.channels.ArrayChannelTest.testSimple
val q = Channel<Int>(1)
check(q.isEmpty)
check(!q.isClosedForReceive)
check(!q.isClosedForSend)
val sender = launch {
q.send(1)
q.send(2)
}
val receiver = launch {
q.receive() == 1
q.receive() == 2
}
sender.join()
receiver.join()
}
}

@Test(expected = BlockingOperationError::class)
fun testReusingThreadsFailure() = runTest {
val n = 100
Expand Down

0 comments on commit 45ba58e

Please sign in to comment.