diff --git a/kotlinx-coroutines-core/README.md b/kotlinx-coroutines-core/README.md index c21e5048f6..c06cd358ad 100644 --- a/kotlinx-coroutines-core/README.md +++ b/kotlinx-coroutines-core/README.md @@ -57,7 +57,6 @@ helper function. [NonCancellable] job object is provided to suppress cancellatio | [SendChannel][kotlinx.coroutines.channels.SendChannel] | [send][kotlinx.coroutines.channels.SendChannel.send] | [onSend][kotlinx.coroutines.channels.SendChannel.onSend] | [trySend][kotlinx.coroutines.channels.SendChannel.trySend] | [ReceiveChannel][kotlinx.coroutines.channels.ReceiveChannel] | [receive][kotlinx.coroutines.channels.ReceiveChannel.receive] | [onReceive][kotlinx.coroutines.channels.ReceiveChannel.onReceive] | [tryReceive][kotlinx.coroutines.channels.ReceiveChannel.tryReceive] | [ReceiveChannel][kotlinx.coroutines.channels.ReceiveChannel] | [receiveCatching][kotlinx.coroutines.channels.receiveCatching] | [onReceiveCatching][kotlinx.coroutines.channels.onReceiveCatching] | [tryReceive][kotlinx.coroutines.channels.ReceiveChannel.tryReceive] -| [Mutex][kotlinx.coroutines.sync.Mutex] | [lock][kotlinx.coroutines.sync.Mutex.lock] | [onLock][kotlinx.coroutines.sync.Mutex.onLock] | [tryLock][kotlinx.coroutines.sync.Mutex.tryLock] | none | [delay][kotlinx.coroutines.delay] | [onTimeout][kotlinx.coroutines.selects.SelectBuilder.onTimeout] | none # Package kotlinx.coroutines @@ -121,8 +120,6 @@ Obsolete and deprecated module to test coroutines. Replaced with `kotlinx-corout [kotlinx.coroutines.sync.Mutex]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/index.html [kotlinx.coroutines.sync.Mutex.lock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/lock.html -[kotlinx.coroutines.sync.Mutex.onLock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/on-lock.html -[kotlinx.coroutines.sync.Mutex.tryLock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/try-lock.html diff --git a/kotlinx-coroutines-core/common/src/selects/Select.kt b/kotlinx-coroutines-core/common/src/selects/Select.kt index a7172707e2..a313de3d5d 100644 --- a/kotlinx-coroutines-core/common/src/selects/Select.kt +++ b/kotlinx-coroutines-core/common/src/selects/Select.kt @@ -186,7 +186,6 @@ public interface SelectInstance { * | [SendChannel] | [send][SendChannel.send] | [onSend][SendChannel.onSend] * | [ReceiveChannel] | [receive][ReceiveChannel.receive] | [onReceive][ReceiveChannel.onReceive] * | [ReceiveChannel] | [receiveCatching][ReceiveChannel.receiveCatching] | [onReceiveCatching][ReceiveChannel.onReceiveCatching] - * | [Mutex] | [lock][Mutex.lock] | [onLock][Mutex.onLock] * | none | [delay] | [onTimeout][SelectBuilder.onTimeout] * * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this diff --git a/kotlinx-coroutines-core/common/src/sync/Mutex.kt b/kotlinx-coroutines-core/common/src/sync/Mutex.kt index 19584e0981..ebebcc8d1c 100644 --- a/kotlinx-coroutines-core/common/src/sync/Mutex.kt +++ b/kotlinx-coroutines-core/common/src/sync/Mutex.kt @@ -52,8 +52,7 @@ public interface Mutex { * Note that this function does not check for cancellation when it is not suspended. * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed. * - * This function can be used in [select] invocation with [onLock] clause. - * Use [tryLock] to try acquire lock without waiting. + * Use [tryLock] to try acquiring a lock without waiting. * * This function is fair; suspended callers are resumed in first-in-first-out order. * @@ -63,10 +62,10 @@ public interface Mutex { public suspend fun lock(owner: Any? = null) /** - * Clause for [select] expression of [lock] suspending function that selects when the mutex is locked. - * Additional parameter for the clause in the `owner` (see [lock]) and when the clause is selected - * the reference to this mutex is passed into the corresponding block. + * Deprecated for removal without built-in replacement. */ + @Deprecated(level = DeprecationLevel.WARNING, message = "Mutex.onLock deprecated without replacement. " + + "For additional details please refer to #2794") public val onLock: SelectClause2 /** diff --git a/reactive/kotlinx-coroutines-reactive/src/Publish.kt b/reactive/kotlinx-coroutines-reactive/src/Publish.kt index 9785627493..1b8683ce64 100644 --- a/reactive/kotlinx-coroutines-reactive/src/Publish.kt +++ b/reactive/kotlinx-coroutines-reactive/src/Publish.kt @@ -110,7 +110,6 @@ public class PublisherCoroutine( block(this) } - // TODO discuss it launch(start = CoroutineStart.UNDISPATCHED) { mutex.lock() // Already selected -- bail out diff --git a/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt b/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt index 84b406d2ac..267953f5a5 100644 --- a/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt @@ -296,12 +296,10 @@ class PublishTest : TestBase() { expect(2) // Collector is ready send(1) - expect(3) try { send(2) expectUnreached() } catch (e: CancellationException) { - expect(7) // publisher cancellation is async latch.countDown() throw e @@ -312,15 +310,14 @@ class PublishTest : TestBase() { val collectorLatch = Mutex(true) val job = launch { published.asFlow().buffer(0).collect { - expect(4) collectorLatch.unlock() - hang { expect(6) } + hang { expect(4) } } } collectorLatch.lock() - expect(5) + expect(3) job.cancelAndJoin() latch.await() - finish(8) + finish(5) } } diff --git a/reactive/kotlinx-coroutines-reactive/test/PublisherMultiTest.kt b/reactive/kotlinx-coroutines-reactive/test/PublisherMultiTest.kt index 5e2ef041ac..4a552b5f8d 100644 --- a/reactive/kotlinx-coroutines-reactive/test/PublisherMultiTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/PublisherMultiTest.kt @@ -33,7 +33,7 @@ class PublisherMultiTest : TestBase() { @Test fun testConcurrentStressOnSend() = runBlocking { val n = 10_000 * stressTestMultiplier - val observable = publish { + val observable = publish { // concurrent emitters (many coroutines) val jobs = List(n) { // launch diff --git a/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt b/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt index 5f409815af..90e770bb4f 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt @@ -10,6 +10,7 @@ import kotlinx.atomicfu.* import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import kotlinx.coroutines.internal.* +import kotlinx.coroutines.intrinsics.* import kotlinx.coroutines.selects.* import kotlinx.coroutines.sync.* import kotlin.coroutines.* @@ -95,10 +96,22 @@ private class RxObservableCoroutine( element: T, block: suspend (SendChannel) -> R ) { - mutex.onLock.registerSelectClause2(select, null) { + val clause = suspend { doLockedNext(element)?.let { throw it } block(this) } + + // This is the default replacement proposed in onLock replacement + launch(start = CoroutineStart.UNDISPATCHED) { + mutex.lock() + // Already selected -- bail out + if (!select.trySelect()) { + mutex.unlock() + return@launch + } + + clause.startCoroutineCancellable(select.completion) + } } // assert: mutex.isLocked() diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableMultiTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableMultiTest.kt index 5ff5e046e7..7023211450 100644 --- a/reactive/kotlinx-coroutines-rx2/test/ObservableMultiTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/ObservableMultiTest.kt @@ -51,7 +51,7 @@ class ObservableMultiTest : TestBase() { @Test fun testConcurrentStressOnSend() { val n = 10_000 * stressTestMultiplier - val observable = rxObservable { + val observable = rxObservable { newCoroutineContext(coroutineContext) // concurrent emitters (many coroutines) val jobs = List(n) { diff --git a/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt b/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt index 57007bbdd4..1c5f7c0a63 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt @@ -13,6 +13,7 @@ import kotlinx.coroutines.selects.* import kotlinx.coroutines.sync.* import kotlin.coroutines.* import kotlinx.coroutines.internal.* +import kotlinx.coroutines.intrinsics.* /** * Creates cold [observable][Observable] that will run a given [block] in a coroutine. @@ -95,10 +96,22 @@ private class RxObservableCoroutine( element: T, block: suspend (SendChannel) -> R ) { - mutex.onLock.registerSelectClause2(select, null) { + val clause = suspend { doLockedNext(element)?.let { throw it } block(this) } + + // This is the default replacement proposed in onLock replacement + launch(start = CoroutineStart.UNDISPATCHED) { + mutex.lock() + // Already selected -- bail out + if (!select.trySelect()) { + mutex.unlock() + return@launch + } + + clause.startCoroutineCancellable(select.completion) + } } // assert: mutex.isLocked() diff --git a/reactive/kotlinx-coroutines-rx3/test/ObservableMultiTest.kt b/reactive/kotlinx-coroutines-rx3/test/ObservableMultiTest.kt index e19be8d130..d7c799db1c 100644 --- a/reactive/kotlinx-coroutines-rx3/test/ObservableMultiTest.kt +++ b/reactive/kotlinx-coroutines-rx3/test/ObservableMultiTest.kt @@ -51,7 +51,7 @@ class ObservableMultiTest : TestBase() { @Test fun testConcurrentStressOnSend() { val n = 10_000 * stressTestMultiplier - val observable = rxObservable { + val observable = rxObservable { newCoroutineContext(coroutineContext) // concurrent emitters (many coroutines) val jobs = List(n) {