From f0bbeb8a6363b564935bd60fd3c1bafe984fddb8 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Tue, 23 Oct 2018 19:01:22 +0300 Subject: [PATCH] Introduce ReceiveChannel.receiveOrClosed and ReceiveChannel.onReceiveOrClosed Fixes #330 --- build.gradle | 3 +- .../src/channels/AbstractChannel.kt | 145 ++++++++++++++---- .../src/channels/Channel.kt | 104 ++++++++++++- .../test/channels/BasicOperationsTest.kt | 34 ++++ .../channels/ChannelReceiveOrClosedTest.kt | 144 +++++++++++++++++ .../test/channels/TestChannelKind.kt | 3 + .../test/selects/SelectArrayChannelTest.kt | 103 +++++++++++++ .../selects/SelectRendezvousChannelTest.kt | 100 ++++++++++++ gradle.properties | 2 +- 9 files changed, 608 insertions(+), 30 deletions(-) create mode 100644 common/kotlinx-coroutines-core-common/test/channels/ChannelReceiveOrClosedTest.kt diff --git a/build.gradle b/build.gradle index 24145df881..90e34114f6 100644 --- a/build.gradle +++ b/build.gradle @@ -193,7 +193,8 @@ configure(subprojects.findAll { !unpublished.contains(it.name) }) { "-Xuse-experimental=kotlin.experimental.ExperimentalTypeInference", "-Xuse-experimental=kotlinx.coroutines.ExperimentalCoroutinesApi", "-Xuse-experimental=kotlinx.coroutines.ObsoleteCoroutinesApi", - "-Xuse-experimental=kotlinx.coroutines.InternalCoroutinesApi"] + "-Xuse-experimental=kotlinx.coroutines.InternalCoroutinesApi", + "-XXLanguage:+InlineClasses"] } } diff --git a/common/kotlinx-coroutines-core-common/src/channels/AbstractChannel.kt b/common/kotlinx-coroutines-core-common/src/channels/AbstractChannel.kt index f031692c04..836233786c 100644 --- a/common/kotlinx-coroutines-core-common/src/channels/AbstractChannel.kt +++ b/common/kotlinx-coroutines-core-common/src/channels/AbstractChannel.kt @@ -14,7 +14,6 @@ import kotlin.jvm.* /** * Abstract send channel. It is a base class for all send channel implementations. - * */ internal abstract class AbstractSendChannel : SendChannel { /** @suppress **This is unstable API and it is subject to change.** */ @@ -580,7 +579,7 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel - val receive = ReceiveElement(cont as CancellableContinuation, nullOnClose = false) + val receive = ReceiveElement(cont as CancellableContinuation, ResumeMode.THROW_ON_CLOSE) while (true) { if (enqueueReceive(receive)) { cont.initCancellability() // make it properly cancellable @@ -628,7 +627,7 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel - val receive = ReceiveElement(cont, nullOnClose = true) + val receive = ReceiveElement(cont as CancellableContinuation, ResumeMode.NULL_ON_CLOSE) while (true) { if (enqueueReceive(receive)) { cont.initCancellability() // make it properly cancellable @@ -651,13 +650,42 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel { + // fast path -- try poll non-blocking + val result = pollInternal() + if (result !== POLL_FAILED) { + return result.toResult() + } + // slow-path does suspend + return receiveOrClosedSuspend() + } + + @Suppress("UNCHECKED_CAST") + private suspend fun receiveOrClosedSuspend(): ReceiveResult = suspendAtomicCancellableCoroutine(holdCancellability = true) sc@{ cont -> + val receive = ReceiveElement(cont as CancellableContinuation, ResumeMode.RECEIVE_RESULT) + while (true) { + if (enqueueReceive(receive)) { + cont.initCancellability() + removeReceiveOnCancel(cont, receive) + return@sc + } + + val result = pollInternal() + if (result is Closed<*> || result !== POLL_FAILED) { + cont.resume(result.toResult()) + return@sc + } + } + } + @Suppress("UNCHECKED_CAST") public final override fun poll(): E? { val result = pollInternal() return if (result === POLL_FAILED) null else receiveOrNullResult(result) } - override fun cancel(): Unit { + override fun cancel() { cancel(null) } @@ -712,9 +740,9 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel( select: SelectInstance, - block: suspend (E?) -> R, - nullOnClose: Boolean - ) : AddLastDesc>(queue, ReceiveSelect(select, block, nullOnClose)) { + block: suspend (Any?) -> R, + mode: ResumeMode + ) : AddLastDesc>(queue, ReceiveSelect(select, block, mode)) { override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? { if (affected is Send) return ENQUEUE_FAILED return null @@ -746,7 +774,7 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel R), nullOnClose = false) + val enqueueOp = TryEnqueueReceiveDesc(select, block as (suspend (Any?) -> R), ResumeMode.THROW_ON_CLOSE) val enqueueResult = select.performAtomicIfNotSelected(enqueueOp) ?: return when { enqueueResult === ALREADY_SELECTED -> return @@ -780,7 +808,7 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel(select, block as suspend (Any?) -> R, ResumeMode.NULL_ON_CLOSE) val enqueueResult = select.performAtomicIfNotSelected(enqueueOp) ?: return when { enqueueResult === ALREADY_SELECTED -> return @@ -810,6 +838,43 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel> + get() = object : SelectClause1> { + override fun registerSelectClause1(select: SelectInstance, block: suspend (ReceiveResult) -> R) { + registerSelectReceiveOrClosed(select, block) + } + } + + @Suppress("UNCHECKED_CAST") + private fun registerSelectReceiveOrClosed(select: SelectInstance, block: suspend (ReceiveResult) -> R) { + while (true) { + if (select.isSelected) return + if (isEmpty) { + val enqueueOp = TryEnqueueReceiveDesc(select, block as suspend (Any?) -> R, ResumeMode.RECEIVE_RESULT) + val enqueueResult = select.performAtomicIfNotSelected(enqueueOp) ?: return + when { + enqueueResult === ALREADY_SELECTED -> return + enqueueResult === ENQUEUE_FAILED -> {} // retry + else -> error("performAtomicIfNotSelected(TryEnqueueReceiveDesc) returned $enqueueResult") + } + } else { + val pollResult = pollSelectInternal(select) + when { + pollResult === ALREADY_SELECTED -> return + pollResult === POLL_FAILED -> {} // retry + pollResult is Closed<*> -> { + block.startCoroutineUnintercepted(ReceiveResult.closed(pollResult.receiveException), select.completion) + } + else -> { + // selected successfully + block.startCoroutineUnintercepted(ReceiveResult.value(pollResult as E), select.completion) + return + } + } + } + } + } + // ------ protected ------ override fun takeFirstReceiveOrPeekClosed(): ReceiveOrClosed? = @@ -902,18 +967,31 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel( - @JvmField val cont: CancellableContinuation, - @JvmField val nullOnClose: Boolean + @JvmField val cont: CancellableContinuation, + @JvmField val resumeMode: ResumeMode ) : Receive() { - override fun tryResumeReceive(value: E, idempotent: Any?): Any? = cont.tryResume(value, idempotent) + @Suppress("IMPLICIT_CAST_TO_ANY") + override fun tryResumeReceive(value: E, idempotent: Any?): Any? { + val resumeValue = when (resumeMode) { + ResumeMode.RECEIVE_RESULT -> ReceiveResult.value(value) + else -> value + } + return cont.tryResume(resumeValue, idempotent) + } + override fun completeResumeReceive(token: Any) = cont.completeResume(token) override fun resumeReceiveClosed(closed: Closed<*>) { - if (closed.closeCause == null && nullOnClose) - cont.resume(null) - else - cont.resumeWithException(closed.receiveException) + when (resumeMode) { + ResumeMode.THROW_ON_CLOSE -> cont.resumeWithException(closed.receiveException) + ResumeMode.RECEIVE_RESULT -> cont.resume(closed.toResult()) + ResumeMode.NULL_ON_CLOSE -> if (closed.closeCause == null) { + cont.resume(null) + } else { + cont.resumeWithException(closed.receiveException) + } + } } - override fun toString(): String = "ReceiveElement[$cont,nullOnClose=$nullOnClose]" + override fun toString(): String = "ReceiveElement[$cont,mode=$resumeMode]" } private class ReceiveHasNext( @@ -957,8 +1035,8 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel( @JvmField val select: SelectInstance, - @JvmField val block: suspend (E?) -> R, - @JvmField val nullOnClose: Boolean + @JvmField val block: suspend (Any?) -> R, + @JvmField val mode: ResumeMode ) : Receive(), DisposableHandle { override fun tryResumeReceive(value: E, idempotent: Any?): Any? = if (select.trySelect(idempotent)) (value ?: NULL_VALUE) else null @@ -966,17 +1044,18 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel) { - if (select.trySelect(null)) { - if (closed.closeCause == null && nullOnClose) { + if (!select.trySelect(null)) return + + when (mode) { + ResumeMode.THROW_ON_CLOSE -> select.resumeSelectCancellableWithException(closed.receiveException) + ResumeMode.RECEIVE_RESULT -> block.startCoroutine(ReceiveResult.closed(closed.receiveException), select.completion) + ResumeMode.NULL_ON_CLOSE -> if (closed.closeCause == null) { block.startCoroutine(null, select.completion) } else { - // even though we are dispatching coroutine to process channel close on receive, - // which is an atomically cancellable suspending function, - // close is a final state, so we can use a cancellable resume mode select.resumeSelectCancellableWithException(closed.receiveException) } } @@ -991,7 +1070,7 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel( @@ -1083,3 +1162,17 @@ private abstract class Receive : LockFreeLinkedListNode(), ReceiveOrClosed override val offerResult get() = OFFER_SUCCESS abstract fun resumeReceiveClosed(closed: Closed<*>) } + +@Suppress("NOTHING_TO_INLINE", "UNCHECKED_CAST") +private inline fun Any?.toResult(): ReceiveResult = + if (this is Closed<*>) ReceiveResult.closed(receiveException) else ReceiveResult.value(this as E) + +@Suppress("NOTHING_TO_INLINE") +private inline fun Closed<*>.toResult(): ReceiveResult = ReceiveResult.closed(receiveException) + +// Marker for receive, receiveOrNull and receiveOrClosed +private enum class ResumeMode { + THROW_ON_CLOSE, + NULL_ON_CLOSE, + RECEIVE_RESULT +} diff --git a/common/kotlinx-coroutines-core-common/src/channels/Channel.kt b/common/kotlinx-coroutines-core-common/src/channels/Channel.kt index 8cf8572aba..26573dc45b 100644 --- a/common/kotlinx-coroutines-core-common/src/channels/Channel.kt +++ b/common/kotlinx-coroutines-core-common/src/channels/Channel.kt @@ -217,7 +217,7 @@ public interface ReceiveChannel { /** * Clause for [select] expression of [receiveOrNull] suspending function that selects with the element that - * is received from the channel or selects with `null` if if the channel + * is received from the channel or selects with `null` if the channel * [isClosedForReceive] without cause. The [select] invocation fails with * the original [close][SendChannel.close] cause exception if the channel has _failed_. * @@ -226,6 +226,37 @@ public interface ReceiveChannel { @ExperimentalCoroutinesApi public val onReceiveOrNull: SelectClause1 + /** + * Retrieves and removes the element from this channel suspending the caller while this channel [isEmpty]. + * This method returns [ReceiveResult] with a value if element was successfully retrieved from the channel + * or [ReceiveResult] with close cause if channel was closed. + * + * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this + * function is suspended, this function immediately resumes with [CancellationException]. + * + * *Cancellation of suspended receive is atomic* -- when this function + * throws [CancellationException] it means that the element was not retrieved from this channel. + * As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may + * continue to execute even after it was cancelled from the same thread in the case when this receive operation + * was already resumed and the continuation was posted for execution to the thread's queue. + * + * Note, that this function does not check for cancellation when it is not suspended. + * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed. + * + * This function can be used in [select] invocation with [onReceiveOrClosed] clause. + * Use [poll] to try receiving from this channel without waiting. + */ + @ExperimentalCoroutinesApi + public suspend fun receiveOrClosed(): ReceiveResult + + /** + * Clause for [select] expression of [receiveOrClosed] suspending function that selects with the [ReceiveResult] with a value + * that is received from the channel or selects with [ReceiveResult] with a close cause if the channel + * [isClosedForReceive]. + */ + @ExperimentalCoroutinesApi + public val onReceiveOrClosed: SelectClause1> + /** * Retrieves and removes the element from this channel, or returns `null` if this channel [isEmpty] * or is [isClosedForReceive] without cause. @@ -250,7 +281,7 @@ public interface ReceiveChannel { * afterwards will throw [ClosedSendChannelException], while attempts to receive will throw * [ClosedReceiveChannelException]. */ - public fun cancel(): Unit + public fun cancel() /** * @suppress @@ -268,6 +299,75 @@ public interface ReceiveChannel { public fun cancel(cause: Throwable? = null): Boolean } +/** + * A discriminated union of [ReceiveChannel.receiveOrClosed] result, + * that encapsulates either successfully received element of type [T] from the channel or a close cause. + */ +@Suppress("NON_PUBLIC_PRIMARY_CONSTRUCTOR_OF_INLINE_CLASS") +public inline class ReceiveResult +internal constructor(private val holder: Any?) { + /** + * Returns `true` if this instance represents received element. + * In this case [isClosed] returns `false`. + */ + public val isValue: Boolean get() = holder !is Closed + + /** + * Returns `true` if this instance represents close cause. + * In this case [isValue] returns `false`. + */ + public val isClosed: Boolean get() = holder is Closed + + /** + * Returns received value if this instance represents received value or throws [IllegalStateException] otherwise. + */ + @Suppress("UNCHECKED_CAST") + public val value: T + get() = if (isClosed) throw IllegalStateException() else holder as T + + /** + * Returns received value if this element represents received value or `null` otherwise. + */ + @Suppress("UNCHECKED_CAST") + public val valueOrNull: T? + get() = if (isClosed) null else holder as T + + @Suppress("UNCHECKED_CAST") + public val valueOrThrow: T + get() = if (isClosed) throw closeCause else holder as T + + /** + * Returns close cause of the channel if this instance represents close cause or throws [IllegalStateException] otherwise. + */ + @Suppress("UNCHECKED_CAST") + public val closeCause: Throwable get() = if (isClosed) (holder as Closed).exception else error("ReceiveResult is not closed") + + /** + * @suppress + */ + public override fun toString(): String = + when (holder) { + is Closed -> holder.toString() + else -> "Value($holder)" + } + + internal class Closed(@JvmField val exception: Throwable) { + override fun equals(other: Any?): Boolean = other is Closed && exception == other.exception + override fun hashCode(): Int = exception.hashCode() + override fun toString(): String = "Closed($exception)" + } + + internal companion object { + @Suppress("NOTHING_TO_INLINE") + internal inline fun value(value: E): ReceiveResult = + ReceiveResult(value) + + @Suppress("NOTHING_TO_INLINE") + internal inline fun closed(cause: Throwable): ReceiveResult = + ReceiveResult(Closed(cause)) + } +} + /** * Iterator for [ReceiveChannel]. Instances of this interface are *not thread-safe* and shall not be used * from concurrent coroutines. diff --git a/common/kotlinx-coroutines-core-common/test/channels/BasicOperationsTest.kt b/common/kotlinx-coroutines-core-common/test/channels/BasicOperationsTest.kt index a5e180aeee..f25bd109b6 100644 --- a/common/kotlinx-coroutines-core-common/test/channels/BasicOperationsTest.kt +++ b/common/kotlinx-coroutines-core-common/test/channels/BasicOperationsTest.kt @@ -30,6 +30,11 @@ class BasicOperationsTest : TestBase() { TestChannelKind.values().forEach { kind -> testReceiveOrNullException(kind) } } + @Test + fun testReceiveOrClosed() = runTest { + TestChannelKind.values().forEach { kind -> testReceiveOrClosed(kind) } + } + @Test fun testInvokeOnClose() = TestChannelKind.values().forEach { kind -> reset() @@ -108,6 +113,35 @@ class BasicOperationsTest : TestBase() { assertTrue(d.getCancellationException().cause is TestException) } + @Suppress("ReplaceAssertBooleanWithAssertEquality") + private suspend fun testReceiveOrClosed(kind: TestChannelKind) = coroutineScope { + reset() + val channel = kind.create() + launch { + expect(2) + channel.send(1) + } + + expect(1) + val result = channel.receiveOrClosed() + assertEquals(1, result.value) + assertEquals(1, result.valueOrNull) + assertTrue(ReceiveResult.value(1) == result) + + expect(3) + launch { + expect(4) + channel.close() + } + val closed = channel.receiveOrClosed() + expect(5) + assertNull(closed.valueOrNull) + assertTrue(closed.isClosed) + assertTrue(closed.closeCause is ClosedReceiveChannelException) + assertFailsWith { closed.valueOrThrow } + assertTrue(ReceiveResult.closed(closed.closeCause) == closed) + finish(6) + } private suspend fun testOffer(kind: TestChannelKind) = coroutineScope { val channel = kind.create() diff --git a/common/kotlinx-coroutines-core-common/test/channels/ChannelReceiveOrClosedTest.kt b/common/kotlinx-coroutines-core-common/test/channels/ChannelReceiveOrClosedTest.kt new file mode 100644 index 0000000000..823d658d1f --- /dev/null +++ b/common/kotlinx-coroutines-core-common/test/channels/ChannelReceiveOrClosedTest.kt @@ -0,0 +1,144 @@ +/* + * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.channels + +import kotlinx.coroutines.* +import kotlin.test.* + +class ChannelReceiveOrClosedTest : TestBase() { + @Test + fun testChannelOfThrowables() = runTest { + val channel = Channel() + launch { + channel.send(TestException1()) + channel.close(TestException2()) + } + + val element = channel.receiveOrClosed() + assertTrue(element.isValue) + assertTrue(element.value is TestException1) + assertTrue(element.valueOrNull is TestException1) + + val closed = channel.receiveOrClosed() + assertTrue(closed.isClosed) + assertTrue(closed.closeCause is TestException2) + } + + @Test + @Suppress("ReplaceAssertBooleanWithAssertEquality") // inline classes test + fun testNullableIntChanel() = runTest { + val channel = Channel() + launch { + expect(2) + channel.send(1) + expect(3) + channel.send(null) + + expect(6) + channel.close() + } + + expect(1) + val element = channel.receiveOrClosed() + assertTrue(element.isValue) + assertEquals(1, element.value) + assertEquals(1, element.valueOrNull) + assertEquals("Value(1)", element.toString()) + assertTrue(ReceiveResult.value(1) == element) // Don't box + + expect(4) + val nullElement = channel.receiveOrClosed() + assertTrue(nullElement.isValue) + assertNull(nullElement.value) + assertNull(nullElement.valueOrNull) + assertEquals("Value(null)", nullElement.toString()) + assertTrue(ReceiveResult.value(null) == nullElement) // Don't box + + expect(5) + val closed = channel.receiveOrClosed() + assertTrue(closed.isClosed) + + val closed2 = channel.receiveOrClosed() + assertTrue(closed2.isClosed) + assertTrue(closed2.closeCause is ClosedReceiveChannelException) + finish(7) + } + + @Test + @ExperimentalUnsignedTypes + fun testUIntChannel() = runTest { + val channel = Channel() + launch { + expect(2) + channel.send(1u) + yield() + expect(4) + channel.send((Long.MAX_VALUE - 1).toUInt()) + expect(5) + } + + expect(1) + val element = channel.receiveOrClosed() + assertEquals(1u, element.value) + + expect(3) + val element2 = channel.receiveOrClosed() + assertEquals((Long.MAX_VALUE - 1).toUInt(), element2.value) + finish(6) + } + + @Test + fun testCancelChannel() = runTest { + val channel = Channel() + launch { + expect(2) + channel.cancel() + } + + expect(1) + val closed = channel.receiveOrClosed() + assertTrue(closed.isClosed) + assertTrue(closed.closeCause is ClosedReceiveChannelException) + finish(3) + } + + @Test + @ExperimentalUnsignedTypes + fun testReceiveResultChannel() = runTest { + val channel = Channel>() + launch { + channel.send(ReceiveResult.value(1u)) + channel.send(ReceiveResult.closed(TestException1())) + channel.close(TestException2()) + } + + val intResult = channel.receiveOrClosed() + assertTrue(intResult.isValue) + assertEquals(1u, intResult.value.value) + + val closeCauseResult = channel.receiveOrClosed() + assertTrue(closeCauseResult.isValue) + assertTrue(closeCauseResult.value.closeCause is TestException1) + + val closeCause = channel.receiveOrClosed() + assertTrue(closeCause.isClosed) + assertTrue(closeCause.closeCause is TestException2) + assertFailsWith { closeCause.valueOrThrow } + } + + @Test + fun testToString() = runTest { + val channel = Channel(1) + channel.send("message") + channel.close(TestException1()) + assertEquals("Value(message)", channel.receiveOrClosed().toString()) + // toString implementation for exception differs on every platform + val str = channel.receiveOrClosed().toString() + assertTrue(str.matches("Closed\\(.*TestException1\\)".toRegex())) + } + + class TestException1 : Throwable() + class TestException2 : Throwable() +} diff --git a/common/kotlinx-coroutines-core-common/test/channels/TestChannelKind.kt b/common/kotlinx-coroutines-core-common/test/channels/TestChannelKind.kt index 682058d3c0..314e549858 100644 --- a/common/kotlinx-coroutines-core-common/test/channels/TestChannelKind.kt +++ b/common/kotlinx-coroutines-core-common/test/channels/TestChannelKind.kt @@ -57,6 +57,7 @@ private class ChannelViaBroadcast( override suspend fun receive(): E = sub.receive() override suspend fun receiveOrNull(): E? = sub.receiveOrNull() + override suspend fun receiveOrClosed(): ReceiveResult = sub.receiveOrClosed() override fun poll(): E? = sub.poll() override fun iterator(): ChannelIterator = sub.iterator() override fun cancel(): Unit = sub.cancel() @@ -65,4 +66,6 @@ private class ChannelViaBroadcast( get() = sub.onReceive override val onReceiveOrNull: SelectClause1 get() = sub.onReceiveOrNull + override val onReceiveOrClosed: SelectClause1> + get() = sub.onReceiveOrClosed } diff --git a/common/kotlinx-coroutines-core-common/test/selects/SelectArrayChannelTest.kt b/common/kotlinx-coroutines-core-common/test/selects/SelectArrayChannelTest.kt index f8c3439b9e..27c6d0b317 100644 --- a/common/kotlinx-coroutines-core-common/test/selects/SelectArrayChannelTest.kt +++ b/common/kotlinx-coroutines-core-common/test/selects/SelectArrayChannelTest.kt @@ -284,10 +284,113 @@ class SelectArrayChannelTest : TestBase() { finish(10) } + @Test + fun testSelectReceiveOrClosedWaitClosed() = runTest { + expect(1) + val channel = Channel(1) + launch { + expect(3) + channel.close() + expect(4) + } + expect(2) + select { + channel.onReceiveOrClosed { + expect(5) + assertTrue(it.isClosed) + assertTrue(it.closeCause is ClosedReceiveChannelException) + } + } + + finish(6) + } + + @Test + fun testSelectReceiveOrClosedWaitClosedWithCause() = runTest { + expect(1) + val channel = Channel(1) + launch { + expect(3) + channel.close(TestException()) + expect(4) + } + expect(2) + select { + channel.onReceiveOrClosed { + expect(5) + assertTrue(it.isClosed) + assertTrue(it.closeCause is TestException) + } + } + + finish(6) + } + + @Test + fun testSelectReceiveOrClosed() = runTest { + val c = Channel(1) + val iterations = 10 + expect(1) + val job = launch { + repeat(iterations) { + select { + c.onReceiveOrClosed { v -> + expect(4 + it * 2) + assertEquals(it, v.value) + } + } + } + } + + expect(2) + repeat(iterations) { + expect(3 + it * 2) + c.send(it) + yield() + } + + job.join() + finish(3 + iterations * 2) + } + + @Test + fun testSelectReceiveOrClosedDispatch() = runTest { + val c = Channel(1) + expect(1) + launch { + expect(3) + val res = select { + c.onReceiveOrClosed { v -> + expect(6) + assertEquals(42, v.value) + yield() // back to main + expect(8) + "OK" + } + } + expect(9) + assertEquals("OK", res) + } + expect(2) + yield() // to launch + expect(4) + c.send(42) // do not suspend + expect(5) + yield() // to receive + expect(7) + yield() // again + finish(10) + } + // only for debugging internal fun SelectBuilder.default(block: suspend () -> R) { this as SelectBuilderImpl // type assertion if (!trySelect(null)) return block.startCoroutineUnintercepted(this) } + + class TestException : Throwable() } + + +fun foo(r: Result) {} \ No newline at end of file diff --git a/common/kotlinx-coroutines-core-common/test/selects/SelectRendezvousChannelTest.kt b/common/kotlinx-coroutines-core-common/test/selects/SelectRendezvousChannelTest.kt index 0c1f9f6bc7..ad2f4928f9 100644 --- a/common/kotlinx-coroutines-core-common/test/selects/SelectRendezvousChannelTest.kt +++ b/common/kotlinx-coroutines-core-common/test/selects/SelectRendezvousChannelTest.kt @@ -305,10 +305,110 @@ class SelectRendezvousChannelTest : TestBase() { finish(10) } + @Test + fun testSelectReceiveOrClosedWaitClosed() = runTest { + expect(1) + val channel = Channel(Channel.RENDEZVOUS) + launch { + expect(3) + channel.close() + expect(4) + } + expect(2) + select { + channel.onReceiveOrClosed { + expect(5) + assertTrue(it.isClosed) + assertTrue(it.closeCause is ClosedReceiveChannelException) + } + } + + finish(6) + } + + @Test + fun testSelectReceiveOrClosedWaitClosedWithCause() = runTest { + expect(1) + val channel = Channel(Channel.RENDEZVOUS) + launch { + expect(3) + channel.close(TestException()) + expect(4) + } + expect(2) + select { + channel.onReceiveOrClosed { + expect(5) + assertTrue(it.isClosed) + assertTrue(it.closeCause is TestException) + } + } + + finish(6) + } + + @Test + fun testSelectReceiveOrClosed() = runTest { + val channel = Channel(Channel.RENDEZVOUS) + val iterations = 10 + expect(1) + val job = launch { + repeat(iterations) { + select { + channel.onReceiveOrClosed { v -> + expect(4 + it * 2) + assertEquals(it, v.value) + } + } + } + } + + expect(2) + repeat(iterations) { + expect(3 + it * 2) + channel.send(it) + yield() + } + + job.join() + finish(3 + iterations * 2) + } + + @Test + fun testSelectReceiveOrClosedDispatch() = runTest { + val c = Channel(Channel.RENDEZVOUS) + expect(1) + launch { + expect(3) + val res = select { + c.onReceiveOrClosed { v -> + expect(6) + assertEquals(42, v.value) + yield() // back to main + expect(8) + "OK" + } + } + expect(9) + assertEquals("OK", res) + } + expect(2) + yield() // to launch + expect(4) + c.send(42) // do not suspend + expect(5) + yield() // to receive + expect(7) + yield() // again + finish(10) + } + // only for debugging internal fun SelectBuilder.default(block: suspend () -> R) { this as SelectBuilderImpl // type assertion if (!trySelect(null)) return block.startCoroutineUnintercepted(this) } + + class TestException : Throwable() } diff --git a/gradle.properties b/gradle.properties index 775676ae54..3207264b3e 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,7 +1,7 @@ # Kotlin version=1.0.0-RC1-SNAPSHOT group=org.jetbrains.kotlinx -kotlin_version=1.3.0-rc-146 +kotlin_version=1.3.0-rc-190 kotlin_native_version=1.3.0-rc-146 # Dependencies