New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Outbound flow control bugfix #61
Changes from 11 commits
125e6a3
757da7f
11d7169
bed8d00
fec2f7d
5df774a
6b7fdea
7621579
afd315e
42a9460
4da8aaf
d99af22
bdba585
11b91e3
18aac4f
8e9bd7c
b4bd070
59715c5
5f326a3
4a667e7
7760fcd
c76b00b
19c0ce4
9bcd734
4375f58
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,7 @@ | ||
object Versions { | ||
const val protobuf = "3.7.1" | ||
const val grpc = "1.21.0" | ||
const val kotlin = "1.3.40" | ||
const val coroutines = "1.2.2" | ||
const val protobuf = "3.9.0" | ||
const val grpc = "1.19.0" | ||
const val kotlin = "1.3.41" | ||
const val coroutines = "1.3.0-RC" | ||
const val mockk = "1.9.1" | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,8 +20,9 @@ import io.grpc.stub.CallStreamObserver | |
import kotlinx.coroutines.CoroutineExceptionHandler | ||
import kotlinx.coroutines.CoroutineScope | ||
import kotlinx.coroutines.Dispatchers | ||
import kotlinx.coroutines.channels.ActorScope | ||
import kotlinx.coroutines.channels.Channel | ||
import kotlinx.coroutines.launch | ||
import kotlinx.coroutines.channels.actor | ||
import java.util.concurrent.atomic.AtomicBoolean | ||
import java.util.concurrent.atomic.AtomicInteger | ||
|
||
|
@@ -42,40 +43,48 @@ internal fun <T> CallStreamObserver<*>.applyInboundFlowControl( | |
} | ||
} | ||
|
||
private typealias MessageHandler = suspend ActorScope<*>.() -> Unit | ||
|
||
internal fun <T> CoroutineScope.applyOutboundFlowControl( | ||
streamObserver: CallStreamObserver<T>, | ||
targetChannel: Channel<T> | ||
){ | ||
val isOutboundJobRunning = AtomicBoolean() | ||
|
||
val isCompleted = AtomicBoolean() | ||
val channelIterator = targetChannel.iterator() | ||
streamObserver.setOnReadyHandler { | ||
if(targetChannel.isClosedForReceive){ | ||
streamObserver.completeSafely() | ||
}else if( | ||
val messageHandlerBlock: MessageHandler = handler@ { | ||
while( | ||
streamObserver.isReady && | ||
!targetChannel.isClosedForReceive && | ||
isOutboundJobRunning.compareAndSet(false, true) | ||
channelIterator.hasNext() | ||
){ | ||
launch(Dispatchers.Unconfined + CoroutineExceptionHandler { _, e -> | ||
streamObserver.completeSafely(e) | ||
targetChannel.close(e) | ||
}) { | ||
try{ | ||
while( | ||
streamObserver.isReady && | ||
!targetChannel.isClosedForReceive && | ||
channelIterator.hasNext() | ||
){ | ||
val value = channelIterator.next() | ||
streamObserver.onNext(value) | ||
} | ||
if(targetChannel.isClosedForReceive){ | ||
streamObserver.onCompleted() | ||
} | ||
} finally { | ||
isOutboundJobRunning.set(false) | ||
} | ||
} | ||
streamObserver.onNext(channelIterator.next()) | ||
} | ||
if(targetChannel.isClosedForReceive && isCompleted.compareAndSet(false,true)){ | ||
streamObserver.onCompleted() | ||
channel.close() | ||
} | ||
} | ||
|
||
val messageHandlerActor = actor<MessageHandler>( | ||
capacity = Channel.UNLIMITED, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A limited capacity didn't work? I am wondering if it's possible that the channel becomes a memory leak if jobs are added faster than the worker consumes them. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thats a good catch. I must've changed it while debugging. I'll test it with the value reverted to This implementation is based off the native grpc util There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am using |
||
context = Dispatchers.Unconfined + CoroutineExceptionHandler { _, e -> | ||
streamObserver.completeSafely(e) | ||
targetChannel.close(e) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it is missing that the |
||
} | ||
) { | ||
for(handler in channel){ | ||
if(isCompleted.get()) break | ||
handler(this) | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Adding this here reduced the problem but didn't eliminate it. I guess there must be other paths for the scope to cancel.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So the scope could cancellation can also be propagated from its parent under normal normal coroutine usage. This case was covered before because executing new launch on a cancelled scope would take care. Its hard to reproduce but Im trying a few things now. |
||
} | ||
|
||
streamObserver.setOnReadyHandler { | ||
try { | ||
messageHandlerActor.offer(messageHandlerBlock) | ||
}catch (e: Throwable){ | ||
// If offer throws an exception then it is | ||
// either already closed or there was a failure | ||
// which has already cleaned up call resources | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,12 +17,14 @@ | |
package com.github.marcoferrer.krotoplus.coroutines.client | ||
|
||
|
||
import com.github.marcoferrer.krotoplus.coroutines.utils.COROUTINE_TEST_TIMEOUT | ||
import com.github.marcoferrer.krotoplus.coroutines.utils.assertFails | ||
import com.github.marcoferrer.krotoplus.coroutines.utils.assertFailsWithStatus | ||
import com.github.marcoferrer.krotoplus.coroutines.withCoroutineContext | ||
import io.grpc.CallOptions | ||
import io.grpc.ClientCall | ||
import io.grpc.Status | ||
import io.grpc.examples.helloworld.GreeterCoroutineGrpc | ||
import io.grpc.examples.helloworld.GreeterGrpc | ||
import io.grpc.examples.helloworld.HelloReply | ||
import io.grpc.examples.helloworld.HelloRequest | ||
|
@@ -38,13 +40,19 @@ import kotlinx.coroutines.Job | |
import kotlinx.coroutines.cancel | ||
import kotlinx.coroutines.channels.ReceiveChannel | ||
import kotlinx.coroutines.channels.SendChannel | ||
import kotlinx.coroutines.channels.map | ||
import kotlinx.coroutines.channels.toList | ||
import kotlinx.coroutines.debug.DebugProbes | ||
import kotlinx.coroutines.debug.junit4.CoroutinesTimeout | ||
import kotlinx.coroutines.delay | ||
import kotlinx.coroutines.flow.collectIndexed | ||
import kotlinx.coroutines.flow.consumeAsFlow | ||
import kotlinx.coroutines.flow.map | ||
import kotlinx.coroutines.flow.toList | ||
import kotlinx.coroutines.launch | ||
import kotlinx.coroutines.runBlocking | ||
import org.junit.Rule | ||
import org.junit.Test | ||
import java.util.concurrent.atomic.AtomicInteger | ||
import kotlin.coroutines.CoroutineContext | ||
import kotlin.test.BeforeTest | ||
import kotlin.test.assertEquals | ||
import kotlin.test.assertFailsWith | ||
|
@@ -55,6 +63,10 @@ class ClientCallBidiStreamingTests { | |
@[Rule JvmField] | ||
var grpcServerRule = GrpcServerRule().directExecutor() | ||
|
||
@[Rule JvmField] | ||
public val timeout = CoroutinesTimeout.seconds(COROUTINE_TEST_TIMEOUT) | ||
|
||
|
||
private val methodDescriptor = GreeterGrpc.getSayHelloStreamingMethod() | ||
private val service = spyk(object : GreeterGrpc.GreeterImplBase() {}) | ||
|
||
|
@@ -149,7 +161,7 @@ class ClientCallBidiStreamingTests { | |
requestChannel.close() | ||
} | ||
|
||
responseChannel.map { it.message }.toList() | ||
responseChannel.consumeAsFlow().map { it.message }.toList() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Map operator was deprecated |
||
} | ||
|
||
assertEquals(3,result.size) | ||
|
@@ -332,4 +344,61 @@ class ClientCallBidiStreamingTests { | |
assert(responseChannel.isClosedForReceive) { "Response channel should be closed for receive" } | ||
} | ||
|
||
|
||
@[Rule JvmField] | ||
var grpcServerRule2 = GrpcServerRule() | ||
|
||
@Test | ||
fun `High throughput call succeeds`() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This test was introduced as part of the discussion the was had with @chris-blacker in #59 |
||
grpcServerRule2.serviceRegistry.addService(object : GreeterCoroutineGrpc.GreeterImplBase() { | ||
override val initialContext: CoroutineContext = Dispatchers.Default | ||
override suspend fun sayHelloStreaming( | ||
requestChannel: ReceiveChannel<HelloRequest>, | ||
responseChannel: SendChannel<HelloReply> | ||
) { | ||
requestChannel.consumeAsFlow().collectIndexed { index, value -> | ||
if (index % 1000 == 0) { | ||
// println("Server received $index") | ||
} | ||
|
||
responseChannel.send(HelloReply.newBuilder().setMessage(value.name).build()) | ||
} | ||
responseChannel.close() | ||
} | ||
}) | ||
val stub = GreeterCoroutineGrpc.newStub(grpcServerRule2.channel) | ||
|
||
val (requestChannel, responseChannel) = stub | ||
.clientCallBidiStreaming(methodDescriptor) | ||
|
||
val numMessages = 100000 | ||
val receivedCount = AtomicInteger() | ||
runBlocking(Dispatchers.Default) { | ||
val req = HelloRequest.newBuilder() | ||
.setName("test").build() | ||
|
||
launch { | ||
repeat(numMessages) { | ||
// if (it % 1000 == 0) println("Client sent $it") | ||
requestChannel.send(req) | ||
} | ||
requestChannel.close() | ||
} | ||
|
||
launch { | ||
repeat(numMessages) { | ||
// if (it % 1000 == 0) println("Client received $it") | ||
responseChannel.receive() | ||
receivedCount.incrementAndGet() | ||
} | ||
} | ||
} | ||
// Sleep so that we can ensure the response channel | ||
// has had enough time to close before being asserted on | ||
Thread.sleep(50) | ||
assert(requestChannel.isClosedForSend) { "Request channel should be closed for send" } | ||
assert(responseChannel.isClosedForReceive) { "Response channel should be closed for receive" } | ||
assertEquals(numMessages, receivedCount.get(), "Must response count must equal request count") | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,6 +30,7 @@ import io.mockk.every | |
import io.mockk.spyk | ||
import io.mockk.verify | ||
import kotlinx.coroutines.CancellationException | ||
import kotlinx.coroutines.CompletableDeferred | ||
import kotlinx.coroutines.CoroutineStart | ||
import kotlinx.coroutines.Dispatchers | ||
import kotlinx.coroutines.cancel | ||
|
@@ -74,10 +75,10 @@ class ClientStreamingBackPressureTests { | |
|
||
@Test | ||
fun `Client send suspends until server invokes receive`() { | ||
lateinit var serverRequestChannel: ReceiveChannel<HelloRequest> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This change was to prevent race conditions between the |
||
val deferredServerChannel = CompletableDeferred<ReceiveChannel<HelloRequest>>() | ||
grpcServerRule.serviceRegistry.addService(object : GreeterCoroutineGrpc.GreeterImplBase(){ | ||
override suspend fun sayHelloClientStreaming(requestChannel: ReceiveChannel<HelloRequest>): HelloReply { | ||
serverRequestChannel = spyk(requestChannel) | ||
deferredServerChannel.complete(spyk(requestChannel)) | ||
delay(Long.MAX_VALUE) | ||
return HelloReply.getDefaultInstance() | ||
} | ||
|
@@ -105,6 +106,7 @@ class ClientStreamingBackPressureTests { | |
} | ||
} | ||
|
||
val serverRequestChannel = deferredServerChannel.await() | ||
repeat(3){ | ||
delay(10L) | ||
assertEquals(it + 1, requestCount.get()) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The outbound flow control handler has been refactored. It no longer spawns multiple jobs when applying backpressure and can properly handle superfluous invocations of the on ready handler runnable.