From 1b5d6338c57ce5b08b44bbf9cd1e101ea485fbfc Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Tue, 13 Apr 2021 11:48:33 +0300 Subject: [PATCH 1/7] Consistently handle exceptions in reactive streams * Fixed `PublisherCoroutine` and `rxObservable` ignoring cancellations. * Fatal exceptions are not treated in a special manner by us anymore. Instead, we follow the requirement in the reactive streams specification that, in case some method of `Subscriber` throws, that subscriber MUST be considered cancelled, and the exception MUST be reported in some place other than `onError`. * Fixed `trySend` sometimes throwing in `PublisherCoroutine` and `rxObservable`. * When an exception happens inside a cancellation handler, we now consistently throw the original exception passed to the handler, with the new exception added as suppressed. --- .../test/IntegrationTest.kt | 22 ++- .../test/PublishTest.kt | 69 ++++---- .../src/Publish.kt | 153 ++++++++++-------- .../test/IntegrationTest.kt | 53 +++--- .../test/PublishTest.kt | 69 ++++---- .../kotlinx-coroutines-reactor/src/Flux.kt | 9 +- .../kotlinx-coroutines-reactor/src/Mono.kt | 1 + reactive/kotlinx-coroutines-rx2/README.md | 4 - .../src/RxCompletable.kt | 7 +- .../kotlinx-coroutines-rx2/src/RxMaybe.kt | 7 +- .../src/RxObservable.kt | 113 ++++++------- .../kotlinx-coroutines-rx2/src/RxSingle.kt | 7 +- .../test/FlowableExceptionHandlingTest.kt | 16 +- .../test/IntegrationTest.kt | 16 ++ .../test/ObservableExceptionHandlingTest.kt | 28 ++-- .../src/RxCompletable.kt | 7 +- .../kotlinx-coroutines-rx3/src/RxMaybe.kt | 7 +- .../src/RxObservable.kt | 108 ++++++------- .../kotlinx-coroutines-rx3/src/RxSingle.kt | 7 +- .../test/FlowableExceptionHandlingTest.kt | 16 +- .../test/IntegrationTest.kt | 15 ++ .../test/ObservableExceptionHandlingTest.kt | 28 ++-- 22 files changed, 421 insertions(+), 341 deletions(-) diff --git a/reactive/kotlinx-coroutines-jdk9/test/IntegrationTest.kt b/reactive/kotlinx-coroutines-jdk9/test/IntegrationTest.kt index 5bfddfee17..f6dfb591dc 100644 --- a/reactive/kotlinx-coroutines-jdk9/test/IntegrationTest.kt +++ b/reactive/kotlinx-coroutines-jdk9/test/IntegrationTest.kt @@ -129,4 +129,24 @@ class IntegrationTest( assertEquals(n, last) } -} \ No newline at end of file +} + +internal suspend inline fun assertCallsExceptionHandlerWith( + crossinline operation: suspend (CoroutineExceptionHandler) -> Unit): E +{ + val caughtExceptions = mutableListOf() + val exceptionHandler = object: AbstractCoroutineContextElement(CoroutineExceptionHandler), + CoroutineExceptionHandler + { + override fun handleException(context: CoroutineContext, exception: Throwable) { + caughtExceptions += exception + } + } + return withContext(exceptionHandler) { + operation(exceptionHandler) + caughtExceptions.single().let { + assertTrue(it is E, it.toString()) + it + } + } +} diff --git a/reactive/kotlinx-coroutines-jdk9/test/PublishTest.kt b/reactive/kotlinx-coroutines-jdk9/test/PublishTest.kt index 1a36a389fa..04a0f1cb8a 100644 --- a/reactive/kotlinx-coroutines-jdk9/test/PublishTest.kt +++ b/reactive/kotlinx-coroutines-jdk9/test/PublishTest.kt @@ -123,42 +123,43 @@ class PublishTest : TestBase() { @Test fun testOnNextError() = runTest { + val latch = CompletableDeferred() expect(1) - val publisher = flowPublish(currentDispatcher()) { - expect(4) - try { - send("OK") - } catch(e: Throwable) { - expect(6) - assert(e is TestException) - } + assertCallsExceptionHandlerWith { exceptionHandler -> + val publisher = flowPublish(currentDispatcher() + exceptionHandler) { + expect(4) + try { + send("OK") + } catch(e: Throwable) { + expect(6) + assert(e is TestException) + latch.complete(Unit) + } + } + expect(2) + publisher.subscribe(object : JFlow.Subscriber { + override fun onComplete() { + expectUnreached() + } + + override fun onSubscribe(s: JFlow.Subscription) { + expect(3) + s.request(1) + } + + override fun onNext(t: String) { + expect(5) + assertEquals("OK", t) + throw TestException() + } + + override fun onError(t: Throwable) { + expectUnreached() + } + }) + latch.await() } - expect(2) - val latch = CompletableDeferred() - publisher.subscribe(object : JFlow.Subscriber { - override fun onComplete() { - expectUnreached() - } - - override fun onSubscribe(s: JFlow.Subscription) { - expect(3) - s.request(1) - } - - override fun onNext(t: String) { - expect(5) - assertEquals("OK", t) - throw TestException() - } - - override fun onError(t: Throwable) { - expect(7) - assert(t is TestException) - latch.complete(Unit) - } - }) - latch.await() - finish(8) + finish(7) } @Test diff --git a/reactive/kotlinx-coroutines-reactive/src/Publish.kt b/reactive/kotlinx-coroutines-reactive/src/Publish.kt index 383a17d836..a5bc980a94 100644 --- a/reactive/kotlinx-coroutines-reactive/src/Publish.kt +++ b/reactive/kotlinx-coroutines-reactive/src/Publish.kt @@ -10,7 +10,6 @@ import kotlinx.coroutines.selects.* import kotlinx.coroutines.sync.* import org.reactivestreams.* import kotlin.coroutines.* -import kotlin.internal.* /** * Creates cold reactive [Publisher] that runs a given [block] in a coroutine. @@ -74,29 +73,27 @@ public class PublisherCoroutine( private val _nRequested = atomic(0L) // < 0 when closed (CLOSED or SIGNALLED) @Volatile - private var cancelled = false // true when Subscription.cancel() is invoked + private var cancelled = false // true after Subscription.cancel() is invoked override val isClosedForSend: Boolean get() = isCompleted override fun close(cause: Throwable?): Boolean = cancelCoroutine(cause) override fun invokeOnClose(handler: (Throwable?) -> Unit): Nothing = throw UnsupportedOperationException("PublisherCoroutine doesn't support invokeOnClose") - override fun trySend(element: T): ChannelResult { - if (!mutex.tryLock()) return ChannelResult.failure() - doLockedNext(element) - return ChannelResult.success(Unit) - } + // TODO: will throw if `null` is passed -- is throwing this kind of programmer-induced errors okay? + override fun trySend(element: T): ChannelResult = + if (!mutex.tryLock()) { + ChannelResult.failure() + } else { + when (val throwable = doLockedNext(element)) { + null -> ChannelResult.success(Unit) + else -> ChannelResult.closed(throwable) + } + } public override suspend fun send(element: T) { - // fast-path -- try send without suspension - if (trySend(element).isSuccess) return - // slow-path does suspend - return sendSuspend(element) - } - - private suspend fun sendSuspend(element: T) { mutex.lock() - doLockedNext(element) + doLockedNext(element)?.let { throw it } } override val onSend: SelectClause2> @@ -106,13 +103,13 @@ public class PublisherCoroutine( @Suppress("PARAMETER_NAME_CHANGED_ON_OVERRIDE") override fun registerSelectClause2(select: SelectInstance, element: T, block: suspend (SendChannel) -> R) { mutex.onLock.registerSelectClause2(select, null) { - doLockedNext(element) + doLockedNext(element)?.let { throw it } block(this) } } /* - * This code is not trivial because of the two properties: + * This code is not trivial because of the following properties: * 1. It ensures conformance to the reactive specification that mandates that onXXX invocations should not * be concurrent. It uses Mutex to protect all onXXX invocation and ensure conformance even when multiple * coroutines are invoking `send` function. @@ -121,27 +118,60 @@ public class PublisherCoroutine( * globally-scoped coroutine that is invoking `send` outside of this context. Without extra precaution this may * lead to `onNext` that is concurrent with `onComplete/onError`, so that is why signalling for * `onComplete/onError` is also done under the same mutex. + * 3. The reactive specification forbids emitting more elements than requested, so `onNext` is forbidden until the + * subscriber actually requests some elements. This is implemented by the mutex being locked when emitting + * elements is not permitted (`_nRequested.value == 0`). */ - // assert: mutex.isLocked() - private fun doLockedNext(elem: T) { - // check if already closed for send, note that isActive becomes false as soon as cancel() is invoked, - // because the job is cancelled, so this check also ensure conformance to the reactive specification's - // requirement that after cancellation requested we don't call onXXX + /** + * Attempts to emit a value to the subscriber and, if back-pressure permits this, unlock the mutex. + * + * Requires that the caller has locked the mutex before this invocation. + * + * If the channel is closed, returns the corresponding [Throwable]; otherwise, returns `null` to denote success. + * + * @throws NullPointerException if the passed element is `null` + */ + private fun doLockedNext(elem: T): Throwable? { + if (elem == null) { + throw NullPointerException("Can not emit null") + } + /** This guards against the case when the caller of this function managed to lock the mutex not because some + * elements were requested--and thus it is permitted to call `onNext`--but because the channel was closed. + * + * It may look like there is a race condition here between `isActive` and a concurrent cancellation, but it's + * okay for a cancellation to happen during `onNext`, as the reactive spec only requires that we *eventually* + * stop signalling the subscriber. */ if (!isActive) { unlockAndCheckCompleted() - throw getCancellationException() + return getCancellationException() } - // notify subscriber + // notify the subscriber try { subscriber.onNext(elem) - } catch (e: Throwable) { - // If onNext fails with exception, then we cancel coroutine (with this exception) and then rethrow it - // to abort the corresponding send/offer invocation. From the standpoint of coroutines machinery, - // this failure is essentially equivalent to a failure of a child coroutine. - cancelCoroutine(e) + } catch (cause: Throwable) { + /** The reactive streams spec forbids the subscribers from throwing from [Subscriber.onNext] unless the + * element is `null`, which we check not to be the case. Therefore, we report this exception to the handler + * for uncaught exceptions and consider the subscription cancelled, as mandated by + * https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#2.13. + * + * Some reactive implementations, like RxJava or Reactor, are known to throw from [Subscriber.onNext] if the + * execution encounters an exception they consider to be "fatal", like [VirtualMachineError] or + * [ThreadDeath]. Us using the handler for the undeliverable exceptions to signal "fatal" exceptions is + * inconsistent with RxJava and Reactor, which attempt to bubble the exception up the call chain as soon as + * possible. However, we can't do much better here, as simply throwing from all methods indiscriminately + * would violate the contracts we place on them. */ + cancelled = true + val causeDelivered = close(cause) unlockAndCheckCompleted() - throw e + return if (causeDelivered) { + // `cause` is the reason this channel is closed + cause + } else { + // Someone else closed the channel during `onNext`. We report `cause` as an undeliverable exception. + exceptionOnCancelHandler(cause, context) + getCancellationException() + } } // now update nRequested while (true) { // lock-free loop on nRequested @@ -152,12 +182,13 @@ public class PublisherCoroutine( if (_nRequested.compareAndSet(current, updated)) { if (updated == 0L) { // return to keep locked due to back-pressure - return + return null } break // unlock if updated > 0 } } unlockAndCheckCompleted() + return null } private fun unlockAndCheckCompleted() { @@ -177,38 +208,31 @@ public class PublisherCoroutine( // assert: mutex.isLocked() & isCompleted private fun doLockedSignalCompleted(cause: Throwable?, handled: Boolean) { try { - if (_nRequested.value >= CLOSED) { - _nRequested.value = SIGNALLED // we'll signal onError/onCompleted (that the final state -- no CAS needed) - // Specification requires that after cancellation requested we don't call onXXX - if (cancelled) { - // If the parent had failed to handle our exception, then we must not lose this exception - if (cause != null && !handled) exceptionOnCancelHandler(cause, context) - return - } - + if (_nRequested.value == SIGNALLED) + return + _nRequested.value = SIGNALLED // we'll signal onError/onCompleted (the final state, so no CAS needed) + // Specification requires that after the cancellation is requested we eventually stop calling onXXX + if (cancelled) { + // If the parent had failed to handle our exception, then we must not lose this exception + if (cause != null && !handled) exceptionOnCancelHandler(cause, context) + return + } + if (cause == null) { try { - if (cause != null && cause !is CancellationException) { - /* - * Reactive frameworks have two types of exceptions: regular and fatal. - * Regular are passed to onError. - * Fatal can be passed to onError, but even the standard implementations **can just swallow it** (e.g. see #1297). - * Such behaviour is inconsistent, leads to silent failures and we can't possibly know whether - * the cause will be handled by onError (and moreover, it depends on whether a fatal exception was - * thrown by subscriber or upstream). - * To make behaviour consistent and least surprising, we always handle fatal exceptions - * by coroutines machinery, anyway, they should not be present in regular program flow, - * thus our goal here is just to expose it as soon as possible. - */ - subscriber.onError(cause) - if (!handled && cause.isFatal()) { - exceptionOnCancelHandler(cause, context) - } - } else { - subscriber.onComplete() - } + subscriber.onComplete() } catch (e: Throwable) { handleCoroutineException(context, e) } + } else { + try { + // This can't be the cancellation exception from `cancel`, as then `cancelled` would be `true`. + subscriber.onError(cause) + } catch (e: Throwable) { + if (e !== cause) { + cause.addSuppressed(e) + } + handleCoroutineException(context, cause) + } } } finally { mutex.unlock() @@ -217,13 +241,13 @@ public class PublisherCoroutine( override fun request(n: Long) { if (n <= 0) { - // Specification requires IAE for n <= 0 + // Specification requires to call onError with IAE for n <= 0 cancelCoroutine(IllegalArgumentException("non-positive subscription request $n")) return } while (true) { // lock-free loop for nRequested val cur = _nRequested.value - if (cur < 0) return // already closed for send, ignore requests + if (cur < 0) return // already closed for send, ignore requests, as mandated by the reactive streams spec var upd = cur + n if (upd < 0 || n == Long.MAX_VALUE) upd = Long.MAX_VALUE @@ -231,6 +255,11 @@ public class PublisherCoroutine( if (_nRequested.compareAndSet(cur, upd)) { // unlock the mutex when we don't have back-pressure anymore if (cur == 0L) { + /** In a sense, after a successful CAS, it is this invocation, not the coroutine itself, that owns + * the lock, given that `upd` is necessarily strictly positive. Thus, no other operation has the + * right to lower the value on [_nRequested], it can only grow or become [CLOSED]. Therefore, it is + * impossible for any other operations to assume that they own the lock without actually acquiring + * it. */ unlockAndCheckCompleted() } return @@ -271,8 +300,6 @@ public class PublisherCoroutine( cancelled = true super.cancel(null) } - - private fun Throwable.isFatal() = this is VirtualMachineError || this is ThreadDeath || this is LinkageError } @Deprecated( diff --git a/reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt b/reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt index 72479b5c6a..920de57c02 100644 --- a/reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt @@ -218,32 +218,39 @@ class IntegrationTest( }.let { assertTrue(it.message?.contains("onSubscribe") ?: false) } } - private suspend inline fun assertCallsExceptionHandlerWith( - crossinline operation: suspend () -> Unit): E - { - val caughtExceptions = mutableListOf() - val exceptionHandler = object: AbstractCoroutineContextElement(CoroutineExceptionHandler), - CoroutineExceptionHandler - { - override fun handleException(context: CoroutineContext, exception: Throwable) { - caughtExceptions += exception - } + @Test + fun testPublishWithTimeout() = runTest { + val publisher = publish { + expect(2) + withTimeout(1) { delay(100) } } - return withContext(exceptionHandler) { - operation() - caughtExceptions.single().let { - assertTrue(it is E) - it - } + try { + expect(1) + publisher.awaitFirstOrNull() + } catch (e: CancellationException) { + expect(3) } + finish(4) } - private suspend fun checkNumbers(n: Int, pub: Publisher) { - var last = 0 - pub.collect { - assertEquals(++last, it) +} + +internal suspend inline fun assertCallsExceptionHandlerWith( + crossinline operation: suspend (CoroutineExceptionHandler) -> Unit): E +{ + val caughtExceptions = mutableListOf() + val exceptionHandler = object: AbstractCoroutineContextElement(CoroutineExceptionHandler), + CoroutineExceptionHandler + { + override fun handleException(context: CoroutineContext, exception: Throwable) { + caughtExceptions += exception } - assertEquals(n, last) } - -} + return withContext(exceptionHandler) { + operation(exceptionHandler) + caughtExceptions.single().let { + assertTrue(it is E, it.toString()) + it + } + } +} \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt b/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt index 9e3c07b6b7..8510ee28e2 100644 --- a/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt @@ -123,42 +123,43 @@ class PublishTest : TestBase() { @Test fun testOnNextError() = runTest { + val latch = CompletableDeferred() expect(1) - val publisher = publish(currentDispatcher()) { - expect(4) - try { - send("OK") - } catch(e: Throwable) { - expect(6) - assert(e is TestException) - } + assertCallsExceptionHandlerWith { exceptionHandler -> + val publisher = publish(currentDispatcher() + exceptionHandler) { + expect(4) + try { + send("OK") + } catch(e: Throwable) { + expect(6) + assert(e is TestException) + latch.complete(Unit) + } + } + expect(2) + publisher.subscribe(object : Subscriber { + override fun onComplete() { + expectUnreached() + } + + override fun onSubscribe(s: Subscription) { + expect(3) + s.request(1) + } + + override fun onNext(t: String) { + expect(5) + assertEquals("OK", t) + throw TestException() + } + + override fun onError(t: Throwable) { + expectUnreached() + } + }) + latch.await() } - expect(2) - val latch = CompletableDeferred() - publisher.subscribe(object : Subscriber { - override fun onComplete() { - expectUnreached() - } - - override fun onSubscribe(s: Subscription) { - expect(3) - s.request(1) - } - - override fun onNext(t: String) { - expect(5) - assertEquals("OK", t) - throw TestException() - } - - override fun onError(t: Throwable) { - expect(7) - assert(t is TestException) - latch.complete(Unit) - } - }) - latch.await() - finish(8) + finish(7) } @Test diff --git a/reactive/kotlinx-coroutines-reactor/src/Flux.kt b/reactive/kotlinx-coroutines-reactor/src/Flux.kt index b7143288ee..806f5bd5bc 100644 --- a/reactive/kotlinx-coroutines-reactor/src/Flux.kt +++ b/reactive/kotlinx-coroutines-reactor/src/Flux.kt @@ -55,12 +55,13 @@ private fun reactorPublish( coroutine.start(CoroutineStart.DEFAULT, coroutine, block) } -private val REACTOR_HANDLER: (Throwable, CoroutineContext) -> Unit = { e, ctx -> - if (e !is CancellationException) { +private val REACTOR_HANDLER: (Throwable, CoroutineContext) -> Unit = { cause, ctx -> + if (cause !is CancellationException) { try { - Operators.onOperatorError(e, ctx[ReactorContext]?.context ?: Context.empty()) + Operators.onOperatorError(cause, ctx[ReactorContext]?.context ?: Context.empty()) } catch (e: Throwable) { - handleCoroutineException(ctx, e) + cause.addSuppressed(e) + handleCoroutineException(ctx, cause) } } } diff --git a/reactive/kotlinx-coroutines-reactor/src/Mono.kt b/reactive/kotlinx-coroutines-reactor/src/Mono.kt index 307ec2278c..5f1d6e7674 100644 --- a/reactive/kotlinx-coroutines-reactor/src/Mono.kt +++ b/reactive/kotlinx-coroutines-reactor/src/Mono.kt @@ -66,6 +66,7 @@ private class MonoCoroutine( sink.error(cause) } catch (e: Throwable) { // In case of improper error implementation or fatal exceptions + cause.addSuppressed(e) handleCoroutineException(context, cause) } } diff --git a/reactive/kotlinx-coroutines-rx2/README.md b/reactive/kotlinx-coroutines-rx2/README.md index 40fe122f89..1ae2c8a04c 100644 --- a/reactive/kotlinx-coroutines-rx2/README.md +++ b/reactive/kotlinx-coroutines-rx2/README.md @@ -35,7 +35,6 @@ Suspending extension functions and suspending iteration: | [ObservableSource.awaitFirstOrNull][io.reactivex.ObservableSource.awaitFirstOrNull] | Awaits for the first value from the given observable or null | [ObservableSource.awaitLast][io.reactivex.ObservableSource.awaitFirst] | Awaits for the last value from the given observable | [ObservableSource.awaitSingle][io.reactivex.ObservableSource.awaitSingle] | Awaits for the single value from the given observable -| [ObservableSource.openSubscription][io.reactivex.ObservableSource.openSubscription] | Subscribes to observable and returns [ReceiveChannel] Note that `Flowable` is a subclass of [Reactive Streams](https://www.reactive-streams.org) `Publisher` and extensions for it are covered by @@ -47,7 +46,6 @@ Conversion functions: | -------- | --------------- | [Job.asCompletable][kotlinx.coroutines.Job.asCompletable] | Converts job to hot completable | [Deferred.asSingle][kotlinx.coroutines.Deferred.asSingle] | Converts deferred value to hot single -| [ReceiveChannel.asObservable][kotlinx.coroutines.channels.ReceiveChannel.asObservable] | Converts streaming channel to hot observable | [Scheduler.asCoroutineDispatcher][io.reactivex.Scheduler.asCoroutineDispatcher] | Converts scheduler to [CoroutineDispatcher] @@ -86,10 +84,8 @@ Conversion functions: [io.reactivex.ObservableSource.awaitFirstOrElse]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-observable-source/await-first-or-else.html [io.reactivex.ObservableSource.awaitFirstOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-observable-source/await-first-or-null.html [io.reactivex.ObservableSource.awaitSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-observable-source/await-single.html -[io.reactivex.ObservableSource.openSubscription]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-observable-source/open-subscription.html [kotlinx.coroutines.Job.asCompletable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/kotlinx.coroutines.-job/as-completable.html [kotlinx.coroutines.Deferred.asSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/kotlinx.coroutines.-deferred/as-single.html -[kotlinx.coroutines.channels.ReceiveChannel.asObservable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/kotlinx.coroutines.channels.-receive-channel/as-observable.html [io.reactivex.Scheduler.asCoroutineDispatcher]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-scheduler/as-coroutine-dispatcher.html diff --git a/reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt b/reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt index eee84b1bb6..3f6c27a693 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt @@ -50,12 +50,13 @@ private class RxCompletableCoroutine( override fun onCancelled(cause: Throwable, handled: Boolean) { try { - if (!subscriber.tryOnError(cause)) { - handleUndeliverableException(cause, context) + if (subscriber.tryOnError(cause)) { + return } } catch (e: Throwable) { - handleUndeliverableException(e, context) + cause.addSuppressed(e) } + handleUndeliverableException(cause, context) } } diff --git a/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt b/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt index 9682373a79..aa531c6ecf 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt @@ -51,12 +51,13 @@ private class RxMaybeCoroutine( override fun onCancelled(cause: Throwable, handled: Boolean) { try { - if (!subscriber.tryOnError(cause)) { - handleUndeliverableException(cause, context) + if (subscriber.tryOnError(cause)) { + return } } catch (e: Throwable) { - handleUndeliverableException(e, context) + cause.addSuppressed(e) } + handleUndeliverableException(cause, context) } } diff --git a/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt b/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt index 09c5dc1d9f..0f76faa514 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt @@ -11,6 +11,7 @@ import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import kotlinx.coroutines.selects.* import kotlinx.coroutines.sync.* +import java.lang.RuntimeException import kotlin.coroutines.* /** @@ -60,7 +61,7 @@ private class RxObservableCoroutine( ) : AbstractCoroutine(parentContext, false, true), ProducerScope, SelectClause2> { override val channel: SendChannel get() = this - // Mutex is locked when while subscriber.onXXX is being invoked + // Mutex is locked while subscriber.onXXX is being invoked private val mutex = Mutex() private val _signal = atomic(OPEN) @@ -70,28 +71,19 @@ private class RxObservableCoroutine( override fun invokeOnClose(handler: (Throwable?) -> Unit) = throw UnsupportedOperationException("RxObservableCoroutine doesn't support invokeOnClose") - override fun offer(element: T): Boolean { - if (!mutex.tryLock()) return false - doLockedNext(element) - return true - } - - override fun trySend(element: T): ChannelResult { - if (!mutex.tryLock()) return ChannelResult.failure() - doLockedNext(element) - return ChannelResult.success(Unit) - } + override fun trySend(element: T): ChannelResult = + if (!mutex.tryLock()) { + ChannelResult.failure() + } else { + when (val throwable = doLockedNext(element)) { + null -> ChannelResult.success(Unit) + else -> ChannelResult.closed(throwable) + } + } public override suspend fun send(element: T) { - // fast-path -- try send without suspension - if (trySend(element).isSuccess) return - // slow-path does suspend - return sendSuspend(element) - } - - private suspend fun sendSuspend(element: T) { mutex.lock() - doLockedNext(element) + doLockedNext(element)?.let { throw it } } override val onSend: SelectClause2> @@ -99,30 +91,39 @@ private class RxObservableCoroutine( // registerSelectSend @Suppress("PARAMETER_NAME_CHANGED_ON_OVERRIDE") - override fun registerSelectClause2(select: SelectInstance, element: T, block: suspend (SendChannel) -> R) { + override fun registerSelectClause2( + select: SelectInstance, + element: T, + block: suspend (SendChannel) -> R + ) { mutex.onLock.registerSelectClause2(select, null) { - doLockedNext(element) + doLockedNext(element)?.let { throw it } block(this) } } // assert: mutex.isLocked() - private fun doLockedNext(elem: T) { + private fun doLockedNext(elem: T): Throwable? { // check if already closed for send if (!isActive) { doLockedSignalCompleted(completionCause, completionCauseHandled) - throw getCancellationException() + return getCancellationException() } // notify subscriber try { subscriber.onNext(elem) } catch (e: Throwable) { - // If onNext fails with exception, then we cancel coroutine (with this exception) and then rethrow it - // to abort the corresponding send/offer invocation. From the standpoint of coroutines machinery, - // this failure is essentially equivalent to a failure of a child coroutine. - cancelCoroutine(e) - mutex.unlock() - throw e + val cause = UndeliverableException(e) + val causeDelivered = close(cause) + unlockAndCheckCompleted() + return if (causeDelivered) { + // `cause` is the reason this channel is closed + cause + } else { + // Someone else closed the channel during `onNext`. We report `cause` as an undeliverable exception. + handleUndeliverableException(cause, context) + getCancellationException() + } } /* * There is no sense to check for `isActive` before doing `unlock`, because cancellation/completion might @@ -131,6 +132,7 @@ private class RxObservableCoroutine( * We have to recheck `isCompleted` after `unlock` anyway. */ unlockAndCheckCompleted() + return null } private fun unlockAndCheckCompleted() { @@ -144,33 +146,29 @@ private class RxObservableCoroutine( private fun doLockedSignalCompleted(cause: Throwable?, handled: Boolean) { // cancellation failures try { - if (_signal.value >= CLOSED) { - _signal.value = SIGNALLED // we'll signal onError/onCompleted (that the final state -- no CAS needed) + if (_signal.value == SIGNALLED) + return + _signal.value = SIGNALLED // we'll signal onError/onCompleted (that the final state -- no CAS needed) + if (cause == null) { try { - if (cause != null && cause !is CancellationException) { - /* - * Reactive frameworks have two types of exceptions: regular and fatal. - * Regular are passed to onError. - * Fatal can be passed to onError, but even the standard implementations **can just swallow it** (e.g. see #1297). - * Such behaviour is inconsistent, leads to silent failures and we can't possibly know whether - * the cause will be handled by onError (and moreover, it depends on whether a fatal exception was - * thrown by subscriber or upstream). - * To make behaviour consistent and least surprising, we always handle fatal exceptions - * by coroutines machinery, anyway, they should not be present in regular program flow, - * thus our goal here is just to expose it as soon as possible. - */ - subscriber.tryOnError(cause) - if (!handled && cause.isFatal()) { - handleUndeliverableException(cause, context) - } - } - else { - subscriber.onComplete() - } - } catch (e: Throwable) { - // Unhandled exception (cannot handle in other way, since we are already complete) + subscriber.onComplete() + } catch (e: Exception) { handleUndeliverableException(e, context) } + } else if (cause is UndeliverableException) { + /** Such exceptions are not reported to `onError`, as, according to the reactive specifications, + * exceptions thrown from the Subscriber methods must be treated as if the Subscriber was already + * cancelled. */ + handleUndeliverableException(cause, context) + } else if (cause !== getCancellationException() || !subscriber.isDisposed) { + try { + /** If the subscriber is already in a terminal state, the error will be signalled to + * `RxJavaPlugins.onError`. */ + subscriber.onError(cause) + } catch (e: Exception) { + cause.addSuppressed(e) + handleUndeliverableException(cause, context) + } } } finally { mutex.unlock() @@ -192,13 +190,6 @@ private class RxObservableCoroutine( } } -internal fun Throwable.isFatal() = try { - Exceptions.throwIfFatal(this) // Rx-consistent behaviour without hardcode - false -} catch (e: Throwable) { - true -} - @Deprecated( message = "CoroutineScope.rxObservable is deprecated in favour of top-level rxObservable", level = DeprecationLevel.HIDDEN, diff --git a/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt b/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt index 49a9bb8dd6..c7ad606eb6 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt @@ -50,12 +50,13 @@ private class RxSingleCoroutine( override fun onCancelled(cause: Throwable, handled: Boolean) { try { - if (!subscriber.tryOnError(cause)) { - handleUndeliverableException(cause, context) + if (subscriber.tryOnError(cause)) { + return } } catch (e: Throwable) { - handleUndeliverableException(e, context) + cause.addSuppressed(e) } + handleUndeliverableException(cause, context) } } diff --git a/reactive/kotlinx-coroutines-rx2/test/FlowableExceptionHandlingTest.kt b/reactive/kotlinx-coroutines-rx2/test/FlowableExceptionHandlingTest.kt index 05b7ee92b6..f83a08a155 100644 --- a/reactive/kotlinx-coroutines-rx2/test/FlowableExceptionHandlingTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/FlowableExceptionHandlingTest.kt @@ -38,16 +38,16 @@ class FlowableExceptionHandlingTest : TestBase() { } @Test - fun testFatalException() = withExceptionHandler(handler(3)) { + fun testFatalException() = withExceptionHandler({ expectUnreached() }) { rxFlowable(Dispatchers.Unconfined) { expect(1) throw LinkageError() }.subscribe({ expectUnreached() }, { - expect(2) // Fatal exception is reported to both onError and CEH + expect(2) // Fatal exception are not treated as special }) - finish(4) + finish(3) } @Test @@ -66,7 +66,7 @@ class FlowableExceptionHandlingTest : TestBase() { } @Test - fun testFatalExceptionAsynchronous() = withExceptionHandler(handler(3)) { + fun testFatalExceptionAsynchronous() = withExceptionHandler({ expectUnreached() }) { rxFlowable(Dispatchers.Unconfined) { expect(1) throw LinkageError() @@ -77,19 +77,19 @@ class FlowableExceptionHandlingTest : TestBase() { }, { expect(2) }) - finish(4) + finish(3) } @Test - fun testFatalExceptionFromSubscribe() = withExceptionHandler(handler(4)) { + fun testFatalExceptionFromSubscribe() = withExceptionHandler(handler(3)) { rxFlowable(Dispatchers.Unconfined) { expect(1) send(Unit) }.subscribe({ expect(2) throw LinkageError() - }, { expect(3) }) // Fatal exception is reported to both onError and CEH - finish(5) + }, { expectUnreached() }) // Fatal exception is rethrown from `onNext` => the subscription is thought to be cancelled + finish(4) } @Test diff --git a/reactive/kotlinx-coroutines-rx2/test/IntegrationTest.kt b/reactive/kotlinx-coroutines-rx2/test/IntegrationTest.kt index 4bf3e4536e..8a6362ad99 100644 --- a/reactive/kotlinx-coroutines-rx2/test/IntegrationTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/IntegrationTest.kt @@ -7,6 +7,7 @@ package kotlinx.coroutines.rx2 import io.reactivex.* import kotlinx.coroutines.* import kotlinx.coroutines.flow.* +import kotlinx.coroutines.reactive.* import org.junit.Test import org.junit.runner.* import org.junit.runners.* @@ -124,6 +125,21 @@ class IntegrationTest( finish(3) } + @Test + fun testObservableWithTimeout() = runTest { + val observable = rxObservable { + expect(2) + withTimeout(1) { delay(100) } + } + try { + expect(1) + observable.awaitFirstOrNull() + } catch (e: CancellationException) { + expect(3) + } + finish(4) + } + private suspend fun checkNumbers(n: Int, observable: Observable) { var last = 0 observable.collect { diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableExceptionHandlingTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableExceptionHandlingTest.kt index d6cdd3ca24..67f68d53c4 100644 --- a/reactive/kotlinx-coroutines-rx2/test/ObservableExceptionHandlingTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/ObservableExceptionHandlingTest.kt @@ -18,7 +18,7 @@ class ObservableExceptionHandlingTest : TestBase() { } private inline fun handler(expect: Int) = { t: Throwable -> - assertTrue(t is UndeliverableException && t.cause is T) + assertTrue(t is UndeliverableException && t.cause is T, "$t") expect(expect) } @@ -38,8 +38,8 @@ class ObservableExceptionHandlingTest : TestBase() { } @Test - fun testFatalException() = withExceptionHandler(handler(3)) { - rxObservable(Dispatchers.Unconfined) { + fun testFatalException() = withExceptionHandler({ expectUnreached() }) { + rxObservable(Dispatchers.Unconfined + cehUnreached()) { expect(1) throw LinkageError() }.subscribe({ @@ -47,7 +47,7 @@ class ObservableExceptionHandlingTest : TestBase() { }, { expect(2) }) - finish(4) + finish(3) } @Test @@ -66,7 +66,7 @@ class ObservableExceptionHandlingTest : TestBase() { } @Test - fun testFatalExceptionAsynchronous() = withExceptionHandler(handler(3)) { + fun testFatalExceptionAsynchronous() = withExceptionHandler({ expectUnreached() }) { rxObservable(Dispatchers.Unconfined) { expect(1) throw LinkageError() @@ -75,21 +75,21 @@ class ObservableExceptionHandlingTest : TestBase() { .subscribe({ expectUnreached() }, { - expect(2) // Fatal exception is not reported in onError + expect(2) // Fatal exceptions are not treated in a special manner }) - finish(4) + finish(3) } @Test - fun testFatalExceptionFromSubscribe() = withExceptionHandler(handler(4)) { + fun testFatalExceptionFromSubscribe() = withExceptionHandler(handler(3)) { rxObservable(Dispatchers.Unconfined) { expect(1) send(Unit) }.subscribe({ expect(2) throw LinkageError() - }, { expect(3) }) // Unreached because fatal errors are rethrown - finish(5) + }, { expectUnreached() }) // Unreached because RxJava bubbles up fatal exceptions, causing `onNext` to throw. + finish(4) } @Test @@ -100,7 +100,7 @@ class ObservableExceptionHandlingTest : TestBase() { }.subscribe({ expect(2) throw TestException() - }, { expect(3) }) // not reported to onError because came from the subscribe itself + }, { expect(3) }) finish(4) } @@ -119,7 +119,7 @@ class ObservableExceptionHandlingTest : TestBase() { } @Test - fun testAsynchronousFatalExceptionFromSubscribe() = withExceptionHandler(handler(4)) { + fun testAsynchronousFatalExceptionFromSubscribe() = withExceptionHandler(handler(3)) { rxObservable(Dispatchers.Unconfined) { expect(1) send(Unit) @@ -128,7 +128,7 @@ class ObservableExceptionHandlingTest : TestBase() { .subscribe({ expect(2) throw LinkageError() - }, { expect(3) }) - finish(5) + }, { expectUnreached() }) // Unreached because RxJava bubbles up fatal exceptions, causing `onNext` to throw. + finish(4) } } diff --git a/reactive/kotlinx-coroutines-rx3/src/RxCompletable.kt b/reactive/kotlinx-coroutines-rx3/src/RxCompletable.kt index 88137675d8..47cc6ad3ab 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxCompletable.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxCompletable.kt @@ -50,11 +50,12 @@ private class RxCompletableCoroutine( override fun onCancelled(cause: Throwable, handled: Boolean) { try { - if (!subscriber.tryOnError(cause)) { - handleUndeliverableException(cause, context) + if (subscriber.tryOnError(cause)) { + return } } catch (e: Throwable) { - handleUndeliverableException(e, context) + cause.addSuppressed(e) } + handleUndeliverableException(cause, context) } } diff --git a/reactive/kotlinx-coroutines-rx3/src/RxMaybe.kt b/reactive/kotlinx-coroutines-rx3/src/RxMaybe.kt index 1c10266470..12d0197bf2 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxMaybe.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxMaybe.kt @@ -51,11 +51,12 @@ private class RxMaybeCoroutine( override fun onCancelled(cause: Throwable, handled: Boolean) { try { - if (!subscriber.tryOnError(cause)) { - handleUndeliverableException(cause, context) + if (subscriber.tryOnError(cause)) { + return } } catch (e: Throwable) { - handleUndeliverableException(e, context) + cause.addSuppressed(e) } + handleUndeliverableException(cause, context) } } diff --git a/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt b/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt index 55794f9adf..a77bf2d879 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt @@ -11,6 +11,7 @@ import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import kotlinx.coroutines.selects.* import kotlinx.coroutines.sync.* +import java.lang.RuntimeException import kotlin.coroutines.* /** @@ -54,13 +55,13 @@ private const val OPEN = 0 // open channel, still working private const val CLOSED = -1 // closed, but have not signalled onCompleted/onError yet private const val SIGNALLED = -2 // already signalled subscriber onCompleted/onError -private class RxObservableCoroutine( +private class RxObservableCoroutine( parentContext: CoroutineContext, private val subscriber: ObservableEmitter ) : AbstractCoroutine(parentContext, false, true), ProducerScope, SelectClause2> { override val channel: SendChannel get() = this - // Mutex is locked when while subscriber.onXXX is being invoked + // Mutex is locked while subscriber.onXXX is being invoked private val mutex = Mutex() private val _signal = atomic(OPEN) @@ -70,22 +71,19 @@ private class RxObservableCoroutine( override fun invokeOnClose(handler: (Throwable?) -> Unit) = throw UnsupportedOperationException("RxObservableCoroutine doesn't support invokeOnClose") - override fun trySend(element: T): ChannelResult { - if (!mutex.tryLock()) return ChannelResult.failure() - doLockedNext(element) - return ChannelResult.success(Unit) - } + override fun trySend(element: T): ChannelResult = + if (!mutex.tryLock()) { + ChannelResult.failure() + } else { + when (val throwable = doLockedNext(element)) { + null -> ChannelResult.success(Unit) + else -> ChannelResult.closed(throwable) + } + } public override suspend fun send(element: T) { - // fast-path -- try send without suspension - if (trySend(element).isSuccess) return - // slow-path does suspend - return sendSuspend(element) - } - - private suspend fun sendSuspend(element: T) { mutex.lock() - doLockedNext(element) + doLockedNext(element)?.let { throw it } } override val onSend: SelectClause2> @@ -93,30 +91,39 @@ private class RxObservableCoroutine( // registerSelectSend @Suppress("PARAMETER_NAME_CHANGED_ON_OVERRIDE") - override fun registerSelectClause2(select: SelectInstance, element: T, block: suspend (SendChannel) -> R) { + override fun registerSelectClause2( + select: SelectInstance, + element: T, + block: suspend (SendChannel) -> R + ) { mutex.onLock.registerSelectClause2(select, null) { - doLockedNext(element) + doLockedNext(element)?.let { throw it } block(this) } } // assert: mutex.isLocked() - private fun doLockedNext(elem: T) { + private fun doLockedNext(elem: T): Throwable? { // check if already closed for send if (!isActive) { doLockedSignalCompleted(completionCause, completionCauseHandled) - throw getCancellationException() + return getCancellationException() } // notify subscriber try { subscriber.onNext(elem) } catch (e: Throwable) { - // If onNext fails with exception, then we cancel coroutine (with this exception) and then rethrow it - // to abort the corresponding send/offer invocation. From the standpoint of coroutines machinery, - // this failure is essentially equivalent to a failure of a child coroutine. - cancelCoroutine(e) - mutex.unlock() - throw e + val cause = UndeliverableException(e) + val causeDelivered = close(cause) + unlockAndCheckCompleted() + return if (causeDelivered) { + // `cause` is the reason this channel is closed + cause + } else { + // Someone else closed the channel during `onNext`. We report `cause` as an undeliverable exception. + handleUndeliverableException(cause, context) + getCancellationException() + } } /* * There is no sense to check for `isActive` before doing `unlock`, because cancellation/completion might @@ -125,6 +132,7 @@ private class RxObservableCoroutine( * We have to recheck `isCompleted` after `unlock` anyway. */ unlockAndCheckCompleted() + return null } private fun unlockAndCheckCompleted() { @@ -138,33 +146,29 @@ private class RxObservableCoroutine( private fun doLockedSignalCompleted(cause: Throwable?, handled: Boolean) { // cancellation failures try { - if (_signal.value >= CLOSED) { - _signal.value = SIGNALLED // we'll signal onError/onCompleted (that the final state -- no CAS needed) + if (_signal.value == SIGNALLED) + return + _signal.value = SIGNALLED // we'll signal onError/onCompleted (that the final state -- no CAS needed) + if (cause == null) { try { - if (cause != null && cause !is CancellationException) { - /* - * Reactive frameworks have two types of exceptions: regular and fatal. - * Regular are passed to onError. - * Fatal can be passed to onError, but even the standard implementations **can just swallow it** (e.g. see #1297). - * Such behaviour is inconsistent, leads to silent failures and we can't possibly know whether - * the cause will be handled by onError (and moreover, it depends on whether a fatal exception was - * thrown by subscriber or upstream). - * To make behaviour consistent and least surprising, we always handle fatal exceptions - * by coroutines machinery, anyway, they should not be present in regular program flow, - * thus our goal here is just to expose it as soon as possible. - */ - subscriber.tryOnError(cause) - if (!handled && cause.isFatal()) { - handleUndeliverableException(cause, context) - } - } - else { - subscriber.onComplete() - } - } catch (e: Throwable) { - // Unhandled exception (cannot handle in other way, since we are already complete) + subscriber.onComplete() + } catch (e: Exception) { handleUndeliverableException(e, context) } + } else if (cause is UndeliverableException) { + /** Such exceptions are not reported to `onError`, as, according to the reactive specifications, + * exceptions thrown from the Subscriber methods must be treated as if the Subscriber was already + * cancelled. */ + handleUndeliverableException(cause, context) + } else if (cause !== getCancellationException() || !subscriber.isDisposed) { + try { + /** If the subscriber is already in a terminal state, the error will be signalled to + * `RxJavaPlugins.onError`. */ + subscriber.onError(cause) + } catch (e: Exception) { + cause.addSuppressed(e) + handleUndeliverableException(cause, context) + } } } finally { mutex.unlock() @@ -186,9 +190,3 @@ private class RxObservableCoroutine( } } -internal fun Throwable.isFatal() = try { - Exceptions.throwIfFatal(this) // Rx-consistent behaviour without hardcode - false -} catch (e: Throwable) { - true -} diff --git a/reactive/kotlinx-coroutines-rx3/src/RxSingle.kt b/reactive/kotlinx-coroutines-rx3/src/RxSingle.kt index fb6020eab3..e7678f0d10 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxSingle.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxSingle.kt @@ -50,11 +50,12 @@ private class RxSingleCoroutine( override fun onCancelled(cause: Throwable, handled: Boolean) { try { - if (!subscriber.tryOnError(cause)) { - handleUndeliverableException(cause, context) + if (subscriber.tryOnError(cause)) { + return } } catch (e: Throwable) { - handleUndeliverableException(e, context) + cause.addSuppressed(e) } + handleUndeliverableException(cause, context) } } diff --git a/reactive/kotlinx-coroutines-rx3/test/FlowableExceptionHandlingTest.kt b/reactive/kotlinx-coroutines-rx3/test/FlowableExceptionHandlingTest.kt index 8cbd7ee89f..c144bcbf37 100644 --- a/reactive/kotlinx-coroutines-rx3/test/FlowableExceptionHandlingTest.kt +++ b/reactive/kotlinx-coroutines-rx3/test/FlowableExceptionHandlingTest.kt @@ -38,16 +38,16 @@ class FlowableExceptionHandlingTest : TestBase() { } @Test - fun testFatalException() = withExceptionHandler(handler(3)) { + fun testFatalException() = withExceptionHandler({ expectUnreached() }) { rxFlowable(Dispatchers.Unconfined) { expect(1) throw LinkageError() }.subscribe({ expectUnreached() }, { - expect(2) // Fatal exception is reported to both onError and CEH + expect(2) // Fatal exception are not treated as special }) - finish(4) + finish(3) } @Test @@ -66,7 +66,7 @@ class FlowableExceptionHandlingTest : TestBase() { } @Test - fun testFatalExceptionAsynchronous() = withExceptionHandler(handler(3)) { + fun testFatalExceptionAsynchronous() = withExceptionHandler({ expectUnreached() }) { rxFlowable(Dispatchers.Unconfined) { expect(1) throw LinkageError() @@ -77,19 +77,19 @@ class FlowableExceptionHandlingTest : TestBase() { }, { expect(2) }) - finish(4) + finish(3) } @Test - fun testFatalExceptionFromSubscribe() = withExceptionHandler(handler(4)) { + fun testFatalExceptionFromSubscribe() = withExceptionHandler(handler(3)) { rxFlowable(Dispatchers.Unconfined) { expect(1) send(Unit) }.subscribe({ expect(2) throw LinkageError() - }, { expect(3) }) // Fatal exception is reported to both onError and CEH - finish(5) + }, { expectUnreached() }) // Fatal exception is rethrown from `onNext` => the subscription is thought to be cancelled + finish(4) } @Test diff --git a/reactive/kotlinx-coroutines-rx3/test/IntegrationTest.kt b/reactive/kotlinx-coroutines-rx3/test/IntegrationTest.kt index 395672cee9..1302124f50 100644 --- a/reactive/kotlinx-coroutines-rx3/test/IntegrationTest.kt +++ b/reactive/kotlinx-coroutines-rx3/test/IntegrationTest.kt @@ -125,6 +125,21 @@ class IntegrationTest( finish(3) } + @Test + fun testObservableWithTimeout() = runTest { + val observable = rxObservable { + expect(2) + withTimeout(1) { delay(100) } + } + try { + expect(1) + observable.awaitFirstOrNull() + } catch (e: CancellationException) { + expect(3) + } + finish(4) + } + private suspend fun checkNumbers(n: Int, observable: Observable) { var last = 0 observable.collect { diff --git a/reactive/kotlinx-coroutines-rx3/test/ObservableExceptionHandlingTest.kt b/reactive/kotlinx-coroutines-rx3/test/ObservableExceptionHandlingTest.kt index 1183b2ae21..6a17a5edbb 100644 --- a/reactive/kotlinx-coroutines-rx3/test/ObservableExceptionHandlingTest.kt +++ b/reactive/kotlinx-coroutines-rx3/test/ObservableExceptionHandlingTest.kt @@ -18,7 +18,7 @@ class ObservableExceptionHandlingTest : TestBase() { } private inline fun handler(expect: Int) = { t: Throwable -> - assertTrue(t is UndeliverableException && t.cause is T) + assertTrue(t is UndeliverableException && t.cause is T, "$t") expect(expect) } @@ -38,8 +38,8 @@ class ObservableExceptionHandlingTest : TestBase() { } @Test - fun testFatalException() = withExceptionHandler(handler(3)) { - rxObservable(Dispatchers.Unconfined) { + fun testFatalException() = withExceptionHandler({ expectUnreached() }) { + rxObservable(Dispatchers.Unconfined + cehUnreached()) { expect(1) throw LinkageError() }.subscribe({ @@ -47,7 +47,7 @@ class ObservableExceptionHandlingTest : TestBase() { }, { expect(2) }) - finish(4) + finish(3) } @Test @@ -66,7 +66,7 @@ class ObservableExceptionHandlingTest : TestBase() { } @Test - fun testFatalExceptionAsynchronous() = withExceptionHandler(handler(3)) { + fun testFatalExceptionAsynchronous() = withExceptionHandler({ expectUnreached() }) { rxObservable(Dispatchers.Unconfined) { expect(1) throw LinkageError() @@ -75,21 +75,21 @@ class ObservableExceptionHandlingTest : TestBase() { .subscribe({ expectUnreached() }, { - expect(2) // Fatal exception is not reported in onError + expect(2) // Fatal exceptions are not treated in a special manner }) - finish(4) + finish(3) } @Test - fun testFatalExceptionFromSubscribe() = withExceptionHandler(handler(4)) { + fun testFatalExceptionFromSubscribe() = withExceptionHandler(handler(3)) { rxObservable(Dispatchers.Unconfined) { expect(1) send(Unit) }.subscribe({ expect(2) throw LinkageError() - }, { expect(3) }) // Unreached because fatal errors are rethrown - finish(5) + }, { expectUnreached() }) // Unreached because RxJava bubbles up fatal exceptions, causing `onNext` to throw. + finish(4) } @Test @@ -100,7 +100,7 @@ class ObservableExceptionHandlingTest : TestBase() { }.subscribe({ expect(2) throw TestException() - }, { expect(3) }) // not reported to onError because came from the subscribe itself + }, { expect(3) }) finish(4) } @@ -119,7 +119,7 @@ class ObservableExceptionHandlingTest : TestBase() { } @Test - fun testAsynchronousFatalExceptionFromSubscribe() = withExceptionHandler(handler(4)) { + fun testAsynchronousFatalExceptionFromSubscribe() = withExceptionHandler(handler(3)) { rxObservable(Dispatchers.Unconfined) { expect(1) send(Unit) @@ -128,7 +128,7 @@ class ObservableExceptionHandlingTest : TestBase() { .subscribe({ expect(2) throw LinkageError() - }, { expect(3) }) - finish(5) + }, { expectUnreached() }) // Unreached because RxJava bubbles up fatal exceptions, causing `onNext` to throw. + finish(4) } } From 884c51b3665abf35eb6f3a598e880dbff5a3b91d Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Wed, 14 Apr 2021 12:07:36 +0300 Subject: [PATCH 2/7] Fixes --- reactive/kotlinx-coroutines-reactive/src/Publish.kt | 3 +-- reactive/kotlinx-coroutines-rx2/src/RxCancellable.kt | 1 + reactive/kotlinx-coroutines-rx3/src/RxCancellable.kt | 1 + 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/reactive/kotlinx-coroutines-reactive/src/Publish.kt b/reactive/kotlinx-coroutines-reactive/src/Publish.kt index a5bc980a94..df61e04943 100644 --- a/reactive/kotlinx-coroutines-reactive/src/Publish.kt +++ b/reactive/kotlinx-coroutines-reactive/src/Publish.kt @@ -80,7 +80,6 @@ public class PublisherCoroutine( override fun invokeOnClose(handler: (Throwable?) -> Unit): Nothing = throw UnsupportedOperationException("PublisherCoroutine doesn't support invokeOnClose") - // TODO: will throw if `null` is passed -- is throwing this kind of programmer-induced errors okay? override fun trySend(element: T): ChannelResult = if (!mutex.tryLock()) { ChannelResult.failure() @@ -134,7 +133,7 @@ public class PublisherCoroutine( */ private fun doLockedNext(elem: T): Throwable? { if (elem == null) { - throw NullPointerException("Can not emit null") + throw NullPointerException("Attempted to emit `null` inside a reactive publisher") } /** This guards against the case when the caller of this function managed to lock the mutex not because some * elements were requested--and thus it is permitted to call `onNext`--but because the channel was closed. diff --git a/reactive/kotlinx-coroutines-rx2/src/RxCancellable.kt b/reactive/kotlinx-coroutines-rx2/src/RxCancellable.kt index 0fe43f1cee..3e39033ed7 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxCancellable.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxCancellable.kt @@ -20,6 +20,7 @@ internal fun handleUndeliverableException(cause: Throwable, context: CoroutineCo try { RxJavaPlugins.onError(cause) } catch (e: Throwable) { + cause.addSuppressed(e) handleCoroutineException(context, cause) } } diff --git a/reactive/kotlinx-coroutines-rx3/src/RxCancellable.kt b/reactive/kotlinx-coroutines-rx3/src/RxCancellable.kt index 2995159850..1017b112f1 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxCancellable.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxCancellable.kt @@ -20,6 +20,7 @@ internal fun handleUndeliverableException(cause: Throwable, context: CoroutineCo try { RxJavaPlugins.onError(cause) } catch (e: Throwable) { + cause.addSuppressed(e) handleCoroutineException(context, cause) } } From 8f9c70df0775554f67dd9bb78e421a2a5d5b0695 Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Fri, 16 Apr 2021 16:52:12 +0300 Subject: [PATCH 3/7] Fixes --- .../test/FlowAsPublisherTest.kt | 22 +++++++++++-- .../src/ReactiveFlow.kt | 32 +++++++++++++------ .../test/FlowAsPublisherTest.kt | 24 +++++++++++--- .../test/PublisherRequestStressTest.kt | 8 +++-- .../src/RxObservable.kt | 2 +- .../src/RxObservable.kt | 3 +- 6 files changed, 69 insertions(+), 22 deletions(-) diff --git a/reactive/kotlinx-coroutines-jdk9/test/FlowAsPublisherTest.kt b/reactive/kotlinx-coroutines-jdk9/test/FlowAsPublisherTest.kt index 488695dea2..bb38d1789d 100644 --- a/reactive/kotlinx-coroutines-jdk9/test/FlowAsPublisherTest.kt +++ b/reactive/kotlinx-coroutines-jdk9/test/FlowAsPublisherTest.kt @@ -7,6 +7,7 @@ package kotlinx.coroutines.jdk9 import kotlinx.coroutines.* import kotlinx.coroutines.flow.* import org.junit.Test +import java.util.concurrent.CancellationException import java.util.concurrent.Flow as JFlow import kotlin.test.* @@ -15,7 +16,7 @@ class FlowAsPublisherTest : TestBase() { @Test fun testErrorOnCancellationIsReported() { expect(1) - flow { + flow { try { emit(2) } finally { @@ -50,13 +51,13 @@ class FlowAsPublisherTest : TestBase() { @Test fun testCancellationIsNotReported() { expect(1) - flow { + flow { emit(2) }.asPublisher().subscribe(object : JFlow.Subscriber { private lateinit var subscription: JFlow.Subscription override fun onComplete() { - expect(3) + expectUnreached() // we stop signalling after cancellation } override fun onSubscribe(s: JFlow.Subscription?) { @@ -73,6 +74,21 @@ class FlowAsPublisherTest : TestBase() { expectUnreached() } }) + finish(3) + } + + @Test + fun testFlowWithTimeout() = runTest { + val publisher = flow { + expect(2) + withTimeout(1) { delay(Long.MAX_VALUE) } + }.asPublisher() + try { + expect(1) + publisher.awaitFirstOrNull() + } catch (e: CancellationException) { + expect(3) + } finish(4) } } diff --git a/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt b/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt index f0245388bf..965790017d 100644 --- a/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt +++ b/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt @@ -195,6 +195,8 @@ public class FlowSubscription( */ private val requested = atomic(0L) private val producer = atomic?>(createInitialContinuation()) + @Volatile + private var cancellationRequested = false // This code wraps startCoroutineCancellable into continuation private fun createInitialContinuation(): Continuation = Continuation(coroutineContext) { @@ -202,18 +204,29 @@ public class FlowSubscription( } private suspend fun flowProcessing() { - try { + val consumeSucceeded = try { consumeFlow() - subscriber.onComplete() - } catch (e: Throwable) { - try { - if (e is CancellationException) { - subscriber.onComplete() - } else { - subscriber.onError(e) + true + } catch (cause: Throwable) { + if (cancellationRequested && cause === getCancellationException()) { + return + } else { + // TODO: this branch gets entered even when `cause` looks identical to `getCancellationException()`. + // Is stack sanitization to blame? + try { + subscriber.onError(cause) + } catch (e: Throwable) { + // Last ditch report + cause.addSuppressed(e) + handleCoroutineException(coroutineContext, cause) } + } + false + } + if (consumeSucceeded) { + try { + subscriber.onComplete() } catch (e: Throwable) { - // Last ditch report handleCoroutineException(coroutineContext, e) } } @@ -239,6 +252,7 @@ public class FlowSubscription( } override fun cancel() { + cancellationRequested = true cancel(null) } diff --git a/reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt b/reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt index e7b8cb17ae..0b0080c671 100644 --- a/reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt @@ -9,13 +9,14 @@ import kotlinx.coroutines.flow.* import org.junit.Test import org.reactivestreams.* import java.util.concurrent.* +import java.util.concurrent.CancellationException import kotlin.test.* class FlowAsPublisherTest : TestBase() { @Test fun testErrorOnCancellationIsReported() { expect(1) - flow { + flow { try { emit(2) } finally { @@ -50,13 +51,13 @@ class FlowAsPublisherTest : TestBase() { @Test fun testCancellationIsNotReported() { expect(1) - flow { + flow { emit(2) }.asPublisher().subscribe(object : Subscriber { private lateinit var subscription: Subscription override fun onComplete() { - expect(3) + expectUnreached() // we stop signalling after cancellation } override fun onSubscribe(s: Subscription?) { @@ -73,7 +74,7 @@ class FlowAsPublisherTest : TestBase() { expectUnreached() } }) - finish(4) + finish(3) } @Test @@ -149,4 +150,19 @@ class FlowAsPublisherTest : TestBase() { } finish(5) } + + @Test + fun testFlowWithTimeout() = runTest { + val publisher = flow { + expect(2) + withTimeout(1) { delay(Long.MAX_VALUE) } + }.asPublisher() + try { + expect(1) + publisher.awaitFirstOrNull() + } catch (e: CancellationException) { + expect(3) + } + finish(4) + } } diff --git a/reactive/kotlinx-coroutines-reactive/test/PublisherRequestStressTest.kt b/reactive/kotlinx-coroutines-reactive/test/PublisherRequestStressTest.kt index 736a66404f..8bb75e887c 100644 --- a/reactive/kotlinx-coroutines-reactive/test/PublisherRequestStressTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/PublisherRequestStressTest.kt @@ -29,13 +29,14 @@ import kotlin.random.* */ @Suppress("ReactiveStreamsSubscriberImplementation") class PublisherRequestStressTest : TestBase() { + private val testDurationSec = 3 * stressTestMultiplier // Original code in Amazon SDK uses 4 and 16 as low/high watermarks. - // There constants were chosen so that problem reproduces asap with particular this code. + // These constants were chosen so that problem reproduces asap with particular this code. private val minDemand = 8L private val maxDemand = 16L - + private val nEmitThreads = 4 private val emitThreadNo = AtomicInteger() @@ -47,7 +48,7 @@ class PublisherRequestStressTest : TestBase() { private val reqPool = Executors.newSingleThreadExecutor { r -> Thread(r, "PublisherRequestStressTest-req") } - + private val nextValue = AtomicLong(0) @After @@ -61,6 +62,7 @@ class PublisherRequestStressTest : TestBase() { private lateinit var subscription: Subscription @Test + @Ignore // for now, given that it fails for strange reasons fun testRequestStress() { val expectedValue = AtomicLong(0) val requestedTill = AtomicLong(0) diff --git a/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt b/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt index 0f76faa514..7d4224e250 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt @@ -155,7 +155,7 @@ private class RxObservableCoroutine( } catch (e: Exception) { handleUndeliverableException(e, context) } - } else if (cause is UndeliverableException) { + } else if (cause is UndeliverableException && !handled) { /** Such exceptions are not reported to `onError`, as, according to the reactive specifications, * exceptions thrown from the Subscriber methods must be treated as if the Subscriber was already * cancelled. */ diff --git a/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt b/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt index a77bf2d879..b8915b9c90 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt @@ -11,7 +11,6 @@ import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import kotlinx.coroutines.selects.* import kotlinx.coroutines.sync.* -import java.lang.RuntimeException import kotlin.coroutines.* /** @@ -155,7 +154,7 @@ private class RxObservableCoroutine( } catch (e: Exception) { handleUndeliverableException(e, context) } - } else if (cause is UndeliverableException) { + } else if (cause is UndeliverableException && !handled) { /** Such exceptions are not reported to `onError`, as, according to the reactive specifications, * exceptions thrown from the Subscriber methods must be treated as if the Subscriber was already * cancelled. */ From e06faa9a161bebe7156787ebcd331d0af24460f6 Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Fri, 16 Apr 2021 17:36:32 +0300 Subject: [PATCH 4/7] Kludges --- .../kotlinx-coroutines-jdk9/test/FlowAsPublisherTest.kt | 4 ++-- reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt | 6 ++---- .../kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt | 4 ++-- .../test/PublisherRequestStressTest.kt | 1 - 4 files changed, 6 insertions(+), 9 deletions(-) diff --git a/reactive/kotlinx-coroutines-jdk9/test/FlowAsPublisherTest.kt b/reactive/kotlinx-coroutines-jdk9/test/FlowAsPublisherTest.kt index bb38d1789d..7889d5071d 100644 --- a/reactive/kotlinx-coroutines-jdk9/test/FlowAsPublisherTest.kt +++ b/reactive/kotlinx-coroutines-jdk9/test/FlowAsPublisherTest.kt @@ -57,7 +57,7 @@ class FlowAsPublisherTest : TestBase() { private lateinit var subscription: JFlow.Subscription override fun onComplete() { - expectUnreached() // we stop signalling after cancellation + expect(3) } override fun onSubscribe(s: JFlow.Subscription?) { @@ -74,7 +74,7 @@ class FlowAsPublisherTest : TestBase() { expectUnreached() } }) - finish(3) + finish(4) } @Test diff --git a/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt b/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt index 965790017d..b8c043bc33 100644 --- a/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt +++ b/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt @@ -208,11 +208,9 @@ public class FlowSubscription( consumeFlow() true } catch (cause: Throwable) { - if (cancellationRequested && cause === getCancellationException()) { - return + if (cancellationRequested && !isActive && (cause === getCancellationException() || cause.cause === getCancellationException() && cause.message == getCancellationException().message)) { + subscriber.onComplete() } else { - // TODO: this branch gets entered even when `cause` looks identical to `getCancellationException()`. - // Is stack sanitization to blame? try { subscriber.onError(cause) } catch (e: Throwable) { diff --git a/reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt b/reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt index 0b0080c671..80fa39bdbc 100644 --- a/reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt @@ -57,7 +57,7 @@ class FlowAsPublisherTest : TestBase() { private lateinit var subscription: Subscription override fun onComplete() { - expectUnreached() // we stop signalling after cancellation + expect(3) } override fun onSubscribe(s: Subscription?) { @@ -74,7 +74,7 @@ class FlowAsPublisherTest : TestBase() { expectUnreached() } }) - finish(3) + finish(4) } @Test diff --git a/reactive/kotlinx-coroutines-reactive/test/PublisherRequestStressTest.kt b/reactive/kotlinx-coroutines-reactive/test/PublisherRequestStressTest.kt index 8bb75e887c..f4050180e9 100644 --- a/reactive/kotlinx-coroutines-reactive/test/PublisherRequestStressTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/PublisherRequestStressTest.kt @@ -62,7 +62,6 @@ class PublisherRequestStressTest : TestBase() { private lateinit var subscription: Subscription @Test - @Ignore // for now, given that it fails for strange reasons fun testRequestStress() { val expectedValue = AtomicLong(0) val requestedTill = AtomicLong(0) From 89fd2c179efcd5a80d5dd2f12ada5a75734e9326 Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Mon, 19 Apr 2021 15:36:46 +0300 Subject: [PATCH 5/7] Add some tests and fixes * Fixed `doLockedNext` not releasing the lock in `PublisherCoroutine` if `null` is emitted * Fixed `flux`, `publish`, `rxObservable` and `rxFlowable` incorrectly reporting `isClosedForSend == true` just after closing the channel --- .../test/IntegrationTest.kt | 27 ++--- .../test/PublishTest.kt | 106 +++++++++++++++++- .../src/Publish.kt | 5 +- .../src/ReactiveFlow.kt | 4 + .../test/IntegrationTest.kt | 22 ++-- .../test/PublishTest.kt | 105 ++++++++++++++++- .../test/FluxTest.kt | 39 ++++++- .../src/RxObservable.kt | 2 +- .../test/ObservableExceptionHandlingTest.kt | 13 ++- .../src/RxObservable.kt | 2 +- .../test/ObservableExceptionHandlingTest.kt | 13 ++- 11 files changed, 298 insertions(+), 40 deletions(-) diff --git a/reactive/kotlinx-coroutines-jdk9/test/IntegrationTest.kt b/reactive/kotlinx-coroutines-jdk9/test/IntegrationTest.kt index f6dfb591dc..5593547963 100644 --- a/reactive/kotlinx-coroutines-jdk9/test/IntegrationTest.kt +++ b/reactive/kotlinx-coroutines-jdk9/test/IntegrationTest.kt @@ -5,6 +5,7 @@ package kotlinx.coroutines.jdk9 import kotlinx.coroutines.* +import kotlinx.coroutines.exceptions.* import org.junit.Test import kotlinx.coroutines.flow.flowOn import org.junit.runner.* @@ -132,21 +133,13 @@ class IntegrationTest( } internal suspend inline fun assertCallsExceptionHandlerWith( - crossinline operation: suspend (CoroutineExceptionHandler) -> Unit): E -{ - val caughtExceptions = mutableListOf() - val exceptionHandler = object: AbstractCoroutineContextElement(CoroutineExceptionHandler), - CoroutineExceptionHandler - { - override fun handleException(context: CoroutineContext, exception: Throwable) { - caughtExceptions += exception - } - } - return withContext(exceptionHandler) { - operation(exceptionHandler) - caughtExceptions.single().let { - assertTrue(it is E, it.toString()) - it + crossinline operation: suspend (CoroutineExceptionHandler) -> Unit): E = + CapturingHandler().let { handler -> + withContext(handler) { + operation(handler) + handler.getException().let { + assertTrue(it is E, it.toString()) + it + } } - } -} + } \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-jdk9/test/PublishTest.kt b/reactive/kotlinx-coroutines-jdk9/test/PublishTest.kt index 04a0f1cb8a..90babf79b2 100644 --- a/reactive/kotlinx-coroutines-jdk9/test/PublishTest.kt +++ b/reactive/kotlinx-coroutines-jdk9/test/PublishTest.kt @@ -5,7 +5,11 @@ package kotlinx.coroutines.jdk9 import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* import org.junit.Test +import java.lang.NullPointerException +import java.util.concurrent.* +import java.util.concurrent.CancellationException import java.util.concurrent.Flow as JFlow import kotlin.test.* @@ -121,6 +125,25 @@ class PublishTest : TestBase() { finish(7) } + /** Tests that, as soon as `ProducerScope.close` is called, `isClosedForSend` starts returning `true`. */ + @Test + fun testChannelClosing() = runTest { + expect(1) + val publisher = flowPublish(Dispatchers.Unconfined) { + expect(3) + close() + assert(isClosedForSend) + expect(4) + } + try { + expect(2) + publisher.awaitFirstOrNull() + } catch (e: CancellationException) { + expect(5) + } + finish(6) + } + @Test fun testOnNextError() = runTest { val latch = CompletableDeferred() @@ -130,9 +153,10 @@ class PublishTest : TestBase() { expect(4) try { send("OK") - } catch(e: Throwable) { + } catch (e: Throwable) { expect(6) assert(e is TestException) + assert(isClosedForSend) latch.complete(Unit) } } @@ -162,6 +186,51 @@ class PublishTest : TestBase() { finish(7) } + /** Tests the behavior when a call to `onNext` fails after the channel is already closed. */ + @Test + fun testOnNextErrorAfterCancellation() = runTest { + assertCallsExceptionHandlerWith { handler -> + var producerScope: ProducerScope? = null + CompletableDeferred() + expect(1) + var job: Job? = null + val publisher = flowPublish(handler + Dispatchers.Unconfined) { + producerScope = this + expect(4) + job = launch { + delay(Long.MAX_VALUE) + } + } + expect(2) + publisher.subscribe(object: JFlow.Subscriber { + override fun onSubscribe(s: JFlow.Subscription) { + expect(3) + s.request(Long.MAX_VALUE) + } + override fun onNext(t: Int) { + expect(6) + assertEquals(1, t) + job!!.cancel() + throw TestException() + } + override fun onError(t: Throwable?) { + /* Correct changes to the implementation could lead to us entering or not entering this method, but + it only matters that if we do, it is the "correct" exception that was validly used to cancel the + coroutine that gets passed here and not `TestException`. */ + assertTrue(t is CancellationException) + } + override fun onComplete() { expectUnreached() } + }) + expect(5) + val result: ChannelResult = producerScope!!.trySend(1) + val e = result.exceptionOrNull()!! + assertTrue(e is CancellationException, "The actual error: $e") + assertTrue(producerScope!!.isClosedForSend) + assertTrue(result.isFailure) + } + finish(7) + } + @Test fun testFailingConsumer() = runTest { val pub = flowPublish(currentDispatcher()) { @@ -183,4 +252,39 @@ class PublishTest : TestBase() { fun testIllegalArgumentException() { assertFailsWith { flowPublish(Job()) { } } } + + /** Tests that `trySend` doesn't throw in `flowPublish`. */ + @Test + fun testTrySendNotThrowing() = runTest { + var producerScope: ProducerScope? = null + expect(1) + val publisher = flowPublish(Dispatchers.Unconfined) { + producerScope = this + expect(3) + delay(Long.MAX_VALUE) + } + val job = launch(start = CoroutineStart.UNDISPATCHED) { + expect(2) + publisher.awaitFirstOrNull() + expectUnreached() + } + job.cancel() + expect(4) + val result = producerScope!!.trySend(1) + assertTrue(result.isFailure) + finish(5) + } + + /** Tests that all methods on `flowPublish` fail without closing the channel when attempting to emit `null`. */ + @Test + fun testEmittingNull() = runTest { + val publisher = flowPublish { + assertFailsWith { send(null) } + assertFailsWith { trySend(null) } + @Suppress("DEPRECATION") + assertFailsWith { offer(null) } + send("OK") + } + assertEquals("OK", publisher.awaitFirstOrNull()) + } } \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-reactive/src/Publish.kt b/reactive/kotlinx-coroutines-reactive/src/Publish.kt index df61e04943..37113849ab 100644 --- a/reactive/kotlinx-coroutines-reactive/src/Publish.kt +++ b/reactive/kotlinx-coroutines-reactive/src/Publish.kt @@ -75,7 +75,7 @@ public class PublisherCoroutine( @Volatile private var cancelled = false // true after Subscription.cancel() is invoked - override val isClosedForSend: Boolean get() = isCompleted + override val isClosedForSend: Boolean get() = !isActive override fun close(cause: Throwable?): Boolean = cancelCoroutine(cause) override fun invokeOnClose(handler: (Throwable?) -> Unit): Nothing = throw UnsupportedOperationException("PublisherCoroutine doesn't support invokeOnClose") @@ -133,6 +133,7 @@ public class PublisherCoroutine( */ private fun doLockedNext(elem: T): Throwable? { if (elem == null) { + unlockAndCheckCompleted() throw NullPointerException("Attempted to emit `null` inside a reactive publisher") } /** This guards against the case when the caller of this function managed to lock the mutex not because some @@ -212,7 +213,7 @@ public class PublisherCoroutine( _nRequested.value = SIGNALLED // we'll signal onError/onCompleted (the final state, so no CAS needed) // Specification requires that after the cancellation is requested we eventually stop calling onXXX if (cancelled) { - // If the parent had failed to handle our exception, then we must not lose this exception + // If the parent failed to handle this exception, then we must not lose the exception if (cause != null && !handled) exceptionOnCancelHandler(cause, context) return } diff --git a/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt b/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt index b8c043bc33..7a68123918 100644 --- a/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt +++ b/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt @@ -208,7 +208,11 @@ public class FlowSubscription( consumeFlow() true } catch (cause: Throwable) { + /* TODO: The part after "||" is a hack needed due to [cause] having travelled over a coroutine boundary and + being changed from the result of [getCancellationException()]. */ if (cancellationRequested && !isActive && (cause === getCancellationException() || cause.cause === getCancellationException() && cause.message == getCancellationException().message)) { + /* TODO: This is incorrect, as [Subscriber.onComplete] denotes the end of the stream and not just any + non-erroneous terminal state. */ subscriber.onComplete() } else { try { diff --git a/reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt b/reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt index 920de57c02..efe7ec7e45 100644 --- a/reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt @@ -5,12 +5,14 @@ package kotlinx.coroutines.reactive import kotlinx.coroutines.* +import kotlinx.coroutines.exceptions.* import org.junit.Test import org.junit.runner.* import org.junit.runners.* import org.reactivestreams.* import java.lang.IllegalStateException import java.lang.RuntimeException +import kotlin.contracts.* import kotlin.coroutines.* import kotlin.test.* @@ -235,20 +237,16 @@ class IntegrationTest( } +@OptIn(ExperimentalContracts::class) internal suspend inline fun assertCallsExceptionHandlerWith( - crossinline operation: suspend (CoroutineExceptionHandler) -> Unit): E -{ - val caughtExceptions = mutableListOf() - val exceptionHandler = object: AbstractCoroutineContextElement(CoroutineExceptionHandler), - CoroutineExceptionHandler - { - override fun handleException(context: CoroutineContext, exception: Throwable) { - caughtExceptions += exception - } + crossinline operation: suspend (CoroutineExceptionHandler) -> Unit): E { + contract { + callsInPlace(operation, InvocationKind.EXACTLY_ONCE) } - return withContext(exceptionHandler) { - operation(exceptionHandler) - caughtExceptions.single().let { + val handler = CapturingHandler() + return withContext(handler) { + operation(handler) + handler.getException().let { assertTrue(it is E, it.toString()) it } diff --git a/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt b/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt index 8510ee28e2..17719abdd1 100644 --- a/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt @@ -5,8 +5,11 @@ package kotlinx.coroutines.reactive import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* import org.junit.Test import org.reactivestreams.* +import java.lang.NullPointerException +import kotlin.coroutines.cancellation.CancellationException import kotlin.test.* class PublishTest : TestBase() { @@ -121,6 +124,25 @@ class PublishTest : TestBase() { finish(7) } + /** Tests that, as soon as `ProducerScope.close` is called, `isClosedForSend` starts returning `true`. */ + @Test + fun testChannelClosing() = runTest { + expect(1) + val publisher = publish(Dispatchers.Unconfined) { + expect(3) + close() + assert(isClosedForSend) + expect(4) + } + try { + expect(2) + publisher.awaitFirstOrNull() + } catch (e: CancellationException) { + expect(5) + } + finish(6) + } + @Test fun testOnNextError() = runTest { val latch = CompletableDeferred() @@ -130,9 +152,10 @@ class PublishTest : TestBase() { expect(4) try { send("OK") - } catch(e: Throwable) { + } catch (e: Throwable) { expect(6) assert(e is TestException) + assert(isClosedForSend) latch.complete(Unit) } } @@ -162,6 +185,51 @@ class PublishTest : TestBase() { finish(7) } + /** Tests the behavior when a call to `onNext` fails after the channel is already closed. */ + @Test + fun testOnNextErrorAfterCancellation() = runTest { + assertCallsExceptionHandlerWith { handler -> + var producerScope: ProducerScope? = null + CompletableDeferred() + expect(1) + var job: Job? = null + val publisher = publish(handler + Dispatchers.Unconfined) { + producerScope = this + expect(4) + job = launch { + delay(Long.MAX_VALUE) + } + } + expect(2) + publisher.subscribe(object: Subscriber { + override fun onSubscribe(s: Subscription) { + expect(3) + s.request(Long.MAX_VALUE) + } + override fun onNext(t: Int) { + expect(6) + assertEquals(1, t) + job!!.cancel() + throw TestException() + } + override fun onError(t: Throwable?) { + /* Correct changes to the implementation could lead to us entering or not entering this method, but + it only matters that if we do, it is the "correct" exception that was validly used to cancel the + coroutine that gets passed here and not `TestException`. */ + assertTrue(t is CancellationException) + } + override fun onComplete() { expectUnreached() } + }) + expect(5) + val result: ChannelResult = producerScope!!.trySend(1) + val e = result.exceptionOrNull()!! + assertTrue(e is CancellationException, "The actual error: $e") + assertTrue(producerScope!!.isClosedForSend) + assertTrue(result.isFailure) + } + finish(7) + } + @Test fun testFailingConsumer() = runTest { val pub = publish(currentDispatcher()) { @@ -183,4 +251,39 @@ class PublishTest : TestBase() { fun testIllegalArgumentException() { assertFailsWith { publish(Job()) { } } } + + /** Tests that `trySend` doesn't throw in `publish`. */ + @Test + fun testTrySendNotThrowing() = runTest { + var producerScope: ProducerScope? = null + expect(1) + val publisher = publish(Dispatchers.Unconfined) { + producerScope = this + expect(3) + delay(Long.MAX_VALUE) + } + val job = launch(start = CoroutineStart.UNDISPATCHED) { + expect(2) + publisher.awaitFirstOrNull() + expectUnreached() + } + job.cancel() + expect(4) + val result = producerScope!!.trySend(1) + assertTrue(result.isFailure) + finish(5) + } + + /** Tests that all methods on `publish` fail without closing the channel when attempting to emit `null`. */ + @Test + fun testEmittingNull() = runTest { + val publisher = publish { + assertFailsWith { send(null) } + assertFailsWith { trySend(null) } + @Suppress("DEPRECATION") + assertFailsWith { offer(null) } + send("OK") + } + assertEquals("OK", publisher.awaitFirstOrNull()) + } } \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-reactor/test/FluxTest.kt b/reactive/kotlinx-coroutines-reactor/test/FluxTest.kt index 31f5f5d979..4bee0247cf 100644 --- a/reactive/kotlinx-coroutines-reactor/test/FluxTest.kt +++ b/reactive/kotlinx-coroutines-reactor/test/FluxTest.kt @@ -5,10 +5,12 @@ package kotlinx.coroutines.reactor import kotlinx.coroutines.* +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.channels.* import kotlinx.coroutines.flow.* import kotlinx.coroutines.reactive.* -import org.junit.* import org.junit.Test +import java.lang.NullPointerException import kotlin.test.* class FluxTest : TestBase() { @@ -141,4 +143,39 @@ class FluxTest : TestBase() { .collect { } } } + + /** Tests that `trySend` doesn't throw in `flux`. */ + @Test + fun testTrySendNotThrowing() = runTest { + var producerScope: ProducerScope? = null + expect(1) + val flux = flux(Dispatchers.Unconfined) { + producerScope = this + expect(3) + delay(Long.MAX_VALUE) + } + val job = launch(start = CoroutineStart.UNDISPATCHED) { + expect(2) + flux.awaitFirstOrNull() + expectUnreached() + } + job.cancel() + expect(4) + val result = producerScope!!.trySend(1) + assertTrue(result.isFailure) + finish(5) + } + + /** Tests that all methods on `flux` fail without closing the channel when attempting to emit `null`. */ + @Test + fun testEmittingNull() = runTest { + val flux = flux { + assertFailsWith { send(null) } + assertFailsWith { trySend(null) } + @Suppress("DEPRECATION") + assertFailsWith { offer(null) } + send("OK") + } + assertEquals("OK", flux.awaitFirstOrNull()) + } } \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt b/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt index 7d4224e250..1aecdf0c24 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt @@ -66,7 +66,7 @@ private class RxObservableCoroutine( private val _signal = atomic(OPEN) - override val isClosedForSend: Boolean get() = isCompleted + override val isClosedForSend: Boolean get() = !isActive override fun close(cause: Throwable?): Boolean = cancelCoroutine(cause) override fun invokeOnClose(handler: (Throwable?) -> Unit) = throw UnsupportedOperationException("RxObservableCoroutine doesn't support invokeOnClose") diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableExceptionHandlingTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableExceptionHandlingTest.kt index 67f68d53c4..fb3d0f69fc 100644 --- a/reactive/kotlinx-coroutines-rx2/test/ObservableExceptionHandlingTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/ObservableExceptionHandlingTest.kt @@ -8,6 +8,7 @@ import io.reactivex.exceptions.* import kotlinx.coroutines.* import org.junit.* import org.junit.Test +import java.util.concurrent.* import kotlin.test.* class ObservableExceptionHandlingTest : TestBase() { @@ -82,14 +83,22 @@ class ObservableExceptionHandlingTest : TestBase() { @Test fun testFatalExceptionFromSubscribe() = withExceptionHandler(handler(3)) { + val latch = CountDownLatch(1) rxObservable(Dispatchers.Unconfined) { expect(1) - send(Unit) + val result = trySend(Unit) + val exception = result.exceptionOrNull() + assertTrue(exception is UndeliverableException) + assertTrue(exception.cause is LinkageError) + assertTrue(isClosedForSend) + expect(4) + latch.countDown() }.subscribe({ expect(2) throw LinkageError() }, { expectUnreached() }) // Unreached because RxJava bubbles up fatal exceptions, causing `onNext` to throw. - finish(4) + latch.await() + finish(5) } @Test diff --git a/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt b/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt index b8915b9c90..00196be4ab 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt @@ -65,7 +65,7 @@ private class RxObservableCoroutine( private val _signal = atomic(OPEN) - override val isClosedForSend: Boolean get() = isCompleted + override val isClosedForSend: Boolean get() = !isActive override fun close(cause: Throwable?): Boolean = cancelCoroutine(cause) override fun invokeOnClose(handler: (Throwable?) -> Unit) = throw UnsupportedOperationException("RxObservableCoroutine doesn't support invokeOnClose") diff --git a/reactive/kotlinx-coroutines-rx3/test/ObservableExceptionHandlingTest.kt b/reactive/kotlinx-coroutines-rx3/test/ObservableExceptionHandlingTest.kt index 6a17a5edbb..5ddb36ed07 100644 --- a/reactive/kotlinx-coroutines-rx3/test/ObservableExceptionHandlingTest.kt +++ b/reactive/kotlinx-coroutines-rx3/test/ObservableExceptionHandlingTest.kt @@ -8,6 +8,7 @@ import io.reactivex.rxjava3.exceptions.* import kotlinx.coroutines.* import org.junit.* import org.junit.Test +import java.util.concurrent.* import kotlin.test.* class ObservableExceptionHandlingTest : TestBase() { @@ -82,14 +83,22 @@ class ObservableExceptionHandlingTest : TestBase() { @Test fun testFatalExceptionFromSubscribe() = withExceptionHandler(handler(3)) { + val latch = CountDownLatch(1) rxObservable(Dispatchers.Unconfined) { expect(1) - send(Unit) + val result = trySend(Unit) + val exception = result.exceptionOrNull() + assertTrue(exception is UndeliverableException) + assertTrue(exception.cause is LinkageError) + assertTrue(isClosedForSend) + expect(4) + latch.countDown() }.subscribe({ expect(2) throw LinkageError() }, { expectUnreached() }) // Unreached because RxJava bubbles up fatal exceptions, causing `onNext` to throw. - finish(4) + latch.await() + finish(5) } @Test From 27650e25f95b8d9895ba19939f67cfdee13cd736 Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Mon, 19 Apr 2021 16:54:12 +0300 Subject: [PATCH 6/7] Remove the kludges --- .../test/FlowAsPublisherTest.kt | 4 +-- .../src/ReactiveFlow.kt | 27 ++++++++----------- .../test/FlowAsPublisherTest.kt | 4 +-- .../test/PublisherRequestStressTest.kt | 7 ++--- .../kotlinx-coroutines-reactor/src/Mono.kt | 4 ++- .../src/RxObservable.kt | 9 ++++--- .../src/RxObservable.kt | 9 ++++--- 7 files changed, 34 insertions(+), 30 deletions(-) diff --git a/reactive/kotlinx-coroutines-jdk9/test/FlowAsPublisherTest.kt b/reactive/kotlinx-coroutines-jdk9/test/FlowAsPublisherTest.kt index 7889d5071d..109129ab16 100644 --- a/reactive/kotlinx-coroutines-jdk9/test/FlowAsPublisherTest.kt +++ b/reactive/kotlinx-coroutines-jdk9/test/FlowAsPublisherTest.kt @@ -57,7 +57,7 @@ class FlowAsPublisherTest : TestBase() { private lateinit var subscription: JFlow.Subscription override fun onComplete() { - expect(3) + expectUnreached() } override fun onSubscribe(s: JFlow.Subscription?) { @@ -74,7 +74,7 @@ class FlowAsPublisherTest : TestBase() { expectUnreached() } }) - finish(4) + finish(3) } @Test diff --git a/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt b/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt index 7a68123918..1f197f94ed 100644 --- a/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt +++ b/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt @@ -13,6 +13,7 @@ import kotlinx.coroutines.intrinsics.* import org.reactivestreams.* import java.util.* import kotlin.coroutines.* +import kotlinx.coroutines.internal.* /** * Transforms the given reactive [Publisher] into [Flow]. @@ -204,17 +205,12 @@ public class FlowSubscription( } private suspend fun flowProcessing() { - val consumeSucceeded = try { + try { consumeFlow() - true } catch (cause: Throwable) { - /* TODO: The part after "||" is a hack needed due to [cause] having travelled over a coroutine boundary and - being changed from the result of [getCancellationException()]. */ - if (cancellationRequested && !isActive && (cause === getCancellationException() || cause.cause === getCancellationException() && cause.message == getCancellationException().message)) { - /* TODO: This is incorrect, as [Subscriber.onComplete] denotes the end of the stream and not just any - non-erroneous terminal state. */ - subscriber.onComplete() - } else { + @Suppress("INVISIBLE_MEMBER") + val unwrappedCause = unwrap(cause) + if (!cancellationRequested || isActive || unwrappedCause !== getCancellationException()) { try { subscriber.onError(cause) } catch (e: Throwable) { @@ -223,14 +219,13 @@ public class FlowSubscription( handleCoroutineException(coroutineContext, cause) } } - false + return } - if (consumeSucceeded) { - try { - subscriber.onComplete() - } catch (e: Throwable) { - handleCoroutineException(coroutineContext, e) - } + // We only call this if `consumeFlow()` finished successfully + try { + subscriber.onComplete() + } catch (e: Throwable) { + handleCoroutineException(coroutineContext, e) } } diff --git a/reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt b/reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt index 80fa39bdbc..426aec657e 100644 --- a/reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt @@ -57,7 +57,7 @@ class FlowAsPublisherTest : TestBase() { private lateinit var subscription: Subscription override fun onComplete() { - expect(3) + expectUnreached() } override fun onSubscribe(s: Subscription?) { @@ -74,7 +74,7 @@ class FlowAsPublisherTest : TestBase() { expectUnreached() } }) - finish(4) + finish(3) } @Test diff --git a/reactive/kotlinx-coroutines-reactive/test/PublisherRequestStressTest.kt b/reactive/kotlinx-coroutines-reactive/test/PublisherRequestStressTest.kt index f4050180e9..9b069dcaec 100644 --- a/reactive/kotlinx-coroutines-reactive/test/PublisherRequestStressTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/PublisherRequestStressTest.kt @@ -65,7 +65,6 @@ class PublisherRequestStressTest : TestBase() { fun testRequestStress() { val expectedValue = AtomicLong(0) val requestedTill = AtomicLong(0) - val completionLatch = CountDownLatch(1) val callingOnNext = AtomicInteger() val publisher = mtFlow().asPublisher() @@ -75,7 +74,7 @@ class PublisherRequestStressTest : TestBase() { private var demand = 0L // only updated from reqPool override fun onComplete() { - completionLatch.countDown() + expectUnreached() } override fun onSubscribe(sub: Subscription) { @@ -124,7 +123,9 @@ class PublisherRequestStressTest : TestBase() { } if (!error) { subscription.cancel() - completionLatch.await() + runBlocking { + (subscription as AbstractCoroutine<*>).join() + } } } diff --git a/reactive/kotlinx-coroutines-reactor/src/Mono.kt b/reactive/kotlinx-coroutines-reactor/src/Mono.kt index 5f1d6e7674..6e7b95ba6e 100644 --- a/reactive/kotlinx-coroutines-reactor/src/Mono.kt +++ b/reactive/kotlinx-coroutines-reactor/src/Mono.kt @@ -13,6 +13,7 @@ import reactor.core.* import reactor.core.publisher.* import kotlin.coroutines.* import kotlin.internal.* +import kotlinx.coroutines.internal.* /** * Creates a cold [mono][Mono] that runs a given [block] in a coroutine and emits its result. @@ -59,7 +60,8 @@ private class MonoCoroutine( override fun onCancelled(cause: Throwable, handled: Boolean) { /** Cancellation exceptions that were caused by [dispose], that is, came from downstream, are not errors. */ - if (getCancellationException() !== cause || !disposed) { + val unwrappedCause = unwrap(cause) + if (getCancellationException() !== unwrappedCause || !disposed) { try { /** If [sink] turns out to already be in a terminal state, this exception will be passed through the * [Hooks.onOperatorError] hook, which is the way to signal undeliverable exceptions in Reactor. */ diff --git a/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt b/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt index 1aecdf0c24..3de09d3211 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt @@ -9,6 +9,7 @@ import io.reactivex.exceptions.* import kotlinx.atomicfu.* import kotlinx.coroutines.* import kotlinx.coroutines.channels.* +import kotlinx.coroutines.internal.* import kotlinx.coroutines.selects.* import kotlinx.coroutines.sync.* import java.lang.RuntimeException @@ -149,18 +150,20 @@ private class RxObservableCoroutine( if (_signal.value == SIGNALLED) return _signal.value = SIGNALLED // we'll signal onError/onCompleted (that the final state -- no CAS needed) - if (cause == null) { + @Suppress("INVISIBLE_MEMBER") + val unwrappedCause = cause?.let { unwrap(it) } + if (unwrappedCause == null) { try { subscriber.onComplete() } catch (e: Exception) { handleUndeliverableException(e, context) } - } else if (cause is UndeliverableException && !handled) { + } else if (unwrappedCause is UndeliverableException && !handled) { /** Such exceptions are not reported to `onError`, as, according to the reactive specifications, * exceptions thrown from the Subscriber methods must be treated as if the Subscriber was already * cancelled. */ handleUndeliverableException(cause, context) - } else if (cause !== getCancellationException() || !subscriber.isDisposed) { + } else if (unwrappedCause !== getCancellationException() || !subscriber.isDisposed) { try { /** If the subscriber is already in a terminal state, the error will be signalled to * `RxJavaPlugins.onError`. */ diff --git a/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt b/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt index 00196be4ab..5c810c498d 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt @@ -12,6 +12,7 @@ import kotlinx.coroutines.channels.* import kotlinx.coroutines.selects.* import kotlinx.coroutines.sync.* import kotlin.coroutines.* +import kotlinx.coroutines.internal.* /** * Creates cold [observable][Observable] that will run a given [block] in a coroutine. @@ -148,18 +149,20 @@ private class RxObservableCoroutine( if (_signal.value == SIGNALLED) return _signal.value = SIGNALLED // we'll signal onError/onCompleted (that the final state -- no CAS needed) - if (cause == null) { + @Suppress("INVISIBLE_MEMBER") + val unwrappedCause = cause?.let { unwrap(it) } + if (unwrappedCause == null) { try { subscriber.onComplete() } catch (e: Exception) { handleUndeliverableException(e, context) } - } else if (cause is UndeliverableException && !handled) { + } else if (unwrappedCause is UndeliverableException && !handled) { /** Such exceptions are not reported to `onError`, as, according to the reactive specifications, * exceptions thrown from the Subscriber methods must be treated as if the Subscriber was already * cancelled. */ handleUndeliverableException(cause, context) - } else if (cause !== getCancellationException() || !subscriber.isDisposed) { + } else if (unwrappedCause !== getCancellationException() || !subscriber.isDisposed) { try { /** If the subscriber is already in a terminal state, the error will be signalled to * `RxJavaPlugins.onError`. */ From de15a4bc6d9165668e0b15b94c04e5e42f1f3524 Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Tue, 20 Apr 2021 10:41:08 +0300 Subject: [PATCH 7/7] Style --- .../test/FlowAsPublisherTest.kt | 1 - .../test/IntegrationTest.kt | 23 +++++++++++-------- .../test/PublishTest.kt | 3 --- .../test/FlowAsPublisherTest.kt | 2 +- .../test/PublishTest.kt | 2 -- .../test/FluxTest.kt | 2 -- .../src/RxObservable.kt | 1 - .../test/FlowableExceptionHandlingTest.kt | 2 +- .../test/FlowableExceptionHandlingTest.kt | 2 +- 9 files changed, 17 insertions(+), 21 deletions(-) diff --git a/reactive/kotlinx-coroutines-jdk9/test/FlowAsPublisherTest.kt b/reactive/kotlinx-coroutines-jdk9/test/FlowAsPublisherTest.kt index 109129ab16..b860e16209 100644 --- a/reactive/kotlinx-coroutines-jdk9/test/FlowAsPublisherTest.kt +++ b/reactive/kotlinx-coroutines-jdk9/test/FlowAsPublisherTest.kt @@ -7,7 +7,6 @@ package kotlinx.coroutines.jdk9 import kotlinx.coroutines.* import kotlinx.coroutines.flow.* import org.junit.Test -import java.util.concurrent.CancellationException import java.util.concurrent.Flow as JFlow import kotlin.test.* diff --git a/reactive/kotlinx-coroutines-jdk9/test/IntegrationTest.kt b/reactive/kotlinx-coroutines-jdk9/test/IntegrationTest.kt index 5593547963..5b3542ad89 100644 --- a/reactive/kotlinx-coroutines-jdk9/test/IntegrationTest.kt +++ b/reactive/kotlinx-coroutines-jdk9/test/IntegrationTest.kt @@ -10,6 +10,7 @@ import org.junit.Test import kotlinx.coroutines.flow.flowOn import org.junit.runner.* import org.junit.runners.* +import kotlin.contracts.* import java.util.concurrent.Flow as JFlow import kotlin.coroutines.* import kotlin.test.* @@ -132,14 +133,18 @@ class IntegrationTest( } +@OptIn(ExperimentalContracts::class) internal suspend inline fun assertCallsExceptionHandlerWith( - crossinline operation: suspend (CoroutineExceptionHandler) -> Unit): E = - CapturingHandler().let { handler -> - withContext(handler) { - operation(handler) - handler.getException().let { - assertTrue(it is E, it.toString()) - it - } + crossinline operation: suspend (CoroutineExceptionHandler) -> Unit): E { + contract { + callsInPlace(operation, InvocationKind.EXACTLY_ONCE) + } + val handler = CapturingHandler() + return withContext(handler) { + operation(handler) + handler.getException().let { + assertTrue(it is E, it.toString()) + it } - } \ No newline at end of file + } +} diff --git a/reactive/kotlinx-coroutines-jdk9/test/PublishTest.kt b/reactive/kotlinx-coroutines-jdk9/test/PublishTest.kt index 90babf79b2..3682d5e318 100644 --- a/reactive/kotlinx-coroutines-jdk9/test/PublishTest.kt +++ b/reactive/kotlinx-coroutines-jdk9/test/PublishTest.kt @@ -7,9 +7,6 @@ package kotlinx.coroutines.jdk9 import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import org.junit.Test -import java.lang.NullPointerException -import java.util.concurrent.* -import java.util.concurrent.CancellationException import java.util.concurrent.Flow as JFlow import kotlin.test.* diff --git a/reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt b/reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt index 426aec657e..02c9e242e9 100644 --- a/reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt @@ -5,11 +5,11 @@ package kotlinx.coroutines.reactive import kotlinx.coroutines.* +import kotlinx.coroutines.CancellationException import kotlinx.coroutines.flow.* import org.junit.Test import org.reactivestreams.* import java.util.concurrent.* -import java.util.concurrent.CancellationException import kotlin.test.* class FlowAsPublisherTest : TestBase() { diff --git a/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt b/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt index 17719abdd1..095b724d40 100644 --- a/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt @@ -8,8 +8,6 @@ import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import org.junit.Test import org.reactivestreams.* -import java.lang.NullPointerException -import kotlin.coroutines.cancellation.CancellationException import kotlin.test.* class PublishTest : TestBase() { diff --git a/reactive/kotlinx-coroutines-reactor/test/FluxTest.kt b/reactive/kotlinx-coroutines-reactor/test/FluxTest.kt index 4bee0247cf..d059eb6622 100644 --- a/reactive/kotlinx-coroutines-reactor/test/FluxTest.kt +++ b/reactive/kotlinx-coroutines-reactor/test/FluxTest.kt @@ -5,12 +5,10 @@ package kotlinx.coroutines.reactor import kotlinx.coroutines.* -import kotlinx.coroutines.CancellationException import kotlinx.coroutines.channels.* import kotlinx.coroutines.flow.* import kotlinx.coroutines.reactive.* import org.junit.Test -import java.lang.NullPointerException import kotlin.test.* class FluxTest : TestBase() { diff --git a/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt b/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt index 3de09d3211..c096c0d254 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt @@ -12,7 +12,6 @@ import kotlinx.coroutines.channels.* import kotlinx.coroutines.internal.* import kotlinx.coroutines.selects.* import kotlinx.coroutines.sync.* -import java.lang.RuntimeException import kotlin.coroutines.* /** diff --git a/reactive/kotlinx-coroutines-rx2/test/FlowableExceptionHandlingTest.kt b/reactive/kotlinx-coroutines-rx2/test/FlowableExceptionHandlingTest.kt index f83a08a155..316439293f 100644 --- a/reactive/kotlinx-coroutines-rx2/test/FlowableExceptionHandlingTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/FlowableExceptionHandlingTest.kt @@ -45,7 +45,7 @@ class FlowableExceptionHandlingTest : TestBase() { }.subscribe({ expectUnreached() }, { - expect(2) // Fatal exception are not treated as special + expect(2) // Fatal exceptions are not treated as special }) finish(3) } diff --git a/reactive/kotlinx-coroutines-rx3/test/FlowableExceptionHandlingTest.kt b/reactive/kotlinx-coroutines-rx3/test/FlowableExceptionHandlingTest.kt index c144bcbf37..126cb81826 100644 --- a/reactive/kotlinx-coroutines-rx3/test/FlowableExceptionHandlingTest.kt +++ b/reactive/kotlinx-coroutines-rx3/test/FlowableExceptionHandlingTest.kt @@ -45,7 +45,7 @@ class FlowableExceptionHandlingTest : TestBase() { }.subscribe({ expectUnreached() }, { - expect(2) // Fatal exception are not treated as special + expect(2) // Fatal exceptions are not treated as special }) finish(3) }