Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Centralise more locks #8389

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ import java.util.logging.Level
import java.util.logging.LogManager
import java.util.logging.LogRecord
import java.util.logging.Logger
import kotlin.concurrent.withLock
import okhttp3.internal.buildConnectionPool
import okhttp3.internal.concurrent.TaskRunner
import okhttp3.internal.connection.Locks.withLock
import okhttp3.internal.connection.RealConnectionPool
import okhttp3.internal.http2.Http2
import okhttp3.internal.taskRunnerInternal
Expand Down Expand Up @@ -234,7 +234,7 @@ class OkHttpClientTestRule : BeforeEachCallback, AfterEachCallback {
// a test timeout failure.
val waitTime = (entryTime + 1_000_000_000L - System.nanoTime())
if (!queue.idleLatch().await(waitTime, TimeUnit.NANOSECONDS)) {
TaskRunner.INSTANCE.lock.withLock {
TaskRunner.INSTANCE.withLock {
TaskRunner.INSTANCE.cancelAll()
}
fail<Unit>("Queue still active after 1000 ms")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
@file:Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")

package okhttp3.internal.concurrent

import assertk.assertThat
Expand All @@ -23,9 +25,9 @@ import java.util.concurrent.BlockingQueue
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import java.util.logging.Logger
import kotlin.concurrent.withLock
import okhttp3.OkHttpClient
import okhttp3.TestUtil.threadFactory
import okhttp3.internal.connection.Locks.withLock

/**
* Runs a [TaskRunner] in a controlled environment so that everything is sequential and
Expand Down Expand Up @@ -166,7 +168,7 @@ class TaskFaker : Closeable {
fun advanceUntil(newTime: Long) {
taskRunner.assertThreadDoesntHoldLock()

taskRunner.lock.withLock {
taskRunner.withLock {
check(currentTask == TestThreadSerialTask)
nanoTime = newTime
yieldUntil(ResumePriority.AfterOtherTasks)
Expand All @@ -177,7 +179,7 @@ class TaskFaker : Closeable {
fun assertNoMoreTasks() {
taskRunner.assertThreadDoesntHoldLock()

taskRunner.lock.withLock {
taskRunner.withLock {
assertThat(activeThreads).isEqualTo(0)
}
}
Expand Down Expand Up @@ -207,7 +209,7 @@ class TaskFaker : Closeable {
fun runNextTask() {
taskRunner.assertThreadDoesntHoldLock()

taskRunner.lock.withLock {
taskRunner.withLock {
val contextSwitchCountBefore = contextSwitchCount
yieldUntil(ResumePriority.BeforeOtherTasks) {
contextSwitchCount > contextSwitchCountBefore
Expand All @@ -217,7 +219,7 @@ class TaskFaker : Closeable {

/** Sleep until [durationNanos] elapses. For use by the task threads. */
fun sleep(durationNanos: Long) {
taskRunner.lock.withLock {
taskRunner.withLock {
val sleepUntil = nanoTime + durationNanos
yieldUntil { nanoTime >= sleepUntil }
}
Expand All @@ -229,7 +231,7 @@ class TaskFaker : Closeable {
*/
fun yield() {
taskRunner.assertThreadDoesntHoldLock()
taskRunner.lock.withLock {
taskRunner.withLock {
yieldUntil()
}
}
Expand Down Expand Up @@ -328,7 +330,7 @@ class TaskFaker : Closeable {
runnable.run()
require(currentTask == this) { "unexpected current task: $currentTask" }
} finally {
taskRunner.lock.withLock {
taskRunner.withLock {
activeThreads--
startNextTask()
}
Expand All @@ -354,7 +356,7 @@ class TaskFaker : Closeable {
timeout: Long,
unit: TimeUnit,
): T? {
taskRunner.lock.withLock {
taskRunner.withLock {
val waitUntil = nanoTime + unit.toNanos(timeout)
while (true) {
val result = poll()
Expand All @@ -367,7 +369,7 @@ class TaskFaker : Closeable {
}

override fun put(element: T) {
taskRunner.lock.withLock {
taskRunner.withLock {
delegate.put(element)
editCount++
}
Expand Down
14 changes: 7 additions & 7 deletions okhttp/src/main/kotlin/okhttp3/internal/concurrent/TaskQueue.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ package okhttp3.internal.concurrent
import java.util.concurrent.CountDownLatch
import java.util.concurrent.RejectedExecutionException
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock
import okhttp3.internal.assertNotHeld
import okhttp3.internal.connection.Locks.withLock
import okhttp3.internal.okHttpName

/**
Expand All @@ -32,7 +32,7 @@ class TaskQueue internal constructor(
internal val taskRunner: TaskRunner,
internal val name: String,
) {
val lock: ReentrantLock = ReentrantLock()
internal val lock: ReentrantLock = ReentrantLock()

internal var shutdown = false

Expand All @@ -50,7 +50,7 @@ class TaskQueue internal constructor(
* currently-executing task unless it is also scheduled for future execution.
*/
val scheduledTasks: List<Task>
get() = taskRunner.lock.withLock { futureTasks.toList() }
get() = taskRunner.withLock { futureTasks.toList() }

/**
* Schedules [task] for execution in [delayNanos]. A task may only have one future execution
Expand All @@ -66,7 +66,7 @@ class TaskQueue internal constructor(
task: Task,
delayNanos: Long = 0L,
) {
taskRunner.lock.withLock {
taskRunner.withLock {
if (shutdown) {
if (task.cancelable) {
taskRunner.logger.taskLog(task, this) { "schedule canceled (queue is shutdown)" }
Expand Down Expand Up @@ -126,7 +126,7 @@ class TaskQueue internal constructor(

/** Returns a latch that reaches 0 when the queue is next idle. */
fun idleLatch(): CountDownLatch {
taskRunner.lock.withLock {
taskRunner.withLock {
// If the queue is already idle, that's easy.
if (activeTask == null && futureTasks.isEmpty()) {
return CountDownLatch(0)
Expand Down Expand Up @@ -208,7 +208,7 @@ class TaskQueue internal constructor(
fun cancelAll() {
lock.assertNotHeld()

taskRunner.lock.withLock {
taskRunner.withLock {
if (cancelAllAndDecide()) {
taskRunner.kickCoordinator(this)
}
Expand All @@ -218,7 +218,7 @@ class TaskQueue internal constructor(
fun shutdown() {
lock.assertNotHeld()

taskRunner.lock.withLock {
taskRunner.withLock {
shutdown = true
if (cancelAllAndDecide()) {
taskRunner.kickCoordinator(this)
Expand Down
17 changes: 9 additions & 8 deletions okhttp/src/main/kotlin/okhttp3/internal/concurrent/TaskRunner.kt
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.Condition
import java.util.concurrent.locks.ReentrantLock
import java.util.logging.Logger
import kotlin.concurrent.withLock
import okhttp3.internal.addIfAbsent
import okhttp3.internal.assertHeld
import okhttp3.internal.concurrent.TaskRunner.Companion.INSTANCE
import okhttp3.internal.connection.Locks.newLockCondition
import okhttp3.internal.connection.Locks.withLock
import okhttp3.internal.okHttpName
import okhttp3.internal.threadFactory

Expand All @@ -45,8 +46,8 @@ class TaskRunner(
val backend: Backend,
internal val logger: Logger = TaskRunner.logger,
) {
val lock: ReentrantLock = ReentrantLock()
val condition: Condition = lock.newCondition()
internal val lock: ReentrantLock = ReentrantLock()
val condition: Condition = lock.newLockCondition()

private var nextQueueName = 10000
private var coordinatorWaiting = false
Expand All @@ -63,7 +64,7 @@ class TaskRunner(
override fun run() {
while (true) {
val task =
this@TaskRunner.lock.withLock {
this@TaskRunner.withLock {
awaitTaskToRun()
} ?: return

Expand All @@ -75,7 +76,7 @@ class TaskRunner(
} finally {
// If the task is crashing start another thread to service the queues.
if (!completedNormally) {
lock.withLock {
this@TaskRunner.withLock {
backend.execute(this@TaskRunner, this)
}
}
Expand Down Expand Up @@ -123,7 +124,7 @@ class TaskRunner(
try {
delayNanos = task.runOnce()
} finally {
lock.withLock {
this.withLock {
afterRun(task, delayNanos)
}
currentThread.name = oldName
Expand Down Expand Up @@ -239,7 +240,7 @@ class TaskRunner(
}

fun newQueue(): TaskQueue {
val name = lock.withLock { nextQueueName++ }
val name = this.withLock { nextQueueName++ }
return TaskQueue(this, "Q$name")
}

Expand All @@ -248,7 +249,7 @@ class TaskRunner(
* necessarily track queues that have no tasks scheduled.
*/
fun activeQueues(): List<TaskQueue> {
lock.withLock {
this.withLock {
return busyQueues + readyQueues
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import java.security.cert.X509Certificate
import java.util.concurrent.TimeUnit
import javax.net.ssl.SSLPeerUnverifiedException
import javax.net.ssl.SSLSocket
import kotlin.concurrent.withLock
import okhttp3.CertificatePinner
import okhttp3.ConnectionSpec
import okhttp3.Handshake
Expand Down
46 changes: 39 additions & 7 deletions okhttp/src/main/kotlin/okhttp3/internal/connection/Locks.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@

package okhttp3.internal.connection

import java.util.concurrent.locks.Condition
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock
import kotlin.contracts.ExperimentalContracts
import kotlin.contracts.InvocationKind
import kotlin.contracts.contract
import okhttp3.Dispatcher
import okhttp3.internal.concurrent.TaskQueue
import okhttp3.internal.concurrent.TaskRunner
import okhttp3.internal.http2.Http2Connection
import okhttp3.internal.http2.Http2Stream
import okhttp3.internal.http2.Http2Writer
Expand All @@ -32,34 +36,62 @@ import okhttp3.internal.http2.Http2Writer
internal object Locks {
inline fun <T> Dispatcher.withLock(action: () -> T): T {
contract { callsInPlace(action, InvocationKind.EXACTLY_ONCE) }
return lock.withLock(action)
return lock.runWithLock(action)
}

inline fun <T> RealConnection.withLock(action: () -> T): T {
contract { callsInPlace(action, InvocationKind.EXACTLY_ONCE) }
return lock.withLock(action)
return lock.runWithLock(action)
}

inline fun <T> RealCall.withLock(action: () -> T): T {
contract { callsInPlace(action, InvocationKind.EXACTLY_ONCE) }
return lock.withLock(action)
return lock.runWithLock(action)
}

inline fun <T> Http2Connection.withLock(action: () -> T): T {
contract { callsInPlace(action, InvocationKind.EXACTLY_ONCE) }
return lock.withLock(action)
return lock.runWithLock(action)
}

inline fun <T> Http2Stream.withLock(action: () -> T): T {
contract { callsInPlace(action, InvocationKind.EXACTLY_ONCE) }
return lock.withLock(action)
return lock.runWithLock(action)
}

inline fun <T> Http2Writer.withLock(action: () -> T): T {
inline fun <T> TaskRunner.withLock(action: () -> T): T {
contract { callsInPlace(action, InvocationKind.EXACTLY_ONCE) }
return lock.runWithLock(action)
}

inline fun <T> TaskQueue.withLock(action: () -> T): T {
contract { callsInPlace(action, InvocationKind.EXACTLY_ONCE) }
return lock.runWithLock(action)
}

inline fun <T> Http2Writer.withLock(action: () -> T): T {
// TODO can we assert we don't have the connection lock?

return lock.withLock(action)
contract { callsInPlace(action, InvocationKind.EXACTLY_ONCE) }
return lock.runWithLock(action)
}

/**
* A no cost (inlined) alias to [ReentrantLock#newCondition] for an OkHttp Lock.
* No function on its own but places a central place that all conditions go through to allow
* temporary debugging.
*/
internal fun ReentrantLock.newLockCondition(): Condition {
return this.newCondition()
}

/**
* A no cost (inlined) alias to [ReentrantLock#withLock] for an OkHttp Lock.
* No function on its own but places a central place that all locks go through to allow
* temporary debugging.
*/
inline fun <T> ReentrantLock.runWithLock(action: () -> T): T {
contract { callsInPlace(action, InvocationKind.EXACTLY_ONCE) }
return withLock(action)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import java.util.concurrent.TimeUnit.MILLISECONDS
import java.util.concurrent.locks.ReentrantLock
import javax.net.ssl.SSLPeerUnverifiedException
import javax.net.ssl.SSLSocket
import kotlin.concurrent.withLock
import okhttp3.Address
import okhttp3.Connection
import okhttp3.ConnectionListener
Expand Down Expand Up @@ -335,7 +334,7 @@ class RealConnection(
return http2Connection.isHealthy(nowNs)
}

val idleDurationNs = lock.withLock { nowNs - idleAtNs }
val idleDurationNs = this.withLock { nowNs - idleAtNs }
if (idleDurationNs >= IDLE_CONNECTION_HEALTHY_NS && doExtensiveChecks) {
return socket.isHealthy(source)
}
Expand All @@ -354,7 +353,7 @@ class RealConnection(
connection: Http2Connection,
settings: Settings,
) {
lock.withLock {
this.withLock {
val oldLimit = allocationLimit
allocationLimit = settings.getMaxConcurrentStreams()

Expand Down Expand Up @@ -398,7 +397,7 @@ class RealConnection(
e: IOException?,
) {
var noNewExchangesEvent = false
lock.withLock {
this.withLock {
if (e is StreamResetException) {
when {
e.errorCode == ErrorCode.REFUSED_STREAM -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import okhttp3.internal.EMPTY_HEADERS
import okhttp3.internal.assertThreadDoesntHoldLock
import okhttp3.internal.closeQuietly
import okhttp3.internal.concurrent.TaskRunner
import okhttp3.internal.connection.Locks.newLockCondition
import okhttp3.internal.connection.Locks.withLock
import okhttp3.internal.http2.ErrorCode.REFUSED_STREAM
import okhttp3.internal.http2.Settings.Companion.DEFAULT_INITIAL_WINDOW_SIZE
Expand Down Expand Up @@ -56,7 +57,7 @@ import okio.source
@Suppress("NAME_SHADOWING")
class Http2Connection internal constructor(builder: Builder) : Closeable {
internal val lock: ReentrantLock = ReentrantLock()
internal val condition: Condition = lock.newCondition()
internal val condition: Condition = lock.newLockCondition()

// Internal state of this connection is guarded by 'lock'. No blocking operations may be
// performed while holding this lock!
Expand Down