From e0c29020de39e45addbd1b3b561c2876182c9289 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Wed, 20 Oct 2021 11:40:08 +0300 Subject: [PATCH] Mutex.onLock deprecation (#2850) * Actually test 'onLock' and the corresponding concurrency and cancellation of Reactive's onSend * Update benchmarks * Non-linearizable implementation of PublisherCoroutine.onSend that isn't using Mutex.onLock * Deprecate Mutex.onLock Fixes #2794 Co-authored-by: Dmitry Khalanskiy --- benchmarks/build.gradle.kts | 13 ++++--- kotlinx-coroutines-core/README.md | 3 -- kotlinx-coroutines-core/common/README.md | 2 - .../common/src/channels/Channel.kt | 1 - .../common/src/selects/Select.kt | 1 - .../common/src/sync/Mutex.kt | 9 ++--- ...Test.kt => RendezvousChannelStressTest.kt} | 2 +- .../src/Publish.kt | 14 ++++++- .../test/PublishTest.kt | 38 ++++++++++++++++++- .../test/PublisherMultiTest.kt | 25 +++++++++++- .../src/RxObservable.kt | 15 +++++++- .../test/ObservableCompletionStressTest.kt | 4 +- .../test/ObservableMultiTest.kt | 26 ++++++++++++- .../src/RxObservable.kt | 15 +++++++- .../test/ObservableMultiTest.kt | 26 ++++++++++++- 15 files changed, 166 insertions(+), 28 deletions(-) rename kotlinx-coroutines-core/jvm/test/channels/{RandevouzChannelStressTest.kt => RendezvousChannelStressTest.kt} (92%) diff --git a/benchmarks/build.gradle.kts b/benchmarks/build.gradle.kts index ce0bff1cdf..27ab59b5f5 100644 --- a/benchmarks/build.gradle.kts +++ b/benchmarks/build.gradle.kts @@ -54,13 +54,14 @@ tasks.named("jmhJar") { } dependencies { - compile("org.openjdk.jmh:jmh-core:1.26") - compile("io.projectreactor:reactor-core:${version("reactor")}") - compile("io.reactivex.rxjava2:rxjava:2.1.9") - compile("com.github.akarnokd:rxjava2-extensions:0.20.8") + implementation("org.openjdk.jmh:jmh-core:1.26") + implementation("io.projectreactor:reactor-core:${version("reactor")}") + implementation("io.reactivex.rxjava2:rxjava:2.1.9") + implementation("com.github.akarnokd:rxjava2-extensions:0.20.8") - compile("com.typesafe.akka:akka-actor_2.12:2.5.0") - compile(project(":kotlinx-coroutines-core")) + implementation("com.typesafe.akka:akka-actor_2.12:2.5.0") + implementation(project(":kotlinx-coroutines-core")) + implementation(project(":kotlinx-coroutines-reactive")) // add jmh dependency on main "jmhImplementation"(sourceSets.main.get().runtimeClasspath) diff --git a/kotlinx-coroutines-core/README.md b/kotlinx-coroutines-core/README.md index c21e5048f6..c06cd358ad 100644 --- a/kotlinx-coroutines-core/README.md +++ b/kotlinx-coroutines-core/README.md @@ -57,7 +57,6 @@ helper function. [NonCancellable] job object is provided to suppress cancellatio | [SendChannel][kotlinx.coroutines.channels.SendChannel] | [send][kotlinx.coroutines.channels.SendChannel.send] | [onSend][kotlinx.coroutines.channels.SendChannel.onSend] | [trySend][kotlinx.coroutines.channels.SendChannel.trySend] | [ReceiveChannel][kotlinx.coroutines.channels.ReceiveChannel] | [receive][kotlinx.coroutines.channels.ReceiveChannel.receive] | [onReceive][kotlinx.coroutines.channels.ReceiveChannel.onReceive] | [tryReceive][kotlinx.coroutines.channels.ReceiveChannel.tryReceive] | [ReceiveChannel][kotlinx.coroutines.channels.ReceiveChannel] | [receiveCatching][kotlinx.coroutines.channels.receiveCatching] | [onReceiveCatching][kotlinx.coroutines.channels.onReceiveCatching] | [tryReceive][kotlinx.coroutines.channels.ReceiveChannel.tryReceive] -| [Mutex][kotlinx.coroutines.sync.Mutex] | [lock][kotlinx.coroutines.sync.Mutex.lock] | [onLock][kotlinx.coroutines.sync.Mutex.onLock] | [tryLock][kotlinx.coroutines.sync.Mutex.tryLock] | none | [delay][kotlinx.coroutines.delay] | [onTimeout][kotlinx.coroutines.selects.SelectBuilder.onTimeout] | none # Package kotlinx.coroutines @@ -121,8 +120,6 @@ Obsolete and deprecated module to test coroutines. Replaced with `kotlinx-corout [kotlinx.coroutines.sync.Mutex]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/index.html [kotlinx.coroutines.sync.Mutex.lock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/lock.html -[kotlinx.coroutines.sync.Mutex.onLock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/on-lock.html -[kotlinx.coroutines.sync.Mutex.tryLock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/try-lock.html diff --git a/kotlinx-coroutines-core/common/README.md b/kotlinx-coroutines-core/common/README.md index fcfe334c62..b00921bbcd 100644 --- a/kotlinx-coroutines-core/common/README.md +++ b/kotlinx-coroutines-core/common/README.md @@ -60,7 +60,6 @@ helper function. [NonCancellable] job object is provided to suppress cancellatio | [SendChannel][kotlinx.coroutines.channels.SendChannel] | [send][kotlinx.coroutines.channels.SendChannel.send] | [onSend][kotlinx.coroutines.channels.SendChannel.onSend] | [trySend][kotlinx.coroutines.channels.SendChannel.trySend] | [ReceiveChannel][kotlinx.coroutines.channels.ReceiveChannel] | [receive][kotlinx.coroutines.channels.ReceiveChannel.receive] | [onReceive][kotlinx.coroutines.channels.ReceiveChannel.onReceive] | [tryReceive][kotlinx.coroutines.channels.ReceiveChannel.tryReceive] | [ReceiveChannel][kotlinx.coroutines.channels.ReceiveChannel] | [receiveCatching][kotlinx.coroutines.channels.ReceiveChannel.receiveCatching] | [onReceiveCatching][kotlinx.coroutines.channels.ReceiveChannel.onReceiveCatching] | [tryReceive][kotlinx.coroutines.channels.ReceiveChannel.tryReceive] -| [Mutex][kotlinx.coroutines.sync.Mutex] | [lock][kotlinx.coroutines.sync.Mutex.lock] | [onLock][kotlinx.coroutines.sync.Mutex.onLock] | [tryLock][kotlinx.coroutines.sync.Mutex.tryLock] | none | [delay] | [onTimeout][kotlinx.coroutines.selects.SelectBuilder.onTimeout] | none This module provides debugging facilities for coroutines (run JVM with `-ea` or `-Dkotlinx.coroutines.debug` options) @@ -131,7 +130,6 @@ Low-level primitives for finer-grained control of coroutines. [kotlinx.coroutines.sync.Mutex]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/index.html [kotlinx.coroutines.sync.Mutex.lock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/lock.html -[kotlinx.coroutines.sync.Mutex.onLock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/on-lock.html [kotlinx.coroutines.sync.Mutex.tryLock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/try-lock.html diff --git a/kotlinx-coroutines-core/common/src/channels/Channel.kt b/kotlinx-coroutines-core/common/src/channels/Channel.kt index b15c4262ef..b24658a4c4 100644 --- a/kotlinx-coroutines-core/common/src/channels/Channel.kt +++ b/kotlinx-coroutines-core/common/src/channels/Channel.kt @@ -64,7 +64,6 @@ public interface SendChannel { */ public val onSend: SelectClause2> - /** * Immediately adds the specified [element] to this channel, if this doesn't violate its capacity restrictions, * and returns the successful result. Otherwise, returns failed or closed result. diff --git a/kotlinx-coroutines-core/common/src/selects/Select.kt b/kotlinx-coroutines-core/common/src/selects/Select.kt index a7172707e2..a313de3d5d 100644 --- a/kotlinx-coroutines-core/common/src/selects/Select.kt +++ b/kotlinx-coroutines-core/common/src/selects/Select.kt @@ -186,7 +186,6 @@ public interface SelectInstance { * | [SendChannel] | [send][SendChannel.send] | [onSend][SendChannel.onSend] * | [ReceiveChannel] | [receive][ReceiveChannel.receive] | [onReceive][ReceiveChannel.onReceive] * | [ReceiveChannel] | [receiveCatching][ReceiveChannel.receiveCatching] | [onReceiveCatching][ReceiveChannel.onReceiveCatching] - * | [Mutex] | [lock][Mutex.lock] | [onLock][Mutex.onLock] * | none | [delay] | [onTimeout][SelectBuilder.onTimeout] * * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this diff --git a/kotlinx-coroutines-core/common/src/sync/Mutex.kt b/kotlinx-coroutines-core/common/src/sync/Mutex.kt index 19584e0981..ebebcc8d1c 100644 --- a/kotlinx-coroutines-core/common/src/sync/Mutex.kt +++ b/kotlinx-coroutines-core/common/src/sync/Mutex.kt @@ -52,8 +52,7 @@ public interface Mutex { * Note that this function does not check for cancellation when it is not suspended. * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed. * - * This function can be used in [select] invocation with [onLock] clause. - * Use [tryLock] to try acquire lock without waiting. + * Use [tryLock] to try acquiring a lock without waiting. * * This function is fair; suspended callers are resumed in first-in-first-out order. * @@ -63,10 +62,10 @@ public interface Mutex { public suspend fun lock(owner: Any? = null) /** - * Clause for [select] expression of [lock] suspending function that selects when the mutex is locked. - * Additional parameter for the clause in the `owner` (see [lock]) and when the clause is selected - * the reference to this mutex is passed into the corresponding block. + * Deprecated for removal without built-in replacement. */ + @Deprecated(level = DeprecationLevel.WARNING, message = "Mutex.onLock deprecated without replacement. " + + "For additional details please refer to #2794") public val onLock: SelectClause2 /** diff --git a/kotlinx-coroutines-core/jvm/test/channels/RandevouzChannelStressTest.kt b/kotlinx-coroutines-core/jvm/test/channels/RendezvousChannelStressTest.kt similarity index 92% rename from kotlinx-coroutines-core/jvm/test/channels/RandevouzChannelStressTest.kt rename to kotlinx-coroutines-core/jvm/test/channels/RendezvousChannelStressTest.kt index a054175412..eb086cd204 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/RandevouzChannelStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/RendezvousChannelStressTest.kt @@ -7,7 +7,7 @@ package kotlinx.coroutines.channels import kotlinx.coroutines.* import org.junit.* -class RandevouzChannelStressTest : TestBase() { +class RendezvousChannelStressTest : TestBase() { @Test fun testStress() = runTest { diff --git a/reactive/kotlinx-coroutines-reactive/src/Publish.kt b/reactive/kotlinx-coroutines-reactive/src/Publish.kt index 4928a7439e..1b8683ce64 100644 --- a/reactive/kotlinx-coroutines-reactive/src/Publish.kt +++ b/reactive/kotlinx-coroutines-reactive/src/Publish.kt @@ -6,6 +6,7 @@ package kotlinx.coroutines.reactive import kotlinx.atomicfu.* import kotlinx.coroutines.* import kotlinx.coroutines.channels.* +import kotlinx.coroutines.intrinsics.* import kotlinx.coroutines.selects.* import kotlinx.coroutines.sync.* import org.reactivestreams.* @@ -104,10 +105,21 @@ public class PublisherCoroutine( // registerSelectSend @Suppress("PARAMETER_NAME_CHANGED_ON_OVERRIDE") override fun registerSelectClause2(select: SelectInstance, element: T, block: suspend (SendChannel) -> R) { - mutex.onLock.registerSelectClause2(select, null) { + val clause = suspend { doLockedNext(element)?.let { throw it } block(this) } + + launch(start = CoroutineStart.UNDISPATCHED) { + mutex.lock() + // Already selected -- bail out + if (!select.trySelect()) { + mutex.unlock() + return@launch + } + + clause.startCoroutineCancellable(select.completion) + } } /* diff --git a/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt b/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt index 095b724d40..267953f5a5 100644 --- a/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt @@ -5,9 +5,13 @@ package kotlinx.coroutines.reactive import kotlinx.coroutines.* +import kotlinx.coroutines.CancellationException import kotlinx.coroutines.channels.* +import kotlinx.coroutines.flow.* +import kotlinx.coroutines.sync.* import org.junit.Test import org.reactivestreams.* +import java.util.concurrent.* import kotlin.test.* class PublishTest : TestBase() { @@ -284,4 +288,36 @@ class PublishTest : TestBase() { } assertEquals("OK", publisher.awaitFirstOrNull()) } -} \ No newline at end of file + + @Test + fun testOnSendCancelled() = runTest { + val latch = CountDownLatch(1) + val published = publish(Dispatchers.Default) { + expect(2) + // Collector is ready + send(1) + try { + send(2) + expectUnreached() + } catch (e: CancellationException) { + // publisher cancellation is async + latch.countDown() + throw e + } + } + + expect(1) + val collectorLatch = Mutex(true) + val job = launch { + published.asFlow().buffer(0).collect { + collectorLatch.unlock() + hang { expect(4) } + } + } + collectorLatch.lock() + expect(3) + job.cancelAndJoin() + latch.await() + finish(5) + } +} diff --git a/reactive/kotlinx-coroutines-reactive/test/PublisherMultiTest.kt b/reactive/kotlinx-coroutines-reactive/test/PublisherMultiTest.kt index e3b1d3b384..4a552b5f8d 100644 --- a/reactive/kotlinx-coroutines-reactive/test/PublisherMultiTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/PublisherMultiTest.kt @@ -5,6 +5,7 @@ package kotlinx.coroutines.reactive import kotlinx.coroutines.* +import kotlinx.coroutines.selects.* import org.junit.Test import kotlin.test.* @@ -16,7 +17,7 @@ class PublisherMultiTest : TestBase() { // concurrent emitters (many coroutines) val jobs = List(n) { // launch - launch { + launch(Dispatchers.Default) { send(it) } } @@ -28,4 +29,26 @@ class PublisherMultiTest : TestBase() { } assertEquals(n, resultSet.size) } + + @Test + fun testConcurrentStressOnSend() = runBlocking { + val n = 10_000 * stressTestMultiplier + val observable = publish { + // concurrent emitters (many coroutines) + val jobs = List(n) { + // launch + launch(Dispatchers.Default) { + select { + onSend(it) {} + } + } + } + jobs.forEach { it.join() } + } + val resultSet = mutableSetOf() + observable.collect { + assertTrue(resultSet.add(it)) + } + assertEquals(n, resultSet.size) + } } diff --git a/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt b/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt index 5f409815af..90e770bb4f 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt @@ -10,6 +10,7 @@ import kotlinx.atomicfu.* import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import kotlinx.coroutines.internal.* +import kotlinx.coroutines.intrinsics.* import kotlinx.coroutines.selects.* import kotlinx.coroutines.sync.* import kotlin.coroutines.* @@ -95,10 +96,22 @@ private class RxObservableCoroutine( element: T, block: suspend (SendChannel) -> R ) { - mutex.onLock.registerSelectClause2(select, null) { + val clause = suspend { doLockedNext(element)?.let { throw it } block(this) } + + // This is the default replacement proposed in onLock replacement + launch(start = CoroutineStart.UNDISPATCHED) { + mutex.lock() + // Already selected -- bail out + if (!select.trySelect()) { + mutex.unlock() + return@launch + } + + clause.startCoroutineCancellable(select.completion) + } } // assert: mutex.isLocked() diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableCompletionStressTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableCompletionStressTest.kt index 30266e3e50..7e1d335028 100644 --- a/reactive/kotlinx-coroutines-rx2/test/ObservableCompletionStressTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/ObservableCompletionStressTest.kt @@ -12,7 +12,7 @@ import kotlin.coroutines.* class ObservableCompletionStressTest : TestBase() { private val N_REPEATS = 10_000 * stressTestMultiplier - private fun CoroutineScope.range(context: CoroutineContext, start: Int, count: Int) = rxObservable(context) { + private fun range(context: CoroutineContext, start: Int, count: Int) = rxObservable(context) { for (x in start until start + count) send(x) } @@ -33,4 +33,4 @@ class ObservableCompletionStressTest : TestBase() { } } } -} \ No newline at end of file +} diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableMultiTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableMultiTest.kt index 074fcf4900..7023211450 100644 --- a/reactive/kotlinx-coroutines-rx2/test/ObservableMultiTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/ObservableMultiTest.kt @@ -6,6 +6,7 @@ package kotlinx.coroutines.rx2 import io.reactivex.* import kotlinx.coroutines.* +import kotlinx.coroutines.selects.* import org.junit.Test import java.io.* import kotlin.test.* @@ -47,6 +48,29 @@ class ObservableMultiTest : TestBase() { } } + @Test + fun testConcurrentStressOnSend() { + val n = 10_000 * stressTestMultiplier + val observable = rxObservable { + newCoroutineContext(coroutineContext) + // concurrent emitters (many coroutines) + val jobs = List(n) { + // launch + launch(Dispatchers.Default) { + val i = it + select { + onSend(i) {} + } + } + } + jobs.forEach { it.join() } + } + checkSingleValue(observable.toList()) { list -> + assertEquals(n, list.size) + assertEquals((0 until n).toList(), list.sorted()) + } + } + @Test fun testIteratorResendUnconfined() { val n = 10_000 * stressTestMultiplier @@ -88,4 +112,4 @@ class ObservableMultiTest : TestBase() { assertEquals("OK", it) } } -} \ No newline at end of file +} diff --git a/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt b/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt index 57007bbdd4..1c5f7c0a63 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt @@ -13,6 +13,7 @@ import kotlinx.coroutines.selects.* import kotlinx.coroutines.sync.* import kotlin.coroutines.* import kotlinx.coroutines.internal.* +import kotlinx.coroutines.intrinsics.* /** * Creates cold [observable][Observable] that will run a given [block] in a coroutine. @@ -95,10 +96,22 @@ private class RxObservableCoroutine( element: T, block: suspend (SendChannel) -> R ) { - mutex.onLock.registerSelectClause2(select, null) { + val clause = suspend { doLockedNext(element)?.let { throw it } block(this) } + + // This is the default replacement proposed in onLock replacement + launch(start = CoroutineStart.UNDISPATCHED) { + mutex.lock() + // Already selected -- bail out + if (!select.trySelect()) { + mutex.unlock() + return@launch + } + + clause.startCoroutineCancellable(select.completion) + } } // assert: mutex.isLocked() diff --git a/reactive/kotlinx-coroutines-rx3/test/ObservableMultiTest.kt b/reactive/kotlinx-coroutines-rx3/test/ObservableMultiTest.kt index b4adf7af27..d7c799db1c 100644 --- a/reactive/kotlinx-coroutines-rx3/test/ObservableMultiTest.kt +++ b/reactive/kotlinx-coroutines-rx3/test/ObservableMultiTest.kt @@ -6,6 +6,7 @@ package kotlinx.coroutines.rx3 import io.reactivex.rxjava3.core.* import kotlinx.coroutines.* +import kotlinx.coroutines.selects.* import org.junit.Test import java.io.* import kotlin.test.* @@ -34,7 +35,7 @@ class ObservableMultiTest : TestBase() { // concurrent emitters (many coroutines) val jobs = List(n) { // launch - launch { + launch(Dispatchers.Default) { val i = it send(i) } @@ -47,6 +48,29 @@ class ObservableMultiTest : TestBase() { } } + @Test + fun testConcurrentStressOnSend() { + val n = 10_000 * stressTestMultiplier + val observable = rxObservable { + newCoroutineContext(coroutineContext) + // concurrent emitters (many coroutines) + val jobs = List(n) { + // launch + launch(Dispatchers.Default) { + val i = it + select { + onSend(i) {} + } + } + } + jobs.forEach { it.join() } + } + checkSingleValue(observable.toList()) { list -> + assertEquals(n, list.size) + assertEquals((0 until n).toList(), list.sorted()) + } + } + @Test fun testIteratorResendUnconfined() { val n = 10_000 * stressTestMultiplier