From 4869665977eeab5b1e67b8364a70e2234ae5bd52 Mon Sep 17 00:00:00 2001 From: Marek Langiewicz Date: Sat, 4 Jan 2020 13:37:30 +0100 Subject: [PATCH 01/16] Add Observable.asFlow experimental implementations --- .../kotlinx-coroutines-rx2/src/RxConvert.kt | 44 +++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt index bd369cad55..82ccbf326c 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt @@ -5,6 +5,8 @@ package kotlinx.coroutines.rx2 import io.reactivex.* +import io.reactivex.BackpressureStrategy.DROP +import io.reactivex.disposables.Disposable import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import kotlinx.coroutines.flow.* @@ -77,6 +79,48 @@ public fun ReceiveChannel.asObservable(context: CoroutineContext): send(t) } +// TODO NOW: write tests for all three versions (same as they write for other converters)!!! + +// This version is worse than asFlow2 in every way, so I'll keep it here only for a while to experiment +// (compare) how it reacts in unit tests I'm going to write for all asFlowX implementations. +public fun Observable.asFlow1(): Flow = flow { + materialize().collect { + when { + it.isOnError -> throw it.error!! + it.isOnNext -> emit(it.value!!) + // it.isOnComplete -> Unit // unnecessary - the collect ends on complete anyway + } + } +} + +public fun ObservableSource.asFlow2(): Flow = flow { + collect { // TODO: test if it actually buffer source items when emit is slow + emit(it) + // FIXME: for sure this can emit on wrong "dispatcher" UPDATE: No! it should collect in right context + // TODO: test these issues (if it is guaranteed to collect/emit in right execution context) + } +} // TODO: show in tests that this version does not support Flow operator fusion, so asFlow3 is better + +public fun ObservableSource.asFlow3(): Flow = callbackFlow { + + var disposable: Disposable? = null + + val observer = object : Observer { + override fun onComplete() { close() } + override fun onSubscribe(d: Disposable) { disposable = d } + override fun onNext(t: T) { offer(t) } + override fun onError(e: Throwable) { close(e) } + } + subscribe(observer) + awaitClose { disposable?.dispose() } +} // TODO: test it also with Flow operator fusion (changing the channel buffer to CONFLATED and UNLIMITED) + +// NOTE: for other no-backpressure stream types (Maybe, Single, Completable) we can just use .toObservable(): +public fun Maybe.asFlow3() = toObservable().asFlow3() + +//TODO: run this implementation through the same unit tests (and benchmarks?) for comparison +public fun Observable.asFlow4(strategy: BackpressureStrategy = DROP) = toFlowable(strategy).asFlow() + /** * Converts the given flow to a cold observable. * The original flow is cancelled when the observable subscriber is disposed. From 617d84fb212655a515af0a9f2584e06d85ee43c0 Mon Sep 17 00:00:00 2001 From: Marek Langiewicz Date: Sat, 4 Jan 2020 20:54:02 +0100 Subject: [PATCH 02/16] Add some Observable.asFlow tests --- .../test/ObservableAsFlow3Test.kt | 149 ++++++++++++++++++ 1 file changed, 149 insertions(+) create mode 100644 reactive/kotlinx-coroutines-rx2/test/ObservableAsFlow3Test.kt diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableAsFlow3Test.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableAsFlow3Test.kt new file mode 100644 index 0000000000..762c560192 --- /dev/null +++ b/reactive/kotlinx-coroutines-rx2/test/ObservableAsFlow3Test.kt @@ -0,0 +1,149 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.rx2 + +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* +import kotlinx.coroutines.flow.* +import kotlin.test.* + +class ObservableAsFlow3Test : TestBase() { + @Test + fun testCancellation() = runTest { + var onNext = 0 + var onCancelled = 0 + var onError = 0 + + val observable = rxObservable(currentDispatcher()) { + coroutineContext[Job]?.invokeOnCompletion { + if (it is CancellationException) ++onCancelled + } + + repeat(100) { + send(it) + } + } + + observable.asFlow3().launchIn(CoroutineScope(Dispatchers.Unconfined)) { + onEach { + ++onNext + throw RuntimeException() + } + catch { + ++onError + } + }.join() + + + assertEquals(1, onNext) + assertEquals(1, onError) + assertEquals(1, onCancelled) + } + + @Test + fun testBufferUnlimited() = runTest { + val observable = rxObservable(currentDispatcher()) { + expect(1); send(10) + expect(2); send(11) + expect(3); send(12) + expect(4); send(13) + expect(5); send(14) + expect(6); send(15) + expect(7); send(16) + expect(8); send(17) + expect(9) + } + + observable + .asFlow3() + .buffer(Channel.UNLIMITED) + .collect { expect(it) } + + finish(18) + } +// +// @Test +// fun testBufferSize10() = runTest { +// val publisher = publish(currentDispatcher()) { +// expect(1) +// send(5) +// +// expect(2) +// send(6) +// +// expect(3) +// send(7) +// expect(4) +// } +// +// publisher.asFlow().buffer(10).collect { +// expect(it) +// } +// +// finish(8) +// } +// +// @Test +// fun testConflated() = runTest { +// val publisher = publish(currentDispatcher()) { +// for (i in 1..5) send(i) +// } +// val list = publisher.asFlow().conflate().toList() +// assertEquals(listOf(1, 5), list) +// } +// +// @Test +// fun testProduce() = runTest { +// val flow = publish(currentDispatcher()) { repeat(10) { send(it) } }.asFlow() +// check((0..9).toList(), flow.produceIn(this)) +// check((0..9).toList(), flow.buffer(2).produceIn(this)) +// check((0..9).toList(), flow.buffer(Channel.UNLIMITED).produceIn(this)) +// check(listOf(0, 9), flow.conflate().produceIn(this)) +// } +// +// private suspend fun check(expected: List, channel: ReceiveChannel) { +// val result = ArrayList(10) +// channel.consumeEach { result.add(it) } +// assertEquals(expected, result) +// } +// +// @Test +// fun testProduceCancellation() = runTest { +// expect(1) +// // publisher is an async coroutine, so it overproduces to the channel, but still gets cancelled +// val flow = publish(currentDispatcher()) { +// expect(3) +// repeat(10) { value -> +// when (value) { +// in 0..6 -> send(value) +// 7 -> try { +// send(value) +// } catch (e: CancellationException) { +// finish(6) +// throw e +// } +// else -> expectUnreached() +// } +// } +// }.asFlow() +// assertFailsWith { +// coroutineScope { +// expect(2) +// val channel = flow.produceIn(this) +// channel.consumeEach { value -> +// when (value) { +// in 0..4 -> {} +// 5 -> { +// expect(4) +// throw TestException() +// } +// else -> expectUnreached() +// } +// } +// } +// } +// expect(5) +// } +} \ No newline at end of file From 1545d0a8b2fdb8750e5d7974de578d8a8cea6897 Mon Sep 17 00:00:00 2001 From: Marek Langiewicz Date: Sun, 5 Jan 2020 11:51:57 +0100 Subject: [PATCH 03/16] Add more tests for Observable.asFlow --- .../test/ObservableAsFlow3Test.kt | 69 +++++++++++++++++++ 1 file changed, 69 insertions(+) diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableAsFlow3Test.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableAsFlow3Test.kt index 762c560192..e8b8088784 100644 --- a/reactive/kotlinx-coroutines-rx2/test/ObservableAsFlow3Test.kt +++ b/reactive/kotlinx-coroutines-rx2/test/ObservableAsFlow3Test.kt @@ -4,6 +4,7 @@ package kotlinx.coroutines.rx2 +import io.reactivex.subjects.PublishSubject import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import kotlinx.coroutines.flow.* @@ -42,6 +43,74 @@ class ObservableAsFlow3Test : TestBase() { assertEquals(1, onCancelled) } + @Test + fun testImmediateCollection() { + val source = PublishSubject.create() + val flow = source.asFlow3() + GlobalScope.launch(Dispatchers.Unconfined) { + expect(1) + flow.collect { expect(it) } + expect(6) + } + expect(2) + source.onNext(3) + expect(4) + source.onNext(5) + source.onComplete() + finish(7) + } + + @Test + fun testOnErrorCancellation() { + val source = PublishSubject.create() + val flow = source.asFlow3() + val exception = java.lang.RuntimeException("Some exception") + GlobalScope.launch(Dispatchers.Unconfined) { + try { + expect(1) + flow.collect { expect(it) } + expectUnreached() + } + catch (e: Exception) { + assertSame(exception, e.cause) + expect(5) + } + expect(6) + } + expect(2) + source.onNext(3) + expect(4) + source.onError(exception) + finish(7) + } + + @Test + fun testUnsubscribeOnCollectionException() { + val source = PublishSubject.create() + val flow = source.asFlow3() + val exception = java.lang.RuntimeException("Some exception") + GlobalScope.launch(Dispatchers.Unconfined) { + try { + expect(1) + flow.collect { + expect(it) + if (it == 3) throw exception + } + expectUnreached() + } + catch (e: Exception) { + assertSame(exception, e.cause) + expect(4) + } + expect(5) + } + expect(2) + assertTrue(source.hasObservers()) + source.onNext(3) + assertFalse(source.hasObservers()) + finish(6) + } + @Test fun testBufferUnlimited() = runTest { val observable = rxObservable(currentDispatcher()) { From 07d19209ce4d08eccd8b159d31fa0f1151a7b7c0 Mon Sep 17 00:00:00 2001 From: Marek Langiewicz Date: Sun, 19 Jan 2020 16:41:46 +0100 Subject: [PATCH 04/16] Add more tests for Observable.asFlow --- .../test/ObservableAsFlow3Test.kt | 123 +++++------------- 1 file changed, 30 insertions(+), 93 deletions(-) diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableAsFlow3Test.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableAsFlow3Test.kt index e8b8088784..2b37568496 100644 --- a/reactive/kotlinx-coroutines-rx2/test/ObservableAsFlow3Test.kt +++ b/reactive/kotlinx-coroutines-rx2/test/ObservableAsFlow3Test.kt @@ -4,6 +4,7 @@ package kotlinx.coroutines.rx2 +import io.reactivex.Observable import io.reactivex.subjects.PublishSubject import kotlinx.coroutines.* import kotlinx.coroutines.channels.* @@ -17,7 +18,7 @@ class ObservableAsFlow3Test : TestBase() { var onCancelled = 0 var onError = 0 - val observable = rxObservable(currentDispatcher()) { + val source = rxObservable(currentDispatcher()) { coroutineContext[Job]?.invokeOnCompletion { if (it is CancellationException) ++onCancelled } @@ -27,7 +28,7 @@ class ObservableAsFlow3Test : TestBase() { } } - observable.asFlow3().launchIn(CoroutineScope(Dispatchers.Unconfined)) { + source.asFlow3().launchIn(CoroutineScope(Dispatchers.Unconfined)) { onEach { ++onNext throw RuntimeException() @@ -64,7 +65,7 @@ class ObservableAsFlow3Test : TestBase() { fun testOnErrorCancellation() { val source = PublishSubject.create() val flow = source.asFlow3() - val exception = java.lang.RuntimeException("Some exception") + val exception = RuntimeException() GlobalScope.launch(Dispatchers.Unconfined) { try { expect(1) @@ -88,7 +89,7 @@ class ObservableAsFlow3Test : TestBase() { fun testUnsubscribeOnCollectionException() { val source = PublishSubject.create() val flow = source.asFlow3() - val exception = java.lang.RuntimeException("Some exception") + val exception = RuntimeException() GlobalScope.launch(Dispatchers.Unconfined) { try { expect(1) @@ -113,7 +114,7 @@ class ObservableAsFlow3Test : TestBase() { @Test fun testBufferUnlimited() = runTest { - val observable = rxObservable(currentDispatcher()) { + val source = rxObservable(currentDispatcher()) { expect(1); send(10) expect(2); send(11) expect(3); send(12) @@ -124,95 +125,31 @@ class ObservableAsFlow3Test : TestBase() { expect(8); send(17) expect(9) } + source.asFlow3().buffer(Channel.UNLIMITED).collect { expect(it) } + finish(18) + } - observable - .asFlow3() - .buffer(Channel.UNLIMITED) - .collect { expect(it) } + @Test + fun testConflated() = runTest { + val source = Observable.range(1, 5) + val list = source.asFlow3().conflate().toList() + assertEquals(listOf(1, 5), list) + } - finish(18) + @Test + fun testProduce() = runTest { + val source = Observable.range(0, 10) + val flow = source.asFlow3() + check((0..9).toList(), flow.produceIn(this)) + check((0..9).toList(), flow.buffer(Channel.UNLIMITED).produceIn(this)) + check((0..2).toList(), flow.buffer(2).produceIn(this)) + // whole source is "offered" immediately (no back-pressure), so 3..9 is dropped + check(listOf(0, 9), flow.conflate().produceIn(this)) + } + + private suspend fun check(expected: List, channel: ReceiveChannel) { + val result = ArrayList(10) + channel.consumeEach { result.add(it) } + assertEquals(expected, result) } -// -// @Test -// fun testBufferSize10() = runTest { -// val publisher = publish(currentDispatcher()) { -// expect(1) -// send(5) -// -// expect(2) -// send(6) -// -// expect(3) -// send(7) -// expect(4) -// } -// -// publisher.asFlow().buffer(10).collect { -// expect(it) -// } -// -// finish(8) -// } -// -// @Test -// fun testConflated() = runTest { -// val publisher = publish(currentDispatcher()) { -// for (i in 1..5) send(i) -// } -// val list = publisher.asFlow().conflate().toList() -// assertEquals(listOf(1, 5), list) -// } -// -// @Test -// fun testProduce() = runTest { -// val flow = publish(currentDispatcher()) { repeat(10) { send(it) } }.asFlow() -// check((0..9).toList(), flow.produceIn(this)) -// check((0..9).toList(), flow.buffer(2).produceIn(this)) -// check((0..9).toList(), flow.buffer(Channel.UNLIMITED).produceIn(this)) -// check(listOf(0, 9), flow.conflate().produceIn(this)) -// } -// -// private suspend fun check(expected: List, channel: ReceiveChannel) { -// val result = ArrayList(10) -// channel.consumeEach { result.add(it) } -// assertEquals(expected, result) -// } -// -// @Test -// fun testProduceCancellation() = runTest { -// expect(1) -// // publisher is an async coroutine, so it overproduces to the channel, but still gets cancelled -// val flow = publish(currentDispatcher()) { -// expect(3) -// repeat(10) { value -> -// when (value) { -// in 0..6 -> send(value) -// 7 -> try { -// send(value) -// } catch (e: CancellationException) { -// finish(6) -// throw e -// } -// else -> expectUnreached() -// } -// } -// }.asFlow() -// assertFailsWith { -// coroutineScope { -// expect(2) -// val channel = flow.produceIn(this) -// channel.consumeEach { value -> -// when (value) { -// in 0..4 -> {} -// 5 -> { -// expect(4) -// throw TestException() -// } -// else -> expectUnreached() -// } -// } -// } -// } -// expect(5) -// } } \ No newline at end of file From 38558d00288aae9c59fb3b85f2f9c9aa32b939bc Mon Sep 17 00:00:00 2001 From: Marek Langiewicz Date: Sun, 19 Jan 2020 17:00:02 +0100 Subject: [PATCH 05/16] Cleanup temporary versions of Observable.asFlow --- .../kotlinx-coroutines-rx2/src/RxConvert.kt | 33 ++----------------- ...AsFlow3Test.kt => ObservableAsFlowTest.kt} | 16 ++++----- 2 files changed, 10 insertions(+), 39 deletions(-) rename reactive/kotlinx-coroutines-rx2/test/{ObservableAsFlow3Test.kt => ObservableAsFlowTest.kt} (90%) diff --git a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt index 82ccbf326c..fab1a1e2bd 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt @@ -5,7 +5,6 @@ package kotlinx.coroutines.rx2 import io.reactivex.* -import io.reactivex.BackpressureStrategy.DROP import io.reactivex.disposables.Disposable import kotlinx.coroutines.* import kotlinx.coroutines.channels.* @@ -79,29 +78,7 @@ public fun ReceiveChannel.asObservable(context: CoroutineContext): send(t) } -// TODO NOW: write tests for all three versions (same as they write for other converters)!!! - -// This version is worse than asFlow2 in every way, so I'll keep it here only for a while to experiment -// (compare) how it reacts in unit tests I'm going to write for all asFlowX implementations. -public fun Observable.asFlow1(): Flow = flow { - materialize().collect { - when { - it.isOnError -> throw it.error!! - it.isOnNext -> emit(it.value!!) - // it.isOnComplete -> Unit // unnecessary - the collect ends on complete anyway - } - } -} - -public fun ObservableSource.asFlow2(): Flow = flow { - collect { // TODO: test if it actually buffer source items when emit is slow - emit(it) - // FIXME: for sure this can emit on wrong "dispatcher" UPDATE: No! it should collect in right context - // TODO: test these issues (if it is guaranteed to collect/emit in right execution context) - } -} // TODO: show in tests that this version does not support Flow operator fusion, so asFlow3 is better - -public fun ObservableSource.asFlow3(): Flow = callbackFlow { +public fun ObservableSource.asFlow(): Flow = callbackFlow { var disposable: Disposable? = null @@ -113,13 +90,7 @@ public fun ObservableSource.asFlow3(): Flow = callbackFlow { } subscribe(observer) awaitClose { disposable?.dispose() } -} // TODO: test it also with Flow operator fusion (changing the channel buffer to CONFLATED and UNLIMITED) - -// NOTE: for other no-backpressure stream types (Maybe, Single, Completable) we can just use .toObservable(): -public fun Maybe.asFlow3() = toObservable().asFlow3() - -//TODO: run this implementation through the same unit tests (and benchmarks?) for comparison -public fun Observable.asFlow4(strategy: BackpressureStrategy = DROP) = toFlowable(strategy).asFlow() +} /** * Converts the given flow to a cold observable. diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableAsFlow3Test.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableAsFlowTest.kt similarity index 90% rename from reactive/kotlinx-coroutines-rx2/test/ObservableAsFlow3Test.kt rename to reactive/kotlinx-coroutines-rx2/test/ObservableAsFlowTest.kt index 2b37568496..2f3b67eb41 100644 --- a/reactive/kotlinx-coroutines-rx2/test/ObservableAsFlow3Test.kt +++ b/reactive/kotlinx-coroutines-rx2/test/ObservableAsFlowTest.kt @@ -11,7 +11,7 @@ import kotlinx.coroutines.channels.* import kotlinx.coroutines.flow.* import kotlin.test.* -class ObservableAsFlow3Test : TestBase() { +class ObservableAsFlowTest : TestBase() { @Test fun testCancellation() = runTest { var onNext = 0 @@ -28,7 +28,7 @@ class ObservableAsFlow3Test : TestBase() { } } - source.asFlow3().launchIn(CoroutineScope(Dispatchers.Unconfined)) { + source.asFlow().launchIn(CoroutineScope(Dispatchers.Unconfined)) { onEach { ++onNext throw RuntimeException() @@ -47,7 +47,7 @@ class ObservableAsFlow3Test : TestBase() { @Test fun testImmediateCollection() { val source = PublishSubject.create() - val flow = source.asFlow3() + val flow = source.asFlow() GlobalScope.launch(Dispatchers.Unconfined) { expect(1) flow.collect { expect(it) } @@ -64,7 +64,7 @@ class ObservableAsFlow3Test : TestBase() { @Test fun testOnErrorCancellation() { val source = PublishSubject.create() - val flow = source.asFlow3() + val flow = source.asFlow() val exception = RuntimeException() GlobalScope.launch(Dispatchers.Unconfined) { try { @@ -88,7 +88,7 @@ class ObservableAsFlow3Test : TestBase() { @Test fun testUnsubscribeOnCollectionException() { val source = PublishSubject.create() - val flow = source.asFlow3() + val flow = source.asFlow() val exception = RuntimeException() GlobalScope.launch(Dispatchers.Unconfined) { try { @@ -125,21 +125,21 @@ class ObservableAsFlow3Test : TestBase() { expect(8); send(17) expect(9) } - source.asFlow3().buffer(Channel.UNLIMITED).collect { expect(it) } + source.asFlow().buffer(Channel.UNLIMITED).collect { expect(it) } finish(18) } @Test fun testConflated() = runTest { val source = Observable.range(1, 5) - val list = source.asFlow3().conflate().toList() + val list = source.asFlow().conflate().toList() assertEquals(listOf(1, 5), list) } @Test fun testProduce() = runTest { val source = Observable.range(0, 10) - val flow = source.asFlow3() + val flow = source.asFlow() check((0..9).toList(), flow.produceIn(this)) check((0..9).toList(), flow.buffer(Channel.UNLIMITED).produceIn(this)) check((0..2).toList(), flow.buffer(2).produceIn(this)) From 5f686dcce1f47bcc35b09033e259c72255332e0b Mon Sep 17 00:00:00 2001 From: Marek Langiewicz Date: Sun, 19 Jan 2020 22:22:48 +0100 Subject: [PATCH 06/16] Add docs --- reactive/kotlinx-coroutines-rx2/README.md | 9 +++++---- reactive/kotlinx-coroutines-rx2/src/RxConvert.kt | 10 ++++++++++ 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/reactive/kotlinx-coroutines-rx2/README.md b/reactive/kotlinx-coroutines-rx2/README.md index b3874b05d3..4565e882ae 100644 --- a/reactive/kotlinx-coroutines-rx2/README.md +++ b/reactive/kotlinx-coroutines-rx2/README.md @@ -14,10 +14,11 @@ Coroutine builders: Integration with [Flow]: -| **Name** | **Result** | **Description** -| --------------- | -------------- | --------------- -| [Flow.asFlowable] | `Flowable` | Converts the given flow to a cold Flowable. -| [Flow.asObservable] | `Observable` | Converts the given flow to a cold Observable. +| **Name** | **Result** | **Description** +| --------------- | -------------- | --------------- +| [Flow.asFlowable] | `Flowable` | Converts the given flow to a cold Flowable. +| [Flow.asObservable] | `Observable` | Converts the given flow to a cold Observable. +| [ObservableSource.asFlow] | `Flow` | Converts the given cold ObservableSource to flow Suspending extension functions and suspending iteration: diff --git a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt index fab1a1e2bd..613bbc60b4 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt @@ -78,6 +78,16 @@ public fun ReceiveChannel.asObservable(context: CoroutineContext): send(t) } +/** + * Transforms given cold [ObservableSource] into cold [Flow]. + * + * The resulting flow is _cold_, which means that [ObservableSource.subscribe] is called every time a terminal operator + * is applied to the resulting flow. + * + * A channel with the [default][Channel.BUFFERED] buffer size is used. Use the [buffer] operator on the + * resulting flow to specify a user-defined value and to control what happens when data is produced faster + * than consumed, i.e. to control the back-pressure behavior. Check [callbackFlow] for more details. + */ public fun ObservableSource.asFlow(): Flow = callbackFlow { var disposable: Disposable? = null From 568fe445ce06bf5597a946628349c9e28a5f87bc Mon Sep 17 00:00:00 2001 From: Marek Langiewicz Date: Sun, 19 Jan 2020 22:37:57 +0100 Subject: [PATCH 07/16] Update rx2 README with knit --- reactive/kotlinx-coroutines-rx2/README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/reactive/kotlinx-coroutines-rx2/README.md b/reactive/kotlinx-coroutines-rx2/README.md index 4565e882ae..8d55a30b21 100644 --- a/reactive/kotlinx-coroutines-rx2/README.md +++ b/reactive/kotlinx-coroutines-rx2/README.md @@ -66,6 +66,7 @@ Conversion functions: [rxFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/rx-flowable.html [Flow.asFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/kotlinx.coroutines.flow.-flow/as-flowable.html [Flow.asObservable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/kotlinx.coroutines.flow.-flow/as-observable.html +[ObservableSource.asFlow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-observable-source/as-flow.html [io.reactivex.CompletableSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-completable-source/await.html [io.reactivex.MaybeSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-maybe-source/await.html [io.reactivex.MaybeSource.awaitOrDefault]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-maybe-source/await-or-default.html From ab6b2a4f9b0a59474642eca1977879c348950db9 Mon Sep 17 00:00:00 2001 From: Marek Langiewicz Date: Wed, 22 Jan 2020 23:17:51 +0100 Subject: [PATCH 08/16] Use more explicit name and explicit buffer size --- reactive/kotlinx-coroutines-rx2/README.md | 12 +++++----- .../kotlinx-coroutines-rx2/src/RxConvert.kt | 16 +++++++++---- ...Test.kt => ObservableAsChannelFlowTest.kt} | 23 +++++++++---------- 3 files changed, 28 insertions(+), 23 deletions(-) rename reactive/kotlinx-coroutines-rx2/test/{ObservableAsFlowTest.kt => ObservableAsChannelFlowTest.kt} (82%) diff --git a/reactive/kotlinx-coroutines-rx2/README.md b/reactive/kotlinx-coroutines-rx2/README.md index 8d55a30b21..88e6559dee 100644 --- a/reactive/kotlinx-coroutines-rx2/README.md +++ b/reactive/kotlinx-coroutines-rx2/README.md @@ -14,11 +14,11 @@ Coroutine builders: Integration with [Flow]: -| **Name** | **Result** | **Description** -| --------------- | -------------- | --------------- -| [Flow.asFlowable] | `Flowable` | Converts the given flow to a cold Flowable. -| [Flow.asObservable] | `Observable` | Converts the given flow to a cold Observable. -| [ObservableSource.asFlow] | `Flow` | Converts the given cold ObservableSource to flow +| **Name** | **Result** | **Description** +| --------------- | -------------- | --------------- +| [Flow.asFlowable] | `Flowable` | Converts the given flow to a cold Flowable. +| [Flow.asObservable] | `Observable` | Converts the given flow to a cold Observable. +| [ObservableSource.asChannelFlow] | `Flow` | Converts the given cold ObservableSource to flow Suspending extension functions and suspending iteration: @@ -66,7 +66,7 @@ Conversion functions: [rxFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/rx-flowable.html [Flow.asFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/kotlinx.coroutines.flow.-flow/as-flowable.html [Flow.asObservable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/kotlinx.coroutines.flow.-flow/as-observable.html -[ObservableSource.asFlow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-observable-source/as-flow.html +[ObservableSource.asChannelFlow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-observable-source/as-channel-flow.html [io.reactivex.CompletableSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-completable-source/await.html [io.reactivex.MaybeSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-maybe-source/await.html [io.reactivex.MaybeSource.awaitOrDefault]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-maybe-source/await-or-default.html diff --git a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt index 613bbc60b4..962537331a 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt @@ -84,11 +84,17 @@ public fun ReceiveChannel.asObservable(context: CoroutineContext): * The resulting flow is _cold_, which means that [ObservableSource.subscribe] is called every time a terminal operator * is applied to the resulting flow. * - * A channel with the [default][Channel.BUFFERED] buffer size is used. Use the [buffer] operator on the - * resulting flow to specify a user-defined value and to control what happens when data is produced faster - * than consumed, i.e. to control the back-pressure behavior. Check [callbackFlow] for more details. + * A channel with the [capacity] buffer size is used. Use it to control what happens when data is produced + * faster than consumed, i.e. to control the back-pressure behavior. Check [callbackFlow] for more details. + * + * @param capacity type/capacity of the buffer. Allowed values are the same as in `Channel(...)` + * factory function: [BUFFERED][Channel.BUFFERED], [CONFLATED][Channel.CONFLATED], + * [RENDEZVOUS][Channel.RENDEZVOUS], [UNLIMITED][Channel.UNLIMITED] or a non-negative value indicating + * an explicitly requested size. */ -public fun ObservableSource.asFlow(): Flow = callbackFlow { +@Suppress("NOTHING_TO_INLINE") +@ExperimentalCoroutinesApi +public inline fun ObservableSource.asChannelFlow(capacity: Int): Flow = channelFlow { var disposable: Disposable? = null @@ -100,7 +106,7 @@ public fun ObservableSource.asFlow(): Flow = callbackFlow { } subscribe(observer) awaitClose { disposable?.dispose() } -} +}.buffer(capacity) /** * Converts the given flow to a cold observable. diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableAsFlowTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableAsChannelFlowTest.kt similarity index 82% rename from reactive/kotlinx-coroutines-rx2/test/ObservableAsFlowTest.kt rename to reactive/kotlinx-coroutines-rx2/test/ObservableAsChannelFlowTest.kt index 2f3b67eb41..77e89856f4 100644 --- a/reactive/kotlinx-coroutines-rx2/test/ObservableAsFlowTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/ObservableAsChannelFlowTest.kt @@ -11,7 +11,7 @@ import kotlinx.coroutines.channels.* import kotlinx.coroutines.flow.* import kotlin.test.* -class ObservableAsFlowTest : TestBase() { +class ObservableAsChannelFlowTest : TestBase() { @Test fun testCancellation() = runTest { var onNext = 0 @@ -28,7 +28,7 @@ class ObservableAsFlowTest : TestBase() { } } - source.asFlow().launchIn(CoroutineScope(Dispatchers.Unconfined)) { + source.asChannelFlow(Channel.RENDEZVOUS).launchIn(CoroutineScope(Dispatchers.Unconfined)) { onEach { ++onNext throw RuntimeException() @@ -47,7 +47,7 @@ class ObservableAsFlowTest : TestBase() { @Test fun testImmediateCollection() { val source = PublishSubject.create() - val flow = source.asFlow() + val flow = source.asChannelFlow(Channel.RENDEZVOUS) GlobalScope.launch(Dispatchers.Unconfined) { expect(1) flow.collect { expect(it) } @@ -64,7 +64,7 @@ class ObservableAsFlowTest : TestBase() { @Test fun testOnErrorCancellation() { val source = PublishSubject.create() - val flow = source.asFlow() + val flow = source.asChannelFlow(Channel.RENDEZVOUS) val exception = RuntimeException() GlobalScope.launch(Dispatchers.Unconfined) { try { @@ -88,7 +88,7 @@ class ObservableAsFlowTest : TestBase() { @Test fun testUnsubscribeOnCollectionException() { val source = PublishSubject.create() - val flow = source.asFlow() + val flow = source.asChannelFlow(Channel.RENDEZVOUS) val exception = RuntimeException() GlobalScope.launch(Dispatchers.Unconfined) { try { @@ -125,26 +125,25 @@ class ObservableAsFlowTest : TestBase() { expect(8); send(17) expect(9) } - source.asFlow().buffer(Channel.UNLIMITED).collect { expect(it) } + source.asChannelFlow(Channel.UNLIMITED).collect { expect(it) } finish(18) } @Test fun testConflated() = runTest { val source = Observable.range(1, 5) - val list = source.asFlow().conflate().toList() + val list = source.asChannelFlow(Channel.CONFLATED).toList() assertEquals(listOf(1, 5), list) } @Test fun testProduce() = runTest { val source = Observable.range(0, 10) - val flow = source.asFlow() - check((0..9).toList(), flow.produceIn(this)) - check((0..9).toList(), flow.buffer(Channel.UNLIMITED).produceIn(this)) - check((0..2).toList(), flow.buffer(2).produceIn(this)) + check((0..9).toList(), source.asChannelFlow(Channel.UNLIMITED).produceIn(this)) + check((0..9).toList(), source.asChannelFlow(10).produceIn(this)) + check((0..2).toList(), source.asChannelFlow(2).produceIn(this)) // whole source is "offered" immediately (no back-pressure), so 3..9 is dropped - check(listOf(0, 9), flow.conflate().produceIn(this)) + check(listOf(0, 9), source.asChannelFlow(Channel.CONFLATED).produceIn(this)) } private suspend fun check(expected: List, channel: ReceiveChannel) { From 3975ed4bb9a7dce22095779ac6f8725ed1fc671d Mon Sep 17 00:00:00 2001 From: Marek Langiewicz Date: Thu, 23 Jan 2020 21:32:09 +0100 Subject: [PATCH 09/16] Revert last change; use sendBlocking to never drop emitted items. --- reactive/kotlinx-coroutines-rx2/README.md | 12 +++---- .../kotlinx-coroutines-rx2/src/RxConvert.kt | 16 ++++------ ...nelFlowTest.kt => ObservableAsFlowTest.kt} | 32 ++++++++++++------- 3 files changed, 32 insertions(+), 28 deletions(-) rename reactive/kotlinx-coroutines-rx2/test/{ObservableAsChannelFlowTest.kt => ObservableAsFlowTest.kt} (80%) diff --git a/reactive/kotlinx-coroutines-rx2/README.md b/reactive/kotlinx-coroutines-rx2/README.md index 88e6559dee..8d55a30b21 100644 --- a/reactive/kotlinx-coroutines-rx2/README.md +++ b/reactive/kotlinx-coroutines-rx2/README.md @@ -14,11 +14,11 @@ Coroutine builders: Integration with [Flow]: -| **Name** | **Result** | **Description** -| --------------- | -------------- | --------------- -| [Flow.asFlowable] | `Flowable` | Converts the given flow to a cold Flowable. -| [Flow.asObservable] | `Observable` | Converts the given flow to a cold Observable. -| [ObservableSource.asChannelFlow] | `Flow` | Converts the given cold ObservableSource to flow +| **Name** | **Result** | **Description** +| --------------- | -------------- | --------------- +| [Flow.asFlowable] | `Flowable` | Converts the given flow to a cold Flowable. +| [Flow.asObservable] | `Observable` | Converts the given flow to a cold Observable. +| [ObservableSource.asFlow] | `Flow` | Converts the given cold ObservableSource to flow Suspending extension functions and suspending iteration: @@ -66,7 +66,7 @@ Conversion functions: [rxFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/rx-flowable.html [Flow.asFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/kotlinx.coroutines.flow.-flow/as-flowable.html [Flow.asObservable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/kotlinx.coroutines.flow.-flow/as-observable.html -[ObservableSource.asChannelFlow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-observable-source/as-channel-flow.html +[ObservableSource.asFlow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-observable-source/as-flow.html [io.reactivex.CompletableSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-completable-source/await.html [io.reactivex.MaybeSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-maybe-source/await.html [io.reactivex.MaybeSource.awaitOrDefault]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-maybe-source/await-or-default.html diff --git a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt index 962537331a..6ebcd5ec2d 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt @@ -84,29 +84,25 @@ public fun ReceiveChannel.asObservable(context: CoroutineContext): * The resulting flow is _cold_, which means that [ObservableSource.subscribe] is called every time a terminal operator * is applied to the resulting flow. * - * A channel with the [capacity] buffer size is used. Use it to control what happens when data is produced - * faster than consumed, i.e. to control the back-pressure behavior. Check [callbackFlow] for more details. - * - * @param capacity type/capacity of the buffer. Allowed values are the same as in `Channel(...)` - * factory function: [BUFFERED][Channel.BUFFERED], [CONFLATED][Channel.CONFLATED], - * [RENDEZVOUS][Channel.RENDEZVOUS], [UNLIMITED][Channel.UNLIMITED] or a non-negative value indicating - * an explicitly requested size. + * A channel with the [default][Channel.BUFFERED] buffer size is used. Use the [buffer] operator on the + * resulting flow to specify a user-defined value and to control what happens when data is produced faster + * than consumed, i.e. to control the back-pressure behavior. Check [callbackFlow] for more details. */ @Suppress("NOTHING_TO_INLINE") @ExperimentalCoroutinesApi -public inline fun ObservableSource.asChannelFlow(capacity: Int): Flow = channelFlow { +public fun ObservableSource.asFlow(): Flow = callbackFlow { var disposable: Disposable? = null val observer = object : Observer { override fun onComplete() { close() } override fun onSubscribe(d: Disposable) { disposable = d } - override fun onNext(t: T) { offer(t) } + override fun onNext(t: T) { sendBlocking(t) } override fun onError(e: Throwable) { close(e) } } subscribe(observer) awaitClose { disposable?.dispose() } -}.buffer(capacity) +} /** * Converts the given flow to a cold observable. diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableAsChannelFlowTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableAsFlowTest.kt similarity index 80% rename from reactive/kotlinx-coroutines-rx2/test/ObservableAsChannelFlowTest.kt rename to reactive/kotlinx-coroutines-rx2/test/ObservableAsFlowTest.kt index 77e89856f4..5569d295f7 100644 --- a/reactive/kotlinx-coroutines-rx2/test/ObservableAsChannelFlowTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/ObservableAsFlowTest.kt @@ -11,7 +11,7 @@ import kotlinx.coroutines.channels.* import kotlinx.coroutines.flow.* import kotlin.test.* -class ObservableAsChannelFlowTest : TestBase() { +class ObservableAsFlowTest : TestBase() { @Test fun testCancellation() = runTest { var onNext = 0 @@ -28,7 +28,7 @@ class ObservableAsChannelFlowTest : TestBase() { } } - source.asChannelFlow(Channel.RENDEZVOUS).launchIn(CoroutineScope(Dispatchers.Unconfined)) { + source.asFlow().launchIn(CoroutineScope(Dispatchers.Unconfined)) { onEach { ++onNext throw RuntimeException() @@ -47,7 +47,7 @@ class ObservableAsChannelFlowTest : TestBase() { @Test fun testImmediateCollection() { val source = PublishSubject.create() - val flow = source.asChannelFlow(Channel.RENDEZVOUS) + val flow = source.asFlow() GlobalScope.launch(Dispatchers.Unconfined) { expect(1) flow.collect { expect(it) } @@ -64,7 +64,7 @@ class ObservableAsChannelFlowTest : TestBase() { @Test fun testOnErrorCancellation() { val source = PublishSubject.create() - val flow = source.asChannelFlow(Channel.RENDEZVOUS) + val flow = source.asFlow() val exception = RuntimeException() GlobalScope.launch(Dispatchers.Unconfined) { try { @@ -88,7 +88,7 @@ class ObservableAsChannelFlowTest : TestBase() { @Test fun testUnsubscribeOnCollectionException() { val source = PublishSubject.create() - val flow = source.asChannelFlow(Channel.RENDEZVOUS) + val flow = source.asFlow() val exception = RuntimeException() GlobalScope.launch(Dispatchers.Unconfined) { try { @@ -125,25 +125,33 @@ class ObservableAsChannelFlowTest : TestBase() { expect(8); send(17) expect(9) } - source.asChannelFlow(Channel.UNLIMITED).collect { expect(it) } + source.asFlow().buffer(Channel.UNLIMITED).collect { expect(it) } finish(18) } @Test fun testConflated() = runTest { val source = Observable.range(1, 5) - val list = source.asChannelFlow(Channel.CONFLATED).toList() + val list = source.asFlow().conflate().toList() assertEquals(listOf(1, 5), list) } + @Test + fun testLongRange() = runTest { + val source = Observable.range(1, 10_000) + val count = source.asFlow().count() + assertEquals(10_000, count) + } + @Test fun testProduce() = runTest { val source = Observable.range(0, 10) - check((0..9).toList(), source.asChannelFlow(Channel.UNLIMITED).produceIn(this)) - check((0..9).toList(), source.asChannelFlow(10).produceIn(this)) - check((0..2).toList(), source.asChannelFlow(2).produceIn(this)) - // whole source is "offered" immediately (no back-pressure), so 3..9 is dropped - check(listOf(0, 9), source.asChannelFlow(Channel.CONFLATED).produceIn(this)) + val flow = source.asFlow() + check((0..9).toList(), flow.produceIn(this)) + check((0..9).toList(), flow.buffer(Channel.UNLIMITED).produceIn(this)) + check((0..9).toList(), flow.buffer(2).produceIn(this)) + check((0..9).toList(), flow.buffer(0).produceIn(this)) + check(listOf(0, 9), flow.conflate().produceIn(this)) } private suspend fun check(expected: List, channel: ReceiveChannel) { From d520395b7b8f6fcd61d3fa06128c446008427cdd Mon Sep 17 00:00:00 2001 From: Marek Langiewicz Date: Thu, 23 Jan 2020 22:32:15 +0100 Subject: [PATCH 10/16] Add opt-in dropping behavior to ObservableSource.asFlow --- .../kotlinx-coroutines-rx2/src/RxConvert.kt | 8 +++++--- .../test/ObservableAsFlowTest.kt | 18 ++++++++++++++++++ 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt index 6ebcd5ec2d..f85bb4b2cf 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt @@ -87,17 +87,19 @@ public fun ReceiveChannel.asObservable(context: CoroutineContext): * A channel with the [default][Channel.BUFFERED] buffer size is used. Use the [buffer] operator on the * resulting flow to specify a user-defined value and to control what happens when data is produced faster * than consumed, i.e. to control the back-pressure behavior. Check [callbackFlow] for more details. + * + * @param dropWhenFull Set to true to drop items when buffer is full instead of blocking the [ObservableSource]. */ -@Suppress("NOTHING_TO_INLINE") +@Suppress("BlockingMethodInNonBlockingContext") @ExperimentalCoroutinesApi -public fun ObservableSource.asFlow(): Flow = callbackFlow { +public fun ObservableSource.asFlow(dropWhenFull: Boolean = false): Flow = callbackFlow { var disposable: Disposable? = null val observer = object : Observer { override fun onComplete() { close() } override fun onSubscribe(d: Disposable) { disposable = d } - override fun onNext(t: T) { sendBlocking(t) } + override fun onNext(t: T) { if(!offer(t) && !dropWhenFull) runBlocking { send(t) } } override fun onError(e: Throwable) { close(e) } } subscribe(observer) diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableAsFlowTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableAsFlowTest.kt index 5569d295f7..c98156e638 100644 --- a/reactive/kotlinx-coroutines-rx2/test/ObservableAsFlowTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/ObservableAsFlowTest.kt @@ -143,6 +143,13 @@ class ObservableAsFlowTest : TestBase() { assertEquals(10_000, count) } + @Test + fun testLongRangeDroppingWhenFull() = runTest { + val source = Observable.range(1, 10_000) + val count = source.asFlow(dropWhenFull = true).buffer(10).count() + assertEquals(11, count) + } + @Test fun testProduce() = runTest { val source = Observable.range(0, 10) @@ -154,6 +161,17 @@ class ObservableAsFlowTest : TestBase() { check(listOf(0, 9), flow.conflate().produceIn(this)) } + @Test + fun testProduceDroppingWhenFull() = runTest { + val source = Observable.range(0, 10) + val flow = source.asFlow(dropWhenFull = true) + check((0..9).toList(), flow.produceIn(this)) + check((0..9).toList(), flow.buffer(Channel.UNLIMITED).produceIn(this)) + check((0..2).toList(), flow.buffer(2).produceIn(this)) + check(listOf(0), flow.buffer(0).produceIn(this)) + check(listOf(0, 9), flow.conflate().produceIn(this)) + } + private suspend fun check(expected: List, channel: ReceiveChannel) { val result = ArrayList(10) channel.consumeEach { result.add(it) } From f1c5aafc60714e1ae4bdf6f091721636dd9822e0 Mon Sep 17 00:00:00 2001 From: Marek Langiewicz Date: Sun, 26 Jan 2020 07:35:21 +0100 Subject: [PATCH 11/16] Revert "Add opt-in dropping behavior to ObservableSource.asFlow" This reverts commit d520395b --- .../kotlinx-coroutines-rx2/src/RxConvert.kt | 7 ++----- .../test/ObservableAsFlowTest.kt | 18 ------------------ 2 files changed, 2 insertions(+), 23 deletions(-) diff --git a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt index f85bb4b2cf..09f57edc4d 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt @@ -87,19 +87,16 @@ public fun ReceiveChannel.asObservable(context: CoroutineContext): * A channel with the [default][Channel.BUFFERED] buffer size is used. Use the [buffer] operator on the * resulting flow to specify a user-defined value and to control what happens when data is produced faster * than consumed, i.e. to control the back-pressure behavior. Check [callbackFlow] for more details. - * - * @param dropWhenFull Set to true to drop items when buffer is full instead of blocking the [ObservableSource]. */ -@Suppress("BlockingMethodInNonBlockingContext") @ExperimentalCoroutinesApi -public fun ObservableSource.asFlow(dropWhenFull: Boolean = false): Flow = callbackFlow { +public fun ObservableSource.asFlow(): Flow = callbackFlow { var disposable: Disposable? = null val observer = object : Observer { override fun onComplete() { close() } override fun onSubscribe(d: Disposable) { disposable = d } - override fun onNext(t: T) { if(!offer(t) && !dropWhenFull) runBlocking { send(t) } } + override fun onNext(t: T) { sendBlocking(t) } override fun onError(e: Throwable) { close(e) } } subscribe(observer) diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableAsFlowTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableAsFlowTest.kt index c98156e638..5569d295f7 100644 --- a/reactive/kotlinx-coroutines-rx2/test/ObservableAsFlowTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/ObservableAsFlowTest.kt @@ -143,13 +143,6 @@ class ObservableAsFlowTest : TestBase() { assertEquals(10_000, count) } - @Test - fun testLongRangeDroppingWhenFull() = runTest { - val source = Observable.range(1, 10_000) - val count = source.asFlow(dropWhenFull = true).buffer(10).count() - assertEquals(11, count) - } - @Test fun testProduce() = runTest { val source = Observable.range(0, 10) @@ -161,17 +154,6 @@ class ObservableAsFlowTest : TestBase() { check(listOf(0, 9), flow.conflate().produceIn(this)) } - @Test - fun testProduceDroppingWhenFull() = runTest { - val source = Observable.range(0, 10) - val flow = source.asFlow(dropWhenFull = true) - check((0..9).toList(), flow.produceIn(this)) - check((0..9).toList(), flow.buffer(Channel.UNLIMITED).produceIn(this)) - check((0..2).toList(), flow.buffer(2).produceIn(this)) - check(listOf(0), flow.buffer(0).produceIn(this)) - check(listOf(0, 9), flow.conflate().produceIn(this)) - } - private suspend fun check(expected: List, channel: ReceiveChannel) { val result = ArrayList(10) channel.consumeEach { result.add(it) } From cf000c0758d19120b8a84d6f8d55138c3a5bee13 Mon Sep 17 00:00:00 2001 From: Marek Langiewicz Date: Thu, 30 Jan 2020 01:37:30 +0100 Subject: [PATCH 12/16] WIP: this almost solves issue with late .onSubscribe atomicfu on disposable is probably needed to make it 100% correct --- reactive/kotlinx-coroutines-rx2/src/RxConvert.kt | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt index 09f57edc4d..37af442e46 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt @@ -6,10 +6,13 @@ package kotlinx.coroutines.rx2 import io.reactivex.* import io.reactivex.disposables.Disposable +import io.reactivex.disposables.Disposables +import kotlinx.atomicfu.atomic import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import kotlinx.coroutines.flow.* import kotlinx.coroutines.reactive.* +import kotlinx.coroutines.sync.Mutex import kotlin.coroutines.* /** @@ -91,16 +94,19 @@ public fun ReceiveChannel.asObservable(context: CoroutineContext): @ExperimentalCoroutinesApi public fun ObservableSource.asFlow(): Flow = callbackFlow { - var disposable: Disposable? = null + var disposable = Disposables.empty() val observer = object : Observer { override fun onComplete() { close() } - override fun onSubscribe(d: Disposable) { disposable = d } + override fun onSubscribe(d: Disposable) = if (disposable.isDisposed) d.dispose() else disposable = d override fun onNext(t: T) { sendBlocking(t) } override fun onError(e: Throwable) { close(e) } } + subscribe(observer) - awaitClose { disposable?.dispose() } + awaitClose { disposable.dispose() } + // in case when the source did not actually subscribe yet at all (observer.onSubscribe was not called): + // then this .dispose() just "marks" that we want to unsubscribe synchronously when we get .onSubscribe } /** From 441b59284b5f87dc8f9661de9c3938332e6cd648 Mon Sep 17 00:00:00 2001 From: Marek Langiewicz Date: Sat, 1 Feb 2020 12:54:23 +0100 Subject: [PATCH 13/16] Add a few asFlow_... potential implementations to analyze --- .../kotlinx-coroutines-rx2/src/RxConvert.kt | 89 ++++++++++++++++++- 1 file changed, 85 insertions(+), 4 deletions(-) diff --git a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt index 37af442e46..b05d08234c 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt @@ -7,7 +7,8 @@ package kotlinx.coroutines.rx2 import io.reactivex.* import io.reactivex.disposables.Disposable import io.reactivex.disposables.Disposables -import kotlinx.atomicfu.atomic +import kotlinx.atomicfu.locks.reentrantLock +import kotlinx.atomicfu.locks.withLock import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import kotlinx.coroutines.flow.* @@ -92,23 +93,103 @@ public fun ReceiveChannel.asObservable(context: CoroutineContext): * than consumed, i.e. to control the back-pressure behavior. Check [callbackFlow] for more details. */ @ExperimentalCoroutinesApi -public fun ObservableSource.asFlow(): Flow = callbackFlow { +public fun ObservableSource.asFlow(): Flow = asFlow_ReentrantLock() + + +// This version doesn't even compile - I don't know much about atomicfu yet. +/* +@ExperimentalCoroutinesApi +public fun ObservableSource.asFlow_AtomicFu(): Flow = callbackFlow { + + val disposable = atomic(Disposables.empty()) + + val observer = object : Observer { + override fun onComplete() { close() } + override fun onSubscribe(d: Disposable) = + if (disposable.value.isDisposed) d.dispose() else disposable.value = d + // we should probably do some compareAndSet above instead + override fun onNext(t: T) { sendBlocking(t) } + override fun onError(e: Throwable) { close(e) } + } + + subscribe(observer) + awaitClose { disposable.value.dispose() } + // in case when the source did not actually subscribe yet at all (observer.onSubscribe was not called): + // then this .dispose() just "marks" that we want to unsubscribe synchronously when we get .onSubscribe +} +*/ + +@ExperimentalCoroutinesApi +public fun ObservableSource.asFlow_ReentrantLock(): Flow = callbackFlow { var disposable = Disposables.empty() + val lock = reentrantLock() val observer = object : Observer { override fun onComplete() { close() } - override fun onSubscribe(d: Disposable) = if (disposable.isDisposed) d.dispose() else disposable = d + override fun onSubscribe(d: Disposable) = + lock.withLock { if (disposable.isDisposed) d.dispose() else disposable = d } override fun onNext(t: T) { sendBlocking(t) } override fun onError(e: Throwable) { close(e) } } subscribe(observer) - awaitClose { disposable.dispose() } + awaitClose { lock.withLock { disposable.dispose() } } // in case when the source did not actually subscribe yet at all (observer.onSubscribe was not called): // then this .dispose() just "marks" that we want to unsubscribe synchronously when we get .onSubscribe } +// This version tries to leverage Job.isActive and generally Job and Channel happens-before guaranties. +// (but I think it's incorrect) +@ExperimentalCoroutinesApi +public fun ObservableSource.asFlow_JobIsActive(): Flow = callbackFlow { + + var disposable: Disposable? = null + + val observer = object : Observer { + override fun onComplete() { close() } + override fun onSubscribe(d: Disposable) = if (isActive) disposable = d else d.dispose() + // Is this version correct due to Job and Channel being thread-safe?? (and rx signaling being "serial") + override fun onNext(t: T) { sendBlocking(t) } + override fun onError(e: Throwable) { close(e) } + } + subscribe(observer) + awaitClose { disposable?.dispose() } +} + +@ExperimentalCoroutinesApi +public fun ObservableSource.asFlow_Mutex(): Flow = callbackFlow { + + lateinit var disposable: Disposable + val mutex = Mutex(locked = true) + + val observer = object : Observer { + override fun onComplete() { close() } + override fun onSubscribe(d: Disposable) { disposable = d; mutex.unlock() } + override fun onNext(t: T) { sendBlocking(t) } + override fun onError(e: Throwable) { close(e) } + } + subscribe(observer) + if (mutex.isLocked) withContext(NonCancellable) { mutex.lock() } + awaitClose { disposable.dispose() } +} + +@ExperimentalCoroutinesApi +public fun ObservableSource.asFlow_Deferred(): Flow = callbackFlow { + + val deferred = CompletableDeferred() + + val observer = object : Observer { + override fun onComplete() { close() } + override fun onSubscribe(d: Disposable) { deferred.complete(d) } + override fun onNext(t: T) { sendBlocking(t) } + override fun onError(e: Throwable) { close(e) } + } + subscribe(observer) + val disposable = withContext(NonCancellable) { deferred.await() } + awaitClose { disposable.dispose() } +} + /** * Converts the given flow to a cold observable. * The original flow is cancelled when the observable subscriber is disposed. From c4f3a15456d2f78be882003d8d7bc9625a39eca5 Mon Sep 17 00:00:00 2001 From: Marek Langiewicz Date: Sun, 16 Feb 2020 14:33:55 +0100 Subject: [PATCH 14/16] Use DisposableHelper to implement asFlow without locking/waiting --- .../kotlinx-coroutines-rx2/src/RxConvert.kt | 96 ++----------------- 1 file changed, 6 insertions(+), 90 deletions(-) diff --git a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt index b05d08234c..fe17d79202 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt @@ -6,14 +6,12 @@ package kotlinx.coroutines.rx2 import io.reactivex.* import io.reactivex.disposables.Disposable -import io.reactivex.disposables.Disposables -import kotlinx.atomicfu.locks.reentrantLock -import kotlinx.atomicfu.locks.withLock +import io.reactivex.internal.disposables.DisposableHelper import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import kotlinx.coroutines.flow.* import kotlinx.coroutines.reactive.* -import kotlinx.coroutines.sync.Mutex +import java.util.concurrent.atomic.AtomicReference import kotlin.coroutines.* /** @@ -93,101 +91,19 @@ public fun ReceiveChannel.asObservable(context: CoroutineContext): * than consumed, i.e. to control the back-pressure behavior. Check [callbackFlow] for more details. */ @ExperimentalCoroutinesApi -public fun ObservableSource.asFlow(): Flow = asFlow_ReentrantLock() +public fun ObservableSource.asFlow(): Flow = callbackFlow { - -// This version doesn't even compile - I don't know much about atomicfu yet. -/* -@ExperimentalCoroutinesApi -public fun ObservableSource.asFlow_AtomicFu(): Flow = callbackFlow { - - val disposable = atomic(Disposables.empty()) + val disposableRef = AtomicReference(null) val observer = object : Observer { override fun onComplete() { close() } - override fun onSubscribe(d: Disposable) = - if (disposable.value.isDisposed) d.dispose() else disposable.value = d - // we should probably do some compareAndSet above instead + override fun onSubscribe(d: Disposable) { DisposableHelper.setOnce(disposableRef, d) } override fun onNext(t: T) { sendBlocking(t) } override fun onError(e: Throwable) { close(e) } } subscribe(observer) - awaitClose { disposable.value.dispose() } - // in case when the source did not actually subscribe yet at all (observer.onSubscribe was not called): - // then this .dispose() just "marks" that we want to unsubscribe synchronously when we get .onSubscribe -} -*/ - -@ExperimentalCoroutinesApi -public fun ObservableSource.asFlow_ReentrantLock(): Flow = callbackFlow { - - var disposable = Disposables.empty() - val lock = reentrantLock() - - val observer = object : Observer { - override fun onComplete() { close() } - override fun onSubscribe(d: Disposable) = - lock.withLock { if (disposable.isDisposed) d.dispose() else disposable = d } - override fun onNext(t: T) { sendBlocking(t) } - override fun onError(e: Throwable) { close(e) } - } - - subscribe(observer) - awaitClose { lock.withLock { disposable.dispose() } } - // in case when the source did not actually subscribe yet at all (observer.onSubscribe was not called): - // then this .dispose() just "marks" that we want to unsubscribe synchronously when we get .onSubscribe -} - -// This version tries to leverage Job.isActive and generally Job and Channel happens-before guaranties. -// (but I think it's incorrect) -@ExperimentalCoroutinesApi -public fun ObservableSource.asFlow_JobIsActive(): Flow = callbackFlow { - - var disposable: Disposable? = null - - val observer = object : Observer { - override fun onComplete() { close() } - override fun onSubscribe(d: Disposable) = if (isActive) disposable = d else d.dispose() - // Is this version correct due to Job and Channel being thread-safe?? (and rx signaling being "serial") - override fun onNext(t: T) { sendBlocking(t) } - override fun onError(e: Throwable) { close(e) } - } - subscribe(observer) - awaitClose { disposable?.dispose() } -} - -@ExperimentalCoroutinesApi -public fun ObservableSource.asFlow_Mutex(): Flow = callbackFlow { - - lateinit var disposable: Disposable - val mutex = Mutex(locked = true) - - val observer = object : Observer { - override fun onComplete() { close() } - override fun onSubscribe(d: Disposable) { disposable = d; mutex.unlock() } - override fun onNext(t: T) { sendBlocking(t) } - override fun onError(e: Throwable) { close(e) } - } - subscribe(observer) - if (mutex.isLocked) withContext(NonCancellable) { mutex.lock() } - awaitClose { disposable.dispose() } -} - -@ExperimentalCoroutinesApi -public fun ObservableSource.asFlow_Deferred(): Flow = callbackFlow { - - val deferred = CompletableDeferred() - - val observer = object : Observer { - override fun onComplete() { close() } - override fun onSubscribe(d: Disposable) { deferred.complete(d) } - override fun onNext(t: T) { sendBlocking(t) } - override fun onError(e: Throwable) { close(e) } - } - subscribe(observer) - val disposable = withContext(NonCancellable) { deferred.await() } - awaitClose { disposable.dispose() } + awaitClose { DisposableHelper.dispose(disposableRef) } } /** From c38d7696e260e53e08742e5c028345d5526a801a Mon Sep 17 00:00:00 2001 From: Marek Langiewicz Date: Tue, 18 Feb 2020 22:22:39 +0100 Subject: [PATCH 15/16] Add test for late onSubscribe call --- .../kotlinx-coroutines-rx2/src/RxConvert.kt | 4 ++-- .../test/ObservableAsFlowTest.kt | 23 +++++++++++++++++++ 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt index fe17d79202..40760b7895 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt @@ -90,10 +90,10 @@ public fun ReceiveChannel.asObservable(context: CoroutineContext): * resulting flow to specify a user-defined value and to control what happens when data is produced faster * than consumed, i.e. to control the back-pressure behavior. Check [callbackFlow] for more details. */ -@ExperimentalCoroutinesApi +@ExperimentalCoroutinesApi // TODO: Use logic from DisposableHelper directly instead of using the (internal?) helper public fun ObservableSource.asFlow(): Flow = callbackFlow { - val disposableRef = AtomicReference(null) + val disposableRef = AtomicReference() val observer = object : Observer { override fun onComplete() { close() } diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableAsFlowTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableAsFlowTest.kt index 5569d295f7..c14c3cc4e8 100644 --- a/reactive/kotlinx-coroutines-rx2/test/ObservableAsFlowTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/ObservableAsFlowTest.kt @@ -5,6 +5,9 @@ package kotlinx.coroutines.rx2 import io.reactivex.Observable +import io.reactivex.ObservableSource +import io.reactivex.Observer +import io.reactivex.disposables.Disposables import io.reactivex.subjects.PublishSubject import kotlinx.coroutines.* import kotlinx.coroutines.channels.* @@ -112,6 +115,26 @@ class ObservableAsFlowTest : TestBase() { finish(6) } + @Test + fun testLateOnSubscribe() { + var observer: Observer? = null + val source = ObservableSource { observer = it } + val flow = source.asFlow() + assertNull(observer) + val job = GlobalScope.launch(Dispatchers.Unconfined) { + expect(1) + flow.collect { expectUnreached() } + expectUnreached() + } + expect(2) + assertNotNull(observer) + job.cancel() + val disposable = Disposables.empty() + observer!!.onSubscribe(disposable) + assertTrue(disposable.isDisposed) + finish(3) + } + @Test fun testBufferUnlimited() = runTest { val source = rxObservable(currentDispatcher()) { From aeda8931336e37adb18db12ed30955f6c83a8761 Mon Sep 17 00:00:00 2001 From: Marek Langiewicz Date: Thu, 20 Feb 2020 08:52:03 +0100 Subject: [PATCH 16/16] Use AtomicReference directly instead of rx internal DisposableHelper --- reactive/kotlinx-coroutines-rx2/src/RxConvert.kt | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt index 40760b7895..d5d072b734 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt @@ -6,7 +6,6 @@ package kotlinx.coroutines.rx2 import io.reactivex.* import io.reactivex.disposables.Disposable -import io.reactivex.internal.disposables.DisposableHelper import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import kotlinx.coroutines.flow.* @@ -90,20 +89,25 @@ public fun ReceiveChannel.asObservable(context: CoroutineContext): * resulting flow to specify a user-defined value and to control what happens when data is produced faster * than consumed, i.e. to control the back-pressure behavior. Check [callbackFlow] for more details. */ -@ExperimentalCoroutinesApi // TODO: Use logic from DisposableHelper directly instead of using the (internal?) helper +@ExperimentalCoroutinesApi public fun ObservableSource.asFlow(): Flow = callbackFlow { val disposableRef = AtomicReference() val observer = object : Observer { override fun onComplete() { close() } - override fun onSubscribe(d: Disposable) { DisposableHelper.setOnce(disposableRef, d) } + override fun onSubscribe(d: Disposable) { if (!disposableRef.compareAndSet(null, d)) d.dispose() } override fun onNext(t: T) { sendBlocking(t) } override fun onError(e: Throwable) { close(e) } } subscribe(observer) - awaitClose { DisposableHelper.dispose(disposableRef) } + awaitClose { disposableRef.getAndSet(DISPOSED)?.dispose() } +} + +private object DISPOSED : Disposable { + override fun isDisposed() = true + override fun dispose() = Unit } /**