Skip to content

Commit

Permalink
Outbound flow control bugfix (#61)
Browse files Browse the repository at this point in the history
* update dependencies

* fix typo in config message name (#57)

* clean up usages of deprecated apis

* fix race condition in unit back-pressure test

* add test from @chris-blacker for high volume race condition

* fix race condition in outbound flow control handler

* update outbound flow control

* propagate scope cancellation to target channel in outbound flow control

* configure detailed test output

* add bidi streaming integration tests

* propagate client inbound channel close as call cancellation

* close outbound message handler in server call handlers

* improve determinism of tests

* add withContext to atomic server bidi streaming test
  • Loading branch information
marcoferrer committed Aug 22, 2019
1 parent 8a643a4 commit 0454f13
Show file tree
Hide file tree
Showing 35 changed files with 751 additions and 321 deletions.
10 changes: 5 additions & 5 deletions CHANGELOG.md
@@ -1,9 +1,9 @@
## Version 0.4.1
## Version 0.5.0-RC
_\*\*-\*\*_
* New: Update to Kotlin `1.3.40`
* New: Update to Kotlin Coroutines `1.2.2`
* New: Update to gRPC `1.21.0`
* New: Update to protobuf `3.7.1`
* New: Update to Kotlin `1.3.41`
* New: Update to Kotlin Coroutines `1.3.0-RC`
* New: Update to gRPC `1.22.1`
* New: Update to protobuf `3.9.0`



Expand Down
46 changes: 46 additions & 0 deletions build.gradle
Expand Up @@ -53,4 +53,50 @@ subprojects{ subproject ->
testImplementation "org.jetbrains.kotlin:kotlin-test"
testImplementation "org.jetbrains.kotlin:kotlin-test-junit"
}


tasks.withType(Test) {
testLogging {
// set options for log level LIFECYCLE
events (
'FAILED',
'PASSED',
'SKIPPED',
'STANDARD_OUT'
)
exceptionFormat 'FULL'
showExceptions true
showCauses true
showStackTraces true

// set options for log level DEBUG and INFO
debug {
events(
'STARTED',
'FAILED',
'PASSED',
'SKIPPED',
'STANDARD_ERROR',
'STANDARD_OUT'
)
exceptionFormat 'FULL'
}
info.events = debug.events
info.exceptionFormat = debug.exceptionFormat

afterSuite { desc, result ->
if (!desc.parent) { // will match the outermost suite
def output = "Results: " +
"${result.resultType} (${result.testCount} tests, " +
"${result.successfulTestCount} successes, " +
"${result.failedTestCount} failures, " +
"${result.skippedTestCount} skipped)"

def startItem = '| ', endItem = ' |'
def repeatLength = startItem.length() + output.length() + endItem.length()
println('\n' + ('-' * repeatLength) + '\n' + startItem + output + endItem + '\n' + ('-' * repeatLength))
}
}
}
}
}
10 changes: 5 additions & 5 deletions buildSrc/src/main/kotlin/Versions.kt
@@ -1,7 +1,7 @@
object Versions {
const val protobuf = "3.7.1"
const val grpc = "1.21.0"
const val kotlin = "1.3.40"
const val coroutines = "1.2.2"
const val mockk = "1.9.1"
const val protobuf = "3.9.0"
const val grpc = "1.22.1"
const val kotlin = "1.3.41"
const val coroutines = "1.3.0-RC2"
const val mockk = "1.9.3"
}
11 changes: 11 additions & 0 deletions kroto-plus-coroutines/build.gradle
Expand Up @@ -8,6 +8,7 @@ def experimentalFlags = [
"-Xuse-experimental=kotlin.Experimental",
"-Xuse-experimental=kotlinx.coroutines.ExperimentalCoroutinesApi",
"-Xuse-experimental=kotlinx.coroutines.ObsoleteCoroutinesApi",
"-Xuse-experimental=kotlinx.coroutines.FlowPreview",
"-Xuse-experimental=com.github.marcoferrer.krotoplus.coroutines.KrotoPlusInternalApi"
]

Expand Down Expand Up @@ -35,8 +36,18 @@ dependencies {
testImplementation project(':test-api:java')
testImplementation "io.mockk:mockk:${Versions.mockk}"
testImplementation "org.jetbrains.kotlinx:kotlinx-coroutines-test:${Versions.coroutines}"

// Not included by default due to the following issue
// https://github.com/Kotlin/kotlinx.coroutines/issues/1060
// testImplementation "org.jetbrains.kotlinx:kotlinx-coroutines-debug:${Versions.coroutines}"
}


//test {
// systemProperty "kotlinx.coroutines.scheduler.core.pool.size", "1"
// systemProperty "kotlinx.coroutines.scheduler.core.max.size", "1"
//}

tasks.withType(JavaCompile) {
enabled = false
}
Expand Down
Expand Up @@ -20,8 +20,10 @@ import io.grpc.stub.CallStreamObserver
import kotlinx.coroutines.CoroutineExceptionHandler
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.channels.ActorScope
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.channels.actor
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger

Expand All @@ -42,40 +44,60 @@ internal fun <T> CallStreamObserver<*>.applyInboundFlowControl(
}
}

internal typealias MessageHandler = suspend ActorScope<*>.() -> Unit

internal fun <T> CoroutineScope.applyOutboundFlowControl(
streamObserver: CallStreamObserver<T>,
targetChannel: Channel<T>
){
val isOutboundJobRunning = AtomicBoolean()
): SendChannel<MessageHandler> {

val isCompleted = AtomicBoolean()
val channelIterator = targetChannel.iterator()
streamObserver.setOnReadyHandler {
if(targetChannel.isClosedForReceive){
streamObserver.completeSafely()
}else if(
val messageHandlerBlock: MessageHandler = handler@ {
while(
streamObserver.isReady &&
!targetChannel.isClosedForReceive &&
isOutboundJobRunning.compareAndSet(false, true)
channelIterator.hasNext()
){
launch(Dispatchers.Unconfined + CoroutineExceptionHandler { _, e ->
streamObserver.completeSafely(e)
targetChannel.close(e)
}) {
try{
while(
streamObserver.isReady &&
!targetChannel.isClosedForReceive &&
channelIterator.hasNext()
){
val value = channelIterator.next()
streamObserver.onNext(value)
}
if(targetChannel.isClosedForReceive){
streamObserver.onCompleted()
}
} finally {
isOutboundJobRunning.set(false)
}
streamObserver.onNext(channelIterator.next())
}
if(targetChannel.isClosedForReceive && isCompleted.compareAndSet(false,true)){
streamObserver.onCompleted()
channel.close()
}
}

val messageHandlerActor = actor<MessageHandler>(
capacity = Channel.UNLIMITED,
context = Dispatchers.Unconfined + CoroutineExceptionHandler { _, e ->
streamObserver.completeSafely(e)
targetChannel.close(e)
}
) {

for (handler in channel) {
if (isCompleted.get()) break
handler(this)
}
if(!isCompleted.get()) {
streamObserver.completeSafely()
}
}

targetChannel.invokeOnClose {
messageHandlerActor.close()
}

streamObserver.setOnReadyHandler {
try {
if(!messageHandlerActor.isClosedForSend){
messageHandlerActor.offer(messageHandlerBlock)
}
}catch (e: Throwable){
// If offer throws an exception then it is
// either already closed or there was a failure
// which has already cleaned up call resources
}
}

return messageHandlerActor
}
Expand Up @@ -17,9 +17,11 @@
package com.github.marcoferrer.krotoplus.coroutines.client

import com.github.marcoferrer.krotoplus.coroutines.call.FlowControlledInboundStreamObserver
import com.github.marcoferrer.krotoplus.coroutines.call.MessageHandler
import com.github.marcoferrer.krotoplus.coroutines.call.applyOutboundFlowControl
import io.grpc.stub.ClientCallStreamObserver
import io.grpc.stub.ClientResponseObserver
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.SendChannel
Expand Down Expand Up @@ -97,17 +99,35 @@ internal class ClientBidiCallChannelImpl<ReqT,RespT>(

override lateinit var callStreamObserver: ClientCallStreamObserver<ReqT>

private lateinit var outboundMessageHandler: SendChannel<MessageHandler>

override fun beforeStart(requestStream: ClientCallStreamObserver<ReqT>) {
callStreamObserver = requestStream.apply { disableAutoInboundFlowControl() }
applyOutboundFlowControl(requestStream,outboundChannel)
outboundMessageHandler = applyOutboundFlowControl(requestStream,outboundChannel)

inboundChannel.invokeOnClose {
// If the client prematurely closes the response channel
// we need to propagate this as a cancellation to the underlying call
if(!outboundChannel.isClosedForSend){
callStreamObserver.cancel("Call has been cancelled", it)
}
}
}

override fun onNext(value: RespT): Unit = onNextWithBackPressure(value)

override fun onError(t: Throwable) {
outboundChannel.close(t)
outboundChannel.cancel()
outboundChannel.cancel(CancellationException(t.message,t))
inboundChannel.close(t)
outboundMessageHandler.close(t)
}

override fun onCompleted() {
super.onCompleted()
if (isChannelReadyForClose) {
outboundMessageHandler.close()
}
}
}

Expand Up @@ -42,15 +42,24 @@ internal class ClientResponseStreamChannel<ReqT, RespT>(

override lateinit var callStreamObserver: ClientCallStreamObserver<ReqT>

private var aborted: Boolean = false

override fun beforeStart(requestStream: ClientCallStreamObserver<ReqT>) {
callStreamObserver = requestStream.apply {
applyInboundFlowControl(inboundChannel,transientInboundMessageCount)
}

inboundChannel.invokeOnClose {
if(!isInboundCompleted.get() && !aborted){
callStreamObserver.cancel("Call has been cancelled", it)
}
}
}

override fun onNext(value: RespT): Unit = onNextWithBackPressure(value)

override fun onError(t: Throwable) {
aborted = true
inboundChannel.close(t)
}
}
Expand Up @@ -66,7 +66,7 @@ public fun <ReqT, RespT> ServiceScope.serverCallServerStreaming(
val serverCallObserver = responseObserver as ServerCallStreamObserver<RespT>
with(newRpcScope(initialContext, methodDescriptor)) {
bindToClientCancellation(serverCallObserver)
applyOutboundFlowControl(serverCallObserver,responseChannel)
val outboundMessageHandler = applyOutboundFlowControl(serverCallObserver,responseChannel)
launch(start = CoroutineStart.ATOMIC) {
try{
block(responseChannel)
Expand All @@ -75,6 +75,8 @@ public fun <ReqT, RespT> ServiceScope.serverCallServerStreaming(
val rpcError = e.toRpcException()
serverCallObserver.completeSafely(rpcError)
responseChannel.close(rpcError)
}finally {
outboundMessageHandler.close()
}
}

Expand Down Expand Up @@ -144,7 +146,7 @@ public fun <ReqT, RespT> ServiceScope.serverCallBidiStreaming(

with(newRpcScope(initialContext, methodDescriptor)) rpcScope@ {
bindToClientCancellation(serverCallObserver)
applyOutboundFlowControl(serverCallObserver,responseChannel)
val outboundMessageHandler = applyOutboundFlowControl(serverCallObserver,responseChannel)
val requestChannel = ServerRequestStreamChannel<ReqT>(
coroutineContext = coroutineContext,
callStreamObserver = serverCallObserver,
Expand Down Expand Up @@ -176,6 +178,7 @@ public fun <ReqT, RespT> ServiceScope.serverCallBidiStreaming(
if (!requestChannel.isClosedForReceive) {
requestChannel.cancel()
}
outboundMessageHandler.close()
}
}

Expand Down
Expand Up @@ -28,6 +28,7 @@ import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertNotEquals

@Suppress("DEPRECATION")
class CallOptionsTest {

@Test
Expand Down Expand Up @@ -131,4 +132,4 @@ class CallOptionsTest {
callOptions.getOption(CALL_OPTION_COROUTINE_CONTEXT)
)
}
}
}
Expand Up @@ -36,7 +36,7 @@ class NewGrpcStubTests {
val stub = scope
.newGrpcStub(GreeterCoroutineGrpc.GreeterCoroutineStub, channel)

assertEquals(nameElement.name, stub.coroutineContext[CoroutineName]?.name)
assertEquals(nameElement.name, stub.context[CoroutineName]?.name)
}


Expand All @@ -50,7 +50,7 @@ class NewGrpcStubTests {
val stub = CoroutineScope(scopeNameElement + scopeJob)
.newGrpcStub(GreeterCoroutineGrpc.GreeterCoroutineStub, channel,expectedNameElement)

assertEquals(expectedNameElement.name, stub.coroutineContext[CoroutineName]?.name)
assertEquals(scopeJob, stub.coroutineContext[Job])
assertEquals(expectedNameElement.name, stub.context[CoroutineName]?.name)
assertEquals(scopeJob, stub.context[Job])
}
}

0 comments on commit 0454f13

Please sign in to comment.