Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DRAFT] Rx Observable as Flow #1768

Closed
Closed
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
4869665
Add Observable.asFlow experimental implementations
mareklangiewicz Jan 4, 2020
617d84f
Add some Observable.asFlow tests
mareklangiewicz Jan 4, 2020
1545d0a
Add more tests for Observable.asFlow
mareklangiewicz Jan 5, 2020
c4262ba
Merge branch 'langara-develop' into rx-observable-as-flow
mareklangiewicz Jan 17, 2020
07d1920
Add more tests for Observable.asFlow
mareklangiewicz Jan 19, 2020
38558d0
Cleanup temporary versions of Observable.asFlow
mareklangiewicz Jan 19, 2020
5f686dc
Add docs
mareklangiewicz Jan 19, 2020
568fe44
Update rx2 README with knit
mareklangiewicz Jan 19, 2020
ab6b2a4
Use more explicit name and explicit buffer size
mareklangiewicz Jan 22, 2020
3975ed4
Revert last change; use sendBlocking to never drop emitted items.
mareklangiewicz Jan 23, 2020
d520395
Add opt-in dropping behavior to ObservableSource.asFlow
mareklangiewicz Jan 23, 2020
f1c5aaf
Revert "Add opt-in dropping behavior to ObservableSource.asFlow"
mareklangiewicz Jan 26, 2020
cf000c0
WIP: this almost solves issue with late .onSubscribe
mareklangiewicz Jan 30, 2020
441b592
Add a few asFlow_... potential implementations to analyze
mareklangiewicz Feb 1, 2020
2ad146b
Merge branch 'develop' into rx-observable-as-flow
mareklangiewicz Feb 16, 2020
c4f3a15
Use DisposableHelper to implement asFlow without locking/waiting
mareklangiewicz Feb 16, 2020
c38d769
Add test for late onSubscribe call
mareklangiewicz Feb 18, 2020
aeda893
Use AtomicReference directly instead of rx internal DisposableHelper
mareklangiewicz Feb 20, 2020
463aae9
Merge branch 'develop' into rx-observable-as-flow
mareklangiewicz Feb 22, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 6 additions & 4 deletions reactive/kotlinx-coroutines-rx2/README.md
Expand Up @@ -14,10 +14,11 @@ Coroutine builders:

Integration with [Flow]:

| **Name** | **Result** | **Description**
| --------------- | -------------- | ---------------
| [Flow.asFlowable] | `Flowable` | Converts the given flow to a cold Flowable.
| [Flow.asObservable] | `Observable` | Converts the given flow to a cold Observable.
| **Name** | **Result** | **Description**
| --------------- | -------------- | ---------------
| [Flow.asFlowable] | `Flowable` | Converts the given flow to a cold Flowable.
| [Flow.asObservable] | `Observable` | Converts the given flow to a cold Observable.
| [ObservableSource.asFlow] | `Flow` | Converts the given cold ObservableSource to flow

Suspending extension functions and suspending iteration:

Expand Down Expand Up @@ -65,6 +66,7 @@ Conversion functions:
[rxFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/rx-flowable.html
[Flow.asFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/kotlinx.coroutines.flow.-flow/as-flowable.html
[Flow.asObservable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/kotlinx.coroutines.flow.-flow/as-observable.html
[ObservableSource.asFlow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-observable-source/as-flow.html
[io.reactivex.CompletableSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-completable-source/await.html
[io.reactivex.MaybeSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-maybe-source/await.html
[io.reactivex.MaybeSource.awaitOrDefault]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-maybe-source/await-or-default.html
Expand Down
26 changes: 26 additions & 0 deletions reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
Expand Up @@ -5,6 +5,7 @@
package kotlinx.coroutines.rx2

import io.reactivex.*
import io.reactivex.disposables.Disposable
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
Expand Down Expand Up @@ -77,6 +78,31 @@ public fun <T : Any> ReceiveChannel<T>.asObservable(context: CoroutineContext):
send(t)
}

/**
* Transforms given cold [ObservableSource] into cold [Flow].
*
* The resulting flow is _cold_, which means that [ObservableSource.subscribe] is called every time a terminal operator
* is applied to the resulting flow.
*
* A channel with the [default][Channel.BUFFERED] buffer size is used. Use the [buffer] operator on the
* resulting flow to specify a user-defined value and to control what happens when data is produced faster
* than consumed, i.e. to control the back-pressure behavior. Check [callbackFlow] for more details.
*/
@ExperimentalCoroutinesApi
public fun <T: Any> ObservableSource<T>.asFlow(): Flow<T> = callbackFlow {

var disposable: Disposable? = null

val observer = object : Observer<T> {
override fun onComplete() { close() }
override fun onSubscribe(d: Disposable) { disposable = d }
override fun onNext(t: T) { sendBlocking(t) }
override fun onError(e: Throwable) { close(e) }
}
subscribe(observer)
awaitClose { disposable?.dispose() }
elizarov marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Converts the given flow to a cold observable.
* The original flow is cancelled when the observable subscriber is disposed.
Expand Down
162 changes: 162 additions & 0 deletions reactive/kotlinx-coroutines-rx2/test/ObservableAsFlowTest.kt
@@ -0,0 +1,162 @@
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.rx2

import io.reactivex.Observable
import io.reactivex.subjects.PublishSubject
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
import kotlin.test.*

class ObservableAsFlowTest : TestBase() {
@Test
fun testCancellation() = runTest {
var onNext = 0
var onCancelled = 0
var onError = 0

val source = rxObservable(currentDispatcher()) {
coroutineContext[Job]?.invokeOnCompletion {
if (it is CancellationException) ++onCancelled
}

repeat(100) {
send(it)
}
}

source.asFlow().launchIn(CoroutineScope(Dispatchers.Unconfined)) {
onEach {
++onNext
throw RuntimeException()
}
catch<Throwable> {
++onError
}
}.join()


assertEquals(1, onNext)
assertEquals(1, onError)
assertEquals(1, onCancelled)
}

@Test
fun testImmediateCollection() {
val source = PublishSubject.create<Int>()
val flow = source.asFlow()
GlobalScope.launch(Dispatchers.Unconfined) {
expect(1)
flow.collect { expect(it) }
expect(6)
}
expect(2)
source.onNext(3)
expect(4)
source.onNext(5)
source.onComplete()
finish(7)
}

@Test
fun testOnErrorCancellation() {
val source = PublishSubject.create<Int>()
val flow = source.asFlow()
val exception = RuntimeException()
GlobalScope.launch(Dispatchers.Unconfined) {
try {
expect(1)
flow.collect { expect(it) }
expectUnreached()
}
catch (e: Exception) {
assertSame(exception, e.cause)
expect(5)
}
expect(6)
}
expect(2)
source.onNext(3)
expect(4)
source.onError(exception)
finish(7)
}

@Test
fun testUnsubscribeOnCollectionException() {
val source = PublishSubject.create<Int>()
val flow = source.asFlow()
val exception = RuntimeException()
GlobalScope.launch(Dispatchers.Unconfined) {
try {
expect(1)
flow.collect {
expect(it)
if (it == 3) throw exception
}
expectUnreached()
}
catch (e: Exception) {
assertSame(exception, e.cause)
expect(4)
}
expect(5)
}
expect(2)
assertTrue(source.hasObservers())
source.onNext(3)
assertFalse(source.hasObservers())
finish(6)
}

@Test
fun testBufferUnlimited() = runTest {
val source = rxObservable(currentDispatcher()) {
expect(1); send(10)
expect(2); send(11)
expect(3); send(12)
expect(4); send(13)
expect(5); send(14)
expect(6); send(15)
expect(7); send(16)
expect(8); send(17)
expect(9)
}
source.asFlow().buffer(Channel.UNLIMITED).collect { expect(it) }
finish(18)
}

@Test
fun testConflated() = runTest {
val source = Observable.range(1, 5)
val list = source.asFlow().conflate().toList()
assertEquals(listOf(1, 5), list)
}

@Test
fun testLongRange() = runTest {
val source = Observable.range(1, 10_000)
val count = source.asFlow().count()
assertEquals(10_000, count)
}

@Test
fun testProduce() = runTest {
val source = Observable.range(0, 10)
val flow = source.asFlow()
check((0..9).toList(), flow.produceIn(this))
check((0..9).toList(), flow.buffer(Channel.UNLIMITED).produceIn(this))
check((0..9).toList(), flow.buffer(2).produceIn(this))
check((0..9).toList(), flow.buffer(0).produceIn(this))
check(listOf(0, 9), flow.conflate().produceIn(this))
}

private suspend fun check(expected: List<Int>, channel: ReceiveChannel<Int>) {
val result = ArrayList<Int>(10)
channel.consumeEach { result.add(it) }
assertEquals(expected, result)
}
}