From d0b6a464d8f660941a47b40fb8ed67508550921c Mon Sep 17 00:00:00 2001 From: Jesse Wilson Date: Sun, 28 Apr 2024 20:33:04 -0400 Subject: [PATCH] Start fewer threads in TaskRunner (#8391) We've got a race where we'll start a thread when we need one, even if we've already started a thread. This changes TaskRunner's behavior to never add a thread if we're still waiting for a recently-added one to start running. This is intended to reduce the number of threads contenting for the TaskRunner lock as reported in this issue: https://github.com/square/okhttp/issues/8388 --- .../okhttp3/internal/concurrent/TaskFaker.kt | 16 ++-- .../okhttp3/internal/concurrent/TaskRunner.kt | 33 ++++++- .../internal/concurrent/TaskRunnerTest.kt | 90 +++++++++++++++++++ 3 files changed, 129 insertions(+), 10 deletions(-) diff --git a/okhttp-testing-support/src/main/kotlin/okhttp3/internal/concurrent/TaskFaker.kt b/okhttp-testing-support/src/main/kotlin/okhttp3/internal/concurrent/TaskFaker.kt index 88dfd7936742..1874cd9ca8e0 100644 --- a/okhttp-testing-support/src/main/kotlin/okhttp3/internal/concurrent/TaskFaker.kt +++ b/okhttp-testing-support/src/main/kotlin/okhttp3/internal/concurrent/TaskFaker.kt @@ -63,29 +63,32 @@ class TaskFaker : Closeable { /** * True if this task faker has ever had multiple tasks scheduled to run concurrently. Guarded by - * [taskRunner]. + * [TaskRunner.lock]. */ var isParallel = false + /** Number of calls to [TaskRunner.Backend.execute]. Guarded by [TaskRunner.lock]. */ + var executeCallCount = 0 + /** Guarded by [taskRunner]. */ var nanoTime = 0L private set - /** Backlog of tasks to run. Only one task runs at a time. Guarded by [taskRunner]. */ + /** Backlog of tasks to run. Only one task runs at a time. Guarded by [TaskRunner.lock]. */ private val serialTaskQueue = ArrayDeque() - /** The task that's currently executing. Guarded by [taskRunner]. */ + /** The task that's currently executing. Guarded by [TaskRunner.lock]. */ private var currentTask: SerialTask = TestThreadSerialTask - /** The coordinator task if it's waiting, and how it will resume. Guarded by [taskRunner]. */ + /** The coordinator task if it's waiting, and how it will resume. Guarded by [TaskRunner.lock]. */ private var waitingCoordinatorTask: SerialTask? = null private var waitingCoordinatorInterrupted = false private var waitingCoordinatorNotified = false - /** How many times a new task has been started. Guarded by [taskRunner]. */ + /** How many times a new task has been started. Guarded by [TaskRunner.lock]. */ private var contextSwitchCount = 0 - /** Guarded by [taskRunner]. */ + /** Guarded by [TaskRunner.lock]. */ private var activeThreads = 0 /** A task runner that posts tasks to this fake. Tasks won't be executed until requested. */ @@ -100,6 +103,7 @@ class TaskFaker : Closeable { val queuedTask = RunnableSerialTask(runnable) serialTaskQueue += queuedTask + executeCallCount++ isParallel = serialTaskQueue.size > 1 } diff --git a/okhttp/src/main/kotlin/okhttp3/internal/concurrent/TaskRunner.kt b/okhttp/src/main/kotlin/okhttp3/internal/concurrent/TaskRunner.kt index 6acc7b24e774..60143b3c40f8 100644 --- a/okhttp/src/main/kotlin/okhttp3/internal/concurrent/TaskRunner.kt +++ b/okhttp/src/main/kotlin/okhttp3/internal/concurrent/TaskRunner.kt @@ -52,6 +52,17 @@ class TaskRunner( private var coordinatorWaiting = false private var coordinatorWakeUpAt = 0L + /** + * When we need a new thread to run tasks, we call [Backend.execute]. A few microseconds later we + * expect a newly-started thread to call [Runnable.run]. We shouldn't request new threads until + * the already-requested ones are in service, otherwise we might create more threads than we need. + * + * We use [executeCallCount] and [runCallCount] to defend against starting more threads than we + * need. Both fields are guarded by [lock]. + */ + private var executeCallCount = 0 + private var runCallCount = 0 + /** Queues with tasks that are currently executing their [TaskQueue.activeTask]. */ private val busyQueues = mutableListOf() @@ -61,9 +72,14 @@ class TaskRunner( private val runnable: Runnable = object : Runnable { override fun run() { + var incrementedRunCallCount = false while (true) { val task = this@TaskRunner.lock.withLock { + if (!incrementedRunCallCount) { + incrementedRunCallCount = true + runCallCount++ + } awaitTaskToRun() } ?: return @@ -76,7 +92,7 @@ class TaskRunner( // If the task is crashing start another thread to service the queues. if (!completedNormally) { lock.withLock { - backend.execute(this@TaskRunner, this) + startAnotherThread() } } } @@ -99,7 +115,7 @@ class TaskRunner( if (coordinatorWaiting) { backend.coordinatorNotify(this@TaskRunner) } else { - backend.execute(this@TaskRunner, runnable) + startAnotherThread() } } @@ -157,7 +173,7 @@ class TaskRunner( * Returns an immediately-executable task for the calling thread to execute, sleeping as necessary * until one is ready. If there are no ready queues, or if other threads have everything under * control this will return null. If there is more than a single task ready to execute immediately - * this will launch another thread to handle that work. + * this will start another thread to handle that work. */ fun awaitTaskToRun(): Task? { lock.assertHeld() @@ -207,7 +223,7 @@ class TaskRunner( // Also start another thread if there's more work or scheduling to do. if (multipleReadyTasks || !coordinatorWaiting && readyQueues.isNotEmpty()) { - backend.execute(this@TaskRunner, runnable) + startAnotherThread() } return readyTask @@ -238,6 +254,15 @@ class TaskRunner( } } + /** Start another thread, unless a new thread is already scheduled to start. */ + private fun startAnotherThread() { + lock.assertHeld() + if (executeCallCount > runCallCount) return // A thread is still starting. + + executeCallCount++ + backend.execute(this@TaskRunner, runnable) + } + fun newQueue(): TaskQueue { val name = lock.withLock { nextQueueName++ } return TaskQueue(this, "Q$name") diff --git a/okhttp/src/test/java/okhttp3/internal/concurrent/TaskRunnerTest.kt b/okhttp/src/test/java/okhttp3/internal/concurrent/TaskRunnerTest.kt index 5f343b5632c7..36b0c4c72b00 100644 --- a/okhttp/src/test/java/okhttp3/internal/concurrent/TaskRunnerTest.kt +++ b/okhttp/src/test/java/okhttp3/internal/concurrent/TaskRunnerTest.kt @@ -680,6 +680,96 @@ class TaskRunnerTest { assertThat(idleLatch2).isSameAs(idleLatch1) } + @Test fun cancelAllWhenEmptyDoesNotStartWorkerThread() { + redQueue.execute("red task", 100.µs) { + error("expected to be canceled") + } + assertThat(taskFaker.executeCallCount).isEqualTo(1) + + blueQueue.execute("task", 100.µs) { + error("expected to be canceled") + } + assertThat(taskFaker.executeCallCount).isEqualTo(1) + + redQueue.cancelAll() + assertThat(taskFaker.executeCallCount).isEqualTo(1) + + blueQueue.cancelAll() + assertThat(taskFaker.executeCallCount).isEqualTo(1) + } + + @Test fun noMoreThanOneWorkerThreadWaitingToStartAtATime() { + // Enqueueing the red task starts a thread because the head of the queue changed. + redQueue.execute("red task") { + log += "red:starting@${taskFaker.nanoTime}" + taskFaker.sleep(100.µs) + log += "red:finishing@${taskFaker.nanoTime}" + } + assertThat(taskFaker.executeCallCount).isEqualTo(1) + + // Enqueueing the blue task doesn't start a thread because the red one is still starting. + blueQueue.execute("blue task") { + log += "blue:starting@${taskFaker.nanoTime}" + taskFaker.sleep(100.µs) + log += "blue:finishing@${taskFaker.nanoTime}" + } + assertThat(taskFaker.executeCallCount).isEqualTo(1) + + // Running the red task starts another thread, so the two can run in parallel. + taskFaker.runNextTask() + assertThat(log).containsExactly("red:starting@0") + assertThat(taskFaker.executeCallCount).isEqualTo(2) + + // Next the blue task starts. + taskFaker.runNextTask() + assertThat(log).containsExactly( + "red:starting@0", + "blue:starting@0", + ) + assertThat(taskFaker.executeCallCount).isEqualTo(2) + + // Advance time until the tasks complete. + taskFaker.advanceUntil(100.µs) + assertThat(log).containsExactly( + "red:starting@0", + "blue:starting@0", + "red:finishing@100000", + "blue:finishing@100000", + ) + taskFaker.assertNoMoreTasks() + assertThat(taskFaker.executeCallCount).isEqualTo(2) + } + + @Test fun onlyOneCoordinatorWaitingToStartFutureTasks() { + // Enqueueing the red task starts a coordinator thread. + redQueue.execute("red task", 100.µs) { + log += "red:run@${taskFaker.nanoTime}" + } + assertThat(taskFaker.executeCallCount).isEqualTo(1) + + // Enqueueing the blue task doesn't need a 2nd coordinator yet. + blueQueue.execute("blue task", 200.µs) { + log += "blue:run@${taskFaker.nanoTime}" + } + assertThat(taskFaker.executeCallCount).isEqualTo(1) + + // Nothing to do. + taskFaker.runTasks() + assertThat(log).isEmpty() + + // At 100.µs, the coordinator runs the red task and starts a thread for the new coordinator. + taskFaker.advanceUntil(100.µs) + assertThat(log).containsExactly("red:run@100000") + assertThat(taskFaker.executeCallCount).isEqualTo(2) + + // At 200.µs, the blue task runs. + taskFaker.advanceUntil(200.µs) + assertThat(log).containsExactly("red:run@100000", "blue:run@200000") + assertThat(taskFaker.executeCallCount).isEqualTo(2) + + taskFaker.assertNoMoreTasks() + } + private val Int.µs: Long get() = this * 1_000L }