Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BlockHound false positive in WorkQueue.addLast #2937

Closed
OliverO2 opened this issue Sep 16, 2021 · 2 comments
Closed

BlockHound false positive in WorkQueue.addLast #2937

OliverO2 opened this issue Sep 16, 2021 · 2 comments
Assignees
Labels

Comments

@OliverO2
Copy link
Contributor

org.jetbrains.kotlinx:kotlinx-coroutines-core:1.5.1

Relevant stack trace excerpt:

Caused by: reactor.blockhound.BlockingOperationError: Blocking call! java.lang.Thread.yield
	at java.base/java.lang.Thread.yield(Thread.java)
	at kotlinx.coroutines.scheduling.WorkQueue.addLast(WorkQueue.kt:95)
@dkhalanskyjb dkhalanskyjb self-assigned this Sep 17, 2021
@dkhalanskyjb
Copy link
Collaborator

Thanks for the report, this is really a false positive and will be fixed, but could you please describe how you managed to trigger this?

@OliverO2
Copy link
Contributor Author

It was a once-only result from a bigger data-driven stress test suite, which typically run several times a week. I have never seen this one before (and have added a BlockHound exception thereafter), so it seems like a rare race condition as indicated by the comment in WorkQueue.addLast.

Unfortunately, the code is not for publication and there are many moving parts involved, but I'll try:

I am collecting updates from a flow. Each collection is guarded by a per-collection timeout. This is achieved by launching a timeout job before each collection, which will cancel the collection if it times out, otherwise it is cancelled and restarted. Depending on the test scenario, there can be up to 1000 concurrent collector jobs (via Dispatchers.Default), each collecting up to 1000 updates (which also means 1000 collectors each starting a timeout job 1000 times). I suspect the full stack trace below is not completely accurate since there is no launch invocation at line 89.

Here is a (slightly simplified) excerpt:

class MonitoredSubscription(private val id: Int) : Subscription() {
    // properties omitted

    private fun CoroutineScope.timeoutJob(
        collectorJob: Job,
        timeout: Duration,
        expectCancellation: Boolean = false
    ) = launch {
        delay(timeout)
        if (!expectCancellation)
            logger.error { "Cancelling collector for ${this@MonitoredSubscription} after $timeout" }
        collectorJob.cancel(CancellationException("collect timed out after $timeout"))
    }

    fun collectUpdates(scope: CoroutineScope, updateGroupID: String, expectUpdates: Boolean) {
        val timeout = if (expectUpdates) Duration.seconds(2) else Duration.milliseconds(100)

        scope.launch {
            withDebugLogging(logger, prefix = "collecting changes via ${this@MonitoredSubscription}") {
                val collectorJob = coroutineContext.job
                var timeoutJob = timeoutJob(collectorJob, timeout, !expectUpdates)

                updates().collect { update ->  // <-- at vc.backendCache.SubscriptionStressTest$MonitoredSubscription.collectUpdates(SubscriptionStressTest.kt:89)
                    timeoutJob.cancel()
                    
                    // processing omitted

                    if (collectedTargets.size == subscribedTargets.size)
                        cancel()

                    timeoutJob = timeoutJob(collectorJob, timeout, !expectUpdates)
                }

                timeoutJob.cancel()
            }
        }
    }
}

This is the full stack trace:

Exception in thread "DefaultDispatcher-worker-4" kotlinx.coroutines.CoroutinesInternalError: Fatal exception in coroutines machinery for DispatchedContinuation[Dispatchers.Default, Continuation at vc.backendCache.SubscriptionStressTest$MonitoredSubscription$collectUpdates$1.invokeSuspend(SubscriptionStressTest.kt)@31a48dcd]. Please read KDoc to 'handleFatalException' method and report this incident to maintainers
at kotlinx.coroutines.DispatchedTask.handleFatalException(DispatchedTask.kt:144)
at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:115)
at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:571)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:750)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:678)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:665)
Caused by: java.lang.IllegalStateException: Job "coroutine#249941":StandaloneCoroutine{Cancelled}@250e98f6 is already complete or completing, but is being completed with CompletedExceptionally[kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job="coroutine#249941":StandaloneCoroutine{Cancelled}@250e98f6]
at kotlinx.coroutines.JobSupport.makeCompletingOnce$kotlinx_coroutines_core(JobSupport.kt:831)
at kotlinx.coroutines.AbstractCoroutine.resumeWith(AbstractCoroutine.kt:100)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:46)
at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:233)
... 4 more
Caused by: kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job="coroutine#249941":StandaloneCoroutine{Cancelled}@250e98f6
(Coroutine boundary)
at vc.backendCache.SubscriptionStressTest$MonitoredSubscription$collectUpdates$1.invokeSuspend(SubscriptionStressTest.kt)
Caused by: kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job="coroutine#249941":StandaloneCoroutine{Cancelled}@250e98f6
at kotlinx.coroutines.JobSupport.toCancellationException(JobSupport.kt:1542)
at kotlinx.coroutines.JobSupport.toCancellationException$default(JobSupport.kt:423)
at kotlinx.coroutines.JobSupport.getCancellationException(JobSupport.kt:419)
at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:99)
at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:571)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:750)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:678)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:665)
Caused by: reactor.blockhound.BlockingOperationError: Blocking call! java.lang.Thread.yield
at java.base/java.lang.Thread.yield(Thread.java)
at kotlinx.coroutines.scheduling.WorkQueue.addLast(WorkQueue.kt:95)
at kotlinx.coroutines.scheduling.WorkQueue.add(WorkQueue.kt:75)
at kotlinx.coroutines.scheduling.CoroutineScheduler.submitToLocalQueue(CoroutineScheduler.kt:506)
at kotlinx.coroutines.scheduling.CoroutineScheduler.dispatch(CoroutineScheduler.kt:388)
at kotlinx.coroutines.scheduling.CoroutineScheduler.dispatch$default(CoroutineScheduler.kt:383)
at kotlinx.coroutines.scheduling.ExperimentalCoroutineDispatcher.dispatch(Dispatcher.kt:66)
at kotlinx.coroutines.internal.DispatchedContinuationKt.resumeCancellableWith(DispatchedContinuation.kt:322)
at kotlinx.coroutines.intrinsics.CancellableKt.startCoroutineCancellable(Cancellable.kt:30)
at kotlinx.coroutines.intrinsics.CancellableKt.startCoroutineCancellable$default(Cancellable.kt:25)
at kotlinx.coroutines.CoroutineStart.invoke(CoroutineStart.kt:110)
at kotlinx.coroutines.AbstractCoroutine.start(AbstractCoroutine.kt:126)
at kotlinx.coroutines.BuildersKt__Builders_commonKt.launch(Builders.common.kt:56)
at kotlinx.coroutines.BuildersKt.launch(Unknown Source)
at kotlinx.coroutines.BuildersKt__Builders_commonKt.launch$default(Builders.common.kt:47)
at kotlinx.coroutines.BuildersKt.launch$default(Unknown Source)
at vc.backendCache.SubscriptionStressTest$MonitoredSubscription.collectUpdates(SubscriptionStressTest.kt:89)
at vc.backendCache.SubscriptionStressTest$Context$collectUpdates$2$1$1.invokeSuspend(SubscriptionStressTest.kt:247)
at vc.backendCache.SubscriptionStressTest$Context$collectUpdates$2$1$1.invoke(SubscriptionStressTest.kt)
at vc.backendCache.SubscriptionStressTest$Context$collectUpdates$2$1$1.invoke(SubscriptionStressTest.kt)
at kotlinx.coroutines.intrinsics.UndispatchedKt.startUndispatchedOrReturn(Undispatched.kt:89)
at kotlinx.coroutines.CoroutineScopeKt.coroutineScope(CoroutineScope.kt:264)
at vc.backendCache.SubscriptionStressTest$Context.collectUpdates(SubscriptionStressTest.kt:245)
at vc.backendCache.SubscriptionStressTest$Context.collectUpdates$default(SubscriptionStressTest.kt:240)
at vc.backendCache.SubscriptionStressTest$scenarios$1$1$1$1.invokeSuspend(SubscriptionStressTest.kt:310)
at vc.backendCache.SubscriptionStressTest$scenarios$1$1$1$1.invoke(SubscriptionStressTest.kt)
at vc.backendCache.SubscriptionStressTest$scenarios$1$1$1$1.invoke(SubscriptionStressTest.kt)
at vc.backendCache.SubscriptionStressTest$Context.with(SubscriptionStressTest.kt:198)
at vc.backendCache.SubscriptionStressTest$Context$with$1.invokeSuspend(SubscriptionStressTest.kt)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
... 4 more

Does that help?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants