Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
JDK9 Flow integration is implemented as thin wrappers around the Reactive Streams integration.
- Loading branch information
1 parent
8aaf2f7
commit a25bf36
Showing
18 changed files
with
933 additions
and
45 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
# Module kotlinx-coroutines-jdk9 | ||
|
||
Utilities for [Java Flow](https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/Flow.html). | ||
|
||
Implemented as a collection of thin wrappers over [kotlinx-coroutines-reactive](../kotlinx-coroutines-reactive), | ||
an equivalent package for the Reactive Streams. | ||
|
||
# Package kotlinx.coroutines.jdk9 | ||
|
||
Utilities for [Java Flow](https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/Flow.html). |
20 changes: 20 additions & 0 deletions
20
reactive/kotlinx-coroutines-jdk9/api/kotlinx-coroutines-jdk9.api
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
public final class kotlinx/coroutines/jdk9/AwaitKt { | ||
public static final fun awaitFirst (Ljava/util/concurrent/Flow$Publisher;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; | ||
public static final fun awaitFirstOrDefault (Ljava/util/concurrent/Flow$Publisher;Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; | ||
public static final fun awaitFirstOrElse (Ljava/util/concurrent/Flow$Publisher;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; | ||
public static final fun awaitFirstOrNull (Ljava/util/concurrent/Flow$Publisher;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; | ||
public static final fun awaitLast (Ljava/util/concurrent/Flow$Publisher;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; | ||
public static final fun awaitSingle (Ljava/util/concurrent/Flow$Publisher;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; | ||
} | ||
|
||
public final class kotlinx/coroutines/jdk9/PublishKt { | ||
public static final fun flowPublish (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Ljava/util/concurrent/Flow$Publisher; | ||
public static synthetic fun flowPublish$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Ljava/util/concurrent/Flow$Publisher; | ||
} | ||
|
||
public final class kotlinx/coroutines/jdk9/ReactiveFlowKt { | ||
public static final fun asFlow (Ljava/util/concurrent/Flow$Publisher;)Lkotlinx/coroutines/flow/Flow; | ||
public static final fun asPublisher (Lkotlinx/coroutines/flow/Flow;)Ljava/util/concurrent/Flow$Publisher; | ||
public static final fun collect (Ljava/util/concurrent/Flow$Publisher;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; | ||
} | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
/* | ||
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. | ||
*/ | ||
targetCompatibility = 9 | ||
|
||
dependencies { | ||
compile project(":kotlinx-coroutines-reactive") | ||
compile "org.reactivestreams:reactive-streams-flow-adapters:$reactive_streams_version" | ||
} | ||
|
||
compileTestKotlin { | ||
kotlinOptions.jvmTarget = "9" | ||
} | ||
|
||
compileKotlin { | ||
kotlinOptions.jvmTarget = "9" | ||
} | ||
|
||
tasks.withType(dokka.getClass()) { | ||
externalDocumentationLink { | ||
url = new URL("https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/Flow.html") | ||
packageListUrl = projectDir.toPath().resolve("package.list").toUri().toURL() | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
java.util.concurrent.Flow |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
/* | ||
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. | ||
*/ | ||
|
||
package kotlinx.coroutines.jdk9 | ||
|
||
import java.util.concurrent.* | ||
import org.reactivestreams.FlowAdapters | ||
import kotlinx.coroutines.reactive.* | ||
|
||
/** | ||
* Awaits for the first value from the given publisher without blocking a thread and | ||
* returns the resulting value or throws the corresponding exception if this publisher had produced error. | ||
* | ||
* This suspending function is cancellable. | ||
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function | ||
* immediately resumes with [CancellationException]. | ||
* | ||
* @throws NoSuchElementException if publisher does not emit any value | ||
*/ | ||
public suspend fun <T> Flow.Publisher<T>.awaitFirst(): T = FlowAdapters.toPublisher(this).awaitFirst() | ||
|
||
/** | ||
* Awaits for the first value from the given observable or the [default] value if none is emitted without blocking a | ||
* thread and returns the resulting value or throws the corresponding exception if this observable had produced error. | ||
* | ||
* This suspending function is cancellable. | ||
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function | ||
* immediately resumes with [CancellationException]. | ||
*/ | ||
public suspend fun <T> Flow.Publisher<T>.awaitFirstOrDefault(default: T): T = | ||
FlowAdapters.toPublisher(this).awaitFirstOrDefault(default) | ||
|
||
/** | ||
* Awaits for the first value from the given observable or `null` value if none is emitted without blocking a | ||
* thread and returns the resulting value or throws the corresponding exception if this observable had produced error. | ||
* | ||
* This suspending function is cancellable. | ||
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function | ||
* immediately resumes with [CancellationException]. | ||
*/ | ||
public suspend fun <T> Flow.Publisher<T>.awaitFirstOrNull(): T? = | ||
FlowAdapters.toPublisher(this).awaitFirstOrNull() | ||
|
||
/** | ||
* Awaits for the first value from the given observable or call [defaultValue] to get a value if none is emitted without blocking a | ||
* thread and returns the resulting value or throws the corresponding exception if this observable had produced error. | ||
* | ||
* This suspending function is cancellable. | ||
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function | ||
* immediately resumes with [CancellationException]. | ||
*/ | ||
public suspend fun <T> Flow.Publisher<T>.awaitFirstOrElse(defaultValue: () -> T): T = | ||
FlowAdapters.toPublisher(this).awaitFirstOrElse(defaultValue) | ||
|
||
/** | ||
* Awaits for the last value from the given publisher without blocking a thread and | ||
* returns the resulting value or throws the corresponding exception if this publisher had produced error. | ||
* | ||
* This suspending function is cancellable. | ||
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function | ||
* immediately resumes with [CancellationException]. | ||
* | ||
* @throws NoSuchElementException if publisher does not emit any value | ||
*/ | ||
public suspend fun <T> Flow.Publisher<T>.awaitLast(): T = | ||
FlowAdapters.toPublisher(this).awaitLast() | ||
|
||
/** | ||
* Awaits for the single value from the given publisher without blocking a thread and | ||
* returns the resulting value or throws the corresponding exception if this publisher had produced error. | ||
* | ||
* This suspending function is cancellable. | ||
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function | ||
* immediately resumes with [CancellationException]. | ||
* | ||
* @throws NoSuchElementException if publisher does not emit any value | ||
* @throws IllegalArgumentException if publisher emits more than one value | ||
*/ | ||
public suspend fun <T> Flow.Publisher<T>.awaitSingle(): T = | ||
FlowAdapters.toPublisher(this).awaitSingle() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
/* | ||
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. | ||
*/ | ||
|
||
package kotlinx.coroutines.jdk9 | ||
|
||
import kotlinx.coroutines.* | ||
import kotlinx.coroutines.channels.* | ||
import java.util.concurrent.* | ||
import kotlin.coroutines.* | ||
import org.reactivestreams.FlowAdapters | ||
|
||
/** | ||
* Creates cold reactive [Flow.Publisher] that runs a given [block] in a coroutine. | ||
* Every time the returned flux is subscribed, it starts a new coroutine in the specified [context]. | ||
* Coroutine emits ([Subscriber.onNext]) values with `send`, completes ([Subscriber.onComplete]) | ||
* when the coroutine completes or channel is explicitly closed and emits error ([Subscriber.onError]) | ||
* if coroutine throws an exception or closes channel with a cause. | ||
* Unsubscribing cancels running coroutine. | ||
* | ||
* Invocations of `send` are suspended appropriately when subscribers apply back-pressure and to ensure that | ||
* `onNext` is not invoked concurrently. | ||
* | ||
* Coroutine context can be specified with [context] argument. | ||
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used. | ||
* Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance. | ||
* | ||
* **Note: This is an experimental api.** Behaviour of publishers that work as children in a parent scope with respect | ||
* to cancellation and error handling may change in the future. | ||
*/ | ||
@ExperimentalCoroutinesApi | ||
public fun <T> flowPublish( | ||
context: CoroutineContext = EmptyCoroutineContext, | ||
@BuilderInference block: suspend ProducerScope<T>.() -> Unit | ||
): Flow.Publisher<T> { | ||
val reactivePublisher : org.reactivestreams.Publisher<T> = kotlinx.coroutines.reactive.publish<T>(context, block) | ||
return FlowAdapters.toFlowPublisher(reactivePublisher) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
/* | ||
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. | ||
*/ | ||
|
||
package kotlinx.coroutines.jdk9 | ||
|
||
import kotlinx.coroutines.flow.* | ||
import kotlinx.coroutines.reactive.asFlow | ||
import kotlinx.coroutines.reactive.asPublisher | ||
import kotlinx.coroutines.reactive.collect | ||
import java.util.concurrent.Flow as JFlow | ||
import org.reactivestreams.FlowAdapters | ||
|
||
/** | ||
* Transforms the given reactive [Publisher] into [Flow]. | ||
* Use [buffer] operator on the resulting flow to specify the size of the backpressure. | ||
* More precisely, it specifies the value of the subscription's [request][Subscription.request]. | ||
* [buffer] default capacity is used by default. | ||
* | ||
* If any of the resulting flow transformations fails, subscription is immediately cancelled and all in-flight elements | ||
* are discarded. | ||
*/ | ||
public fun <T : Any> JFlow.Publisher<T>.asFlow(): Flow<T> = | ||
FlowAdapters.toPublisher(this).asFlow() | ||
|
||
/** | ||
* Transforms the given flow to a reactive specification compliant [Publisher]. | ||
*/ | ||
public fun <T : Any> Flow<T>.asPublisher(): JFlow.Publisher<T> { | ||
val reactivePublisher : org.reactivestreams.Publisher<T> = this.asPublisher<T>() | ||
return FlowAdapters.toFlowPublisher(reactivePublisher) | ||
} | ||
|
||
/** | ||
* Subscribes to this [Publisher] and performs the specified action for each received element. | ||
* Cancels subscription if any exception happens during collect. | ||
*/ | ||
public suspend inline fun <T> JFlow.Publisher<T>.collect(action: (T) -> Unit) = | ||
FlowAdapters.toPublisher(this).collect(action) |
79 changes: 79 additions & 0 deletions
79
reactive/kotlinx-coroutines-jdk9/test/FlowAsPublisherTest.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
/* | ||
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. | ||
*/ | ||
|
||
package kotlinx.coroutines.jdk9 | ||
|
||
import kotlinx.coroutines.* | ||
import kotlinx.coroutines.flow.* | ||
import org.junit.Test | ||
import java.util.concurrent.Flow as JFlow | ||
import kotlin.test.* | ||
|
||
class FlowAsPublisherTest : TestBase() { | ||
|
||
@Test | ||
fun testErrorOnCancellationIsReported() { | ||
expect(1) | ||
flow<Int> { | ||
emit(2) | ||
try { | ||
hang { expect(3) } | ||
} finally { | ||
throw TestException() | ||
} | ||
}.asPublisher().subscribe(object : JFlow.Subscriber<Int> { | ||
private lateinit var subscription: JFlow.Subscription | ||
|
||
override fun onComplete() { | ||
expectUnreached() | ||
} | ||
|
||
override fun onSubscribe(s: JFlow.Subscription?) { | ||
subscription = s!! | ||
subscription.request(2) | ||
} | ||
|
||
override fun onNext(t: Int) { | ||
expect(t) | ||
subscription.cancel() | ||
} | ||
|
||
override fun onError(t: Throwable?) { | ||
assertTrue(t is TestException) | ||
expect(4) | ||
} | ||
}) | ||
finish(5) | ||
} | ||
|
||
@Test | ||
fun testCancellationIsNotReported() { | ||
expect(1) | ||
flow<Int> { | ||
emit(2) | ||
hang { expect(3) } | ||
}.asPublisher().subscribe(object : JFlow.Subscriber<Int> { | ||
private lateinit var subscription: JFlow.Subscription | ||
|
||
override fun onComplete() { | ||
expect(4) | ||
} | ||
|
||
override fun onSubscribe(s: JFlow.Subscription?) { | ||
subscription = s!! | ||
subscription.request(2) | ||
} | ||
|
||
override fun onNext(t: Int) { | ||
expect(t) | ||
subscription.cancel() | ||
} | ||
|
||
override fun onError(t: Throwable?) { | ||
expectUnreached() | ||
} | ||
}) | ||
finish(5) | ||
} | ||
} |
Oops, something went wrong.