Skip to content

Commit

Permalink
Consistently handle exceptions in reactive streams (Kotlin#2646)
Browse files Browse the repository at this point in the history
* Fixed `PublisherCoroutine`, `rxObservable`, and
  `Flow.toPublisher` ignoring cancellations.
* Fatal exceptions are not treated in a special manner by us
  anymore. Instead, we follow the requirement in the reactive
  streams specification that, in case some method of `Subscriber`
  throws, that subscriber MUST be considered canceled, and the
  exception MUST be reported in someplace other than `onError`.
* Fixed `trySend` sometimes throwing in `PublisherCoroutine` and
  `rxObservable`.
* When an exception happens inside a cancellation handler, we now
  consistently throw the original exception passed to the handler,
  with the new exception added as suppressed.
* Fixed `PublisherCoroutine` and `rxObservable` claiming that the
  channel is not closed for send for some time after `close()` has
  finished.
* Fixed publishers sometimes signalling `onComplete()` after
  cancellation even though their streams are not finite.

Fixes Kotlin#2173
  • Loading branch information
dkhalanskyjb authored and pablobaxter committed Sep 14, 2022
1 parent 014c29e commit 9ff6701
Show file tree
Hide file tree
Showing 29 changed files with 740 additions and 357 deletions.
21 changes: 18 additions & 3 deletions reactive/kotlinx-coroutines-jdk9/test/FlowAsPublisherTest.kt
Expand Up @@ -15,7 +15,7 @@ class FlowAsPublisherTest : TestBase() {
@Test
fun testErrorOnCancellationIsReported() {
expect(1)
flow<Int> {
flow {
try {
emit(2)
} finally {
Expand Down Expand Up @@ -50,13 +50,13 @@ class FlowAsPublisherTest : TestBase() {
@Test
fun testCancellationIsNotReported() {
expect(1)
flow<Int> {
flow {
emit(2)
}.asPublisher().subscribe(object : JFlow.Subscriber<Int> {
private lateinit var subscription: JFlow.Subscription

override fun onComplete() {
expect(3)
expectUnreached()
}

override fun onSubscribe(s: JFlow.Subscription?) {
Expand All @@ -73,6 +73,21 @@ class FlowAsPublisherTest : TestBase() {
expectUnreached()
}
})
finish(3)
}

@Test
fun testFlowWithTimeout() = runTest {
val publisher = flow<Int> {
expect(2)
withTimeout(1) { delay(Long.MAX_VALUE) }
}.asPublisher()
try {
expect(1)
publisher.awaitFirstOrNull()
} catch (e: CancellationException) {
expect(3)
}
finish(4)
}
}
20 changes: 19 additions & 1 deletion reactive/kotlinx-coroutines-jdk9/test/IntegrationTest.kt
Expand Up @@ -5,10 +5,12 @@
package kotlinx.coroutines.jdk9

import kotlinx.coroutines.*
import kotlinx.coroutines.exceptions.*
import org.junit.Test
import kotlinx.coroutines.flow.flowOn
import org.junit.runner.*
import org.junit.runners.*
import kotlin.contracts.*
import java.util.concurrent.Flow as JFlow
import kotlin.coroutines.*
import kotlin.test.*
Expand Down Expand Up @@ -129,4 +131,20 @@ class IntegrationTest(
assertEquals(n, last)
}

}
}

@OptIn(ExperimentalContracts::class)
internal suspend inline fun <reified E: Throwable> assertCallsExceptionHandlerWith(
crossinline operation: suspend (CoroutineExceptionHandler) -> Unit): E {
contract {
callsInPlace(operation, InvocationKind.EXACTLY_ONCE)
}
val handler = CapturingHandler()
return withContext(handler) {
operation(handler)
handler.getException().let {
assertTrue(it is E, it.toString())
it
}
}
}
158 changes: 130 additions & 28 deletions reactive/kotlinx-coroutines-jdk9/test/PublishTest.kt
Expand Up @@ -5,6 +5,7 @@
package kotlinx.coroutines.jdk9

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import org.junit.Test
import java.util.concurrent.Flow as JFlow
import kotlin.test.*
Expand Down Expand Up @@ -121,44 +122,110 @@ class PublishTest : TestBase() {
finish(7)
}

/** Tests that, as soon as `ProducerScope.close` is called, `isClosedForSend` starts returning `true`. */
@Test
fun testOnNextError() = runTest {
fun testChannelClosing() = runTest {
expect(1)
val publisher = flowPublish(currentDispatcher()) {
val publisher = flowPublish<Int>(Dispatchers.Unconfined) {
expect(3)
close()
assert(isClosedForSend)
expect(4)
try {
send("OK")
} catch(e: Throwable) {
expect(6)
assert(e is TestException)
}
}
expect(2)
try {
expect(2)
publisher.awaitFirstOrNull()
} catch (e: CancellationException) {
expect(5)
}
finish(6)
}

@Test
fun testOnNextError() = runTest {
val latch = CompletableDeferred<Unit>()
publisher.subscribe(object : JFlow.Subscriber<String> {
override fun onComplete() {
expectUnreached()
expect(1)
assertCallsExceptionHandlerWith<TestException> { exceptionHandler ->
val publisher = flowPublish(currentDispatcher() + exceptionHandler) {
expect(4)
try {
send("OK")
} catch (e: Throwable) {
expect(6)
assert(e is TestException)
assert(isClosedForSend)
latch.complete(Unit)
}
}
expect(2)
publisher.subscribe(object : JFlow.Subscriber<String> {
override fun onComplete() {
expectUnreached()
}

override fun onSubscribe(s: JFlow.Subscription) {
expect(3)
s.request(1)
}
override fun onSubscribe(s: JFlow.Subscription) {
expect(3)
s.request(1)
}

override fun onNext(t: String) {
expect(5)
assertEquals("OK", t)
throw TestException()
}
override fun onNext(t: String) {
expect(5)
assertEquals("OK", t)
throw TestException()
}

override fun onError(t: Throwable) {
expect(7)
assert(t is TestException)
latch.complete(Unit)
override fun onError(t: Throwable) {
expectUnreached()
}
})
latch.await()
}
finish(7)
}

/** Tests the behavior when a call to `onNext` fails after the channel is already closed. */
@Test
fun testOnNextErrorAfterCancellation() = runTest {
assertCallsExceptionHandlerWith<TestException> { handler ->
var producerScope: ProducerScope<Int>? = null
CompletableDeferred<Unit>()
expect(1)
var job: Job? = null
val publisher = flowPublish<Int>(handler + Dispatchers.Unconfined) {
producerScope = this
expect(4)
job = launch {
delay(Long.MAX_VALUE)
}
}
})
latch.await()
finish(8)
expect(2)
publisher.subscribe(object: JFlow.Subscriber<Int> {
override fun onSubscribe(s: JFlow.Subscription) {
expect(3)
s.request(Long.MAX_VALUE)
}
override fun onNext(t: Int) {
expect(6)
assertEquals(1, t)
job!!.cancel()
throw TestException()
}
override fun onError(t: Throwable?) {
/* Correct changes to the implementation could lead to us entering or not entering this method, but
it only matters that if we do, it is the "correct" exception that was validly used to cancel the
coroutine that gets passed here and not `TestException`. */
assertTrue(t is CancellationException)
}
override fun onComplete() { expectUnreached() }
})
expect(5)
val result: ChannelResult<Unit> = producerScope!!.trySend(1)
val e = result.exceptionOrNull()!!
assertTrue(e is CancellationException, "The actual error: $e")
assertTrue(producerScope!!.isClosedForSend)
assertTrue(result.isFailure)
}
finish(7)
}

@Test
Expand All @@ -182,4 +249,39 @@ class PublishTest : TestBase() {
fun testIllegalArgumentException() {
assertFailsWith<IllegalArgumentException> { flowPublish<Int>(Job()) { } }
}

/** Tests that `trySend` doesn't throw in `flowPublish`. */
@Test
fun testTrySendNotThrowing() = runTest {
var producerScope: ProducerScope<Int>? = null
expect(1)
val publisher = flowPublish<Int>(Dispatchers.Unconfined) {
producerScope = this
expect(3)
delay(Long.MAX_VALUE)
}
val job = launch(start = CoroutineStart.UNDISPATCHED) {
expect(2)
publisher.awaitFirstOrNull()
expectUnreached()
}
job.cancel()
expect(4)
val result = producerScope!!.trySend(1)
assertTrue(result.isFailure)
finish(5)
}

/** Tests that all methods on `flowPublish` fail without closing the channel when attempting to emit `null`. */
@Test
fun testEmittingNull() = runTest {
val publisher = flowPublish {
assertFailsWith<NullPointerException> { send(null) }
assertFailsWith<NullPointerException> { trySend(null) }
@Suppress("DEPRECATION")
assertFailsWith<NullPointerException> { offer(null) }
send("OK")
}
assertEquals("OK", publisher.awaitFirstOrNull())
}
}

0 comments on commit 9ff6701

Please sign in to comment.