From c27145d57f7262485fe7ec9680f84f687fc3dd4d Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Thu, 8 Oct 2020 02:01:09 -0700 Subject: [PATCH] =?UTF-8?q?Provide=20asFlowable=20and=20asObservable=20by?= =?UTF-8?q?=20their=20names=20in=20binary=20instead=20=E2=80=A6=20(#2285)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Provide asFlowable and asObservable by their names in binary instead of 'from' function to prevent naming clash for Java users. * Do not provide @JvmOverloads for convenience of Java interop * Deprecate ReceiveChannel.asObservable by the way Fixes #2182 --- .../api/kotlinx-coroutines-rx2.api | 12 +++-- .../kotlinx-coroutines-rx2/src/RxConvert.kt | 46 ++++++++++--------- .../test/ConvertTest.kt | 7 +-- .../test/IntegrationTest.kt | 5 +- .../api/kotlinx-coroutines-rx3.api | 12 +++-- .../kotlinx-coroutines-rx3/src/RxConvert.kt | 21 ++++++--- 6 files changed, 63 insertions(+), 40 deletions(-) diff --git a/reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api b/reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api index 06ddb68e9c..2cf7bc815b 100644 --- a/reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api +++ b/reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api @@ -30,13 +30,17 @@ public final class kotlinx/coroutines/rx2/RxCompletableKt { public final class kotlinx/coroutines/rx2/RxConvertKt { public static final fun asCompletable (Lkotlinx/coroutines/Job;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Completable; public static final fun asFlow (Lio/reactivex/ObservableSource;)Lkotlinx/coroutines/flow/Flow; + public static final fun asFlowable (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Flowable; + public static synthetic fun asFlowable$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lio/reactivex/Flowable; public static final fun asMaybe (Lkotlinx/coroutines/Deferred;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Maybe; public static final fun asObservable (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Observable; + public static final fun asObservable (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Observable; + public static synthetic fun asObservable$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lio/reactivex/Observable; public static final fun asSingle (Lkotlinx/coroutines/Deferred;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Single; - public static final fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/Flowable; - public static final fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/Observable; - public static final fun from (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Flowable; - public static final fun from (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Observable; + public static final synthetic fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/Flowable; + public static final synthetic fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/Observable; + public static final synthetic fun from (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Flowable; + public static final synthetic fun from (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Observable; public static synthetic fun from$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lio/reactivex/Flowable; public static synthetic fun from$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lio/reactivex/Observable; } diff --git a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt index 264cdad43c..cf73ef2ea8 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt @@ -16,7 +16,7 @@ import kotlin.coroutines.* /** * Converts this job to the hot reactive completable that signals - * with [onCompleted][CompletableSubscriber.onCompleted] when the corresponding job completes. + * with [onCompleted][CompletableObserver.onComplete] when the corresponding job completes. * * Every subscriber gets the signal at the same time. * Unsubscribing from the resulting completable **does not** affect the original job in any way. @@ -50,7 +50,7 @@ public fun Deferred.asMaybe(context: CoroutineContext): Maybe = rxMay /** * Converts this deferred value to the hot reactive single that signals either - * [onSuccess][SingleSubscriber.onSuccess] or [onError][SingleSubscriber.onError]. + * [onSuccess][SingleObserver.onSuccess] or [onError][SingleObserver.onError]. * * Every subscriber gets the same completion value. * Unsubscribing from the resulting single **does not** affect the original deferred value in any way. @@ -65,21 +65,6 @@ public fun Deferred.asSingle(context: CoroutineContext): Single this@asSingle.await() } -/** - * Converts a stream of elements received from the channel to the hot reactive observable. - * - * Every subscriber receives values from this channel in **fan-out** fashion. If the are multiple subscribers, - * they'll receive values in round-robin way. - */ -@Deprecated( - message = "Deprecated in the favour of Flow", - level = DeprecationLevel.WARNING, replaceWith = ReplaceWith("this.consumeAsFlow().asObservable()") -) -public fun ReceiveChannel.asObservable(context: CoroutineContext): Observable = rxObservable(context) { - for (t in this@asObservable) - send(t) -} - /** * Transforms given cold [ObservableSource] into cold [Flow]. * @@ -113,8 +98,6 @@ public fun ObservableSource.asFlow(): Flow = callbackFlow { * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher * is used, so calls are performed from an arbitrary thread. */ -@JvmOverloads // binary compatibility -@JvmName("from") @ExperimentalCoroutinesApi public fun Flow.asObservable(context: CoroutineContext = EmptyCoroutineContext) : Observable = Observable.create { emitter -> /* @@ -148,8 +131,29 @@ public fun Flow.asObservable(context: CoroutineContext = EmptyCorout * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher * is used, so calls are performed from an arbitrary thread. */ -@JvmOverloads // binary compatibility -@JvmName("from") @ExperimentalCoroutinesApi public fun Flow.asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable = Flowable.fromPublisher(asPublisher(context)) + +@Deprecated( + message = "Deprecated in the favour of Flow", + level = DeprecationLevel.ERROR, + replaceWith = ReplaceWith("this.consumeAsFlow().asObservable(context)", "kotlinx.coroutines.flow.consumeAsFlow") +) // Deprecated since 1.4.0 +public fun ReceiveChannel.asObservable(context: CoroutineContext): Observable = rxObservable(context) { + for (t in this@asObservable) + send(t) +} + +@Suppress("UNUSED") // KT-42513 +@JvmOverloads // binary compatibility +@JvmName("from") +@Deprecated(level = DeprecationLevel.HIDDEN, message = "") // Since 1.4, was experimental prior to that +public fun Flow._asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable = + asFlowable(context) + +@Suppress("UNUSED") // KT-42513 +@JvmOverloads // binary compatibility +@JvmName("from") +@Deprecated(level = DeprecationLevel.HIDDEN, message = "") // Since 1.4, was experimental prior to that +public fun Flow._asObservable(context: CoroutineContext = EmptyCoroutineContext) : Observable = asObservable(context) diff --git a/reactive/kotlinx-coroutines-rx2/test/ConvertTest.kt b/reactive/kotlinx-coroutines-rx2/test/ConvertTest.kt index a43366555e..cfc3240741 100644 --- a/reactive/kotlinx-coroutines-rx2/test/ConvertTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/ConvertTest.kt @@ -6,6 +6,7 @@ package kotlinx.coroutines.rx2 import kotlinx.coroutines.* import kotlinx.coroutines.channels.* +import kotlinx.coroutines.flow.* import org.junit.Assert import org.junit.Test import kotlin.test.* @@ -126,7 +127,7 @@ class ConvertTest : TestBase() { delay(50) send("K") } - val observable = c.asObservable(Dispatchers.Unconfined) + val observable = c.consumeAsFlow().asObservable(Dispatchers.Unconfined) checkSingleValue(observable.reduce { t1, t2 -> t1 + t2 }.toSingle()) { assertEquals("OK", it) } @@ -140,7 +141,7 @@ class ConvertTest : TestBase() { delay(50) throw TestException("K") } - val observable = c.asObservable(Dispatchers.Unconfined) + val observable = c.consumeAsFlow().asObservable(Dispatchers.Unconfined) val single = rxSingle(Dispatchers.Unconfined) { var result = "" try { @@ -155,4 +156,4 @@ class ConvertTest : TestBase() { assertEquals("OK", it) } } -} \ No newline at end of file +} diff --git a/reactive/kotlinx-coroutines-rx2/test/IntegrationTest.kt b/reactive/kotlinx-coroutines-rx2/test/IntegrationTest.kt index 22e0e72191..540fa76b7e 100644 --- a/reactive/kotlinx-coroutines-rx2/test/IntegrationTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/IntegrationTest.kt @@ -6,6 +6,7 @@ package kotlinx.coroutines.rx2 import io.reactivex.* import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* import org.junit.Test import org.junit.runner.* import org.junit.runners.* @@ -92,7 +93,7 @@ class IntegrationTest( assertFailsWith { observable.awaitSingle() } checkNumbers(n, observable) val channel = observable.openSubscription() - checkNumbers(n, channel.asObservable(ctx(coroutineContext))) + checkNumbers(n, channel.consumeAsFlow().asObservable(ctx(coroutineContext))) channel.cancel() } @@ -131,4 +132,4 @@ class IntegrationTest( assertEquals(n, last) } -} \ No newline at end of file +} diff --git a/reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api b/reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api index 4f15eda7d4..eb92fd3285 100644 --- a/reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api +++ b/reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api @@ -26,12 +26,16 @@ public final class kotlinx/coroutines/rx3/RxCompletableKt { public final class kotlinx/coroutines/rx3/RxConvertKt { public static final fun asCompletable (Lkotlinx/coroutines/Job;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Completable; public static final fun asFlow (Lio/reactivex/rxjava3/core/ObservableSource;)Lkotlinx/coroutines/flow/Flow; + public static final fun asFlowable (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Flowable; + public static synthetic fun asFlowable$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lio/reactivex/rxjava3/core/Flowable; public static final fun asMaybe (Lkotlinx/coroutines/Deferred;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Maybe; + public static final fun asObservable (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Observable; + public static synthetic fun asObservable$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lio/reactivex/rxjava3/core/Observable; public static final fun asSingle (Lkotlinx/coroutines/Deferred;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Single; - public static final fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/rxjava3/core/Flowable; - public static final fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/rxjava3/core/Observable; - public static final fun from (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Flowable; - public static final fun from (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Observable; + public static final synthetic fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/rxjava3/core/Flowable; + public static final synthetic fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/rxjava3/core/Observable; + public static final synthetic fun from (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Flowable; + public static final synthetic fun from (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Observable; public static synthetic fun from$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lio/reactivex/rxjava3/core/Flowable; public static synthetic fun from$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lio/reactivex/rxjava3/core/Observable; } diff --git a/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt index c7ab237cea..9bb38c088f 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt @@ -16,7 +16,7 @@ import kotlin.coroutines.* /** * Converts this job to the hot reactive completable that signals - * with [onCompleted][CompletableSubscriber.onCompleted] when the corresponding job completes. + * with [onCompleted][CompletableObserver.onComplete] when the corresponding job completes. * * Every subscriber gets the signal at the same time. * Unsubscribing from the resulting completable **does not** affect the original job in any way. @@ -50,7 +50,7 @@ public fun Deferred.asMaybe(context: CoroutineContext): Maybe = rxMay /** * Converts this deferred value to the hot reactive single that signals either - * [onSuccess][SingleSubscriber.onSuccess] or [onError][SingleSubscriber.onError]. + * [onSuccess][SingleObserver.onSuccess] or [onError][SingleObserver.onError]. * * Every subscriber gets the same completion value. * Unsubscribing from the resulting single **does not** affect the original deferred value in any way. @@ -98,8 +98,6 @@ public fun ObservableSource.asFlow(): Flow = callbackFlow { * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher * is used, so calls are performed from an arbitrary thread. */ -@JvmOverloads // binary compatibility -@JvmName("from") @ExperimentalCoroutinesApi public fun Flow.asObservable(context: CoroutineContext = EmptyCoroutineContext) : Observable = Observable.create { emitter -> /* @@ -133,8 +131,19 @@ public fun Flow.asObservable(context: CoroutineContext = EmptyCorout * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher * is used, so calls are performed from an arbitrary thread. */ -@JvmOverloads // binary compatibility -@JvmName("from") @ExperimentalCoroutinesApi public fun Flow.asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable = Flowable.fromPublisher(asPublisher(context)) + +@Suppress("UNUSED") // KT-42513 +@JvmOverloads // binary compatibility +@JvmName("from") +@Deprecated(level = DeprecationLevel.HIDDEN, message = "") // Since 1.4, was experimental prior to that +public fun Flow._asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable = + asFlowable(context) + +@Suppress("UNUSED") // KT-42513 +@JvmOverloads // binary compatibility +@JvmName("from") +@Deprecated(level = DeprecationLevel.HIDDEN, message = "") // Since 1.4, was experimental prior to that +public fun Flow._asObservable(context: CoroutineContext = EmptyCoroutineContext) : Observable = asObservable(context)