Skip to content

Commit

Permalink
Lift out experimentality where it is applicable
Browse files Browse the repository at this point in the history
    * CoroutineDispatcher.invoke
    * ReceiveChannel.consume and ReceiveChannel.consumeEach
    * Flow core operators: onStart, onCompletion, onEmpty
    * CompletableDeferred.completeWith
    * awaitCancellation
    * Add experimentality notes where applicable
  • Loading branch information
qwwdfsad committed Oct 19, 2020
1 parent ea4ece3 commit 21d82f3
Show file tree
Hide file tree
Showing 16 changed files with 13 additions and 1,452 deletions.
1 change: 0 additions & 1 deletion kotlinx-coroutines-core/common/src/Builders.common.kt
Expand Up @@ -175,7 +175,6 @@ public suspend fun <T> withContext(
*
* This inline function calls [withContext].
*/
@ExperimentalCoroutinesApi
public suspend inline operator fun <T> CoroutineDispatcher.invoke(
noinline block: suspend CoroutineScope.() -> T
): T = withContext(this, block)
Expand Down
1 change: 0 additions & 1 deletion kotlinx-coroutines-core/common/src/CompletableDeferred.kt
Expand Up @@ -57,7 +57,6 @@ public interface CompletableDeferred<T> : Deferred<T> {
* This function transitions this deferred in the same ways described by [CompletableDeferred.complete] and
* [CompletableDeferred.completeExceptionally].
*/
@ExperimentalCoroutinesApi // since 1.3.2, tentatively until 1.4.0
public fun <T> CompletableDeferred<T>.completeWith(result: Result<T>): Boolean =
result.fold({ complete(it) }, { completeExceptionally(it) })

Expand Down
4 changes: 2 additions & 2 deletions kotlinx-coroutines-core/common/src/CoroutineStart.kt
Expand Up @@ -55,7 +55,7 @@ public enum class CoroutineStart {
* Cancellability of coroutine at suspension points depends on the particular implementation details of
* suspending functions as in [DEFAULT].
*/
@ExperimentalCoroutinesApi
@ExperimentalCoroutinesApi // Since 1.0.0, no ETA on stability
ATOMIC,

/**
Expand All @@ -71,7 +71,7 @@ public enum class CoroutineStart {
*
* **Note: This is an experimental api.** Execution semantics of coroutines may change in the future when this mode is used.
*/
@ExperimentalCoroutinesApi
@ExperimentalCoroutinesApi // Since 1.0.0, no ETA on stability
UNDISPATCHED;

/**
Expand Down
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/common/src/Debug.common.kt
Expand Up @@ -27,7 +27,7 @@ internal expect fun assert(value: () -> Boolean)
* Copy mechanism is used only on JVM, but it might be convenient to implement it in common exceptions,
* so on JVM their stacktraces will be properly recovered.
*/
@ExperimentalCoroutinesApi
@ExperimentalCoroutinesApi // Since 1.2.0, no ETA on stability
public interface CopyableThrowable<T> where T : Throwable, T : CopyableThrowable<T> {

/**
Expand Down
1 change: 0 additions & 1 deletion kotlinx-coroutines-core/common/src/Delay.kt
Expand Up @@ -95,7 +95,6 @@ public interface Delay {
* }
* ```
*/
@ExperimentalCoroutinesApi
public suspend fun awaitCancellation(): Nothing = suspendCancellableCoroutine {}

/**
Expand Down
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/common/src/channels/Broadcast.kt
Expand Up @@ -47,7 +47,7 @@ public fun <E> ReceiveChannel<E>.broadcast(
val scope = GlobalScope + Dispatchers.Unconfined + CoroutineExceptionHandler { _, _ -> }
// We can run this coroutine in the context that ignores all exceptions, because of `onCompletion = consume()`
// which passes all exceptions upstream to the source ReceiveChannel
return scope.broadcast(capacity = capacity, start = start, onCompletion = consumes()) {
return scope.broadcast(capacity = capacity, start = start, onCompletion = { cancelConsumed(it) }) {
for (e in this@broadcast) {
send(e)
}
Expand Down
Expand Up @@ -150,7 +150,6 @@ public fun consumesAll(vararg channels: ReceiveChannel<*>): CompletionHandler =
*
* The operation is _terminal_.
*/
@ExperimentalCoroutinesApi // since 1.3.0, tentatively graduates in 1.4.0
public inline fun <E, R> ReceiveChannel<E>.consume(block: ReceiveChannel<E>.() -> R): R {
var cause: Throwable? = null
try {
Expand All @@ -171,7 +170,6 @@ public inline fun <E, R> ReceiveChannel<E>.consume(block: ReceiveChannel<E>.() -
* The operation is _terminal_.
* This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
*/
@ExperimentalCoroutinesApi // since 1.3.0, tentatively graduates in 1.4.x
public suspend inline fun <E> ReceiveChannel<E>.consumeEach(action: (E) -> Unit): Unit =
consume {
for (e in this) action(e)
Expand Down
4 changes: 2 additions & 2 deletions kotlinx-coroutines-core/common/src/flow/Builders.kt
Expand Up @@ -204,8 +204,8 @@ public fun LongRange.asFlow(): Flow<Long> = flow {
@FlowPreview
@Deprecated(
message = "Use channelFlow with awaitClose { } instead of flowViaChannel and invokeOnClose { }.",
level = DeprecationLevel.WARNING
)
level = DeprecationLevel.ERROR
) // To be removed in 1.4.x
@Suppress("DeprecatedCallableAddReplaceWith")
public fun <T> flowViaChannel(
bufferSize: Int = BUFFERED,
Expand Down
3 changes: 1 addition & 2 deletions kotlinx-coroutines-core/common/src/flow/Migration.kt
Expand Up @@ -434,7 +434,6 @@ public fun <T, R> Flow<T>.switchMap(transform: suspend (value: T) -> Flow<R>): F
message = "'scanReduce' was renamed to 'runningReduce' to be consistent with Kotlin standard library",
replaceWith = ReplaceWith("runningReduce(operation)")
)
@ExperimentalCoroutinesApi
public fun <T> Flow<T>.scanReduce(operation: suspend (accumulator: T, value: T) -> T): Flow<T> = runningReduce(operation)

@Deprecated(
Expand Down Expand Up @@ -482,4 +481,4 @@ public fun <T> Flow<T>.replay(bufferSize: Int): Flow<T> = noImpl()
message = "Flow analogue of 'cache()' is 'shareIn' with unlimited replay and 'started = SharingStared.Lazily' argument'",
replaceWith = ReplaceWith("this.shareIn(scope, Int.MAX_VALUE, started = SharingStared.Lazily)")
)
public fun <T> Flow<T>.cache(): Flow<T> = noImpl()
public fun <T> Flow<T>.cache(): Flow<T> = noImpl()
3 changes: 0 additions & 3 deletions kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt
Expand Up @@ -71,7 +71,6 @@ internal inline fun <T, R> Flow<T>.unsafeTransform(
* .collect { println(it) } // prints Begin, a, b, c
* ```
*/
@ExperimentalCoroutinesApi
public fun <T> Flow<T>.onStart(
action: suspend FlowCollector<T>.() -> Unit
): Flow<T> = unsafeFlow { // Note: unsafe flow is used here, but safe collector is used to invoke start action
Expand Down Expand Up @@ -142,7 +141,6 @@ public fun <T> Flow<T>.onStart(
* In case of failure or cancellation, any attempt to emit additional elements throws the corresponding exception.
* Use [catch] if you need to suppress failure and replace it with emission of elements.
*/
@ExperimentalCoroutinesApi
public fun <T> Flow<T>.onCompletion(
action: suspend FlowCollector<T>.(cause: Throwable?) -> Unit
): Flow<T> = unsafeFlow { // Note: unsafe flow is used here, but safe collector is used to invoke completion action
Expand Down Expand Up @@ -173,7 +171,6 @@ public fun <T> Flow<T>.onCompletion(
* }.collect { println(it) } // prints 1, 2
* ```
*/
@ExperimentalCoroutinesApi
public fun <T> Flow<T>.onEmpty(
action: suspend FlowCollector<T>.() -> Unit
): Flow<T> = unsafeFlow {
Expand Down

0 comments on commit 21d82f3

Please sign in to comment.