Skip to content

Commit

Permalink
Add ChannelResult.onClosed (Kotlin#2665)
Browse files Browse the repository at this point in the history
* Establish clear contract on ChannelResult.isClosed
* This method provides a **clear** migration from correct 'offer' usages to 'trySend'
  • Loading branch information
qwwdfsad authored and pablobaxter committed Sep 14, 2022
1 parent 5b6bb21 commit 5c3b008
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 3 deletions.
1 change: 1 addition & 0 deletions kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,7 @@ public final class kotlinx/coroutines/channels/ChannelKt {
public static synthetic fun Channel$default (IILjava/lang/Object;)Lkotlinx/coroutines/channels/Channel;
public static synthetic fun Channel$default (ILkotlinx/coroutines/channels/BufferOverflow;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/channels/Channel;
public static final fun getOrElse-WpGqRn0 (Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)Ljava/lang/Object;
public static final fun onClosed-WpGqRn0 (Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)Ljava/lang/Object;
public static final fun onFailure-WpGqRn0 (Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)Ljava/lang/Object;
public static final fun onSuccess-WpGqRn0 (Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)Ljava/lang/Object;
}
Expand Down
36 changes: 34 additions & 2 deletions kotlinx-coroutines-core/common/src/channels/Channel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -144,13 +144,22 @@ public interface SendChannel<in E> {
* oversee such error during code review.
* * Its name was not aligned with the rest of the API and tried to mimic Java's queue instead.
*
* **NB** Automatic migration provides best-effort for the user experience, but requires removal
* or adjusting of the code that relied on the exception handling.
* The complete replacement has a more verbose form:
* ```
* channel.trySend(element)
* .onClosed { throw it ?: ClosedSendChannelException("Channel was closed normally") }
* .isSuccess
* ```
*
* See https://github.com/Kotlin/kotlinx.coroutines/issues/974 for more context.
*/
@Deprecated(
level = DeprecationLevel.WARNING,
message = "Deprecated in the favour of 'trySend' method",
replaceWith = ReplaceWith("trySend(element).isSuccess")
) // Since 1.5.0
) // Warning since 1.5.0
public fun offer(element: E): Boolean {
val result = trySend(element)
if (result.isSuccess) return true
Expand Down Expand Up @@ -297,7 +306,7 @@ public interface ReceiveChannel<out E> {
@Deprecated(level = DeprecationLevel.WARNING,
message = "Deprecated in the favour of 'tryReceive'",
replaceWith = ReplaceWith("tryReceive().getOrNull()")
) // Since 1.5.0
) // Warning since 1.5.0
public fun poll(): E? {
val result = tryReceive()
if (result.isSuccess) return result.getOrThrow()
Expand Down Expand Up @@ -362,6 +371,8 @@ public interface ReceiveChannel<out E> {
* E.g. when the channel is full, [Channel.trySend] returns failed result, but the channel itself is not in the failed state.
*
* The closed result represents an operation attempt to a closed channel and also implies that the operation has failed.
* It is guaranteed that if the result is _closed_, then the target channel is either [closed for send][Channel.isClosedForSend]
* or is [closed for receive][Channel.isClosedForReceive] depending on whether the failed operation was sending or receiving.
*/
@JvmInline
public value class ChannelResult<out T>
Expand Down Expand Up @@ -399,12 +410,14 @@ public value class ChannelResult<out T>
/**
* Returns the encapsulated value if this instance represents success or `null` if it represents failed result.
*/
@Suppress("UNCHECKED_CAST")
public fun getOrNull(): T? = if (holder !is Failed) holder as T else null

/**
* Returns the encapsulated value if this instance represents success or throws an exception if it is closed or failed.
*/
public fun getOrThrow(): T {
@Suppress("UNCHECKED_CAST")
if (holder !is Failed) return holder as T
if (holder is Closed && holder.cause != null) throw holder.cause
error("Trying to call 'getOrThrow' on a failed channel result: $holder")
Expand Down Expand Up @@ -495,6 +508,25 @@ public inline fun <T> ChannelResult<T>.onFailure(action: (exception: Throwable?)
return this
}

/**
* Performs the given [action] on the encapsulated [Throwable] exception if this instance represents [failure][ChannelResult.isFailure]
* due to channel being [closed][Channel.close].
* The result of [ChannelResult.exceptionOrNull] is passed to the [action] parameter.
* It is guaranteed that if action is invoked, then the channel is either [closed for send][Channel.isClosedForSend]
* or is [closed for receive][Channel.isClosedForReceive] depending on the failed operation.
*
* Returns the original `ChannelResult` unchanged.
*/
@OptIn(ExperimentalContracts::class)
public inline fun <T> ChannelResult<T>.onClosed(action: (exception: Throwable?) -> Unit): ChannelResult<T> {
contract {
callsInPlace(action, InvocationKind.AT_MOST_ONCE)
}
@Suppress("UNCHECKED_CAST")
if (holder is ChannelResult.Closed) action(exceptionOrNull())
return this
}

/**
* Iterator for [ReceiveChannel]. Instances of this interface are *not thread-safe* and shall not be used
* from concurrent coroutines.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ class BasicOperationsTest : TestBase() {
TestChannelKind.values().forEach { kind -> testSendReceive(kind, 20) }
}

@Test
fun testTrySendToFullChannel() = runTest {
TestChannelKind.values().forEach { kind -> testTrySendToFullChannel(kind) }
}

@Test
fun testTrySendAfterClose() = runTest {
TestChannelKind.values().forEach { kind -> testTrySend(kind) }
Expand Down Expand Up @@ -118,7 +123,7 @@ class BasicOperationsTest : TestBase() {
assertTrue(channel.isClosedForSend)
channel.trySend(2)
.onSuccess { expectUnreached() }
.onFailure {
.onClosed {
assertTrue { it is ClosedSendChannelException}
if (!kind.isConflated) {
assertEquals(42, channel.receive())
Expand All @@ -127,6 +132,21 @@ class BasicOperationsTest : TestBase() {
d.await()
}

private suspend fun testTrySendToFullChannel(kind: TestChannelKind) = coroutineScope {
if (kind.isConflated || kind.capacity == Int.MAX_VALUE) return@coroutineScope
val channel = kind.create<Int>()
// Make it full
repeat(11) {
channel.trySend(42)
}
channel.trySend(1)
.onSuccess { expectUnreached() }
.onFailure { assertNull(it) }
.onClosed {
expectUnreached()
}
}

/**
* [ClosedSendChannelException] should not be eaten.
* See [https://github.com/Kotlin/kotlinx.coroutines/issues/957]
Expand Down

0 comments on commit 5c3b008

Please sign in to comment.