Skip to content

Commit

Permalink
Fix potential crash in Rx2 and Rx3 asFlow extension
Browse files Browse the repository at this point in the history
Fixes #2104
Fixes #2299
  • Loading branch information
LouisCAD authored and qwwdfsad committed Oct 23, 2020
1 parent 9eaa9c6 commit 6dfa2b6
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 2 deletions.
8 changes: 7 additions & 1 deletion reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
Expand Up @@ -81,7 +81,13 @@ public fun <T: Any> ObservableSource<T>.asFlow(): Flow<T> = callbackFlow {
val observer = object : Observer<T> {
override fun onComplete() { close() }
override fun onSubscribe(d: Disposable) { if (!disposableRef.compareAndSet(null, d)) d.dispose() }
override fun onNext(t: T) { sendBlocking(t) }
override fun onNext(t: T) {
try {
sendBlocking(t)
} catch (ignored: Throwable) { // TODO: Replace when this issue is fixed: https://github.com/Kotlin/kotlinx.coroutines/issues/974
// Is handled by the downstream flow
}
}
override fun onError(e: Throwable) { close(e) }
}

Expand Down
@@ -0,0 +1,35 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.rx2

import io.reactivex.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
import org.junit.*
import java.util.concurrent.*

class ObservableSourceAsFlowStressTest : TestBase() {

private val iterations = 100 * stressTestMultiplierSqrt

@Before
fun setup() {
ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
}

@Test
fun testAsFlowCancellation() = runTest {
repeat(iterations) {
val latch = Channel<Unit>(1)
var i = 0
val observable = Observable.interval(100L, TimeUnit.MICROSECONDS)
.doOnNext { if (++i > 100) latch.offer(Unit) }
val job = observable.asFlow().launchIn(CoroutineScope(Dispatchers.Default))
latch.receive()
job.cancelAndJoin()
}
}
}
8 changes: 7 additions & 1 deletion reactive/kotlinx-coroutines-rx3/src/RxConvert.kt
Expand Up @@ -81,7 +81,13 @@ public fun <T: Any> ObservableSource<T>.asFlow(): Flow<T> = callbackFlow {
val observer = object : Observer<T> {
override fun onComplete() { close() }
override fun onSubscribe(d: Disposable) { if (!disposableRef.compareAndSet(null, d)) d.dispose() }
override fun onNext(t: T) { sendBlocking(t) }
override fun onNext(t: T) {
try {
sendBlocking(t)
} catch (ignored: Throwable) { // TODO: Replace when this issue is fixed: https://github.com/Kotlin/kotlinx.coroutines/issues/974
// Is handled by the downstream flow
}
}
override fun onError(e: Throwable) { close(e) }
}

Expand Down
@@ -0,0 +1,36 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.rx3

import io.reactivex.rxjava3.core.*
import io.reactivex.rxjava3.exceptions.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
import org.junit.*
import java.util.concurrent.*

class ObservableSourceAsFlowStressTest : TestBase() {

private val iterations = 100 * stressTestMultiplierSqrt

@Before
fun setup() {
ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
}

@Test
fun testAsFlowCancellation() = runTest {
repeat(iterations) {
val latch = Channel<Unit>(1)
var i = 0
val observable = Observable.interval(100L, TimeUnit.MICROSECONDS)
.doOnNext { if (++i > 100) latch.offer(Unit) }
val job = observable.asFlow().launchIn(CoroutineScope(Dispatchers.Default))
latch.receive()
job.cancelAndJoin()
}
}
}

0 comments on commit 6dfa2b6

Please sign in to comment.