Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Start fewer threads in TaskRunner #8391

Merged
merged 1 commit into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,29 +63,32 @@ class TaskFaker : Closeable {

/**
* True if this task faker has ever had multiple tasks scheduled to run concurrently. Guarded by
* [taskRunner].
* [TaskRunner.lock].
*/
var isParallel = false

/** Number of calls to [TaskRunner.Backend.execute]. Guarded by [TaskRunner.lock]. */
var executeCallCount = 0

/** Guarded by [taskRunner]. */
var nanoTime = 0L
private set

/** Backlog of tasks to run. Only one task runs at a time. Guarded by [taskRunner]. */
/** Backlog of tasks to run. Only one task runs at a time. Guarded by [TaskRunner.lock]. */
private val serialTaskQueue = ArrayDeque<SerialTask>()

/** The task that's currently executing. Guarded by [taskRunner]. */
/** The task that's currently executing. Guarded by [TaskRunner.lock]. */
private var currentTask: SerialTask = TestThreadSerialTask

/** The coordinator task if it's waiting, and how it will resume. Guarded by [taskRunner]. */
/** The coordinator task if it's waiting, and how it will resume. Guarded by [TaskRunner.lock]. */
private var waitingCoordinatorTask: SerialTask? = null
private var waitingCoordinatorInterrupted = false
private var waitingCoordinatorNotified = false

/** How many times a new task has been started. Guarded by [taskRunner]. */
/** How many times a new task has been started. Guarded by [TaskRunner.lock]. */
private var contextSwitchCount = 0

/** Guarded by [taskRunner]. */
/** Guarded by [TaskRunner.lock]. */
private var activeThreads = 0

/** A task runner that posts tasks to this fake. Tasks won't be executed until requested. */
Expand All @@ -100,6 +103,7 @@ class TaskFaker : Closeable {

val queuedTask = RunnableSerialTask(runnable)
serialTaskQueue += queuedTask
executeCallCount++
isParallel = serialTaskQueue.size > 1
}

Expand Down
33 changes: 29 additions & 4 deletions okhttp/src/main/kotlin/okhttp3/internal/concurrent/TaskRunner.kt
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,17 @@ class TaskRunner(
private var coordinatorWaiting = false
private var coordinatorWakeUpAt = 0L

/**
* When we need a new thread to run tasks, we call [Backend.execute]. A few microseconds later we
* expect a newly-started thread to call [Runnable.run]. We shouldn't request new threads until
* the already-requested ones are in service, otherwise we might create more threads than we need.
*
* We use [executeCallCount] and [runCallCount] to defend against starting more threads than we
* need. Both fields are guarded by [lock].
*/
private var executeCallCount = 0
private var runCallCount = 0

/** Queues with tasks that are currently executing their [TaskQueue.activeTask]. */
private val busyQueues = mutableListOf<TaskQueue>()

Expand All @@ -61,9 +72,14 @@ class TaskRunner(
private val runnable: Runnable =
object : Runnable {
override fun run() {
var incrementedRunCallCount = false
while (true) {
val task =
this@TaskRunner.lock.withLock {
if (!incrementedRunCallCount) {
incrementedRunCallCount = true
runCallCount++
}
awaitTaskToRun()
} ?: return

Expand All @@ -76,7 +92,7 @@ class TaskRunner(
// If the task is crashing start another thread to service the queues.
if (!completedNormally) {
lock.withLock {
backend.execute(this@TaskRunner, this)
startAnotherThread()
}
}
}
Expand All @@ -99,7 +115,7 @@ class TaskRunner(
if (coordinatorWaiting) {
backend.coordinatorNotify(this@TaskRunner)
} else {
backend.execute(this@TaskRunner, runnable)
startAnotherThread()
}
}

Expand Down Expand Up @@ -157,7 +173,7 @@ class TaskRunner(
* Returns an immediately-executable task for the calling thread to execute, sleeping as necessary
* until one is ready. If there are no ready queues, or if other threads have everything under
* control this will return null. If there is more than a single task ready to execute immediately
* this will launch another thread to handle that work.
* this will start another thread to handle that work.
*/
fun awaitTaskToRun(): Task? {
lock.assertHeld()
Expand Down Expand Up @@ -207,7 +223,7 @@ class TaskRunner(

// Also start another thread if there's more work or scheduling to do.
if (multipleReadyTasks || !coordinatorWaiting && readyQueues.isNotEmpty()) {
backend.execute(this@TaskRunner, runnable)
startAnotherThread()
}

return readyTask
Expand Down Expand Up @@ -238,6 +254,15 @@ class TaskRunner(
}
}

/** Start another thread, unless a new thread is already scheduled to start. */
private fun startAnotherThread() {
lock.assertHeld()
if (executeCallCount > runCallCount) return // A thread is still starting.

executeCallCount++
backend.execute(this@TaskRunner, runnable)
}

fun newQueue(): TaskQueue {
val name = lock.withLock { nextQueueName++ }
return TaskQueue(this, "Q$name")
Expand Down
90 changes: 90 additions & 0 deletions okhttp/src/test/java/okhttp3/internal/concurrent/TaskRunnerTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,96 @@ class TaskRunnerTest {
assertThat(idleLatch2).isSameAs(idleLatch1)
}

@Test fun cancelAllWhenEmptyDoesNotStartWorkerThread() {
redQueue.execute("red task", 100.µs) {
error("expected to be canceled")
}
assertThat(taskFaker.executeCallCount).isEqualTo(1)

blueQueue.execute("task", 100.µs) {
error("expected to be canceled")
}
assertThat(taskFaker.executeCallCount).isEqualTo(1)

redQueue.cancelAll()
assertThat(taskFaker.executeCallCount).isEqualTo(1)

blueQueue.cancelAll()
assertThat(taskFaker.executeCallCount).isEqualTo(1)
}

@Test fun noMoreThanOneWorkerThreadWaitingToStartAtATime() {
// Enqueueing the red task starts a thread because the head of the queue changed.
redQueue.execute("red task") {
log += "red:starting@${taskFaker.nanoTime}"
taskFaker.sleep(100.µs)
log += "red:finishing@${taskFaker.nanoTime}"
}
assertThat(taskFaker.executeCallCount).isEqualTo(1)

// Enqueueing the blue task doesn't start a thread because the red one is still starting.
blueQueue.execute("blue task") {
log += "blue:starting@${taskFaker.nanoTime}"
taskFaker.sleep(100.µs)
log += "blue:finishing@${taskFaker.nanoTime}"
}
assertThat(taskFaker.executeCallCount).isEqualTo(1)

// Running the red task starts another thread, so the two can run in parallel.
taskFaker.runNextTask()
assertThat(log).containsExactly("red:starting@0")
assertThat(taskFaker.executeCallCount).isEqualTo(2)

// Next the blue task starts.
taskFaker.runNextTask()
assertThat(log).containsExactly(
"red:starting@0",
"blue:starting@0",
)
assertThat(taskFaker.executeCallCount).isEqualTo(2)

// Advance time until the tasks complete.
taskFaker.advanceUntil(100.µs)
assertThat(log).containsExactly(
"red:starting@0",
"blue:starting@0",
"red:finishing@100000",
"blue:finishing@100000",
)
taskFaker.assertNoMoreTasks()
assertThat(taskFaker.executeCallCount).isEqualTo(2)
}

@Test fun onlyOneCoordinatorWaitingToStartFutureTasks() {
// Enqueueing the red task starts a coordinator thread.
redQueue.execute("red task", 100.µs) {
log += "red:run@${taskFaker.nanoTime}"
}
assertThat(taskFaker.executeCallCount).isEqualTo(1)

// Enqueueing the blue task doesn't need a 2nd coordinator yet.
blueQueue.execute("blue task", 200.µs) {
log += "blue:run@${taskFaker.nanoTime}"
}
assertThat(taskFaker.executeCallCount).isEqualTo(1)

// Nothing to do.
taskFaker.runTasks()
assertThat(log).isEmpty()

// At 100.µs, the coordinator runs the red task and starts a thread for the new coordinator.
taskFaker.advanceUntil(100.µs)
assertThat(log).containsExactly("red:run@100000")
assertThat(taskFaker.executeCallCount).isEqualTo(2)

// At 200.µs, the blue task runs.
taskFaker.advanceUntil(200.µs)
assertThat(log).containsExactly("red:run@100000", "blue:run@200000")
assertThat(taskFaker.executeCallCount).isEqualTo(2)

taskFaker.assertNoMoreTasks()
}

private val Int.µs: Long
get() = this * 1_000L
}