diff --git a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt index cf73ef2ea8..abd13b9ed4 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt @@ -81,7 +81,12 @@ public fun ObservableSource.asFlow(): Flow = callbackFlow { val observer = object : Observer { override fun onComplete() { close() } override fun onSubscribe(d: Disposable) { if (!disposableRef.compareAndSet(null, d)) d.dispose() } - override fun onNext(t: T) { sendBlocking(t) } + override fun onNext(t: T) { + try { + sendBlocking(t) + } catch (ignored: Throwable) { //TODO: Replace when this issue is fixed: https://github.com/Kotlin/kotlinx.coroutines/issues/974 + } + } override fun onError(e: Throwable) { close(e) } } diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableSourceAsFlowStressTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableSourceAsFlowStressTest.kt new file mode 100644 index 0000000000..b187ab9b63 --- /dev/null +++ b/reactive/kotlinx-coroutines-rx2/test/ObservableSourceAsFlowStressTest.kt @@ -0,0 +1,89 @@ +/* + * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.rx2 + +import io.reactivex.* +import io.reactivex.exceptions.* +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* +import org.junit.* +import java.util.concurrent.* + +class ObservableSourceAsFlowStressTest : TestBase() { + + private val maxIterations = 25 + private val collectorJobCancelDelay = 1_000L + private val nextIterationDelay = collectorJobCancelDelay * 2 + + private var jobCancelledException: Throwable? = null + private val exceptionHandler = { throwable: Throwable -> + jobCancelledException = extractJobCancelledException(throwable) + } + + @Before + fun setup() { + ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-") + } + + @Test + fun testObservableSourceAsFlow_doesntThrowJobCancelledException() = withExceptionHandler(exceptionHandler) { + val collectorThread = newSingleThreadContext("Collector Thread") + val cancellerThread = newSingleThreadContext("Canceller Thread") + val scope = CoroutineScope(Job()) + var iteration = 0 + + while (jobCancelledException == null && iteration < maxIterations) { + scope.runIteration(collectorThread, cancellerThread) + iteration += 1 + + Thread.sleep(nextIterationDelay) + + collectorThread.cancel() + cancellerThread.cancel() + } + + collectorThread.close() + cancellerThread.close() + scope.cancel() + + jobCancelledException?.also { + throw error("ObservableSource.asFlow() cancellation caused exception in iteration # $iteration", it) + } + } + + private fun CoroutineScope.runIteration( + collectorThread: ExecutorCoroutineDispatcher, + cancellerThread: ExecutorCoroutineDispatcher + ) { + val outerObservable = Observable.interval(1L, TimeUnit.MILLISECONDS) + val innerObservable = Observable.interval(1L, TimeUnit.MILLISECONDS) + + val collectorJob = launch(collectorThread) { + outerObservable + .asFlow() + .flatMapLatest { + innerObservable.asFlow() + } + .collect { delay(100) } + } + + launch(cancellerThread) { + delay(collectorJobCancelDelay) + collectorJob.cancel() + } + } + + private fun extractJobCancelledException(throwable: Throwable): Throwable? { + if (throwable is UndeliverableException) { + if (throwable.cause !is InterruptedException) return throwable.cause + } + + if (throwable is InterruptedException) { + if (throwable.cause !is InterruptedException) return throwable.cause + } + + return throwable + } +} diff --git a/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt index 9bb38c088f..f30146a704 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt @@ -81,7 +81,12 @@ public fun ObservableSource.asFlow(): Flow = callbackFlow { val observer = object : Observer { override fun onComplete() { close() } override fun onSubscribe(d: Disposable) { if (!disposableRef.compareAndSet(null, d)) d.dispose() } - override fun onNext(t: T) { sendBlocking(t) } + override fun onNext(t: T) { + try { + sendBlocking(t) + } catch (ignored: Throwable) { //TODO: Replace when this issue is fixed: https://github.com/Kotlin/kotlinx.coroutines/issues/974 + } + } override fun onError(e: Throwable) { close(e) } } diff --git a/reactive/kotlinx-coroutines-rx3/test/ObservableSourceAsFlowStressTest.kt b/reactive/kotlinx-coroutines-rx3/test/ObservableSourceAsFlowStressTest.kt new file mode 100644 index 0000000000..b5e90286a4 --- /dev/null +++ b/reactive/kotlinx-coroutines-rx3/test/ObservableSourceAsFlowStressTest.kt @@ -0,0 +1,89 @@ +/* + * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.rx3 + +import io.reactivex.rxjava3.core.* +import io.reactivex.rxjava3.exceptions.* +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* +import org.junit.* +import java.util.concurrent.* + +class ObservableSourceAsFlowStressTest : TestBase() { + + private val maxIterations = 25 + private val collectorJobCancelDelay = 1_000L + private val nextIterationDelay = collectorJobCancelDelay * 2 + + private var jobCancelledException: Throwable? = null + private val exceptionHandler = { throwable: Throwable -> + jobCancelledException = extractJobCancelledException(throwable) + } + + @Before + fun setup() { + ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-") + } + + @Test + fun testObservableSourceAsFlow_doesntThrowJobCancelledException() = withExceptionHandler(exceptionHandler) { + val collectorThread = newSingleThreadContext("Collector Thread") + val cancellerThread = newSingleThreadContext("Canceller Thread") + val scope = CoroutineScope(Job()) + var iteration = 0 + + while (jobCancelledException == null && iteration < maxIterations) { + scope.runIteration(collectorThread, cancellerThread) + iteration += 1 + + Thread.sleep(nextIterationDelay) + + collectorThread.cancel() + cancellerThread.cancel() + } + + collectorThread.close() + cancellerThread.close() + scope.cancel() + + jobCancelledException?.also { + throw error("ObservableSource.asFlow() cancellation caused exception in iteration # $iteration", it) + } + } + + private fun CoroutineScope.runIteration( + collectorThread: ExecutorCoroutineDispatcher, + cancellerThread: ExecutorCoroutineDispatcher + ) { + val outerObservable = Observable.interval(1L, TimeUnit.MILLISECONDS) + val innerObservable = Observable.interval(1L, TimeUnit.MILLISECONDS) + + val collectorJob = launch(collectorThread) { + outerObservable + .asFlow() + .flatMapLatest { + innerObservable.asFlow() + } + .collect { delay(100) } + } + + launch(cancellerThread) { + delay(collectorJobCancelDelay) + collectorJob.cancel() + } + } + + private fun extractJobCancelledException(throwable: Throwable): Throwable? { + if (throwable is UndeliverableException) { + if (throwable.cause !is InterruptedException) return throwable.cause + } + + if (throwable is InterruptedException) { + if (throwable.cause !is InterruptedException) return throwable.cause + } + + return throwable + } +}