Skip to content

Commit

Permalink
Acquire fewer locks in TaskRunner
Browse files Browse the repository at this point in the history
Previously each run did this:

 - acquire a lock to take a task
 - acquire a lock to finish a task
 - if crashed, acquire a lock to start a new thread

So to run 10 tasks without any crashes, we'd acquire
the lock 20 times.

With this update, we do this:

 - acquire a lock to take the first task
 - acquire a lock to release task N and take task N + 1

So to run 10 tasks without any crashes, we now acquire
the lock 11 times.
  • Loading branch information
swankjesse committed Apr 29, 2024
1 parent d0b6a46 commit 6a757ac
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 43 deletions.
70 changes: 32 additions & 38 deletions okhttp/src/main/kotlin/okhttp3/internal/concurrent/TaskRunner.kt
Original file line number Diff line number Diff line change
Expand Up @@ -72,31 +72,35 @@ class TaskRunner(
private val runnable: Runnable =
object : Runnable {
override fun run() {
var incrementedRunCallCount = false
while (true) {
val task =
this@TaskRunner.lock.withLock {
if (!incrementedRunCallCount) {
incrementedRunCallCount = true
runCallCount++
}
var task: Task =
lock.withLock {
runCallCount++
awaitTaskToRun()
} ?: return

val currentThread = Thread.currentThread()
val oldName = currentThread.name
try {
while (true) {
currentThread.name = task.name
val delayNanos = logger.logElapsed(task, task.queue!!) {
task.runOnce()
}

// A task ran successfully. Update the execution state and take the next task.
task = lock.withLock {
afterRun(task, delayNanos, true)
awaitTaskToRun()
} ?: return

logger.logElapsed(task, task.queue!!) {
var completedNormally = false
try {
runTask(task)
completedNormally = true
} finally {
// If the task is crashing start another thread to service the queues.
if (!completedNormally) {
lock.withLock {
startAnotherThread()
}
}
}
}
} catch (thrown: Throwable) {
// A task failed. Update execution state and re-throw the exception.
lock.withLock {
afterRun(task, -1L, false)
}
throw thrown
} finally {
currentThread.name = oldName
}
}
}
Expand Down Expand Up @@ -130,25 +134,10 @@ class TaskRunner(
busyQueues.add(queue)
}

private fun runTask(task: Task) {
val currentThread = Thread.currentThread()
val oldName = currentThread.name
currentThread.name = task.name

var delayNanos = -1L
try {
delayNanos = task.runOnce()
} finally {
lock.withLock {
afterRun(task, delayNanos)
}
currentThread.name = oldName
}
}

private fun afterRun(
task: Task,
delayNanos: Long,
completedNormally: Boolean,
) {
lock.assertHeld()

Expand All @@ -166,6 +155,11 @@ class TaskRunner(

if (queue.futureTasks.isNotEmpty()) {
readyQueues.add(queue)

// If the task crashed, start another thread to run the next task.
if (!completedNormally) {
startAnotherThread()
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,11 @@ class TaskRunnerTest {
assertThat(testLogHandler.takeAll()).containsExactly(
"FINE: Q10000 scheduled after 100 µs: task",
"FINE: Q10000 starting : task",
"FINE: Q10000 run again after 50 µs: task",
"FINE: Q10000 finished run in 0 µs: task",
"FINE: Q10000 run again after 50 µs: task",
"FINE: Q10000 starting : task",
"FINE: Q10000 run again after 150 µs: task",
"FINE: Q10000 finished run in 0 µs: task",
"FINE: Q10000 run again after 150 µs: task",
"FINE: Q10000 starting : task",
"FINE: Q10000 finished run in 0 µs: task",
)
Expand Down Expand Up @@ -137,8 +137,8 @@ class TaskRunnerTest {
"FINE: Q10000 scheduled after 100 µs: task",
"FINE: Q10000 starting : task",
"FINE: Q10000 scheduled after 50 µs: task",
"FINE: Q10000 already scheduled : task",
"FINE: Q10000 finished run in 0 µs: task",
"FINE: Q10000 already scheduled : task",
"FINE: Q10000 starting : task",
"FINE: Q10000 finished run in 0 µs: task",
)
Expand Down Expand Up @@ -176,8 +176,8 @@ class TaskRunnerTest {
"FINE: Q10000 scheduled after 100 µs: task",
"FINE: Q10000 starting : task",
"FINE: Q10000 scheduled after 200 µs: task",
"FINE: Q10000 run again after 50 µs: task",
"FINE: Q10000 finished run in 0 µs: task",
"FINE: Q10000 run again after 50 µs: task",
"FINE: Q10000 starting : task",
"FINE: Q10000 finished run in 0 µs: task",
)
Expand Down Expand Up @@ -306,8 +306,8 @@ class TaskRunnerTest {
assertThat(testLogHandler.takeAll()).containsExactly(
"FINE: Q10000 scheduled after 100 µs: task",
"FINE: Q10000 starting : task",
"FINE: Q10000 run again after 50 µs: task",
"FINE: Q10000 finished run in 0 µs: task",
"FINE: Q10000 run again after 50 µs: task",
"FINE: Q10000 starting : task",
"FINE: Q10000 finished run in 0 µs: task",
)
Expand Down

0 comments on commit 6a757ac

Please sign in to comment.