diff --git a/kotlinx-coroutines-core/README.md b/kotlinx-coroutines-core/README.md index c21e5048f6..c06cd358ad 100644 --- a/kotlinx-coroutines-core/README.md +++ b/kotlinx-coroutines-core/README.md @@ -57,7 +57,6 @@ helper function. [NonCancellable] job object is provided to suppress cancellatio | [SendChannel][kotlinx.coroutines.channels.SendChannel] | [send][kotlinx.coroutines.channels.SendChannel.send] | [onSend][kotlinx.coroutines.channels.SendChannel.onSend] | [trySend][kotlinx.coroutines.channels.SendChannel.trySend] | [ReceiveChannel][kotlinx.coroutines.channels.ReceiveChannel] | [receive][kotlinx.coroutines.channels.ReceiveChannel.receive] | [onReceive][kotlinx.coroutines.channels.ReceiveChannel.onReceive] | [tryReceive][kotlinx.coroutines.channels.ReceiveChannel.tryReceive] | [ReceiveChannel][kotlinx.coroutines.channels.ReceiveChannel] | [receiveCatching][kotlinx.coroutines.channels.receiveCatching] | [onReceiveCatching][kotlinx.coroutines.channels.onReceiveCatching] | [tryReceive][kotlinx.coroutines.channels.ReceiveChannel.tryReceive] -| [Mutex][kotlinx.coroutines.sync.Mutex] | [lock][kotlinx.coroutines.sync.Mutex.lock] | [onLock][kotlinx.coroutines.sync.Mutex.onLock] | [tryLock][kotlinx.coroutines.sync.Mutex.tryLock] | none | [delay][kotlinx.coroutines.delay] | [onTimeout][kotlinx.coroutines.selects.SelectBuilder.onTimeout] | none # Package kotlinx.coroutines @@ -121,8 +120,6 @@ Obsolete and deprecated module to test coroutines. Replaced with `kotlinx-corout [kotlinx.coroutines.sync.Mutex]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/index.html [kotlinx.coroutines.sync.Mutex.lock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/lock.html -[kotlinx.coroutines.sync.Mutex.onLock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/on-lock.html -[kotlinx.coroutines.sync.Mutex.tryLock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/try-lock.html diff --git a/kotlinx-coroutines-core/common/src/selects/Select.kt b/kotlinx-coroutines-core/common/src/selects/Select.kt index a7172707e2..a313de3d5d 100644 --- a/kotlinx-coroutines-core/common/src/selects/Select.kt +++ b/kotlinx-coroutines-core/common/src/selects/Select.kt @@ -186,7 +186,6 @@ public interface SelectInstance { * | [SendChannel] | [send][SendChannel.send] | [onSend][SendChannel.onSend] * | [ReceiveChannel] | [receive][ReceiveChannel.receive] | [onReceive][ReceiveChannel.onReceive] * | [ReceiveChannel] | [receiveCatching][ReceiveChannel.receiveCatching] | [onReceiveCatching][ReceiveChannel.onReceiveCatching] - * | [Mutex] | [lock][Mutex.lock] | [onLock][Mutex.onLock] * | none | [delay] | [onTimeout][SelectBuilder.onTimeout] * * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this diff --git a/kotlinx-coroutines-core/common/src/sync/Mutex.kt b/kotlinx-coroutines-core/common/src/sync/Mutex.kt index 19584e0981..ebebcc8d1c 100644 --- a/kotlinx-coroutines-core/common/src/sync/Mutex.kt +++ b/kotlinx-coroutines-core/common/src/sync/Mutex.kt @@ -52,8 +52,7 @@ public interface Mutex { * Note that this function does not check for cancellation when it is not suspended. * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed. * - * This function can be used in [select] invocation with [onLock] clause. - * Use [tryLock] to try acquire lock without waiting. + * Use [tryLock] to try acquiring a lock without waiting. * * This function is fair; suspended callers are resumed in first-in-first-out order. * @@ -63,10 +62,10 @@ public interface Mutex { public suspend fun lock(owner: Any? = null) /** - * Clause for [select] expression of [lock] suspending function that selects when the mutex is locked. - * Additional parameter for the clause in the `owner` (see [lock]) and when the clause is selected - * the reference to this mutex is passed into the corresponding block. + * Deprecated for removal without built-in replacement. */ + @Deprecated(level = DeprecationLevel.WARNING, message = "Mutex.onLock deprecated without replacement. " + + "For additional details please refer to #2794") public val onLock: SelectClause2 /** diff --git a/reactive/kotlinx-coroutines-reactive/src/Publish.kt b/reactive/kotlinx-coroutines-reactive/src/Publish.kt index 9785627493..1b8683ce64 100644 --- a/reactive/kotlinx-coroutines-reactive/src/Publish.kt +++ b/reactive/kotlinx-coroutines-reactive/src/Publish.kt @@ -110,7 +110,6 @@ public class PublisherCoroutine( block(this) } - // TODO discuss it launch(start = CoroutineStart.UNDISPATCHED) { mutex.lock() // Already selected -- bail out diff --git a/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt b/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt index 5f409815af..90e770bb4f 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt @@ -10,6 +10,7 @@ import kotlinx.atomicfu.* import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import kotlinx.coroutines.internal.* +import kotlinx.coroutines.intrinsics.* import kotlinx.coroutines.selects.* import kotlinx.coroutines.sync.* import kotlin.coroutines.* @@ -95,10 +96,22 @@ private class RxObservableCoroutine( element: T, block: suspend (SendChannel) -> R ) { - mutex.onLock.registerSelectClause2(select, null) { + val clause = suspend { doLockedNext(element)?.let { throw it } block(this) } + + // This is the default replacement proposed in onLock replacement + launch(start = CoroutineStart.UNDISPATCHED) { + mutex.lock() + // Already selected -- bail out + if (!select.trySelect()) { + mutex.unlock() + return@launch + } + + clause.startCoroutineCancellable(select.completion) + } } // assert: mutex.isLocked() diff --git a/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt b/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt index 57007bbdd4..1c5f7c0a63 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt @@ -13,6 +13,7 @@ import kotlinx.coroutines.selects.* import kotlinx.coroutines.sync.* import kotlin.coroutines.* import kotlinx.coroutines.internal.* +import kotlinx.coroutines.intrinsics.* /** * Creates cold [observable][Observable] that will run a given [block] in a coroutine. @@ -95,10 +96,22 @@ private class RxObservableCoroutine( element: T, block: suspend (SendChannel) -> R ) { - mutex.onLock.registerSelectClause2(select, null) { + val clause = suspend { doLockedNext(element)?.let { throw it } block(this) } + + // This is the default replacement proposed in onLock replacement + launch(start = CoroutineStart.UNDISPATCHED) { + mutex.lock() + // Already selected -- bail out + if (!select.trySelect()) { + mutex.unlock() + return@launch + } + + clause.startCoroutineCancellable(select.completion) + } } // assert: mutex.isLocked()