diff --git a/reactive/kotlinx-coroutines-rx2/README.md b/reactive/kotlinx-coroutines-rx2/README.md index 7fbad95369..f0fbeb001e 100644 --- a/reactive/kotlinx-coroutines-rx2/README.md +++ b/reactive/kotlinx-coroutines-rx2/README.md @@ -14,10 +14,11 @@ Coroutine builders: Integration with [Flow]: -| **Name** | **Result** | **Description** -| --------------- | -------------- | --------------- -| [Flow.asFlowable] | `Flowable` | Converts the given flow to a cold Flowable. -| [Flow.asObservable] | `Observable` | Converts the given flow to a cold Observable. +| **Name** | **Result** | **Description** +| --------------- | -------------- | --------------- +| [Flow.asFlowable] | `Flowable` | Converts the given flow to a cold Flowable. +| [Flow.asObservable] | `Observable` | Converts the given flow to a cold Observable. +| [ObservableSource.asFlow] | `Flow` | Converts the given cold ObservableSource to flow Suspending extension functions and suspending iteration: @@ -67,6 +68,7 @@ Conversion functions: [rxFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/rx-flowable.html [Flow.asFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/kotlinx.coroutines.flow.-flow/as-flowable.html [Flow.asObservable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/kotlinx.coroutines.flow.-flow/as-observable.html +[ObservableSource.asFlow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-observable-source/as-flow.html [io.reactivex.CompletableSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-completable-source/await.html [io.reactivex.MaybeSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-maybe-source/await.html [io.reactivex.MaybeSource.awaitOrDefault]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-maybe-source/await-or-default.html diff --git a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt index a9d423a99b..930ab26124 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt @@ -5,10 +5,12 @@ package kotlinx.coroutines.rx2 import io.reactivex.* +import io.reactivex.disposables.Disposable import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import kotlinx.coroutines.flow.* import kotlinx.coroutines.reactive.* +import java.util.concurrent.atomic.AtomicReference import kotlin.coroutines.* /** @@ -77,6 +79,35 @@ public fun ReceiveChannel.asObservable(context: CoroutineContext): send(t) } +/** + * Transforms given cold [ObservableSource] into cold [Flow]. + * + * The resulting flow is _cold_, which means that [ObservableSource.subscribe] is called every time a terminal operator + * is applied to the resulting flow. + * + * A channel with the [default][Channel.BUFFERED] buffer size is used. Use the [buffer] operator on the + * resulting flow to specify a user-defined value and to control what happens when data is produced faster + * than consumed, i.e. to control the back-pressure behavior. Check [callbackFlow] for more details. + */ +@ExperimentalCoroutinesApi +public fun ObservableSource.asFlow(): Flow = callbackFlow { + val disposableRef = AtomicReference() + val observer = object : Observer { + override fun onComplete() { close() } + override fun onSubscribe(d: Disposable) { if (!disposableRef.compareAndSet(null, d)) d.dispose() } + override fun onNext(t: T) { sendBlocking(t) } + override fun onError(e: Throwable) { close(e) } + } + + subscribe(observer) + awaitClose { disposableRef.getAndSet(Disposed)?.dispose() } +} + +private object Disposed : Disposable { + override fun isDisposed() = true + override fun dispose() = Unit +} + /** * Converts the given flow to a cold observable. * The original flow is cancelled when the observable subscriber is disposed. diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableAsFlowTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableAsFlowTest.kt new file mode 100644 index 0000000000..c14c3cc4e8 --- /dev/null +++ b/reactive/kotlinx-coroutines-rx2/test/ObservableAsFlowTest.kt @@ -0,0 +1,185 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.rx2 + +import io.reactivex.Observable +import io.reactivex.ObservableSource +import io.reactivex.Observer +import io.reactivex.disposables.Disposables +import io.reactivex.subjects.PublishSubject +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* +import kotlinx.coroutines.flow.* +import kotlin.test.* + +class ObservableAsFlowTest : TestBase() { + @Test + fun testCancellation() = runTest { + var onNext = 0 + var onCancelled = 0 + var onError = 0 + + val source = rxObservable(currentDispatcher()) { + coroutineContext[Job]?.invokeOnCompletion { + if (it is CancellationException) ++onCancelled + } + + repeat(100) { + send(it) + } + } + + source.asFlow().launchIn(CoroutineScope(Dispatchers.Unconfined)) { + onEach { + ++onNext + throw RuntimeException() + } + catch { + ++onError + } + }.join() + + + assertEquals(1, onNext) + assertEquals(1, onError) + assertEquals(1, onCancelled) + } + + @Test + fun testImmediateCollection() { + val source = PublishSubject.create() + val flow = source.asFlow() + GlobalScope.launch(Dispatchers.Unconfined) { + expect(1) + flow.collect { expect(it) } + expect(6) + } + expect(2) + source.onNext(3) + expect(4) + source.onNext(5) + source.onComplete() + finish(7) + } + + @Test + fun testOnErrorCancellation() { + val source = PublishSubject.create() + val flow = source.asFlow() + val exception = RuntimeException() + GlobalScope.launch(Dispatchers.Unconfined) { + try { + expect(1) + flow.collect { expect(it) } + expectUnreached() + } + catch (e: Exception) { + assertSame(exception, e.cause) + expect(5) + } + expect(6) + } + expect(2) + source.onNext(3) + expect(4) + source.onError(exception) + finish(7) + } + + @Test + fun testUnsubscribeOnCollectionException() { + val source = PublishSubject.create() + val flow = source.asFlow() + val exception = RuntimeException() + GlobalScope.launch(Dispatchers.Unconfined) { + try { + expect(1) + flow.collect { + expect(it) + if (it == 3) throw exception + } + expectUnreached() + } + catch (e: Exception) { + assertSame(exception, e.cause) + expect(4) + } + expect(5) + } + expect(2) + assertTrue(source.hasObservers()) + source.onNext(3) + assertFalse(source.hasObservers()) + finish(6) + } + + @Test + fun testLateOnSubscribe() { + var observer: Observer? = null + val source = ObservableSource { observer = it } + val flow = source.asFlow() + assertNull(observer) + val job = GlobalScope.launch(Dispatchers.Unconfined) { + expect(1) + flow.collect { expectUnreached() } + expectUnreached() + } + expect(2) + assertNotNull(observer) + job.cancel() + val disposable = Disposables.empty() + observer!!.onSubscribe(disposable) + assertTrue(disposable.isDisposed) + finish(3) + } + + @Test + fun testBufferUnlimited() = runTest { + val source = rxObservable(currentDispatcher()) { + expect(1); send(10) + expect(2); send(11) + expect(3); send(12) + expect(4); send(13) + expect(5); send(14) + expect(6); send(15) + expect(7); send(16) + expect(8); send(17) + expect(9) + } + source.asFlow().buffer(Channel.UNLIMITED).collect { expect(it) } + finish(18) + } + + @Test + fun testConflated() = runTest { + val source = Observable.range(1, 5) + val list = source.asFlow().conflate().toList() + assertEquals(listOf(1, 5), list) + } + + @Test + fun testLongRange() = runTest { + val source = Observable.range(1, 10_000) + val count = source.asFlow().count() + assertEquals(10_000, count) + } + + @Test + fun testProduce() = runTest { + val source = Observable.range(0, 10) + val flow = source.asFlow() + check((0..9).toList(), flow.produceIn(this)) + check((0..9).toList(), flow.buffer(Channel.UNLIMITED).produceIn(this)) + check((0..9).toList(), flow.buffer(2).produceIn(this)) + check((0..9).toList(), flow.buffer(0).produceIn(this)) + check(listOf(0, 9), flow.conflate().produceIn(this)) + } + + private suspend fun check(expected: List, channel: ReceiveChannel) { + val result = ArrayList(10) + channel.consumeEach { result.add(it) } + assertEquals(expected, result) + } +} \ No newline at end of file