Skip to content

Commit

Permalink
Add ObservableSource.asFlow operator (#1768)
Browse files Browse the repository at this point in the history
  • Loading branch information
mareklangiewicz authored and qwwdfsad committed Mar 4, 2020
1 parent 12e96cd commit baad34f
Show file tree
Hide file tree
Showing 4 changed files with 223 additions and 4 deletions.
10 changes: 6 additions & 4 deletions reactive/kotlinx-coroutines-rx2/README.md
Expand Up @@ -14,10 +14,11 @@ Coroutine builders:

Integration with [Flow]:

| **Name** | **Result** | **Description**
| --------------- | -------------- | ---------------
| [Flow.asFlowable] | `Flowable` | Converts the given flow to a cold Flowable.
| [Flow.asObservable] | `Observable` | Converts the given flow to a cold Observable.
| **Name** | **Result** | **Description**
| --------------- | -------------- | ---------------
| [Flow.asFlowable] | `Flowable` | Converts the given flow to a cold Flowable.
| [Flow.asObservable] | `Observable` | Converts the given flow to a cold Observable.
| [ObservableSource.asFlow] | `Flow` | Converts the given cold ObservableSource to flow

Suspending extension functions and suspending iteration:

Expand Down Expand Up @@ -67,6 +68,7 @@ Conversion functions:
[rxFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/rx-flowable.html
[Flow.asFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/kotlinx.coroutines.flow.-flow/as-flowable.html
[Flow.asObservable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/kotlinx.coroutines.flow.-flow/as-observable.html
[ObservableSource.asFlow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-observable-source/as-flow.html
[io.reactivex.CompletableSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-completable-source/await.html
[io.reactivex.MaybeSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-maybe-source/await.html
[io.reactivex.MaybeSource.awaitOrDefault]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-maybe-source/await-or-default.html
Expand Down
Expand Up @@ -29,6 +29,7 @@ public final class kotlinx/coroutines/rx2/RxCompletableKt {

public final class kotlinx/coroutines/rx2/RxConvertKt {
public static final fun asCompletable (Lkotlinx/coroutines/Job;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Completable;
public static final fun asFlow (Lio/reactivex/ObservableSource;)Lkotlinx/coroutines/flow/Flow;
public static final fun asMaybe (Lkotlinx/coroutines/Deferred;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Maybe;
public static final fun asObservable (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Observable;
public static final fun asSingle (Lkotlinx/coroutines/Deferred;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Single;
Expand Down
31 changes: 31 additions & 0 deletions reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
Expand Up @@ -5,10 +5,12 @@
package kotlinx.coroutines.rx2

import io.reactivex.*
import io.reactivex.disposables.Disposable
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.reactive.*
import java.util.concurrent.atomic.AtomicReference
import kotlin.coroutines.*

/**
Expand Down Expand Up @@ -77,6 +79,35 @@ public fun <T : Any> ReceiveChannel<T>.asObservable(context: CoroutineContext):
send(t)
}

/**
* Transforms given cold [ObservableSource] into cold [Flow].
*
* The resulting flow is _cold_, which means that [ObservableSource.subscribe] is called every time a terminal operator
* is applied to the resulting flow.
*
* A channel with the [default][Channel.BUFFERED] buffer size is used. Use the [buffer] operator on the
* resulting flow to specify a user-defined value and to control what happens when data is produced faster
* than consumed, i.e. to control the back-pressure behavior. Check [callbackFlow] for more details.
*/
@ExperimentalCoroutinesApi
public fun <T: Any> ObservableSource<T>.asFlow(): Flow<T> = callbackFlow {
val disposableRef = AtomicReference<Disposable>()
val observer = object : Observer<T> {
override fun onComplete() { close() }
override fun onSubscribe(d: Disposable) { if (!disposableRef.compareAndSet(null, d)) d.dispose() }
override fun onNext(t: T) { sendBlocking(t) }
override fun onError(e: Throwable) { close(e) }
}

subscribe(observer)
awaitClose { disposableRef.getAndSet(Disposed)?.dispose() }
}

private object Disposed : Disposable {
override fun isDisposed() = true
override fun dispose() = Unit
}

/**
* Converts the given flow to a cold observable.
* The original flow is cancelled when the observable subscriber is disposed.
Expand Down
185 changes: 185 additions & 0 deletions reactive/kotlinx-coroutines-rx2/test/ObservableAsFlowTest.kt
@@ -0,0 +1,185 @@
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.rx2

import io.reactivex.Observable
import io.reactivex.ObservableSource
import io.reactivex.Observer
import io.reactivex.disposables.Disposables
import io.reactivex.subjects.PublishSubject
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
import kotlin.test.*

class ObservableAsFlowTest : TestBase() {
@Test
fun testCancellation() = runTest {
var onNext = 0
var onCancelled = 0
var onError = 0

val source = rxObservable(currentDispatcher()) {
coroutineContext[Job]?.invokeOnCompletion {
if (it is CancellationException) ++onCancelled
}

repeat(100) {
send(it)
}
}

source.asFlow().launchIn(CoroutineScope(Dispatchers.Unconfined)) {
onEach {
++onNext
throw RuntimeException()
}
catch<Throwable> {
++onError
}
}.join()


assertEquals(1, onNext)
assertEquals(1, onError)
assertEquals(1, onCancelled)
}

@Test
fun testImmediateCollection() {
val source = PublishSubject.create<Int>()
val flow = source.asFlow()
GlobalScope.launch(Dispatchers.Unconfined) {
expect(1)
flow.collect { expect(it) }
expect(6)
}
expect(2)
source.onNext(3)
expect(4)
source.onNext(5)
source.onComplete()
finish(7)
}

@Test
fun testOnErrorCancellation() {
val source = PublishSubject.create<Int>()
val flow = source.asFlow()
val exception = RuntimeException()
GlobalScope.launch(Dispatchers.Unconfined) {
try {
expect(1)
flow.collect { expect(it) }
expectUnreached()
}
catch (e: Exception) {
assertSame(exception, e.cause)
expect(5)
}
expect(6)
}
expect(2)
source.onNext(3)
expect(4)
source.onError(exception)
finish(7)
}

@Test
fun testUnsubscribeOnCollectionException() {
val source = PublishSubject.create<Int>()
val flow = source.asFlow()
val exception = RuntimeException()
GlobalScope.launch(Dispatchers.Unconfined) {
try {
expect(1)
flow.collect {
expect(it)
if (it == 3) throw exception
}
expectUnreached()
}
catch (e: Exception) {
assertSame(exception, e.cause)
expect(4)
}
expect(5)
}
expect(2)
assertTrue(source.hasObservers())
source.onNext(3)
assertFalse(source.hasObservers())
finish(6)
}

@Test
fun testLateOnSubscribe() {
var observer: Observer<in Int>? = null
val source = ObservableSource<Int> { observer = it }
val flow = source.asFlow()
assertNull(observer)
val job = GlobalScope.launch(Dispatchers.Unconfined) {
expect(1)
flow.collect { expectUnreached() }
expectUnreached()
}
expect(2)
assertNotNull(observer)
job.cancel()
val disposable = Disposables.empty()
observer!!.onSubscribe(disposable)
assertTrue(disposable.isDisposed)
finish(3)
}

@Test
fun testBufferUnlimited() = runTest {
val source = rxObservable(currentDispatcher()) {
expect(1); send(10)
expect(2); send(11)
expect(3); send(12)
expect(4); send(13)
expect(5); send(14)
expect(6); send(15)
expect(7); send(16)
expect(8); send(17)
expect(9)
}
source.asFlow().buffer(Channel.UNLIMITED).collect { expect(it) }
finish(18)
}

@Test
fun testConflated() = runTest {
val source = Observable.range(1, 5)
val list = source.asFlow().conflate().toList()
assertEquals(listOf(1, 5), list)
}

@Test
fun testLongRange() = runTest {
val source = Observable.range(1, 10_000)
val count = source.asFlow().count()
assertEquals(10_000, count)
}

@Test
fun testProduce() = runTest {
val source = Observable.range(0, 10)
val flow = source.asFlow()
check((0..9).toList(), flow.produceIn(this))
check((0..9).toList(), flow.buffer(Channel.UNLIMITED).produceIn(this))
check((0..9).toList(), flow.buffer(2).produceIn(this))
check((0..9).toList(), flow.buffer(0).produceIn(this))
check(listOf(0, 9), flow.conflate().produceIn(this))
}

private suspend fun check(expected: List<Int>, channel: ReceiveChannel<Int>) {
val result = ArrayList<Int>(10)
channel.consumeEach { result.add(it) }
assertEquals(expected, result)
}
}

0 comments on commit baad34f

Please sign in to comment.