Skip to content

Commit

Permalink
Mutex.onLock deprecation (#2850)
Browse files Browse the repository at this point in the history
* Actually test 'onLock' and the corresponding concurrency and cancellation of Reactive's onSend
* Update benchmarks
* Non-linearizable implementation of PublisherCoroutine.onSend that isn't using Mutex.onLock
* Deprecate Mutex.onLock

Fixes #2794

Co-authored-by: Dmitry Khalanskiy <Dmitry.Khalanskiy@jetbrains.com>
  • Loading branch information
qwwdfsad and dkhalanskyjb committed Oct 20, 2021
1 parent dfc4821 commit 7755edb
Show file tree
Hide file tree
Showing 15 changed files with 166 additions and 28 deletions.
13 changes: 7 additions & 6 deletions benchmarks/build.gradle.kts
Expand Up @@ -54,13 +54,14 @@ tasks.named<Jar>("jmhJar") {
}

dependencies {
compile("org.openjdk.jmh:jmh-core:1.26")
compile("io.projectreactor:reactor-core:${version("reactor")}")
compile("io.reactivex.rxjava2:rxjava:2.1.9")
compile("com.github.akarnokd:rxjava2-extensions:0.20.8")
implementation("org.openjdk.jmh:jmh-core:1.26")
implementation("io.projectreactor:reactor-core:${version("reactor")}")
implementation("io.reactivex.rxjava2:rxjava:2.1.9")
implementation("com.github.akarnokd:rxjava2-extensions:0.20.8")

compile("com.typesafe.akka:akka-actor_2.12:2.5.0")
compile(project(":kotlinx-coroutines-core"))
implementation("com.typesafe.akka:akka-actor_2.12:2.5.0")
implementation(project(":kotlinx-coroutines-core"))
implementation(project(":kotlinx-coroutines-reactive"))

// add jmh dependency on main
"jmhImplementation"(sourceSets.main.get().runtimeClasspath)
Expand Down
3 changes: 0 additions & 3 deletions kotlinx-coroutines-core/README.md
Expand Up @@ -57,7 +57,6 @@ helper function. [NonCancellable] job object is provided to suppress cancellatio
| [SendChannel][kotlinx.coroutines.channels.SendChannel] | [send][kotlinx.coroutines.channels.SendChannel.send] | [onSend][kotlinx.coroutines.channels.SendChannel.onSend] | [trySend][kotlinx.coroutines.channels.SendChannel.trySend]
| [ReceiveChannel][kotlinx.coroutines.channels.ReceiveChannel] | [receive][kotlinx.coroutines.channels.ReceiveChannel.receive] | [onReceive][kotlinx.coroutines.channels.ReceiveChannel.onReceive] | [tryReceive][kotlinx.coroutines.channels.ReceiveChannel.tryReceive]
| [ReceiveChannel][kotlinx.coroutines.channels.ReceiveChannel] | [receiveCatching][kotlinx.coroutines.channels.receiveCatching] | [onReceiveCatching][kotlinx.coroutines.channels.onReceiveCatching] | [tryReceive][kotlinx.coroutines.channels.ReceiveChannel.tryReceive]
| [Mutex][kotlinx.coroutines.sync.Mutex] | [lock][kotlinx.coroutines.sync.Mutex.lock] | [onLock][kotlinx.coroutines.sync.Mutex.onLock] | [tryLock][kotlinx.coroutines.sync.Mutex.tryLock]
| none | [delay][kotlinx.coroutines.delay] | [onTimeout][kotlinx.coroutines.selects.SelectBuilder.onTimeout] | none

# Package kotlinx.coroutines
Expand Down Expand Up @@ -121,8 +120,6 @@ Obsolete and deprecated module to test coroutines. Replaced with `kotlinx-corout

[kotlinx.coroutines.sync.Mutex]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/index.html
[kotlinx.coroutines.sync.Mutex.lock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/lock.html
[kotlinx.coroutines.sync.Mutex.onLock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/on-lock.html
[kotlinx.coroutines.sync.Mutex.tryLock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/try-lock.html

<!--- INDEX kotlinx.coroutines.channels -->

Expand Down
2 changes: 0 additions & 2 deletions kotlinx-coroutines-core/common/README.md
Expand Up @@ -60,7 +60,6 @@ helper function. [NonCancellable] job object is provided to suppress cancellatio
| [SendChannel][kotlinx.coroutines.channels.SendChannel] | [send][kotlinx.coroutines.channels.SendChannel.send] | [onSend][kotlinx.coroutines.channels.SendChannel.onSend] | [trySend][kotlinx.coroutines.channels.SendChannel.trySend]
| [ReceiveChannel][kotlinx.coroutines.channels.ReceiveChannel] | [receive][kotlinx.coroutines.channels.ReceiveChannel.receive] | [onReceive][kotlinx.coroutines.channels.ReceiveChannel.onReceive] | [tryReceive][kotlinx.coroutines.channels.ReceiveChannel.tryReceive]
| [ReceiveChannel][kotlinx.coroutines.channels.ReceiveChannel] | [receiveCatching][kotlinx.coroutines.channels.ReceiveChannel.receiveCatching] | [onReceiveCatching][kotlinx.coroutines.channels.ReceiveChannel.onReceiveCatching] | [tryReceive][kotlinx.coroutines.channels.ReceiveChannel.tryReceive]
| [Mutex][kotlinx.coroutines.sync.Mutex] | [lock][kotlinx.coroutines.sync.Mutex.lock] | [onLock][kotlinx.coroutines.sync.Mutex.onLock] | [tryLock][kotlinx.coroutines.sync.Mutex.tryLock]
| none | [delay] | [onTimeout][kotlinx.coroutines.selects.SelectBuilder.onTimeout] | none

This module provides debugging facilities for coroutines (run JVM with `-ea` or `-Dkotlinx.coroutines.debug` options)
Expand Down Expand Up @@ -131,7 +130,6 @@ Low-level primitives for finer-grained control of coroutines.

[kotlinx.coroutines.sync.Mutex]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/index.html
[kotlinx.coroutines.sync.Mutex.lock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/lock.html
[kotlinx.coroutines.sync.Mutex.onLock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/on-lock.html
[kotlinx.coroutines.sync.Mutex.tryLock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/try-lock.html

<!--- INDEX kotlinx.coroutines.channels -->
Expand Down
1 change: 0 additions & 1 deletion kotlinx-coroutines-core/common/src/channels/Channel.kt
Expand Up @@ -64,7 +64,6 @@ public interface SendChannel<in E> {
*/
public val onSend: SelectClause2<E, SendChannel<E>>


/**
* Immediately adds the specified [element] to this channel, if this doesn't violate its capacity restrictions,
* and returns the successful result. Otherwise, returns failed or closed result.
Expand Down
1 change: 0 additions & 1 deletion kotlinx-coroutines-core/common/src/selects/Select.kt
Expand Up @@ -186,7 +186,6 @@ public interface SelectInstance<in R> {
* | [SendChannel] | [send][SendChannel.send] | [onSend][SendChannel.onSend]
* | [ReceiveChannel] | [receive][ReceiveChannel.receive] | [onReceive][ReceiveChannel.onReceive]
* | [ReceiveChannel] | [receiveCatching][ReceiveChannel.receiveCatching] | [onReceiveCatching][ReceiveChannel.onReceiveCatching]
* | [Mutex] | [lock][Mutex.lock] | [onLock][Mutex.onLock]
* | none | [delay] | [onTimeout][SelectBuilder.onTimeout]
*
* This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
Expand Down
9 changes: 4 additions & 5 deletions kotlinx-coroutines-core/common/src/sync/Mutex.kt
Expand Up @@ -52,8 +52,7 @@ public interface Mutex {
* Note that this function does not check for cancellation when it is not suspended.
* Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
*
* This function can be used in [select] invocation with [onLock] clause.
* Use [tryLock] to try acquire lock without waiting.
* Use [tryLock] to try acquiring a lock without waiting.
*
* This function is fair; suspended callers are resumed in first-in-first-out order.
*
Expand All @@ -63,10 +62,10 @@ public interface Mutex {
public suspend fun lock(owner: Any? = null)

/**
* Clause for [select] expression of [lock] suspending function that selects when the mutex is locked.
* Additional parameter for the clause in the `owner` (see [lock]) and when the clause is selected
* the reference to this mutex is passed into the corresponding block.
* Deprecated for removal without built-in replacement.
*/
@Deprecated(level = DeprecationLevel.WARNING, message = "Mutex.onLock deprecated without replacement. " +
"For additional details please refer to #2794")
public val onLock: SelectClause2<Any?, Mutex>

/**
Expand Down
Expand Up @@ -7,7 +7,7 @@ package kotlinx.coroutines.channels
import kotlinx.coroutines.*
import org.junit.*

class RandevouzChannelStressTest : TestBase() {
class RendezvousChannelStressTest : TestBase() {

@Test
fun testStress() = runTest {
Expand Down
14 changes: 13 additions & 1 deletion reactive/kotlinx-coroutines-reactive/src/Publish.kt
Expand Up @@ -6,6 +6,7 @@ package kotlinx.coroutines.reactive
import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.intrinsics.*
import kotlinx.coroutines.selects.*
import kotlinx.coroutines.sync.*
import org.reactivestreams.*
Expand Down Expand Up @@ -104,10 +105,21 @@ public class PublisherCoroutine<in T>(
// registerSelectSend
@Suppress("PARAMETER_NAME_CHANGED_ON_OVERRIDE")
override fun <R> registerSelectClause2(select: SelectInstance<R>, element: T, block: suspend (SendChannel<T>) -> R) {
mutex.onLock.registerSelectClause2(select, null) {
val clause = suspend {
doLockedNext(element)?.let { throw it }
block(this)
}

launch(start = CoroutineStart.UNDISPATCHED) {
mutex.lock()
// Already selected -- bail out
if (!select.trySelect()) {
mutex.unlock()
return@launch
}

clause.startCoroutineCancellable(select.completion)
}
}

/*
Expand Down
38 changes: 37 additions & 1 deletion reactive/kotlinx-coroutines-reactive/test/PublishTest.kt
Expand Up @@ -5,9 +5,13 @@
package kotlinx.coroutines.reactive

import kotlinx.coroutines.*
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.sync.*
import org.junit.Test
import org.reactivestreams.*
import java.util.concurrent.*
import kotlin.test.*

class PublishTest : TestBase() {
Expand Down Expand Up @@ -284,4 +288,36 @@ class PublishTest : TestBase() {
}
assertEquals("OK", publisher.awaitFirstOrNull())
}
}

@Test
fun testOnSendCancelled() = runTest {
val latch = CountDownLatch(1)
val published = publish(Dispatchers.Default) {
expect(2)
// Collector is ready
send(1)
try {
send(2)
expectUnreached()
} catch (e: CancellationException) {
// publisher cancellation is async
latch.countDown()
throw e
}
}

expect(1)
val collectorLatch = Mutex(true)
val job = launch {
published.asFlow().buffer(0).collect {
collectorLatch.unlock()
hang { expect(4) }
}
}
collectorLatch.lock()
expect(3)
job.cancelAndJoin()
latch.await()
finish(5)
}
}
25 changes: 24 additions & 1 deletion reactive/kotlinx-coroutines-reactive/test/PublisherMultiTest.kt
Expand Up @@ -5,6 +5,7 @@
package kotlinx.coroutines.reactive

import kotlinx.coroutines.*
import kotlinx.coroutines.selects.*
import org.junit.Test
import kotlin.test.*

Expand All @@ -16,7 +17,7 @@ class PublisherMultiTest : TestBase() {
// concurrent emitters (many coroutines)
val jobs = List(n) {
// launch
launch {
launch(Dispatchers.Default) {
send(it)
}
}
Expand All @@ -28,4 +29,26 @@ class PublisherMultiTest : TestBase() {
}
assertEquals(n, resultSet.size)
}

@Test
fun testConcurrentStressOnSend() = runBlocking {
val n = 10_000 * stressTestMultiplier
val observable = publish<Int> {
// concurrent emitters (many coroutines)
val jobs = List(n) {
// launch
launch(Dispatchers.Default) {
select<Unit> {
onSend(it) {}
}
}
}
jobs.forEach { it.join() }
}
val resultSet = mutableSetOf<Int>()
observable.collect {
assertTrue(resultSet.add(it))
}
assertEquals(n, resultSet.size)
}
}
15 changes: 14 additions & 1 deletion reactive/kotlinx-coroutines-rx2/src/RxObservable.kt
Expand Up @@ -10,6 +10,7 @@ import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.internal.*
import kotlinx.coroutines.intrinsics.*
import kotlinx.coroutines.selects.*
import kotlinx.coroutines.sync.*
import kotlin.coroutines.*
Expand Down Expand Up @@ -95,10 +96,22 @@ private class RxObservableCoroutine<T : Any>(
element: T,
block: suspend (SendChannel<T>) -> R
) {
mutex.onLock.registerSelectClause2(select, null) {
val clause = suspend {
doLockedNext(element)?.let { throw it }
block(this)
}

// This is the default replacement proposed in onLock replacement
launch(start = CoroutineStart.UNDISPATCHED) {
mutex.lock()
// Already selected -- bail out
if (!select.trySelect()) {
mutex.unlock()
return@launch
}

clause.startCoroutineCancellable(select.completion)
}
}

// assert: mutex.isLocked()
Expand Down
Expand Up @@ -12,7 +12,7 @@ import kotlin.coroutines.*
class ObservableCompletionStressTest : TestBase() {
private val N_REPEATS = 10_000 * stressTestMultiplier

private fun CoroutineScope.range(context: CoroutineContext, start: Int, count: Int) = rxObservable(context) {
private fun range(context: CoroutineContext, start: Int, count: Int) = rxObservable(context) {
for (x in start until start + count) send(x)
}

Expand All @@ -33,4 +33,4 @@ class ObservableCompletionStressTest : TestBase() {
}
}
}
}
}
26 changes: 25 additions & 1 deletion reactive/kotlinx-coroutines-rx2/test/ObservableMultiTest.kt
Expand Up @@ -6,6 +6,7 @@ package kotlinx.coroutines.rx2

import io.reactivex.*
import kotlinx.coroutines.*
import kotlinx.coroutines.selects.*
import org.junit.Test
import java.io.*
import kotlin.test.*
Expand Down Expand Up @@ -47,6 +48,29 @@ class ObservableMultiTest : TestBase() {
}
}

@Test
fun testConcurrentStressOnSend() {
val n = 10_000 * stressTestMultiplier
val observable = rxObservable<Int> {
newCoroutineContext(coroutineContext)
// concurrent emitters (many coroutines)
val jobs = List(n) {
// launch
launch(Dispatchers.Default) {
val i = it
select<Unit> {
onSend(i) {}
}
}
}
jobs.forEach { it.join() }
}
checkSingleValue(observable.toList()) { list ->
assertEquals(n, list.size)
assertEquals((0 until n).toList(), list.sorted())
}
}

@Test
fun testIteratorResendUnconfined() {
val n = 10_000 * stressTestMultiplier
Expand Down Expand Up @@ -88,4 +112,4 @@ class ObservableMultiTest : TestBase() {
assertEquals("OK", it)
}
}
}
}
15 changes: 14 additions & 1 deletion reactive/kotlinx-coroutines-rx3/src/RxObservable.kt
Expand Up @@ -13,6 +13,7 @@ import kotlinx.coroutines.selects.*
import kotlinx.coroutines.sync.*
import kotlin.coroutines.*
import kotlinx.coroutines.internal.*
import kotlinx.coroutines.intrinsics.*

/**
* Creates cold [observable][Observable] that will run a given [block] in a coroutine.
Expand Down Expand Up @@ -95,10 +96,22 @@ private class RxObservableCoroutine<T : Any>(
element: T,
block: suspend (SendChannel<T>) -> R
) {
mutex.onLock.registerSelectClause2(select, null) {
val clause = suspend {
doLockedNext(element)?.let { throw it }
block(this)
}

// This is the default replacement proposed in onLock replacement
launch(start = CoroutineStart.UNDISPATCHED) {
mutex.lock()
// Already selected -- bail out
if (!select.trySelect()) {
mutex.unlock()
return@launch
}

clause.startCoroutineCancellable(select.completion)
}
}

// assert: mutex.isLocked()
Expand Down

0 comments on commit 7755edb

Please sign in to comment.