Skip to content

Commit

Permalink
Channel.receiveOrNull becomes extension, internal receiveOrClosed added
Browse files Browse the repository at this point in the history
* The corresponding ReceiveChannel methods are deprecated.
* Introduced corresponding extensions with the same semantic and generic
  Any bound.
* Introduce internal ReceiveChannel.[on]receiveOrClosed
  * Using internal inline class ValueOrClosed.
  * To be stabilized and made public in the future when inline classes
    ABI stabilizes.
  * It is related to #330 but does not resolve it yet.
* Includes todos for future public ValueOrClose design.
* Simplify AbstractChannel select implementations.
* AbstractChannel implementation is optimized to avoid code
  duplication in suspension of different receive methods:
  receive, receiveOrNull, receiveOrClosed.
  • Loading branch information
qwwdfsad authored and elizarov committed Jul 18, 2019
1 parent db0ef0c commit f9e14dd
Show file tree
Hide file tree
Showing 15 changed files with 760 additions and 95 deletions.
Expand Up @@ -670,7 +670,9 @@ public final class kotlinx/coroutines/channels/ChannelsKt {
public static final fun minWith (Lkotlinx/coroutines/channels/ReceiveChannel;Ljava/util/Comparator;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun none (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun none (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun onReceiveOrNull (Lkotlinx/coroutines/channels/ReceiveChannel;)Lkotlinx/coroutines/selects/SelectClause1;
public static final fun partition (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun receiveOrNull (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun reduce (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun reduceIndexed (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun requireNoNulls (Lkotlinx/coroutines/channels/ReceiveChannel;)Lkotlinx/coroutines/channels/ReceiveChannel;
Expand Down Expand Up @@ -743,12 +745,14 @@ public abstract interface class kotlinx/coroutines/channels/ReceiveChannel {
public abstract synthetic fun cancel (Ljava/lang/Throwable;)Z
public abstract fun cancel (Ljava/util/concurrent/CancellationException;)V
public abstract fun getOnReceive ()Lkotlinx/coroutines/selects/SelectClause1;
public abstract fun getOnReceiveOrClosed ()Lkotlinx/coroutines/selects/SelectClause1;
public abstract fun getOnReceiveOrNull ()Lkotlinx/coroutines/selects/SelectClause1;
public abstract fun isClosedForReceive ()Z
public abstract fun isEmpty ()Z
public abstract fun iterator ()Lkotlinx/coroutines/channels/ChannelIterator;
public abstract fun poll ()Ljava/lang/Object;
public abstract fun receive (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun receiveOrClosed (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun receiveOrNull (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

Expand Down Expand Up @@ -784,6 +788,23 @@ public final class kotlinx/coroutines/channels/TickerMode : java/lang/Enum {
public static fun values ()[Lkotlinx/coroutines/channels/TickerMode;
}

public final class kotlinx/coroutines/channels/ValueOrClosed {
public static final field Companion Lkotlinx/coroutines/channels/ValueOrClosed$Companion;
public static final synthetic fun box-impl (Ljava/lang/Object;)Lkotlinx/coroutines/channels/ValueOrClosed;
public fun equals (Ljava/lang/Object;)Z
public static fun equals-impl (Ljava/lang/Object;Ljava/lang/Object;)Z
public static final fun equals-impl0 (Ljava/lang/Object;Ljava/lang/Object;)Z
public static final fun getCloseCause-impl (Ljava/lang/Object;)Ljava/lang/Throwable;
public static final fun getValue-impl (Ljava/lang/Object;)Ljava/lang/Object;
public static final fun getValueOrNull-impl (Ljava/lang/Object;)Ljava/lang/Object;
public fun hashCode ()I
public static fun hashCode-impl (Ljava/lang/Object;)I
public static final fun isClosed-impl (Ljava/lang/Object;)Z
public fun toString ()Ljava/lang/String;
public static fun toString-impl (Ljava/lang/Object;)Ljava/lang/String;
public final synthetic fun unbox-impl ()Ljava/lang/Object;
}

public abstract class kotlinx/coroutines/flow/AbstractFlow : kotlinx/coroutines/flow/Flow {
public fun <init> ()V
public final fun collect (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Expand Up @@ -123,7 +123,6 @@ allprojects {
def platform = platformOf(it)
apply from: rootProject.file("gradle/compile-${platform}.gradle")


dependencies {
// See comment below for rationale, it will be replaced with "project" dependency
compile "org.jetbrains.kotlinx:kotlinx-coroutines-core:$version"
Expand All @@ -135,6 +134,7 @@ allprojects {
tasks.withType(org.jetbrains.kotlin.gradle.tasks.AbstractKotlinCompile).all {
kotlinOptions.freeCompilerArgs += experimentalAnnotations.collect { "-Xuse-experimental=" + it }
kotlinOptions.freeCompilerArgs += "-progressive"
kotlinOptions.freeCompilerArgs += "-XXLanguage:+InlineClasses"
// Binary compatibility support
kotlinOptions.freeCompilerArgs += ["-Xdump-declarations-to=${buildDir}/visibilities.json"]
}
Expand Down
@@ -0,0 +1,141 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.channels

import kotlinx.coroutines.*
import kotlin.test.*

class ChannelReceiveOrClosedTest : TestBase() {
@Test
fun testChannelOfThrowables() = runTest {
val channel = Channel<Throwable>()
launch {
channel.send(TestException1())
channel.close(TestException2())
}

val element = channel.receiveOrClosed()
assertTrue(element.isValue)
assertTrue(element.value is TestException1)
assertTrue(element.valueOrNull is TestException1)

val closed = channel.receiveOrClosed()
assertTrue(closed.isClosed)
assertTrue(closed.closeCause is TestException2)
}

@Test
@Suppress("ReplaceAssertBooleanWithAssertEquality") // inline classes test
fun testNullableIntChanel() = runTest {
val channel = Channel<Int?>()
launch {
expect(2)
channel.send(1)
expect(3)
channel.send(null)

expect(6)
channel.close()
}

expect(1)
val element = channel.receiveOrClosed()
assertTrue(element.isValue)
assertEquals(1, element.value)
assertEquals(1, element.valueOrNull)
assertEquals("Value(1)", element.toString())
assertTrue(ValueOrClosed.value(1) == element) // Don't box

expect(4)
val nullElement = channel.receiveOrClosed()
assertTrue(nullElement.isValue)
assertNull(nullElement.value)
assertNull(nullElement.valueOrNull)
assertEquals("Value(null)", nullElement.toString())
assertTrue(ValueOrClosed.value(null) == nullElement) // Don't box

expect(5)
val closed = channel.receiveOrClosed()
assertTrue(closed.isClosed)

val closed2 = channel.receiveOrClosed()
assertTrue(closed2.isClosed)
assertTrue(closed2.closeCause is ClosedReceiveChannelException)
finish(7)
}

@Test
@ExperimentalUnsignedTypes
fun testUIntChannel() = runTest {
val channel = Channel<UInt>()
launch {
expect(2)
channel.send(1u)
yield()
expect(4)
channel.send((Long.MAX_VALUE - 1).toUInt())
expect(5)
}

expect(1)
val element = channel.receiveOrClosed()
assertEquals(1u, element.value)

expect(3)
val element2 = channel.receiveOrClosed()
assertEquals((Long.MAX_VALUE - 1).toUInt(), element2.value)
finish(6)
}

@Test
fun testCancelChannel() = runTest {
val channel = Channel<Boolean>()
launch {
expect(2)
channel.cancel()
}

expect(1)
val closed = channel.receiveOrClosed()
assertTrue(closed.isClosed)
assertTrue(closed.closeCause is ClosedReceiveChannelException)
finish(3)
}

@Test
@ExperimentalUnsignedTypes
fun testReceiveResultChannel() = runTest {
val channel = Channel<ValueOrClosed<UInt>>()
launch {
channel.send(ValueOrClosed.value(1u))
channel.send(ValueOrClosed.closed(TestException1()))
channel.close(TestException2())
}

val intResult = channel.receiveOrClosed()
assertTrue(intResult.isValue)
assertEquals(1u, intResult.value.value)

val closeCauseResult = channel.receiveOrClosed()
assertTrue(closeCauseResult.isValue)
assertTrue(closeCauseResult.value.closeCause is TestException1)

val closeCause = channel.receiveOrClosed()
assertTrue(closeCause.isClosed)
assertTrue(closeCause.closeCause is TestException2)
assertFailsWith<TestException2> { closeCause.valueOrThrow }
}

@Test
fun testToString() = runTest {
val channel = Channel<String>(1)
channel.send("message")
channel.close(TestException1())
assertEquals("Value(message)", channel.receiveOrClosed().toString())
// toString implementation for exception differs on every platform
val str = channel.receiveOrClosed().toString()
assertTrue(str.matches("Closed\\(.*TestException1\\)".toRegex()))
}
}
12 changes: 8 additions & 4 deletions docs/select-expression.md
Expand Up @@ -163,7 +163,7 @@ buzz -> 'Buzz!'
### Selecting on close

The [onReceive][ReceiveChannel.onReceive] clause in `select` fails when the channel is closed causing the corresponding
`select` to throw an exception. We can use [onReceiveOrNull][ReceiveChannel.onReceiveOrNull] clause to perform a
`select` to throw an exception. We can use [onReceiveOrNull][onReceiveOrNull] clause to perform a
specific action when the channel is closed. The following example also shows that `select` is an expression that returns
the result of its selected clause:

Expand All @@ -189,6 +189,10 @@ suspend fun selectAorB(a: ReceiveChannel<String>, b: ReceiveChannel<String>): St

</div>

Note that [onReceiveOrNull][onReceiveOrNull] is an extension function defined only
for channels with non-nullable elements so that there is no accidental confusion between a closed channel
and a null value.

Let's use it with channel `a` that produces "Hello" string four times and
channel `b` that produces "World" four times:

Expand Down Expand Up @@ -259,7 +263,7 @@ the first one among them gets selected. Here, both channels are constantly produ
being the first clause in select, wins. However, because we are using unbuffered channel, the `a` gets suspended from
time to time on its [send][SendChannel.send] invocation and gives a chance for `b` to send, too.

The second observation, is that [onReceiveOrNull][ReceiveChannel.onReceiveOrNull] gets immediately selected when the
The second observation, is that [onReceiveOrNull][onReceiveOrNull] gets immediately selected when the
channel is already closed.

### Selecting to send
Expand Down Expand Up @@ -433,7 +437,7 @@ Deferred 4 produced answer 'Waited for 128 ms'

Let us write a channel producer function that consumes a channel of deferred string values, waits for each received
deferred value, but only until the next deferred value comes over or the channel is closed. This example puts together
[onReceiveOrNull][ReceiveChannel.onReceiveOrNull] and [onAwait][Deferred.onAwait] clauses in the same `select`:
[onReceiveOrNull][onReceiveOrNull] and [onAwait][Deferred.onAwait] clauses in the same `select`:

<div class="sample" markdown="1" theme="idea" data-highlight-only>

Expand Down Expand Up @@ -556,7 +560,7 @@ Channel was closed
<!--- INDEX kotlinx.coroutines.channels -->
[ReceiveChannel.receive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/receive.html
[ReceiveChannel.onReceive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/on-receive.html
[ReceiveChannel.onReceiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/on-receive-or-null.html
[onReceiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/on-receive-or-null.html
[SendChannel.send]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/send.html
[SendChannel.onSend]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/on-send.html
<!--- INDEX kotlinx.coroutines.selects -->
Expand Down
6 changes: 3 additions & 3 deletions kotlinx-coroutines-core/README.md
Expand Up @@ -56,7 +56,7 @@ helper function. [NonCancellable] job object is provided to suppress cancellatio
| [Deferred] | [await][Deferred.await] | [onAwait][Deferred.onAwait] | [isCompleted][Job.isCompleted]
| [SendChannel][kotlinx.coroutines.channels.SendChannel] | [send][kotlinx.coroutines.channels.SendChannel.send] | [onSend][kotlinx.coroutines.channels.SendChannel.onSend] | [offer][kotlinx.coroutines.channels.SendChannel.offer]
| [ReceiveChannel][kotlinx.coroutines.channels.ReceiveChannel] | [receive][kotlinx.coroutines.channels.ReceiveChannel.receive] | [onReceive][kotlinx.coroutines.channels.ReceiveChannel.onReceive] | [poll][kotlinx.coroutines.channels.ReceiveChannel.poll]
| [ReceiveChannel][kotlinx.coroutines.channels.ReceiveChannel] | [receiveOrNull][kotlinx.coroutines.channels.ReceiveChannel.receiveOrNull] | [onReceiveOrNull][kotlinx.coroutines.channels.ReceiveChannel.onReceiveOrNull] | [poll][kotlinx.coroutines.channels.ReceiveChannel.poll]
| [ReceiveChannel][kotlinx.coroutines.channels.ReceiveChannel] | [receiveOrNull][kotlinx.coroutines.channels.receiveOrNull] | [onReceiveOrNull][kotlinx.coroutines.channels.onReceiveOrNull] | [poll][kotlinx.coroutines.channels.ReceiveChannel.poll]
| [Mutex][kotlinx.coroutines.sync.Mutex] | [lock][kotlinx.coroutines.sync.Mutex.lock] | [onLock][kotlinx.coroutines.sync.Mutex.onLock] | [tryLock][kotlinx.coroutines.sync.Mutex.tryLock]
| none | [delay] | [onTimeout][kotlinx.coroutines.selects.SelectBuilder.onTimeout] | none

Expand Down Expand Up @@ -131,8 +131,8 @@ Obsolete and deprecated module to test coroutines. Replaced with `kotlinx-corout
[kotlinx.coroutines.channels.SendChannel.offer]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/offer.html
[kotlinx.coroutines.channels.ReceiveChannel.onReceive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/on-receive.html
[kotlinx.coroutines.channels.ReceiveChannel.poll]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/poll.html
[kotlinx.coroutines.channels.ReceiveChannel.receiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/receive-or-null.html
[kotlinx.coroutines.channels.ReceiveChannel.onReceiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/on-receive-or-null.html
[kotlinx.coroutines.channels.receiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/receive-or-null.html
[kotlinx.coroutines.channels.onReceiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/on-receive-or-null.html
<!--- INDEX kotlinx.coroutines.selects -->
[kotlinx.coroutines.selects.select]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.selects/select.html
[kotlinx.coroutines.selects.SelectBuilder.onTimeout]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.selects/-select-builder/on-timeout.html
Expand Down
6 changes: 3 additions & 3 deletions kotlinx-coroutines-core/common/README.md
Expand Up @@ -59,7 +59,7 @@ helper function. [NonCancellable] job object is provided to suppress cancellatio
| [Deferred] | [await][Deferred.await] | [onAwait][Deferred.onAwait] | [isCompleted][Job.isCompleted]
| [SendChannel][kotlinx.coroutines.channels.SendChannel] | [send][kotlinx.coroutines.channels.SendChannel.send] | [onSend][kotlinx.coroutines.channels.SendChannel.onSend] | [offer][kotlinx.coroutines.channels.SendChannel.offer]
| [ReceiveChannel][kotlinx.coroutines.channels.ReceiveChannel] | [receive][kotlinx.coroutines.channels.ReceiveChannel.receive] | [onReceive][kotlinx.coroutines.channels.ReceiveChannel.onReceive] | [poll][kotlinx.coroutines.channels.ReceiveChannel.poll]
| [ReceiveChannel][kotlinx.coroutines.channels.ReceiveChannel] | [receiveOrNull][kotlinx.coroutines.channels.ReceiveChannel.receiveOrNull] | [onReceiveOrNull][kotlinx.coroutines.channels.ReceiveChannel.onReceiveOrNull] | [poll][kotlinx.coroutines.channels.ReceiveChannel.poll]
| [ReceiveChannel][kotlinx.coroutines.channels.ReceiveChannel] | [receiveOrNull][kotlinx.coroutines.channels.receiveOrNull] | [onReceiveOrNull][kotlinx.coroutines.channels.onReceiveOrNull] | [poll][kotlinx.coroutines.channels.ReceiveChannel.poll]
| [Mutex][kotlinx.coroutines.sync.Mutex] | [lock][kotlinx.coroutines.sync.Mutex.lock] | [onLock][kotlinx.coroutines.sync.Mutex.onLock] | [tryLock][kotlinx.coroutines.sync.Mutex.tryLock]
| none | [delay] | [onTimeout][kotlinx.coroutines.selects.SelectBuilder.onTimeout] | none

Expand Down Expand Up @@ -143,8 +143,8 @@ Low-level primitives for finer-grained control of coroutines.
[kotlinx.coroutines.channels.SendChannel.offer]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/offer.html
[kotlinx.coroutines.channels.ReceiveChannel.onReceive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/on-receive.html
[kotlinx.coroutines.channels.ReceiveChannel.poll]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/poll.html
[kotlinx.coroutines.channels.ReceiveChannel.receiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/receive-or-null.html
[kotlinx.coroutines.channels.ReceiveChannel.onReceiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/on-receive-or-null.html
[kotlinx.coroutines.channels.receiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/receive-or-null.html
[kotlinx.coroutines.channels.onReceiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/on-receive-or-null.html
<!--- INDEX kotlinx.coroutines.selects -->
[kotlinx.coroutines.selects.select]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.selects/select.html
[kotlinx.coroutines.selects.SelectBuilder.onTimeout]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.selects/-select-builder/on-timeout.html
Expand Down

0 comments on commit f9e14dd

Please sign in to comment.