Skip to content

Commit

Permalink
Provide asFlowable and asObservable by their names in binary instead … (
Browse files Browse the repository at this point in the history
Kotlin#2285)

* Provide asFlowable and asObservable by their names in binary instead of 'from' function to prevent naming clash for Java users.
* Do not provide @jvmoverloads for convenience of Java interop
* Deprecate ReceiveChannel.asObservable by the way

Fixes Kotlin#2182
  • Loading branch information
qwwdfsad authored and recheej committed Dec 28, 2020
1 parent 1500fe7 commit c27145d
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 40 deletions.
12 changes: 8 additions & 4 deletions reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api
Expand Up @@ -30,13 +30,17 @@ public final class kotlinx/coroutines/rx2/RxCompletableKt {
public final class kotlinx/coroutines/rx2/RxConvertKt {
public static final fun asCompletable (Lkotlinx/coroutines/Job;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Completable;
public static final fun asFlow (Lio/reactivex/ObservableSource;)Lkotlinx/coroutines/flow/Flow;
public static final fun asFlowable (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Flowable;
public static synthetic fun asFlowable$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lio/reactivex/Flowable;
public static final fun asMaybe (Lkotlinx/coroutines/Deferred;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Maybe;
public static final fun asObservable (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Observable;
public static final fun asObservable (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Observable;
public static synthetic fun asObservable$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lio/reactivex/Observable;
public static final fun asSingle (Lkotlinx/coroutines/Deferred;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Single;
public static final fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/Flowable;
public static final fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/Observable;
public static final fun from (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Flowable;
public static final fun from (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Observable;
public static final synthetic fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/Flowable;
public static final synthetic fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/Observable;
public static final synthetic fun from (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Flowable;
public static final synthetic fun from (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Observable;
public static synthetic fun from$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lio/reactivex/Flowable;
public static synthetic fun from$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lio/reactivex/Observable;
}
Expand Down
46 changes: 25 additions & 21 deletions reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
Expand Up @@ -16,7 +16,7 @@ import kotlin.coroutines.*

/**
* Converts this job to the hot reactive completable that signals
* with [onCompleted][CompletableSubscriber.onCompleted] when the corresponding job completes.
* with [onCompleted][CompletableObserver.onComplete] when the corresponding job completes.
*
* Every subscriber gets the signal at the same time.
* Unsubscribing from the resulting completable **does not** affect the original job in any way.
Expand Down Expand Up @@ -50,7 +50,7 @@ public fun <T> Deferred<T?>.asMaybe(context: CoroutineContext): Maybe<T> = rxMay

/**
* Converts this deferred value to the hot reactive single that signals either
* [onSuccess][SingleSubscriber.onSuccess] or [onError][SingleSubscriber.onError].
* [onSuccess][SingleObserver.onSuccess] or [onError][SingleObserver.onError].
*
* Every subscriber gets the same completion value.
* Unsubscribing from the resulting single **does not** affect the original deferred value in any way.
Expand All @@ -65,21 +65,6 @@ public fun <T : Any> Deferred<T>.asSingle(context: CoroutineContext): Single<T>
this@asSingle.await()
}

/**
* Converts a stream of elements received from the channel to the hot reactive observable.
*
* Every subscriber receives values from this channel in **fan-out** fashion. If the are multiple subscribers,
* they'll receive values in round-robin way.
*/
@Deprecated(
message = "Deprecated in the favour of Flow",
level = DeprecationLevel.WARNING, replaceWith = ReplaceWith("this.consumeAsFlow().asObservable()")
)
public fun <T : Any> ReceiveChannel<T>.asObservable(context: CoroutineContext): Observable<T> = rxObservable(context) {
for (t in this@asObservable)
send(t)
}

/**
* Transforms given cold [ObservableSource] into cold [Flow].
*
Expand Down Expand Up @@ -113,8 +98,6 @@ public fun <T: Any> ObservableSource<T>.asFlow(): Flow<T> = callbackFlow {
* inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
* is used, so calls are performed from an arbitrary thread.
*/
@JvmOverloads // binary compatibility
@JvmName("from")
@ExperimentalCoroutinesApi
public fun <T: Any> Flow<T>.asObservable(context: CoroutineContext = EmptyCoroutineContext) : Observable<T> = Observable.create { emitter ->
/*
Expand Down Expand Up @@ -148,8 +131,29 @@ public fun <T: Any> Flow<T>.asObservable(context: CoroutineContext = EmptyCorout
* inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
* is used, so calls are performed from an arbitrary thread.
*/
@JvmOverloads // binary compatibility
@JvmName("from")
@ExperimentalCoroutinesApi
public fun <T: Any> Flow<T>.asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable<T> =
Flowable.fromPublisher(asPublisher(context))

@Deprecated(
message = "Deprecated in the favour of Flow",
level = DeprecationLevel.ERROR,
replaceWith = ReplaceWith("this.consumeAsFlow().asObservable(context)", "kotlinx.coroutines.flow.consumeAsFlow")
) // Deprecated since 1.4.0
public fun <T : Any> ReceiveChannel<T>.asObservable(context: CoroutineContext): Observable<T> = rxObservable(context) {
for (t in this@asObservable)
send(t)
}

@Suppress("UNUSED") // KT-42513
@JvmOverloads // binary compatibility
@JvmName("from")
@Deprecated(level = DeprecationLevel.HIDDEN, message = "") // Since 1.4, was experimental prior to that
public fun <T: Any> Flow<T>._asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable<T> =
asFlowable(context)

@Suppress("UNUSED") // KT-42513
@JvmOverloads // binary compatibility
@JvmName("from")
@Deprecated(level = DeprecationLevel.HIDDEN, message = "") // Since 1.4, was experimental prior to that
public fun <T: Any> Flow<T>._asObservable(context: CoroutineContext = EmptyCoroutineContext) : Observable<T> = asObservable(context)
7 changes: 4 additions & 3 deletions reactive/kotlinx-coroutines-rx2/test/ConvertTest.kt
Expand Up @@ -6,6 +6,7 @@ package kotlinx.coroutines.rx2

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
import org.junit.Assert
import org.junit.Test
import kotlin.test.*
Expand Down Expand Up @@ -126,7 +127,7 @@ class ConvertTest : TestBase() {
delay(50)
send("K")
}
val observable = c.asObservable(Dispatchers.Unconfined)
val observable = c.consumeAsFlow().asObservable(Dispatchers.Unconfined)
checkSingleValue(observable.reduce { t1, t2 -> t1 + t2 }.toSingle()) {
assertEquals("OK", it)
}
Expand All @@ -140,7 +141,7 @@ class ConvertTest : TestBase() {
delay(50)
throw TestException("K")
}
val observable = c.asObservable(Dispatchers.Unconfined)
val observable = c.consumeAsFlow().asObservable(Dispatchers.Unconfined)
val single = rxSingle(Dispatchers.Unconfined) {
var result = ""
try {
Expand All @@ -155,4 +156,4 @@ class ConvertTest : TestBase() {
assertEquals("OK", it)
}
}
}
}
5 changes: 3 additions & 2 deletions reactive/kotlinx-coroutines-rx2/test/IntegrationTest.kt
Expand Up @@ -6,6 +6,7 @@ package kotlinx.coroutines.rx2

import io.reactivex.*
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import org.junit.Test
import org.junit.runner.*
import org.junit.runners.*
Expand Down Expand Up @@ -92,7 +93,7 @@ class IntegrationTest(
assertFailsWith<IllegalArgumentException> { observable.awaitSingle() }
checkNumbers(n, observable)
val channel = observable.openSubscription()
checkNumbers(n, channel.asObservable(ctx(coroutineContext)))
checkNumbers(n, channel.consumeAsFlow().asObservable(ctx(coroutineContext)))
channel.cancel()
}

Expand Down Expand Up @@ -131,4 +132,4 @@ class IntegrationTest(
assertEquals(n, last)
}

}
}
12 changes: 8 additions & 4 deletions reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api
Expand Up @@ -26,12 +26,16 @@ public final class kotlinx/coroutines/rx3/RxCompletableKt {
public final class kotlinx/coroutines/rx3/RxConvertKt {
public static final fun asCompletable (Lkotlinx/coroutines/Job;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Completable;
public static final fun asFlow (Lio/reactivex/rxjava3/core/ObservableSource;)Lkotlinx/coroutines/flow/Flow;
public static final fun asFlowable (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Flowable;
public static synthetic fun asFlowable$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lio/reactivex/rxjava3/core/Flowable;
public static final fun asMaybe (Lkotlinx/coroutines/Deferred;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Maybe;
public static final fun asObservable (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Observable;
public static synthetic fun asObservable$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lio/reactivex/rxjava3/core/Observable;
public static final fun asSingle (Lkotlinx/coroutines/Deferred;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Single;
public static final fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/rxjava3/core/Flowable;
public static final fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/rxjava3/core/Observable;
public static final fun from (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Flowable;
public static final fun from (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Observable;
public static final synthetic fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/rxjava3/core/Flowable;
public static final synthetic fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/rxjava3/core/Observable;
public static final synthetic fun from (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Flowable;
public static final synthetic fun from (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Observable;
public static synthetic fun from$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lio/reactivex/rxjava3/core/Flowable;
public static synthetic fun from$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lio/reactivex/rxjava3/core/Observable;
}
Expand Down
21 changes: 15 additions & 6 deletions reactive/kotlinx-coroutines-rx3/src/RxConvert.kt
Expand Up @@ -16,7 +16,7 @@ import kotlin.coroutines.*

/**
* Converts this job to the hot reactive completable that signals
* with [onCompleted][CompletableSubscriber.onCompleted] when the corresponding job completes.
* with [onCompleted][CompletableObserver.onComplete] when the corresponding job completes.
*
* Every subscriber gets the signal at the same time.
* Unsubscribing from the resulting completable **does not** affect the original job in any way.
Expand Down Expand Up @@ -50,7 +50,7 @@ public fun <T> Deferred<T?>.asMaybe(context: CoroutineContext): Maybe<T> = rxMay

/**
* Converts this deferred value to the hot reactive single that signals either
* [onSuccess][SingleSubscriber.onSuccess] or [onError][SingleSubscriber.onError].
* [onSuccess][SingleObserver.onSuccess] or [onError][SingleObserver.onError].
*
* Every subscriber gets the same completion value.
* Unsubscribing from the resulting single **does not** affect the original deferred value in any way.
Expand Down Expand Up @@ -98,8 +98,6 @@ public fun <T: Any> ObservableSource<T>.asFlow(): Flow<T> = callbackFlow {
* inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
* is used, so calls are performed from an arbitrary thread.
*/
@JvmOverloads // binary compatibility
@JvmName("from")
@ExperimentalCoroutinesApi
public fun <T: Any> Flow<T>.asObservable(context: CoroutineContext = EmptyCoroutineContext) : Observable<T> = Observable.create { emitter ->
/*
Expand Down Expand Up @@ -133,8 +131,19 @@ public fun <T: Any> Flow<T>.asObservable(context: CoroutineContext = EmptyCorout
* inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
* is used, so calls are performed from an arbitrary thread.
*/
@JvmOverloads // binary compatibility
@JvmName("from")
@ExperimentalCoroutinesApi
public fun <T: Any> Flow<T>.asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable<T> =
Flowable.fromPublisher(asPublisher(context))

@Suppress("UNUSED") // KT-42513
@JvmOverloads // binary compatibility
@JvmName("from")
@Deprecated(level = DeprecationLevel.HIDDEN, message = "") // Since 1.4, was experimental prior to that
public fun <T: Any> Flow<T>._asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable<T> =
asFlowable(context)

@Suppress("UNUSED") // KT-42513
@JvmOverloads // binary compatibility
@JvmName("from")
@Deprecated(level = DeprecationLevel.HIDDEN, message = "") // Since 1.4, was experimental prior to that
public fun <T: Any> Flow<T>._asObservable(context: CoroutineContext = EmptyCoroutineContext) : Observable<T> = asObservable(context)

0 comments on commit c27145d

Please sign in to comment.