From 97ecf5931ecec42a55eb0fc5ec64e47ea28cb099 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Fri, 30 Jul 2021 17:51:54 +0300 Subject: [PATCH] Non-linearizable implementation of PublisherCoroutine.onSend that isn't using Mutex.onLock --- .../kotlinx-coroutines-reactive/src/Publish.kt | 15 ++++++++++++++- .../test/PublishTest.kt | 5 ++++- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/reactive/kotlinx-coroutines-reactive/src/Publish.kt b/reactive/kotlinx-coroutines-reactive/src/Publish.kt index 4928a7439e..9785627493 100644 --- a/reactive/kotlinx-coroutines-reactive/src/Publish.kt +++ b/reactive/kotlinx-coroutines-reactive/src/Publish.kt @@ -6,6 +6,7 @@ package kotlinx.coroutines.reactive import kotlinx.atomicfu.* import kotlinx.coroutines.* import kotlinx.coroutines.channels.* +import kotlinx.coroutines.intrinsics.* import kotlinx.coroutines.selects.* import kotlinx.coroutines.sync.* import org.reactivestreams.* @@ -104,10 +105,22 @@ public class PublisherCoroutine( // registerSelectSend @Suppress("PARAMETER_NAME_CHANGED_ON_OVERRIDE") override fun registerSelectClause2(select: SelectInstance, element: T, block: suspend (SendChannel) -> R) { - mutex.onLock.registerSelectClause2(select, null) { + val clause = suspend { doLockedNext(element)?.let { throw it } block(this) } + + // TODO discuss it + launch(start = CoroutineStart.UNDISPATCHED) { + mutex.lock() + // Already selected -- bail out + if (!select.trySelect()) { + mutex.unlock() + return@launch + } + + clause.startCoroutineCancellable(select.completion) + } } /* diff --git a/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt b/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt index 8e05906850..84b406d2ac 100644 --- a/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt @@ -8,6 +8,7 @@ import kotlinx.coroutines.* import kotlinx.coroutines.CancellationException import kotlinx.coroutines.channels.* import kotlinx.coroutines.flow.* +import kotlinx.coroutines.sync.* import org.junit.Test import org.reactivestreams.* import java.util.concurrent.* @@ -308,13 +309,15 @@ class PublishTest : TestBase() { } expect(1) + val collectorLatch = Mutex(true) val job = launch { published.asFlow().buffer(0).collect { expect(4) + collectorLatch.unlock() hang { expect(6) } } } - yield() + collectorLatch.lock() expect(5) job.cancelAndJoin() latch.await()