Skip to content

Commit

Permalink
Deprecate Mutex.onLock
Browse files Browse the repository at this point in the history
Fixes #2794
  • Loading branch information
qwwdfsad committed Oct 6, 2021
1 parent 97ecf59 commit 0aed352
Show file tree
Hide file tree
Showing 10 changed files with 38 additions and 21 deletions.
3 changes: 0 additions & 3 deletions kotlinx-coroutines-core/README.md
Expand Up @@ -57,7 +57,6 @@ helper function. [NonCancellable] job object is provided to suppress cancellatio
| [SendChannel][kotlinx.coroutines.channels.SendChannel] | [send][kotlinx.coroutines.channels.SendChannel.send] | [onSend][kotlinx.coroutines.channels.SendChannel.onSend] | [trySend][kotlinx.coroutines.channels.SendChannel.trySend]
| [ReceiveChannel][kotlinx.coroutines.channels.ReceiveChannel] | [receive][kotlinx.coroutines.channels.ReceiveChannel.receive] | [onReceive][kotlinx.coroutines.channels.ReceiveChannel.onReceive] | [tryReceive][kotlinx.coroutines.channels.ReceiveChannel.tryReceive]
| [ReceiveChannel][kotlinx.coroutines.channels.ReceiveChannel] | [receiveCatching][kotlinx.coroutines.channels.receiveCatching] | [onReceiveCatching][kotlinx.coroutines.channels.onReceiveCatching] | [tryReceive][kotlinx.coroutines.channels.ReceiveChannel.tryReceive]
| [Mutex][kotlinx.coroutines.sync.Mutex] | [lock][kotlinx.coroutines.sync.Mutex.lock] | [onLock][kotlinx.coroutines.sync.Mutex.onLock] | [tryLock][kotlinx.coroutines.sync.Mutex.tryLock]
| none | [delay][kotlinx.coroutines.delay] | [onTimeout][kotlinx.coroutines.selects.SelectBuilder.onTimeout] | none

# Package kotlinx.coroutines
Expand Down Expand Up @@ -121,8 +120,6 @@ Obsolete and deprecated module to test coroutines. Replaced with `kotlinx-corout

[kotlinx.coroutines.sync.Mutex]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/index.html
[kotlinx.coroutines.sync.Mutex.lock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/lock.html
[kotlinx.coroutines.sync.Mutex.onLock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/on-lock.html
[kotlinx.coroutines.sync.Mutex.tryLock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/try-lock.html

<!--- INDEX kotlinx.coroutines.channels -->

Expand Down
1 change: 0 additions & 1 deletion kotlinx-coroutines-core/common/src/selects/Select.kt
Expand Up @@ -186,7 +186,6 @@ public interface SelectInstance<in R> {
* | [SendChannel] | [send][SendChannel.send] | [onSend][SendChannel.onSend]
* | [ReceiveChannel] | [receive][ReceiveChannel.receive] | [onReceive][ReceiveChannel.onReceive]
* | [ReceiveChannel] | [receiveCatching][ReceiveChannel.receiveCatching] | [onReceiveCatching][ReceiveChannel.onReceiveCatching]
* | [Mutex] | [lock][Mutex.lock] | [onLock][Mutex.onLock]
* | none | [delay] | [onTimeout][SelectBuilder.onTimeout]
*
* This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
Expand Down
9 changes: 4 additions & 5 deletions kotlinx-coroutines-core/common/src/sync/Mutex.kt
Expand Up @@ -52,8 +52,7 @@ public interface Mutex {
* Note that this function does not check for cancellation when it is not suspended.
* Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
*
* This function can be used in [select] invocation with [onLock] clause.
* Use [tryLock] to try acquire lock without waiting.
* Use [tryLock] to try acquiring a lock without waiting.
*
* This function is fair; suspended callers are resumed in first-in-first-out order.
*
Expand All @@ -63,10 +62,10 @@ public interface Mutex {
public suspend fun lock(owner: Any? = null)

/**
* Clause for [select] expression of [lock] suspending function that selects when the mutex is locked.
* Additional parameter for the clause in the `owner` (see [lock]) and when the clause is selected
* the reference to this mutex is passed into the corresponding block.
* Deprecated for removal without built-in replacement.
*/
@Deprecated(level = DeprecationLevel.WARNING, message = "Mutex.onLock deprecated without replacement. " +
"For additional details please refer to #2794")
public val onLock: SelectClause2<Any?, Mutex>

/**
Expand Down
1 change: 0 additions & 1 deletion reactive/kotlinx-coroutines-reactive/src/Publish.kt
Expand Up @@ -110,7 +110,6 @@ public class PublisherCoroutine<in T>(
block(this)
}

// TODO discuss it
launch(start = CoroutineStart.UNDISPATCHED) {
mutex.lock()
// Already selected -- bail out
Expand Down
9 changes: 3 additions & 6 deletions reactive/kotlinx-coroutines-reactive/test/PublishTest.kt
Expand Up @@ -296,12 +296,10 @@ class PublishTest : TestBase() {
expect(2)
// Collector is ready
send(1)
expect(3)
try {
send(2)
expectUnreached()
} catch (e: CancellationException) {
expect(7)
// publisher cancellation is async
latch.countDown()
throw e
Expand All @@ -312,15 +310,14 @@ class PublishTest : TestBase() {
val collectorLatch = Mutex(true)
val job = launch {
published.asFlow().buffer(0).collect {
expect(4)
collectorLatch.unlock()
hang { expect(6) }
hang { expect(4) }
}
}
collectorLatch.lock()
expect(5)
expect(3)
job.cancelAndJoin()
latch.await()
finish(8)
finish(5)
}
}
Expand Up @@ -33,7 +33,7 @@ class PublisherMultiTest : TestBase() {
@Test
fun testConcurrentStressOnSend() = runBlocking {
val n = 10_000 * stressTestMultiplier
val observable = publish {
val observable = publish<Int> {
// concurrent emitters (many coroutines)
val jobs = List(n) {
// launch
Expand Down
15 changes: 14 additions & 1 deletion reactive/kotlinx-coroutines-rx2/src/RxObservable.kt
Expand Up @@ -10,6 +10,7 @@ import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.internal.*
import kotlinx.coroutines.intrinsics.*
import kotlinx.coroutines.selects.*
import kotlinx.coroutines.sync.*
import kotlin.coroutines.*
Expand Down Expand Up @@ -95,10 +96,22 @@ private class RxObservableCoroutine<T : Any>(
element: T,
block: suspend (SendChannel<T>) -> R
) {
mutex.onLock.registerSelectClause2(select, null) {
val clause = suspend {
doLockedNext(element)?.let { throw it }
block(this)
}

// This is the default replacement proposed in onLock replacement
launch(start = CoroutineStart.UNDISPATCHED) {
mutex.lock()
// Already selected -- bail out
if (!select.trySelect()) {
mutex.unlock()
return@launch
}

clause.startCoroutineCancellable(select.completion)
}
}

// assert: mutex.isLocked()
Expand Down
Expand Up @@ -51,7 +51,7 @@ class ObservableMultiTest : TestBase() {
@Test
fun testConcurrentStressOnSend() {
val n = 10_000 * stressTestMultiplier
val observable = rxObservable {
val observable = rxObservable<Int> {
newCoroutineContext(coroutineContext)
// concurrent emitters (many coroutines)
val jobs = List(n) {
Expand Down
15 changes: 14 additions & 1 deletion reactive/kotlinx-coroutines-rx3/src/RxObservable.kt
Expand Up @@ -13,6 +13,7 @@ import kotlinx.coroutines.selects.*
import kotlinx.coroutines.sync.*
import kotlin.coroutines.*
import kotlinx.coroutines.internal.*
import kotlinx.coroutines.intrinsics.*

/**
* Creates cold [observable][Observable] that will run a given [block] in a coroutine.
Expand Down Expand Up @@ -95,10 +96,22 @@ private class RxObservableCoroutine<T : Any>(
element: T,
block: suspend (SendChannel<T>) -> R
) {
mutex.onLock.registerSelectClause2(select, null) {
val clause = suspend {
doLockedNext(element)?.let { throw it }
block(this)
}

// This is the default replacement proposed in onLock replacement
launch(start = CoroutineStart.UNDISPATCHED) {
mutex.lock()
// Already selected -- bail out
if (!select.trySelect()) {
mutex.unlock()
return@launch
}

clause.startCoroutineCancellable(select.completion)
}
}

// assert: mutex.isLocked()
Expand Down
Expand Up @@ -51,7 +51,7 @@ class ObservableMultiTest : TestBase() {
@Test
fun testConcurrentStressOnSend() {
val n = 10_000 * stressTestMultiplier
val observable = rxObservable {
val observable = rxObservable<Int> {
newCoroutineContext(coroutineContext)
// concurrent emitters (many coroutines)
val jobs = List(n) {
Expand Down

0 comments on commit 0aed352

Please sign in to comment.