diff --git a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt index cf73ef2ea8..41c82ed0e8 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt @@ -81,7 +81,13 @@ public fun ObservableSource.asFlow(): Flow = callbackFlow { val observer = object : Observer { override fun onComplete() { close() } override fun onSubscribe(d: Disposable) { if (!disposableRef.compareAndSet(null, d)) d.dispose() } - override fun onNext(t: T) { sendBlocking(t) } + override fun onNext(t: T) { + try { + sendBlocking(t) + } catch (ignored: Throwable) { // TODO: Replace when this issue is fixed: https://github.com/Kotlin/kotlinx.coroutines/issues/974 + // Is handled by the downstream flow + } + } override fun onError(e: Throwable) { close(e) } } diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableSourceAsFlowStressTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableSourceAsFlowStressTest.kt new file mode 100644 index 0000000000..159f3729c8 --- /dev/null +++ b/reactive/kotlinx-coroutines-rx2/test/ObservableSourceAsFlowStressTest.kt @@ -0,0 +1,35 @@ +/* + * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.rx2 + +import io.reactivex.* +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* +import kotlinx.coroutines.flow.* +import org.junit.* +import java.util.concurrent.* + +class ObservableSourceAsFlowStressTest : TestBase() { + + private val iterations = 100 * stressTestMultiplierSqrt + + @Before + fun setup() { + ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-") + } + + @Test + fun testAsFlowCancellation() = runTest { + repeat(iterations) { + val latch = Channel(1) + var i = 0 + val observable = Observable.interval(100L, TimeUnit.MICROSECONDS) + .doOnNext { if (++i > 100) latch.offer(Unit) } + val job = observable.asFlow().launchIn(CoroutineScope(Dispatchers.Default)) + latch.receive() + job.cancelAndJoin() + } + } +} diff --git a/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt index 9bb38c088f..0978423ac9 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt @@ -81,7 +81,13 @@ public fun ObservableSource.asFlow(): Flow = callbackFlow { val observer = object : Observer { override fun onComplete() { close() } override fun onSubscribe(d: Disposable) { if (!disposableRef.compareAndSet(null, d)) d.dispose() } - override fun onNext(t: T) { sendBlocking(t) } + override fun onNext(t: T) { + try { + sendBlocking(t) + } catch (ignored: Throwable) { // TODO: Replace when this issue is fixed: https://github.com/Kotlin/kotlinx.coroutines/issues/974 + // Is handled by the downstream flow + } + } override fun onError(e: Throwable) { close(e) } } diff --git a/reactive/kotlinx-coroutines-rx3/test/ObservableSourceAsFlowStressTest.kt b/reactive/kotlinx-coroutines-rx3/test/ObservableSourceAsFlowStressTest.kt new file mode 100644 index 0000000000..431a7a789e --- /dev/null +++ b/reactive/kotlinx-coroutines-rx3/test/ObservableSourceAsFlowStressTest.kt @@ -0,0 +1,36 @@ +/* + * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.rx3 + +import io.reactivex.rxjava3.core.* +import io.reactivex.rxjava3.exceptions.* +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* +import kotlinx.coroutines.flow.* +import org.junit.* +import java.util.concurrent.* + +class ObservableSourceAsFlowStressTest : TestBase() { + + private val iterations = 100 * stressTestMultiplierSqrt + + @Before + fun setup() { + ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-") + } + + @Test + fun testAsFlowCancellation() = runTest { + repeat(iterations) { + val latch = Channel(1) + var i = 0 + val observable = Observable.interval(100L, TimeUnit.MICROSECONDS) + .doOnNext { if (++i > 100) latch.offer(Unit) } + val job = observable.asFlow().launchIn(CoroutineScope(Dispatchers.Default)) + latch.receive() + job.cancelAndJoin() + } + } +}