Skip to content

Commit

Permalink
Integration with RxJava3 (#1950)
Browse files Browse the repository at this point in the history
Fixes #1883

Co-authored-by: Zac Sweers <zsweers@slack-corp.com>
  • Loading branch information
qwwdfsad and ZacSweers committed Apr 24, 2020
1 parent f533125 commit fe64178
Show file tree
Hide file tree
Showing 41 changed files with 3,816 additions and 2 deletions.
3 changes: 3 additions & 0 deletions README.md
Expand Up @@ -47,6 +47,7 @@ suspend fun main() = coroutineScope {
* Reactive Streams ([Publisher.collect], [Publisher.awaitSingle], [publish], etc),
* Flow (JDK 9) (the same interface as for Reactive Streams),
* RxJava 2.x ([rxFlowable], [rxSingle], etc), and
* RxJava 3.x ([rxFlowable], [rxSingle], etc), and
* Project Reactor ([flux], [mono], etc).
* [ui](ui/README.md) &mdash; modules that provide coroutine dispatchers for various single-threaded UI libraries:
* Android, JavaFX, and Swing.
Expand Down Expand Up @@ -288,6 +289,8 @@ The `develop` branch is pushed to `master` during release.
<!--- INDEX kotlinx.coroutines.rx2 -->
[rxFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/rx-flowable.html
[rxSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/rx-single.html
<!--- MODULE kotlinx-coroutines-rx2 -->
<!--- INDEX kotlinx.coroutines.rx2 -->
<!--- MODULE kotlinx-coroutines-reactor -->
<!--- INDEX kotlinx.coroutines.reactor -->
[flux]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactor/kotlinx.coroutines.reactor/flux.html
Expand Down
2 changes: 1 addition & 1 deletion docs/flow.md
Expand Up @@ -1786,7 +1786,7 @@ Indeed, its design was inspired by Reactive Streams and its various implementati
be Kotlin and suspension friendly and respect structured concurrency. Achieving this goal would be impossible without reactive pioneers and their tremendous work. You can read the complete story in [Reactive Streams and Kotlin Flows](https://medium.com/@elizarov/reactive-streams-and-kotlin-flows-bfd12772cda4) article.

While being different, conceptually, Flow *is* a reactive stream and it is possible to convert it to the reactive (spec and TCK compliant) Publisher and vice versa.
Such converters are provided by `kotlinx.coroutines` out-of-the-box and can be found in corresponding reactive modules (`kotlinx-coroutines-reactive` for Reactive Streams, `kotlinx-coroutines-reactor` for Project Reactor and `kotlinx-coroutines-rx2` for RxJava2).
Such converters are provided by `kotlinx.coroutines` out-of-the-box and can be found in corresponding reactive modules (`kotlinx-coroutines-reactive` for Reactive Streams, `kotlinx-coroutines-reactor` for Project Reactor and `kotlinx-coroutines-rx2`/`kotlinx-coroutines-rx3` for RxJava2/RxJava3).
Integration modules include conversions from and to `Flow`, integration with Reactor's `Context` and suspension-friendly ways to work with various reactive entities.

<!-- stdlib references -->
Expand Down
1 change: 1 addition & 0 deletions gradle.properties
Expand Up @@ -18,6 +18,7 @@ byte_buddy_version=1.10.9
reactor_vesion=3.2.5.RELEASE
reactive_streams_version=1.0.2
rxjava2_version=2.2.8
rxjava3_version=3.0.2
javafx_version=11.0.2
javafx_plugin_version=0.0.8
binary_compatibility_validator_version=0.2.2
Expand Down
1 change: 1 addition & 0 deletions reactive/README.md
Expand Up @@ -8,3 +8,4 @@ Module name below corresponds to the artifact name in Maven/Gradle.
* [kotlinx-coroutines-reactive](kotlinx-coroutines-reactive/README.md) -- utilities for [Reactive Streams](https://www.reactive-streams.org)
* [kotlinx-coroutines-reactor](kotlinx-coroutines-reactor/README.md) -- utilities for [Reactor](https://projectreactor.io)
* [kotlinx-coroutines-rx2](kotlinx-coroutines-rx2/README.md) -- utilities for [RxJava 2.x](https://github.com/ReactiveX/RxJava)
* [kotlinx-coroutines-rx3](kotlinx-coroutines-rx3/README.md) -- utilities for [RxJava 3.x](https://github.com/ReactiveX/RxJava)
2 changes: 1 addition & 1 deletion reactive/kotlinx-coroutines-rx2/build.gradle
Expand Up @@ -11,7 +11,7 @@ dependencies {

tasks.withType(dokka.getClass()) {
externalDocumentationLink {
url = new URL('http://reactivex.io/RxJava/javadoc/')
url = new URL('http://reactivex.io/RxJava/2.x/javadoc/')
packageListUrl = projectDir.toPath().resolve("package.list").toUri().toURL()
}
}
Expand Down
84 changes: 84 additions & 0 deletions reactive/kotlinx-coroutines-rx3/README.md
@@ -0,0 +1,84 @@
# Module kotlinx-coroutines-rx3

Utilities for [RxJava 3.x](https://github.com/ReactiveX/RxJava).

Coroutine builders:

| **Name** | **Result** | **Scope** | **Description**
| --------------- | --------------------------------------- | ---------------- | ---------------
| [rxCompletable] | `Completable` | [CoroutineScope] | Cold completable that starts coroutine on subscribe
| [rxMaybe] | `Maybe` | [CoroutineScope] | Cold maybe that starts coroutine on subscribe
| [rxSingle] | `Single` | [CoroutineScope] | Cold single that starts coroutine on subscribe
| [rxObservable] | `Observable` | [ProducerScope] | Cold observable that starts coroutine on subscribe
| [rxFlowable] | `Flowable` | [ProducerScope] | Cold observable that starts coroutine on subscribe with **backpressure** support

Integration with [Flow]:

| **Name** | **Result** | **Description**
| --------------- | -------------- | ---------------
| [Flow.asFlowable] | `Flowable` | Converts the given flow to a cold Flowable.
| [Flow.asObservable] | `Observable` | Converts the given flow to a cold Observable.
| [ObservableSource.asFlow] | `Flow` | Converts the given cold ObservableSource to flow

Suspending extension functions and suspending iteration:

| **Name** | **Description**
| -------- | ---------------
| [CompletableSource.await][io.reactivex.rxjava3.core.CompletableSource.await] | Awaits for completion of the completable value
| [MaybeSource.await][io.reactivex.rxjava3.core.MaybeSource.await] | Awaits for the value of the maybe and returns it or null
| [MaybeSource.awaitOrDefault][io.reactivex.rxjava3.core.MaybeSource.awaitOrDefault] | Awaits for the value of the maybe and returns it or default
| [SingleSource.await][io.reactivex.rxjava3.core.SingleSource.await] | Awaits for completion of the single value and returns it
| [ObservableSource.awaitFirst][io.reactivex.rxjava3.core.ObservableSource.awaitFirst] | Awaits for the first value from the given observable
| [ObservableSource.awaitFirstOrDefault][io.reactivex.rxjava3.core.ObservableSource.awaitFirstOrDefault] | Awaits for the first value from the given observable or default
| [ObservableSource.awaitFirstOrElse][io.reactivex.rxjava3.core.ObservableSource.awaitFirstOrElse] | Awaits for the first value from the given observable or default from a function
| [ObservableSource.awaitFirstOrNull][io.reactivex.rxjava3.core.ObservableSource.awaitFirstOrNull] | Awaits for the first value from the given observable or null
| [ObservableSource.awaitLast][io.reactivex.rxjava3.core.ObservableSource.awaitFirst] | Awaits for the last value from the given observable
| [ObservableSource.awaitSingle][io.reactivex.rxjava3.core.ObservableSource.awaitSingle] | Awaits for the single value from the given observable

Note that `Flowable` is a subclass of [Reactive Streams](https://www.reactive-streams.org)
`Publisher` and extensions for it are covered by
[kotlinx-coroutines-reactive](../kotlinx-coroutines-reactive) module.

Conversion functions:

| **Name** | **Description**
| -------- | ---------------
| [Job.asCompletable][kotlinx.coroutines.Job.asCompletable] | Converts job to hot completable
| [Deferred.asSingle][kotlinx.coroutines.Deferred.asSingle] | Converts deferred value to hot single
| [Scheduler.asCoroutineDispatcher][io.reactivex.rxjava3.core.Scheduler.asCoroutineDispatcher] | Converts scheduler to [CoroutineDispatcher]

<!--- MODULE kotlinx-coroutines-core -->
<!--- INDEX kotlinx.coroutines -->
[CoroutineScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-coroutine-scope/index.html
[CoroutineDispatcher]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-coroutine-dispatcher/index.html
<!--- INDEX kotlinx.coroutines.channels -->
[ProducerScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-producer-scope/index.html
<!--- INDEX kotlinx.coroutines.flow -->
[Flow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow/index.html
<!--- MODULE kotlinx-coroutines-rx3 -->
<!--- INDEX kotlinx.coroutines.rx3 -->
[rxCompletable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/rx-completable.html
[rxMaybe]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/rx-maybe.html
[rxSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/rx-single.html
[rxObservable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/rx-observable.html
[rxFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/rx-flowable.html
[Flow.asFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/kotlinx.coroutines.flow.-flow/as-flowable.html
[Flow.asObservable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/kotlinx.coroutines.flow.-flow/as-observable.html
[ObservableSource.asFlow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/io.reactivex.rxjava3.core.-observable-source/as-flow.html
[io.reactivex.rxjava3.core.CompletableSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/io.reactivex.rxjava3.core.-completable-source/await.html
[io.reactivex.rxjava3.core.MaybeSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/io.reactivex.rxjava3.core.-maybe-source/await.html
[io.reactivex.rxjava3.core.MaybeSource.awaitOrDefault]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/io.reactivex.rxjava3.core.-maybe-source/await-or-default.html
[io.reactivex.rxjava3.core.SingleSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/io.reactivex.rxjava3.core.-single-source/await.html
[io.reactivex.rxjava3.core.ObservableSource.awaitFirst]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/io.reactivex.rxjava3.core.-observable-source/await-first.html
[io.reactivex.rxjava3.core.ObservableSource.awaitFirstOrDefault]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/io.reactivex.rxjava3.core.-observable-source/await-first-or-default.html
[io.reactivex.rxjava3.core.ObservableSource.awaitFirstOrElse]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/io.reactivex.rxjava3.core.-observable-source/await-first-or-else.html
[io.reactivex.rxjava3.core.ObservableSource.awaitFirstOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/io.reactivex.rxjava3.core.-observable-source/await-first-or-null.html
[io.reactivex.rxjava3.core.ObservableSource.awaitSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/io.reactivex.rxjava3.core.-observable-source/await-single.html
[kotlinx.coroutines.Job.asCompletable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/kotlinx.coroutines.-job/as-completable.html
[kotlinx.coroutines.Deferred.asSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/kotlinx.coroutines.-deferred/as-single.html
[io.reactivex.rxjava3.core.Scheduler.asCoroutineDispatcher]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/io.reactivex.rxjava3.core.-scheduler/as-coroutine-dispatcher.html
<!--- END -->

# Package kotlinx.coroutines.rx3

Utilities for [RxJava 3.x](https://github.com/ReactiveX/RxJava).
70 changes: 70 additions & 0 deletions reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api
@@ -0,0 +1,70 @@
public final class kotlinx/coroutines/rx3/RxAwaitKt {
public static final fun await (Lio/reactivex/rxjava3/core/CompletableSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun await (Lio/reactivex/rxjava3/core/MaybeSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun await (Lio/reactivex/rxjava3/core/SingleSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun awaitFirst (Lio/reactivex/rxjava3/core/ObservableSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun awaitFirstOrDefault (Lio/reactivex/rxjava3/core/ObservableSource;Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun awaitFirstOrElse (Lio/reactivex/rxjava3/core/ObservableSource;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun awaitFirstOrNull (Lio/reactivex/rxjava3/core/ObservableSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun awaitLast (Lio/reactivex/rxjava3/core/ObservableSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun awaitOrDefault (Lio/reactivex/rxjava3/core/MaybeSource;Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun awaitSingle (Lio/reactivex/rxjava3/core/ObservableSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public final class kotlinx/coroutines/rx3/RxChannelKt {
public static final fun collect (Lio/reactivex/rxjava3/core/MaybeSource;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun collect (Lio/reactivex/rxjava3/core/ObservableSource;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun openSubscription (Lio/reactivex/rxjava3/core/MaybeSource;)Lkotlinx/coroutines/channels/ReceiveChannel;
public static final fun openSubscription (Lio/reactivex/rxjava3/core/ObservableSource;)Lkotlinx/coroutines/channels/ReceiveChannel;
}

public final class kotlinx/coroutines/rx3/RxCompletableKt {
public static final fun rxCompletable (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/rxjava3/core/Completable;
public static synthetic fun rxCompletable$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lio/reactivex/rxjava3/core/Completable;
}

public final class kotlinx/coroutines/rx3/RxConvertKt {
public static final fun asCompletable (Lkotlinx/coroutines/Job;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Completable;
public static final fun asFlow (Lio/reactivex/rxjava3/core/ObservableSource;)Lkotlinx/coroutines/flow/Flow;
public static final fun asMaybe (Lkotlinx/coroutines/Deferred;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Maybe;
public static final fun asSingle (Lkotlinx/coroutines/Deferred;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Single;
public static final fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/rxjava3/core/Flowable;
public static final fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/rxjava3/core/Observable;
}

public final class kotlinx/coroutines/rx3/RxFlowableKt {
public static final fun rxFlowable (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/rxjava3/core/Flowable;
public static synthetic fun rxFlowable$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lio/reactivex/rxjava3/core/Flowable;
}

public final class kotlinx/coroutines/rx3/RxMaybeKt {
public static final fun rxMaybe (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/rxjava3/core/Maybe;
public static synthetic fun rxMaybe$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lio/reactivex/rxjava3/core/Maybe;
}

public final class kotlinx/coroutines/rx3/RxObservableKt {
public static final fun rxObservable (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/rxjava3/core/Observable;
public static synthetic fun rxObservable$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lio/reactivex/rxjava3/core/Observable;
}

public final class kotlinx/coroutines/rx3/RxSchedulerKt {
public static final fun asCoroutineDispatcher (Lio/reactivex/rxjava3/core/Scheduler;)Lkotlinx/coroutines/rx3/SchedulerCoroutineDispatcher;
}

public final class kotlinx/coroutines/rx3/RxSingleKt {
public static final fun rxSingle (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/rxjava3/core/Single;
public static synthetic fun rxSingle$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lio/reactivex/rxjava3/core/Single;
}

public final class kotlinx/coroutines/rx3/SchedulerCoroutineDispatcher : kotlinx/coroutines/CoroutineDispatcher, kotlinx/coroutines/Delay {
public fun <init> (Lio/reactivex/rxjava3/core/Scheduler;)V
public fun delay (JLkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun dispatch (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Runnable;)V
public fun equals (Ljava/lang/Object;)Z
public final fun getScheduler ()Lio/reactivex/rxjava3/core/Scheduler;
public fun hashCode ()I
public fun invokeOnTimeout (JLjava/lang/Runnable;)Lkotlinx/coroutines/DisposableHandle;
public fun scheduleResumeAfterDelay (JLkotlinx/coroutines/CancellableContinuation;)V
public fun toString ()Ljava/lang/String;
}

45 changes: 45 additions & 0 deletions reactive/kotlinx-coroutines-rx3/build.gradle
@@ -0,0 +1,45 @@
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
targetCompatibility = JavaVersion.VERSION_1_8

dependencies {
compile project(':kotlinx-coroutines-reactive')
testCompile project(':kotlinx-coroutines-reactive').sourceSets.test.output
testCompile "org.reactivestreams:reactive-streams-tck:$reactive_streams_version"
compile "io.reactivex.rxjava3:rxjava:$rxjava3_version"
}

compileTestKotlin {
kotlinOptions.jvmTarget = "1.8"
}

compileKotlin {
kotlinOptions.jvmTarget = "1.8"
}

tasks.withType(dokka.getClass()) {
externalDocumentationLink {
url = new URL('http://reactivex.io/RxJava/3.x/javadoc/')
packageListUrl = projectDir.toPath().resolve("package.list").toUri().toURL()
}
}

task testNG(type: Test) {
useTestNG()
reports.html.destination = file("$buildDir/reports/testng")
include '**/*ReactiveStreamTckTest.*'
// Skip testNG when tests are filtered with --tests, otherwise it simply fails
onlyIf {
filter.includePatterns.isEmpty()
}
doFirst {
// Classic gradle, nothing works without doFirst
println "TestNG tests: ($includes)"
}
}

test {
dependsOn(testNG)
reports.html.destination = file("$buildDir/reports/junit")
}
14 changes: 14 additions & 0 deletions reactive/kotlinx-coroutines-rx3/package.list
@@ -0,0 +1,14 @@
io.reactivex.rxjava3.core
io.reactivex.rxjava3.annotations
io.reactivex.rxjava3.disposables
io.reactivex.rxjava3.exceptions
io.reactivex.rxjava3.flowables
io.reactivex.rxjava3.functions
io.reactivex.rxjava3.observables
io.reactivex.rxjava3.observers
io.reactivex.rxjava3.parallel
io.reactivex.rxjava3.plugins
io.reactivex.rxjava3.processors
io.reactivex.rxjava3.schedulers
io.reactivex.rxjava3.subjects
io.reactivex.rxjava3.subscribers

0 comments on commit fe64178

Please sign in to comment.