Skip to content

Commit

Permalink
propagate scope cancellation to target channel in outbound flow control
Browse files Browse the repository at this point in the history
  • Loading branch information
marcoferrer committed Aug 11, 2019
1 parent 4da8aaf commit d99af22
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 1 deletion.
Expand Up @@ -17,9 +17,11 @@
package com.github.marcoferrer.krotoplus.coroutines.call

import io.grpc.stub.CallStreamObserver
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineExceptionHandler
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.channels.ActorScope
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.actor
Expand Down Expand Up @@ -72,6 +74,14 @@ internal fun <T> CoroutineScope.applyOutboundFlowControl(
targetChannel.close(e)
}
) {
coroutineContext[Job]?.let { job ->
job.invokeOnCompletion {
if(job.isCancelled && !targetChannel.isClosedForSend){
targetChannel.cancel(it as? CancellationException)
}
}
}

for(handler in channel){
if(isCompleted.get()) break
handler(this)
Expand Down
Expand Up @@ -20,6 +20,7 @@ import com.github.marcoferrer.krotoplus.coroutines.call.FlowControlledInboundStr
import com.github.marcoferrer.krotoplus.coroutines.call.applyOutboundFlowControl
import io.grpc.stub.ClientCallStreamObserver
import io.grpc.stub.ClientResponseObserver
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.SendChannel
Expand Down Expand Up @@ -106,7 +107,7 @@ internal class ClientBidiCallChannelImpl<ReqT,RespT>(

override fun onError(t: Throwable) {
outboundChannel.close(t)
outboundChannel.cancel()
outboundChannel.cancel(CancellationException(t.message,t))
inboundChannel.close(t)
}
}
Expand Down

0 comments on commit d99af22

Please sign in to comment.