New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Bugfix for race condition in outbound flow control #59
Conversation
Thanks for the PR. Im actually interested in your second approach as well. I like the idea of not having to re launch the observer consumer |
Sorry about that. Didn’t mean to close this out |
Thanks for your interest. I added and activated the second approach in the pull request but I am struggling a bit to get the corner cases right and passing the tests. |
Do you have a sample project I can use to reproduce the deadlock? I wanted to see what it would take to add it as part of the integration tests. I was also wanted to test an approach using a mutex during stream consumption to resolve the issue. |
Unfortunately I didn't succeed yet in creating a minimal test. I encounter the problem in a complex setup where 3 grpc calls are chained together in-process. But my theory is that the problem occurs when this flow control coroutine is frequently exiting and restarting. With a little delay in there my setup hangs almost immediately:
|
Although I cant reproduce the issue by adding a delay there, I can see that it is definitely possible for a race condition to manifest there. I've had some mild success with a solution similar to the following private typealias MessageHandler = suspend CoroutineScope.() -> Unit
internal fun <T> CoroutineScope.applyOutboundFlowControl(
streamObserver: CallStreamObserver<T>,
targetChannel: Channel<T>
){
val channelIterator = targetChannel.iterator()
val messageHandlerBlock: MessageHandler = {
while(
streamObserver.isReady &&
!targetChannel.isClosedForReceive &&
channelIterator.hasNext()
){
val value = channelIterator.next()
streamObserver.onNext(value)
}
if(targetChannel.isClosedForReceive){
streamObserver.onCompleted()
}
}
val messageHandlerActor = actor<MessageHandler>(
start = CoroutineStart.LAZY,
capacity = Channel.CONFLATED,
context = Dispatchers.Unconfined + CoroutineExceptionHandler { _, e ->
streamObserver.completeSafely(e)
targetChannel.close(e)
}
) {
consumeEach {
if(streamObserver.isReady && !targetChannel.isClosedForReceive){
it.invoke(this)
}else{
streamObserver.completeSafely()
}
}
}
streamObserver.setOnReadyHandler {
if(targetChannel.isClosedForReceive){
streamObserver.completeSafely()
}else if(
streamObserver.isReady &&
!targetChannel.isClosedForReceive
){
// Using sendBlocking here is safe since we're using a conflated channel
messageHandlerActor.sendBlocking(messageHandlerBlock)
}
}
} With this approach there are still tests that fail but it looks like those faliures are due the actor inheriting the event loop dispatcher from the |
… rather than test failure
Thank you for working on this issue. Since my fix attempts just seemed to make things worse I tried to reproduce the problem and I might have been successful. |
This pull request is not necessary anymore because the issue is being addressed in #61 |
A bidi stream call with a high throughput of messages can hang up.
This occurred reliably in a throughput test where 100000 messages of 1kB were transmitted over an in-process channel to a server and echoed back to the caller. With kroto plus, the message transfer gets simply stuck and never completes.
The root cause is a race condition in
FlowControl.kt applyOutboundFlowControl()
: ReadingstreamObserver.isReady
and settingisOutboundJobRunning
are not atomic, thus theoutBoundJob
can terminate and misses to be relaunched by an onReady-event.Here I propose a minimally invasive fix though I also considered refactoring the coroutine to be long running - suspend while not ready and resume on ready. Please consider addressing this because the hangup is a deal breaker for my use case and most likely others as well.