Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Outbound flow control bugfix #61

Merged
merged 25 commits into from Aug 22, 2019
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
125e6a3
update dependencies
marcoferrer Aug 4, 2019
757da7f
fix typo in config message name (#57)
marcoferrer Aug 4, 2019
11d7169
clean up usages of deprecated apis
marcoferrer Aug 4, 2019
bed8d00
fix race condition in unit back-pressure test
marcoferrer Aug 4, 2019
fec2f7d
add test from @chris-blacker for high volume race condition
marcoferrer Aug 4, 2019
5df774a
Revert "clean up usages of deprecated apis"
marcoferrer Aug 4, 2019
6b7fdea
fix race condition in outbound flow control handler
marcoferrer Aug 4, 2019
7621579
update outbound flow control
marcoferrer Aug 5, 2019
afd315e
add coroutine timeout rule to unit tests
marcoferrer Aug 5, 2019
42a9460
debug ci issues
marcoferrer Aug 5, 2019
4da8aaf
downgrade grpc version
marcoferrer Aug 5, 2019
d99af22
propagate scope cancellation to target channel in outbound flow control
marcoferrer Aug 11, 2019
bdba585
configure detailed test output
marcoferrer Aug 12, 2019
11b91e3
disable coroutines debug artifact
marcoferrer Aug 12, 2019
18aac4f
update dependency versions
marcoferrer Aug 12, 2019
8e9bd7c
add bidi streaming integration tests
marcoferrer Aug 12, 2019
b4bd070
propagate client inbound channel close as call cancellation
marcoferrer Aug 12, 2019
59715c5
close outbound message handler in server call handlers
marcoferrer Aug 12, 2019
5f326a3
improve determinism of tests
marcoferrer Aug 12, 2019
4a667e7
Update build.gradle
marcoferrer Aug 12, 2019
7760fcd
Update CallOptionsTest.kt
marcoferrer Aug 12, 2019
c76b00b
add withContext to atomic server bidi streaming test
marcoferrer Aug 13, 2019
19c0ce4
fix flaky cancellation message checking
marcoferrer Aug 13, 2019
9bcd734
update tests
marcoferrer Aug 22, 2019
4375f58
update tests
marcoferrer Aug 22, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 5 additions & 5 deletions CHANGELOG.md
@@ -1,9 +1,9 @@
## Version 0.4.1
## Version 0.5.0-RC
_\*\*-\*\*_
* New: Update to Kotlin `1.3.40`
* New: Update to Kotlin Coroutines `1.2.2`
* New: Update to gRPC `1.21.0`
* New: Update to protobuf `3.7.1`
* New: Update to Kotlin `1.3.41`
* New: Update to Kotlin Coroutines `1.3.0-RC`
* New: Update to gRPC `1.22.1`
* New: Update to protobuf `3.9.0`



Expand Down
8 changes: 4 additions & 4 deletions buildSrc/src/main/kotlin/Versions.kt
@@ -1,7 +1,7 @@
object Versions {
const val protobuf = "3.7.1"
const val grpc = "1.21.0"
const val kotlin = "1.3.40"
const val coroutines = "1.2.2"
const val protobuf = "3.9.0"
const val grpc = "1.22.1"
const val kotlin = "1.3.41"
const val coroutines = "1.3.0-RC"
const val mockk = "1.9.1"
}
2 changes: 2 additions & 0 deletions kroto-plus-coroutines/build.gradle
Expand Up @@ -8,6 +8,7 @@ def experimentalFlags = [
"-Xuse-experimental=kotlin.Experimental",
"-Xuse-experimental=kotlinx.coroutines.ExperimentalCoroutinesApi",
"-Xuse-experimental=kotlinx.coroutines.ObsoleteCoroutinesApi",
"-Xuse-experimental=kotlinx.coroutines.FlowPreview",
"-Xuse-experimental=com.github.marcoferrer.krotoplus.coroutines.KrotoPlusInternalApi"
]

Expand Down Expand Up @@ -35,6 +36,7 @@ dependencies {
testImplementation project(':test-api:java')
testImplementation "io.mockk:mockk:${Versions.mockk}"
testImplementation "org.jetbrains.kotlinx:kotlinx-coroutines-test:${Versions.coroutines}"
testImplementation "org.jetbrains.kotlinx:kotlinx-coroutines-debug:${Versions.coroutines}"
}

tasks.withType(JavaCompile) {
Expand Down
Expand Up @@ -20,9 +20,10 @@ import io.grpc.stub.CallStreamObserver
import kotlinx.coroutines.CoroutineExceptionHandler
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.channels.ActorScope
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
import java.util.concurrent.atomic.AtomicBoolean
import kotlinx.coroutines.channels.actor
import kotlinx.coroutines.channels.consumeEach
import java.util.concurrent.atomic.AtomicInteger


Expand All @@ -42,40 +43,51 @@ internal fun <T> CallStreamObserver<*>.applyInboundFlowControl(
}
}

private typealias MessageHandler = suspend ActorScope<*>.() -> Unit

internal fun <T> CoroutineScope.applyOutboundFlowControl(
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The outbound flow control handler has been refactored. It no longer spawns multiple jobs when applying backpressure and can properly handle superfluous invocations of the on ready handler runnable.

streamObserver: CallStreamObserver<T>,
targetChannel: Channel<T>
){
val isOutboundJobRunning = AtomicBoolean()

val channelIterator = targetChannel.iterator()
streamObserver.setOnReadyHandler {
if(targetChannel.isClosedForReceive){
streamObserver.completeSafely()
}else if(
val messageHandlerBlock: MessageHandler = {
while(
streamObserver.isReady &&
!targetChannel.isClosedForReceive &&
isOutboundJobRunning.compareAndSet(false, true)
channelIterator.hasNext()
){
launch(Dispatchers.Unconfined + CoroutineExceptionHandler { _, e ->
streamObserver.completeSafely(e)
targetChannel.close(e)
}) {
try{
while(
streamObserver.isReady &&
!targetChannel.isClosedForReceive &&
channelIterator.hasNext()
){
val value = channelIterator.next()
streamObserver.onNext(value)
}
if(targetChannel.isClosedForReceive){
streamObserver.onCompleted()
}
} finally {
isOutboundJobRunning.set(false)
}
val value = channelIterator.next()
streamObserver.onNext(value)
}
if(targetChannel.isClosedForReceive){
channel.close()
}
}

val messageHandlerActor = actor<MessageHandler>(
capacity = 1,
context = Dispatchers.Unconfined + CoroutineExceptionHandler { _, e ->
streamObserver.completeSafely(e)
targetChannel.close(e)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is missing that the targetChannel is canceled in some situations. I tried canceling it here but this code was not being executed when I expected.
I have encountered that a client bidi call send hangs when the call is getting canceled in another thread because the channel is not canceled. This happened when the call is canceled while a send was pending in the call rendezvous outboundChannel. I think the rpc scope then somehow cleans up without canceling the channel, thus the send hangs.

}
) {
consumeEach { it.invoke(this) }
if(targetChannel.isClosedForReceive){
streamObserver.onCompleted()
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding this here reduced the problem but didn't eliminate it. I guess there must be other paths for the scope to cancel.

       catch(ex: CancellationException) {
            targetChannel.cancel(ex)
            throw ex
        }

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the scope could cancellation can also be propagated from its parent under normal normal coroutine usage. This case was covered before because executing new launch on a cancelled scope would take care. Its hard to reproduce but Im trying a few things now.

}

streamObserver.setOnReadyHandler {
try {
if(targetChannel.isClosedForReceive){
messageHandlerActor.close()
}else{
messageHandlerActor.offer(messageHandlerBlock)
}
}catch (e: Throwable){
// If offer throws an exception then it is
// either already closed or there was a failure
// which has already cleaned up call resources
}
}
}
Expand Up @@ -14,6 +14,8 @@
* limitations under the License.
*/



package com.github.marcoferrer.krotoplus.coroutines

import io.grpc.CallOptions
Expand All @@ -28,6 +30,7 @@ import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertNotEquals

@Suppress("DEPRECATION")
class CallOptionsTest {

@Test
Expand Down
Expand Up @@ -36,7 +36,7 @@ class NewGrpcStubTests {
val stub = scope
.newGrpcStub(GreeterCoroutineGrpc.GreeterCoroutineStub, channel)

assertEquals(nameElement.name, stub.coroutineContext[CoroutineName]?.name)
assertEquals(nameElement.name, stub.context[CoroutineName]?.name)
}


Expand All @@ -50,7 +50,7 @@ class NewGrpcStubTests {
val stub = CoroutineScope(scopeNameElement + scopeJob)
.newGrpcStub(GreeterCoroutineGrpc.GreeterCoroutineStub, channel,expectedNameElement)

assertEquals(expectedNameElement.name, stub.coroutineContext[CoroutineName]?.name)
assertEquals(scopeJob, stub.coroutineContext[Job])
assertEquals(expectedNameElement.name, stub.context[CoroutineName]?.name)
assertEquals(scopeJob, stub.context[Job])
}
}
Expand Up @@ -23,6 +23,7 @@ import com.github.marcoferrer.krotoplus.coroutines.withCoroutineContext
import io.grpc.CallOptions
import io.grpc.ClientCall
import io.grpc.Status
import io.grpc.examples.helloworld.GreeterCoroutineGrpc
import io.grpc.examples.helloworld.GreeterGrpc
import io.grpc.examples.helloworld.HelloReply
import io.grpc.examples.helloworld.HelloRequest
Expand All @@ -38,13 +39,19 @@ import kotlinx.coroutines.Job
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.channels.map
import kotlinx.coroutines.channels.toList
import kotlinx.coroutines.debug.DebugProbes
import kotlinx.coroutines.debug.junit4.CoroutinesTimeout
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collectIndexed
import kotlinx.coroutines.flow.consumeAsFlow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.junit.Rule
import org.junit.Test
import java.util.concurrent.atomic.AtomicInteger
import kotlin.coroutines.CoroutineContext
import kotlin.test.BeforeTest
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
Expand All @@ -55,6 +62,10 @@ class ClientCallBidiStreamingTests {
@[Rule JvmField]
var grpcServerRule = GrpcServerRule().directExecutor()

@[Rule JvmField]
public val timeout = CoroutinesTimeout.seconds(30)
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This allows us to debug coroutine deadlocks in the unit tests.



private val methodDescriptor = GreeterGrpc.getSayHelloStreamingMethod()
private val service = spyk(object : GreeterGrpc.GreeterImplBase() {})

Expand Down Expand Up @@ -149,7 +160,7 @@ class ClientCallBidiStreamingTests {
requestChannel.close()
}

responseChannel.map { it.message }.toList()
responseChannel.consumeAsFlow().map { it.message }.toList()
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Map operator was deprecated

}

assertEquals(3,result.size)
Expand Down Expand Up @@ -332,4 +343,58 @@ class ClientCallBidiStreamingTests {
assert(responseChannel.isClosedForReceive) { "Response channel should be closed for receive" }
}

@Test
fun `High throughput call succeeds`() {
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test was introduced as part of the discussion the was had with @chris-blacker in #59

grpcServerRule.serviceRegistry.addService(object : GreeterCoroutineGrpc.GreeterImplBase() {
override val initialContext: CoroutineContext = Dispatchers.Default
override suspend fun sayHelloStreaming(
requestChannel: ReceiveChannel<HelloRequest>,
responseChannel: SendChannel<HelloReply>
) {
requestChannel.consumeAsFlow().collectIndexed { index, value ->
if (index % 1000 == 0) {
// println("Server received $index")
}

responseChannel.send(HelloReply.newBuilder().setMessage(value.name).build())
}
responseChannel.close()
}
})
val stub = GreeterCoroutineGrpc.newStub(grpcServerRule.channel)

val (requestChannel, responseChannel) = stub
.clientCallBidiStreaming(methodDescriptor)

val numMessages = 100000
val receivedCount = AtomicInteger()
runBlocking(Dispatchers.Default) {
DebugProbes.install()
val req = HelloRequest.newBuilder()
.setName("test").build()

launch {
repeat(numMessages) {
// if (it % 1000 == 0) println("Client sent $it")
requestChannel.send(req)
}
requestChannel.close()
}

launch {
repeat(numMessages) {
// if (it % 1000 == 0) println("Client received $it")
responseChannel.receive()
receivedCount.incrementAndGet()
}
}
}
// Sleep so that we can ensure the response channel
// has had enough time to close before being asserted on
Thread.sleep(50)
assert(requestChannel.isClosedForSend) { "Request channel should be closed for send" }
assert(responseChannel.isClosedForReceive) { "Response channel should be closed for receive" }
assertEquals(numMessages, receivedCount.get(), "Must response count must equal request count")
}

}
Expand Up @@ -30,6 +30,7 @@ import io.mockk.every
import io.mockk.spyk
import io.mockk.verify
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.cancel
Expand Down Expand Up @@ -74,10 +75,10 @@ class ClientStreamingBackPressureTests {

@Test
fun `Client send suspends until server invokes receive`() {
lateinit var serverRequestChannel: ReceiveChannel<HelloRequest>
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change was to prevent race conditions between the serverRequestChannel var being populated and test assertions running.

val deferredServerChannel = CompletableDeferred<ReceiveChannel<HelloRequest>>()
grpcServerRule.serviceRegistry.addService(object : GreeterCoroutineGrpc.GreeterImplBase(){
override suspend fun sayHelloClientStreaming(requestChannel: ReceiveChannel<HelloRequest>): HelloReply {
serverRequestChannel = spyk(requestChannel)
deferredServerChannel.complete(spyk(requestChannel))
delay(Long.MAX_VALUE)
return HelloReply.getDefaultInstance()
}
Expand Down Expand Up @@ -105,6 +106,7 @@ class ClientStreamingBackPressureTests {
}
}

val serverRequestChannel = deferredServerChannel.await()
repeat(3){
delay(10L)
assertEquals(it + 1, requestCount.get())
Expand Down
Expand Up @@ -42,9 +42,10 @@ import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.channels.mapTo
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.consumeAsFlow
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.yield
Expand All @@ -63,7 +64,9 @@ class ServerCallBidiStreamingTests {
private val expectedResponse = HelloReply.newBuilder().setMessage("reply").build()
private val responseObserver = spyk<StreamObserver<HelloReply>>(object : StreamObserver<HelloReply> {
override fun onNext(value: HelloReply?) {}
override fun onError(t: Throwable?) {}
override fun onError(t: Throwable?) {
print(t?.message)
}
override fun onCompleted() {}
})

Expand Down Expand Up @@ -93,9 +96,11 @@ class ServerCallBidiStreamingTests {
) {
reqChannel = requestChannel
respChannel = responseChannel
requestChannel.mapTo(responseChannel) {
HelloReply.newBuilder().setMessage("Reply: ${it.name}").build()
}
requestChannel
.consumeAsFlow()
.collect {
responseChannel.send(HelloReply.newBuilder().setMessage("Reply: ${it.name}").build())
}
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The mapTo operator on channel is now deprecated. With out this refactor, the test would fail.

}
})

Expand Down Expand Up @@ -128,10 +133,11 @@ class ServerCallBidiStreamingTests {
respChannel = responseChannel
var groupCount = 0
val requestIter = requestChannel.iterator()

while (requestIter.hasNext()) {
val requestValues = listOf(
requestIter.next(),
requestIter.next(),
requestIter.next().also { requestIter.hasNext() },
requestIter.next().also { requestIter.hasNext() },
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was required after updating to coroutines 1.3.0-RC. Every call to next() must be preceded by a call to hasNext()

requestIter.next()
)
responseChannel.send {
Expand Down
Expand Up @@ -144,7 +144,6 @@ class GrpcCoroutinesGeneratorTests {

val (requestChannel, responseChannel) = stub.sayHelloStreaming()

// launchProducerJob(requestChannel) {
launch(Dispatchers.Default) {
repeat(3) {
requestChannel.send { name = "name $it" }
Expand All @@ -153,7 +152,6 @@ class GrpcCoroutinesGeneratorTests {
}

val results = responseChannel.toList()
println(results)
assertEquals(9, results.size)

val expected = "name 0|name 0|name 0" +
Expand Down