Skip to content

Commit

Permalink
Start fewer threads in TaskRunner (#8391)
Browse files Browse the repository at this point in the history
We've got a race where we'll start a thread when we need
one, even if we've already started a thread. This changes
TaskRunner's behavior to never add a thread if we're
still waiting for a recently-added one to start running.

This is intended to reduce the number of threads contenting
for the TaskRunner lock as reported in this issue:

#8388
  • Loading branch information
swankjesse committed Apr 29, 2024
1 parent aede7c5 commit d0b6a46
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 10 deletions.
Expand Up @@ -63,29 +63,32 @@ class TaskFaker : Closeable {

/**
* True if this task faker has ever had multiple tasks scheduled to run concurrently. Guarded by
* [taskRunner].
* [TaskRunner.lock].
*/
var isParallel = false

/** Number of calls to [TaskRunner.Backend.execute]. Guarded by [TaskRunner.lock]. */
var executeCallCount = 0

/** Guarded by [taskRunner]. */
var nanoTime = 0L
private set

/** Backlog of tasks to run. Only one task runs at a time. Guarded by [taskRunner]. */
/** Backlog of tasks to run. Only one task runs at a time. Guarded by [TaskRunner.lock]. */
private val serialTaskQueue = ArrayDeque<SerialTask>()

/** The task that's currently executing. Guarded by [taskRunner]. */
/** The task that's currently executing. Guarded by [TaskRunner.lock]. */
private var currentTask: SerialTask = TestThreadSerialTask

/** The coordinator task if it's waiting, and how it will resume. Guarded by [taskRunner]. */
/** The coordinator task if it's waiting, and how it will resume. Guarded by [TaskRunner.lock]. */
private var waitingCoordinatorTask: SerialTask? = null
private var waitingCoordinatorInterrupted = false
private var waitingCoordinatorNotified = false

/** How many times a new task has been started. Guarded by [taskRunner]. */
/** How many times a new task has been started. Guarded by [TaskRunner.lock]. */
private var contextSwitchCount = 0

/** Guarded by [taskRunner]. */
/** Guarded by [TaskRunner.lock]. */
private var activeThreads = 0

/** A task runner that posts tasks to this fake. Tasks won't be executed until requested. */
Expand All @@ -100,6 +103,7 @@ class TaskFaker : Closeable {

val queuedTask = RunnableSerialTask(runnable)
serialTaskQueue += queuedTask
executeCallCount++
isParallel = serialTaskQueue.size > 1
}

Expand Down
33 changes: 29 additions & 4 deletions okhttp/src/main/kotlin/okhttp3/internal/concurrent/TaskRunner.kt
Expand Up @@ -52,6 +52,17 @@ class TaskRunner(
private var coordinatorWaiting = false
private var coordinatorWakeUpAt = 0L

/**
* When we need a new thread to run tasks, we call [Backend.execute]. A few microseconds later we
* expect a newly-started thread to call [Runnable.run]. We shouldn't request new threads until
* the already-requested ones are in service, otherwise we might create more threads than we need.
*
* We use [executeCallCount] and [runCallCount] to defend against starting more threads than we
* need. Both fields are guarded by [lock].
*/
private var executeCallCount = 0
private var runCallCount = 0

/** Queues with tasks that are currently executing their [TaskQueue.activeTask]. */
private val busyQueues = mutableListOf<TaskQueue>()

Expand All @@ -61,9 +72,14 @@ class TaskRunner(
private val runnable: Runnable =
object : Runnable {
override fun run() {
var incrementedRunCallCount = false
while (true) {
val task =
this@TaskRunner.lock.withLock {
if (!incrementedRunCallCount) {
incrementedRunCallCount = true
runCallCount++
}
awaitTaskToRun()
} ?: return

Expand All @@ -76,7 +92,7 @@ class TaskRunner(
// If the task is crashing start another thread to service the queues.
if (!completedNormally) {
lock.withLock {
backend.execute(this@TaskRunner, this)
startAnotherThread()
}
}
}
Expand All @@ -99,7 +115,7 @@ class TaskRunner(
if (coordinatorWaiting) {
backend.coordinatorNotify(this@TaskRunner)
} else {
backend.execute(this@TaskRunner, runnable)
startAnotherThread()
}
}

Expand Down Expand Up @@ -157,7 +173,7 @@ class TaskRunner(
* Returns an immediately-executable task for the calling thread to execute, sleeping as necessary
* until one is ready. If there are no ready queues, or if other threads have everything under
* control this will return null. If there is more than a single task ready to execute immediately
* this will launch another thread to handle that work.
* this will start another thread to handle that work.
*/
fun awaitTaskToRun(): Task? {
lock.assertHeld()
Expand Down Expand Up @@ -207,7 +223,7 @@ class TaskRunner(

// Also start another thread if there's more work or scheduling to do.
if (multipleReadyTasks || !coordinatorWaiting && readyQueues.isNotEmpty()) {
backend.execute(this@TaskRunner, runnable)
startAnotherThread()
}

return readyTask
Expand Down Expand Up @@ -238,6 +254,15 @@ class TaskRunner(
}
}

/** Start another thread, unless a new thread is already scheduled to start. */
private fun startAnotherThread() {
lock.assertHeld()
if (executeCallCount > runCallCount) return // A thread is still starting.

executeCallCount++
backend.execute(this@TaskRunner, runnable)
}

fun newQueue(): TaskQueue {
val name = lock.withLock { nextQueueName++ }
return TaskQueue(this, "Q$name")
Expand Down
90 changes: 90 additions & 0 deletions okhttp/src/test/java/okhttp3/internal/concurrent/TaskRunnerTest.kt
Expand Up @@ -680,6 +680,96 @@ class TaskRunnerTest {
assertThat(idleLatch2).isSameAs(idleLatch1)
}

@Test fun cancelAllWhenEmptyDoesNotStartWorkerThread() {
redQueue.execute("red task", 100.µs) {
error("expected to be canceled")
}
assertThat(taskFaker.executeCallCount).isEqualTo(1)

blueQueue.execute("task", 100.µs) {
error("expected to be canceled")
}
assertThat(taskFaker.executeCallCount).isEqualTo(1)

redQueue.cancelAll()
assertThat(taskFaker.executeCallCount).isEqualTo(1)

blueQueue.cancelAll()
assertThat(taskFaker.executeCallCount).isEqualTo(1)
}

@Test fun noMoreThanOneWorkerThreadWaitingToStartAtATime() {
// Enqueueing the red task starts a thread because the head of the queue changed.
redQueue.execute("red task") {
log += "red:starting@${taskFaker.nanoTime}"
taskFaker.sleep(100.µs)
log += "red:finishing@${taskFaker.nanoTime}"
}
assertThat(taskFaker.executeCallCount).isEqualTo(1)

// Enqueueing the blue task doesn't start a thread because the red one is still starting.
blueQueue.execute("blue task") {
log += "blue:starting@${taskFaker.nanoTime}"
taskFaker.sleep(100.µs)
log += "blue:finishing@${taskFaker.nanoTime}"
}
assertThat(taskFaker.executeCallCount).isEqualTo(1)

// Running the red task starts another thread, so the two can run in parallel.
taskFaker.runNextTask()
assertThat(log).containsExactly("red:starting@0")
assertThat(taskFaker.executeCallCount).isEqualTo(2)

// Next the blue task starts.
taskFaker.runNextTask()
assertThat(log).containsExactly(
"red:starting@0",
"blue:starting@0",
)
assertThat(taskFaker.executeCallCount).isEqualTo(2)

// Advance time until the tasks complete.
taskFaker.advanceUntil(100.µs)
assertThat(log).containsExactly(
"red:starting@0",
"blue:starting@0",
"red:finishing@100000",
"blue:finishing@100000",
)
taskFaker.assertNoMoreTasks()
assertThat(taskFaker.executeCallCount).isEqualTo(2)
}

@Test fun onlyOneCoordinatorWaitingToStartFutureTasks() {
// Enqueueing the red task starts a coordinator thread.
redQueue.execute("red task", 100.µs) {
log += "red:run@${taskFaker.nanoTime}"
}
assertThat(taskFaker.executeCallCount).isEqualTo(1)

// Enqueueing the blue task doesn't need a 2nd coordinator yet.
blueQueue.execute("blue task", 200.µs) {
log += "blue:run@${taskFaker.nanoTime}"
}
assertThat(taskFaker.executeCallCount).isEqualTo(1)

// Nothing to do.
taskFaker.runTasks()
assertThat(log).isEmpty()

// At 100.µs, the coordinator runs the red task and starts a thread for the new coordinator.
taskFaker.advanceUntil(100.µs)
assertThat(log).containsExactly("red:run@100000")
assertThat(taskFaker.executeCallCount).isEqualTo(2)

// At 200.µs, the blue task runs.
taskFaker.advanceUntil(200.µs)
assertThat(log).containsExactly("red:run@100000", "blue:run@200000")
assertThat(taskFaker.executeCallCount).isEqualTo(2)

taskFaker.assertNoMoreTasks()
}

private val Int.µs: Long
get() = this * 1_000L
}

0 comments on commit d0b6a46

Please sign in to comment.