Skip to content

Commit

Permalink
Fix DefaultExecutor not being able to exit (Kotlin#1876)
Browse files Browse the repository at this point in the history
* Fix DefaultExecutor not being able to exit.
* Also adds the performance optimization. See the discussion on the PR.
* Add a stress test for the DefaultExecutor worker shutting down.
* Make `testDelayChannelBackpressure2` not fail:

This test could in theory already fail on the second
`checkNotEmpty`: after the first `checkNotEmpty` has passed,
first, the ticker channel awakens to produce a new element, and
then the main thread awakens to check if it's there. However, if
the ticker channel is sufficiently slowed down, it may not produce
the element in time for the main thread to find it.

After introducing the change that allows the worker thread in
`DefaultExecutor` to shut down, the initial delay of 2500 ms is
sufficient for the worker to shut down (which, by default, happens
after 1000 ms of inactivity), and then the aforementioned race
condition worsens: additional time is required to launch a new
worker thread and it's much easier to miss the deadline.

Now, the delays are much shorter, meaning that the worker thread is
never inactive long enough to shut down.

Fixes Kotlin#856
  • Loading branch information
dkhalanskyjb authored and recheej committed Dec 28, 2020
1 parent ad1c22f commit 9be76d2
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 18 deletions.
10 changes: 7 additions & 3 deletions kotlinx-coroutines-core/common/src/EventLoop.common.kt
Expand Up @@ -52,7 +52,7 @@ internal abstract class EventLoop : CoroutineDispatcher() {
*/
public open fun processNextEvent(): Long {
if (!processUnconfinedEvent()) return Long.MAX_VALUE
return nextTime
return 0
}

protected open val isEmpty: Boolean get() = isUnconfinedQueueEmpty
Expand Down Expand Up @@ -251,7 +251,7 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {

override fun processNextEvent(): Long {
// unconfined events take priority
if (processUnconfinedEvent()) return nextTime
if (processUnconfinedEvent()) return 0
// queue all delayed tasks that are due to be executed
val delayed = _delayed.value
if (delayed != null && !delayed.isEmpty) {
Expand All @@ -269,7 +269,11 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
}
}
// then process one event from queue
dequeue()?.run()
val task = dequeue()
if (task != null) {
task.run()
return 0
}
return nextTime
}

Expand Down
19 changes: 10 additions & 9 deletions kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt
Expand Up @@ -68,15 +68,13 @@ internal actual object DefaultExecutor : EventLoopImplBase(), Runnable {
var parkNanos = processNextEvent()
if (parkNanos == Long.MAX_VALUE) {
// nothing to do, initialize shutdown timeout
if (shutdownNanos == Long.MAX_VALUE) {
val now = nanoTime()
if (shutdownNanos == Long.MAX_VALUE) shutdownNanos = now + KEEP_ALIVE_NANOS
val tillShutdown = shutdownNanos - now
if (tillShutdown <= 0) return // shut thread down
parkNanos = parkNanos.coerceAtMost(tillShutdown)
} else
parkNanos = parkNanos.coerceAtMost(KEEP_ALIVE_NANOS) // limit wait time anyway
}
val now = nanoTime()
if (shutdownNanos == Long.MAX_VALUE) shutdownNanos = now + KEEP_ALIVE_NANOS
val tillShutdown = shutdownNanos - now
if (tillShutdown <= 0) return // shut thread down
parkNanos = parkNanos.coerceAtMost(tillShutdown)
} else
shutdownNanos = Long.MAX_VALUE
if (parkNanos > 0) {
// check if shutdown was requested and bail out in this case
if (isShutdownRequested) return
Expand Down Expand Up @@ -142,4 +140,7 @@ internal actual object DefaultExecutor : EventLoopImplBase(), Runnable {
resetAll() // clear queues
(this as Object).notifyAll()
}

internal val isThreadPresent
get() = _thread != null
}
29 changes: 28 additions & 1 deletion kotlinx-coroutines-core/jvm/test/DefaultExecutorStressTest.kt
Expand Up @@ -4,7 +4,8 @@

package kotlinx.coroutines

import org.junit.*
import org.junit.Test
import kotlin.test.*

class DefaultExecutorStressTest : TestBase() {
@Test
Expand Down Expand Up @@ -35,4 +36,30 @@ class DefaultExecutorStressTest : TestBase() {
}
finish(2 + iterations * 4)
}

@Test
fun testWorkerShutdown() = withVirtualTimeSource {
val iterations = 1_000 * stressTestMultiplier
// wait for the worker to shut down
suspend fun awaitWorkerShutdown() {
val executorTimeoutMs = 1000L
delay(executorTimeoutMs)
while (DefaultExecutor.isThreadPresent) { delay(10) } // hangs if the thread refuses to stop
assertFalse(DefaultExecutor.isThreadPresent) // just to make sure
}
runTest {
awaitWorkerShutdown() // so that the worker shuts down after the initial launch
repeat (iterations) {
val job = launch(Dispatchers.Unconfined) {
// this line runs in the main thread
delay(1)
// this line runs in the DefaultExecutor worker
}
delay(100) // yield the execution, allow the worker to spawn
assertTrue(DefaultExecutor.isThreadPresent) // the worker spawned
job.join()
awaitWorkerShutdown()
}
}
}
}
10 changes: 5 additions & 5 deletions kotlinx-coroutines-core/jvm/test/channels/TickerChannelTest.kt
Expand Up @@ -47,17 +47,17 @@ class TickerChannelTest : TestBase() {
@Test
fun testDelayChannelBackpressure2() = withVirtualTimeSource {
runTest {
val delayChannel = ticker(delayMillis = 1000, initialDelayMillis = 0)
val delayChannel = ticker(delayMillis = 200, initialDelayMillis = 0)
delayChannel.checkNotEmpty()
delayChannel.checkEmpty()

delay(2500)
delay(500)
delayChannel.checkNotEmpty()
delay(510)
delay(110)
delayChannel.checkNotEmpty()
delay(510)
delay(110)
delayChannel.checkEmpty()
delay(510)
delay(110)
delayChannel.checkNotEmpty()
delayChannel.cancel()
}
Expand Down

0 comments on commit 9be76d2

Please sign in to comment.