Skip to content

Commit

Permalink
Name tasks only in TaskRunner (#5526)
Browse files Browse the repository at this point in the history
* Name tasks only in TaskRunner

Naming queues seemed good initially, but the names are mostly
mutually-redundant with task names.

This PR reduces debug information when a queue is still busy
when the test completes. I have a fix for that in a follow-up
change.

* Update okhttp/src/main/java/okhttp3/internal/ws/RealWebSocket.kt

Co-Authored-By: Yuri Schimke <yuri@schimke.ee>
  • Loading branch information
swankjesse and yschimke committed Oct 3, 2019
1 parent 20fcc7d commit aad8a14
Show file tree
Hide file tree
Showing 10 changed files with 43 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ class MockWebServer : ExternalResource(), Closeable {

for (queue in taskRunner.activeQueues()) {
if (!queue.awaitIdle(TimeUnit.MILLISECONDS.toNanos(500L))) {
throw IOException("Gave up waiting for ${queue.owner} to shut down")
throw IOException("Gave up waiting for queue to shut down")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class OkHttpClientTestRule : TestRule {
private fun ensureAllTaskQueuesIdle() {
for (queue in TaskRunner.INSTANCE.activeQueues()) {
assertThat(queue.awaitIdle(TimeUnit.MILLISECONDS.toNanos(500L)))
.withFailMessage("Queue ${queue.owner} still active after 500ms")
.withFailMessage("Queue still active after 500ms")
.isTrue()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ class DiskLruCache internal constructor(
*/
private var nextSequenceNumber: Long = 0

private val cleanupQueue = taskRunner.newQueue(this)
private val cleanupQueue = taskRunner.newQueue()
private val cleanupTask = object : Task("OkHttp DiskLruCache", cancelable = false) {
override fun runOnce(): Long {
synchronized(this@DiskLruCache) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,7 @@ import java.util.concurrent.TimeUnit
* for its work; in practice a set of queues may share a set of threads to save resources.
*/
class TaskQueue internal constructor(
private val taskRunner: TaskRunner,

/**
* An application-level object like a connection pool or HTTP call that this queue works on behalf
* of. This is intended to be useful for testing and debugging only.
*/
val owner: Any
private val taskRunner: TaskRunner
) {
private var shutdown = false

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class TaskRunner(
private val activeQueues = mutableListOf<TaskQueue>()
private val coordinator = Runnable { coordinate() }

fun newQueue(owner: Any) = TaskQueue(this, owner)
fun newQueue() = TaskQueue(this)

/**
* Returns a snapshot of queues that currently have tasks scheduled. The task runner does not
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class RealConnectionPool(
) {
private val keepAliveDurationNs: Long = timeUnit.toNanos(keepAliveDuration)

private val cleanupQueue: TaskQueue = taskRunner.newQueue(this)
private val cleanupQueue: TaskQueue = taskRunner.newQueue()
private val cleanupTask = object : Task("OkHttp ConnectionPool") {
override fun runOnce() = cleanup(System.nanoTime())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,13 @@ class Http2Connection internal constructor(builder: Builder) : Closeable {
private val taskRunner = builder.taskRunner

/** Asynchronously writes frames to the outgoing socket. */
private val writerQueue = taskRunner.newQueue("$connectionName Writer")
private val writerQueue = taskRunner.newQueue()

/** Ensures push promise callbacks events are sent in order per stream. */
private val pushQueue = taskRunner.newQueue("$connectionName Push")
private val pushQueue = taskRunner.newQueue()

/** Notifies the listener of settings changes. */
private val settingsListenerQueue = taskRunner.newQueue("$connectionName Listener")
private val settingsListenerQueue = taskRunner.newQueue()

/** User code to run in response to push promise events. */
private val pushObserver: PushObserver = builder.pushObserver
Expand Down Expand Up @@ -632,7 +632,7 @@ class Http2Connection internal constructor(builder: Builder) : Closeable {

// Use a different task queue for each stream because they should be handled in parallel.
val taskName = "OkHttp $connectionName stream $streamId"
taskRunner.newQueue(taskName).schedule(object : Task(taskName, cancelable = false) {
taskRunner.newQueue().schedule(object : Task(taskName, cancelable = false) {
override fun runOnce(): Long {
try {
listener.onStream(newStream)
Expand Down
51 changes: 29 additions & 22 deletions okhttp/src/main/java/okhttp3/internal/ws/RealWebSocket.kt
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class RealWebSocket(
private var call: Call? = null

/** This task processes the outgoing queues. Call [runWriter] to after enqueueing. */
private val writerTask: Task
private var writerTask: Task? = null

/** Null until this web socket is connected. Only accessed by the reader thread. */
private var reader: WebSocketReader? = null
Expand All @@ -74,7 +74,10 @@ class RealWebSocket(
private var writer: WebSocketWriter? = null

/** Used for writes, pings, and close timeouts. */
private var taskQueue = taskRunner.newQueue("OkHttp WebSocket ${originalRequest.url.redact()}")
private var taskQueue = taskRunner.newQueue()

/** Names this web socket for observability and debugging. */
private var name: String? = null

/**
* The streams held by this web socket. This is non-null until all incoming messages have been
Expand Down Expand Up @@ -122,16 +125,6 @@ class RealWebSocket(
}

this.key = ByteArray(16).apply { random.nextBytes(this) }.toByteString().base64()
this.writerTask = object : Task("${taskQueue.owner} Writer") {
override fun runOnce(): Long {
try {
if (writeOneFrame()) return 0L
} catch (e: IOException) {
failWebSocket(e, null)
}
return -1L
}
}
}

override fun request(): Request = originalRequest
Expand Down Expand Up @@ -219,8 +212,10 @@ class RealWebSocket(
@Throws(IOException::class)
fun initReaderAndWriter(name: String, streams: Streams) {
synchronized(this) {
this.name = name
this.streams = streams
this.writer = WebSocketWriter(streams.client, streams.sink, random)
this.writerTask = WriterTask()
if (pingIntervalMillis != 0L) {
val pingIntervalNanos = MILLISECONDS.toNanos(pingIntervalMillis)
taskQueue.schedule(PingTask(pingIntervalNanos), pingIntervalNanos)
Expand Down Expand Up @@ -395,7 +390,7 @@ class RealWebSocket(

private fun runWriter() {
assert(Thread.holdsLock(this))
taskQueue.trySchedule(writerTask)
taskQueue.trySchedule(writerTask!!)
}

/**
Expand Down Expand Up @@ -440,7 +435,7 @@ class RealWebSocket(
// When we request a graceful close also schedule a cancel of the web socket.
val cancelAfterCloseNanos =
MILLISECONDS.toNanos((messageOrClose as Close).cancelAfterCloseMillis)
taskQueue.schedule(CancelRunnable(), cancelAfterCloseNanos)
taskQueue.schedule(CancelTask(), cancelAfterCloseNanos)
}
} else if (messageOrClose == null) {
return false // The queue is exhausted.
Expand Down Expand Up @@ -478,13 +473,6 @@ class RealWebSocket(
}
}

private inner class PingTask(val delayNanos: Long) : Task("${taskQueue.owner} Ping") {
override fun runOnce(): Long {
writePingFrame()
return delayNanos
}
}

internal fun writePingFrame() {
val writer: WebSocketWriter?
val failedPing: Int
Expand Down Expand Up @@ -543,13 +531,32 @@ class RealWebSocket(
val sink: BufferedSink
) : Closeable

internal inner class CancelRunnable : Task("${taskQueue.owner} Cancel") {

private inner class PingTask(val delayNanos: Long) : Task("$name Ping") {
override fun runOnce(): Long {
writePingFrame()
return delayNanos
}
}

private inner class CancelTask : Task("$name Cancel") {
override fun runOnce(): Long {
cancel()
return -1L
}
}

private inner class WriterTask : Task("$name Writer") {
override fun runOnce(): Long {
try {
if (writeOneFrame()) return 0L
} catch (e: IOException) {
failWebSocket(e, null)
}
return -1L
}
}

companion object {
private val ONLY_HTTP1 = listOf(Protocol.HTTP_1_1)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import java.util.concurrent.TimeUnit
class TaskRunnerRealBackendTest {
private val backend = TaskRunner.RealBackend()
private val taskRunner = TaskRunner(backend)
private val queue = taskRunner.newQueue("queue")
private val queue = taskRunner.newQueue()
private val log = LinkedBlockingDeque<String>()

@Test fun test() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ class TaskRunnerTest {
private val taskFaker = TaskFaker()
private val taskRunner = taskFaker.taskRunner
private val log = mutableListOf<String>()
private val redQueue = taskRunner.newQueue("red")
private val blueQueue = taskRunner.newQueue("blue")
private val greenQueue = taskRunner.newQueue("green")
private val redQueue = taskRunner.newQueue()
private val blueQueue = taskRunner.newQueue()
private val greenQueue = taskRunner.newQueue()

@Test fun executeDelayed() {
redQueue.schedule(object : Task("task") {
Expand Down

0 comments on commit aad8a14

Please sign in to comment.