Skip to content

Commit

Permalink
Non-linearizable implementation of PublisherCoroutine.onSend that isn…
Browse files Browse the repository at this point in the history
…'t using Mutex.onLock
  • Loading branch information
qwwdfsad committed Sep 30, 2021
1 parent 3df1307 commit 97ecf59
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 2 deletions.
15 changes: 14 additions & 1 deletion reactive/kotlinx-coroutines-reactive/src/Publish.kt
Expand Up @@ -6,6 +6,7 @@ package kotlinx.coroutines.reactive
import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.intrinsics.*
import kotlinx.coroutines.selects.*
import kotlinx.coroutines.sync.*
import org.reactivestreams.*
Expand Down Expand Up @@ -104,10 +105,22 @@ public class PublisherCoroutine<in T>(
// registerSelectSend
@Suppress("PARAMETER_NAME_CHANGED_ON_OVERRIDE")
override fun <R> registerSelectClause2(select: SelectInstance<R>, element: T, block: suspend (SendChannel<T>) -> R) {
mutex.onLock.registerSelectClause2(select, null) {
val clause = suspend {
doLockedNext(element)?.let { throw it }
block(this)
}

// TODO discuss it
launch(start = CoroutineStart.UNDISPATCHED) {
mutex.lock()
// Already selected -- bail out
if (!select.trySelect()) {
mutex.unlock()
return@launch
}

clause.startCoroutineCancellable(select.completion)
}
}

/*
Expand Down
5 changes: 4 additions & 1 deletion reactive/kotlinx-coroutines-reactive/test/PublishTest.kt
Expand Up @@ -8,6 +8,7 @@ import kotlinx.coroutines.*
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.sync.*
import org.junit.Test
import org.reactivestreams.*
import java.util.concurrent.*
Expand Down Expand Up @@ -308,13 +309,15 @@ class PublishTest : TestBase() {
}

expect(1)
val collectorLatch = Mutex(true)
val job = launch {
published.asFlow().buffer(0).collect {
expect(4)
collectorLatch.unlock()
hang { expect(6) }
}
}
yield()
collectorLatch.lock()
expect(5)
job.cancelAndJoin()
latch.await()
Expand Down

0 comments on commit 97ecf59

Please sign in to comment.