From d2a47740cc53be8bd351f1e6be64e0a07b733d52 Mon Sep 17 00:00:00 2001 From: Yuri Schimke Date: Fri, 30 Dec 2022 22:03:50 +1000 Subject: [PATCH 01/15] [Experiment] Loom support. --- .../commonMain/kotlin/okio/-CommonPlatform.kt | 6 +- okio/src/commonMain/kotlin/okio/FileHandle.kt | 26 ++++--- okio/src/jvmMain/kotlin/okio/-JvmPlatform.kt | 11 ++- okio/src/jvmMain/kotlin/okio/AsyncTimeout.kt | 22 +++--- okio/src/jvmMain/kotlin/okio/JvmFileHandle.kt | 6 ++ okio/src/jvmMain/kotlin/okio/Pipe.kt | 47 +++++++----- okio/src/jvmMain/kotlin/okio/Throttler.kt | 17 +++-- okio/src/jvmMain/kotlin/okio/Timeout.kt | 75 +++++++++++++++++++ .../nonJvmMain/kotlin/okio/-NonJvmPlatform.kt | 10 ++- 9 files changed, 166 insertions(+), 54 deletions(-) diff --git a/okio/src/commonMain/kotlin/okio/-CommonPlatform.kt b/okio/src/commonMain/kotlin/okio/-CommonPlatform.kt index 72ccec4161..8a9f1176fc 100644 --- a/okio/src/commonMain/kotlin/okio/-CommonPlatform.kt +++ b/okio/src/commonMain/kotlin/okio/-CommonPlatform.kt @@ -23,7 +23,11 @@ internal expect fun String.asUtf8ToByteArray(): ByteArray // TODO make internal https://youtrack.jetbrains.com/issue/KT-37316 expect class ArrayIndexOutOfBoundsException(message: String?) : IndexOutOfBoundsException -internal expect inline fun synchronized(lock: Any, block: () -> R): R +expect class ALock + +internal expect fun newLock(): ALock + +internal expect inline fun synchronized(lock: ALock, block: () -> R): R expect open class IOException(message: String?, cause: Throwable?) : Exception { constructor(message: String? = null) diff --git a/okio/src/commonMain/kotlin/okio/FileHandle.kt b/okio/src/commonMain/kotlin/okio/FileHandle.kt index 606b74cb23..f35212116f 100644 --- a/okio/src/commonMain/kotlin/okio/FileHandle.kt +++ b/okio/src/commonMain/kotlin/okio/FileHandle.kt @@ -52,6 +52,8 @@ abstract class FileHandle( */ private var openStreamCount = 0 + val aLock: ALock = newLock() + /** * Reads at least 1, and up to [byteCount] bytes from this starting at [fileOffset] and copies * them to [array] at [arrayOffset]. Returns the number of bytes read, or -1 if [fileOffset] @@ -64,7 +66,7 @@ abstract class FileHandle( arrayOffset: Int, byteCount: Int ): Int { - synchronized(this) { + synchronized(aLock) { check(!closed) { "closed" } } return protectedRead(fileOffset, array, arrayOffset, byteCount) @@ -76,7 +78,7 @@ abstract class FileHandle( */ @Throws(IOException::class) fun read(fileOffset: Long, sink: Buffer, byteCount: Long): Long { - synchronized(this) { + synchronized(aLock) { check(!closed) { "closed" } } return readNoCloseCheck(fileOffset, sink, byteCount) @@ -87,7 +89,7 @@ abstract class FileHandle( */ @Throws(IOException::class) fun size(): Long { - synchronized(this) { + synchronized(aLock) { check(!closed) { "closed" } } return protectedSize() @@ -100,7 +102,7 @@ abstract class FileHandle( @Throws(IOException::class) fun resize(size: Long) { check(readWrite) { "file handle is read-only" } - synchronized(this) { + synchronized(aLock) { check(!closed) { "closed" } } return protectedResize(size) @@ -114,7 +116,7 @@ abstract class FileHandle( byteCount: Int ) { check(readWrite) { "file handle is read-only" } - synchronized(this) { + synchronized(aLock) { check(!closed) { "closed" } } return protectedWrite(fileOffset, array, arrayOffset, byteCount) @@ -124,7 +126,7 @@ abstract class FileHandle( @Throws(IOException::class) fun write(fileOffset: Long, source: Buffer, byteCount: Long) { check(readWrite) { "file handle is read-only" } - synchronized(this) { + synchronized(aLock) { check(!closed) { "closed" } } writeNoCloseCheck(fileOffset, source, byteCount) @@ -134,7 +136,7 @@ abstract class FileHandle( @Throws(IOException::class) fun flush() { check(readWrite) { "file handle is read-only" } - synchronized(this) { + synchronized(aLock) { check(!closed) { "closed" } } return protectedFlush() @@ -146,7 +148,7 @@ abstract class FileHandle( */ @Throws(IOException::class) fun source(fileOffset: Long = 0L): Source { - synchronized(this) { + synchronized(aLock) { check(!closed) { "closed" } openStreamCount++ } @@ -216,7 +218,7 @@ abstract class FileHandle( @Throws(IOException::class) fun sink(fileOffset: Long = 0L): Sink { check(readWrite) { "file handle is read-only" } - synchronized(this) { + synchronized(aLock) { check(!closed) { "closed" } openStreamCount++ } @@ -282,7 +284,7 @@ abstract class FileHandle( @Throws(IOException::class) final override fun close() { - synchronized(this) { + synchronized(aLock) { if (closed) return@close closed = true if (openStreamCount != 0) return@close @@ -405,7 +407,7 @@ abstract class FileHandle( override fun close() { if (closed) return closed = true - synchronized(fileHandle) { + synchronized(fileHandle.aLock) { fileHandle.openStreamCount-- if (fileHandle.openStreamCount != 0 || !fileHandle.closed) return@close } @@ -431,7 +433,7 @@ abstract class FileHandle( override fun close() { if (closed) return closed = true - synchronized(fileHandle) { + synchronized(fileHandle.aLock) { fileHandle.openStreamCount-- if (fileHandle.openStreamCount != 0 || !fileHandle.closed) return@close } diff --git a/okio/src/jvmMain/kotlin/okio/-JvmPlatform.kt b/okio/src/jvmMain/kotlin/okio/-JvmPlatform.kt index b0a2b6498c..33b84bdbaa 100644 --- a/okio/src/jvmMain/kotlin/okio/-JvmPlatform.kt +++ b/okio/src/jvmMain/kotlin/okio/-JvmPlatform.kt @@ -16,6 +16,9 @@ package okio +import java.util.concurrent.locks.ReentrantLock +import kotlin.concurrent.withLock + internal actual fun ByteArray.toUtf8String(): String = String(this, Charsets.UTF_8) internal actual fun String.asUtf8ToByteArray(): ByteArray = toByteArray(Charsets.UTF_8) @@ -23,9 +26,11 @@ internal actual fun String.asUtf8ToByteArray(): ByteArray = toByteArray(Charsets // TODO remove if https://youtrack.jetbrains.com/issue/KT-20641 provides a better solution actual typealias ArrayIndexOutOfBoundsException = java.lang.ArrayIndexOutOfBoundsException -internal actual inline fun synchronized(lock: Any, block: () -> R): R { - return kotlin.synchronized(lock, block) -} +actual typealias ALock = ReentrantLock + +internal actual fun newLock(): ALock = ReentrantLock() + +internal actual inline fun synchronized(lock: ALock, block: () -> R): R = lock.withLock(block) actual typealias IOException = java.io.IOException diff --git a/okio/src/jvmMain/kotlin/okio/AsyncTimeout.kt b/okio/src/jvmMain/kotlin/okio/AsyncTimeout.kt index 2077832713..87d87d774e 100644 --- a/okio/src/jvmMain/kotlin/okio/AsyncTimeout.kt +++ b/okio/src/jvmMain/kotlin/okio/AsyncTimeout.kt @@ -18,6 +18,9 @@ package okio import java.io.IOException import java.io.InterruptedIOException import java.util.concurrent.TimeUnit +import java.util.concurrent.locks.Condition +import java.util.concurrent.locks.ReentrantLock +import kotlin.concurrent.withLock /** * This timeout uses a background thread to take action exactly when the timeout occurs. Use this to @@ -179,7 +182,7 @@ open class AsyncTimeout : Timeout() { while (true) { try { var timedOut: AsyncTimeout? = null - synchronized(AsyncTimeout::class.java) { + AsyncTimeout.lock.withLock { timedOut = awaitTimeout() // The queue is completely empty. Let this thread exit and let another watchdog thread @@ -199,6 +202,9 @@ open class AsyncTimeout : Timeout() { } companion object { + val lock: ReentrantLock = ReentrantLock() + val condition: Condition = lock.newCondition() + /** * Don't write more than 64 KiB of data at a time, give or take a segment. Otherwise slow * connections may suffer timeouts even when they're making (slow) progress. Without this, @@ -221,7 +227,7 @@ open class AsyncTimeout : Timeout() { private var head: AsyncTimeout? = null private fun scheduleTimeout(node: AsyncTimeout, timeoutNanos: Long, hasDeadline: Boolean) { - synchronized(AsyncTimeout::class.java) { + AsyncTimeout.lock.withLock { check(!node.inQueue) { "Unbalanced enter/exit" } node.inQueue = true @@ -253,7 +259,7 @@ open class AsyncTimeout : Timeout() { prev.next = node if (prev === head) { // Wake up the watchdog when inserting at the front. - (AsyncTimeout::class.java as Object).notify() + condition.signal() } break } @@ -264,7 +270,7 @@ open class AsyncTimeout : Timeout() { /** Returns true if the timeout occurred. */ private fun cancelScheduledTimeout(node: AsyncTimeout): Boolean { - synchronized(AsyncTimeout::class.java) { + AsyncTimeout.lock.withLock { if (!node.inQueue) return false node.inQueue = false @@ -299,7 +305,7 @@ open class AsyncTimeout : Timeout() { // The queue is empty. Wait until either something is enqueued or the idle timeout elapses. if (node == null) { val startNanos = System.nanoTime() - (AsyncTimeout::class.java as Object).wait(IDLE_TIMEOUT_MILLIS) + condition.await(IDLE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS) return if (head!!.next == null && System.nanoTime() - startNanos >= IDLE_TIMEOUT_NANOS) { head // The idle timeout elapsed. } else { @@ -311,11 +317,7 @@ open class AsyncTimeout : Timeout() { // The head of the queue hasn't timed out yet. Await that. if (waitNanos > 0) { - // Waiting is made complicated by the fact that we work in nanoseconds, - // but the API wants (millis, nanos) in two arguments. - val waitMillis = waitNanos / 1000000L - waitNanos -= waitMillis * 1000000L - (AsyncTimeout::class.java as Object).wait(waitMillis, waitNanos.toInt()) + condition.await(waitNanos, TimeUnit.NANOSECONDS) return null } diff --git a/okio/src/jvmMain/kotlin/okio/JvmFileHandle.kt b/okio/src/jvmMain/kotlin/okio/JvmFileHandle.kt index 3bbb04107a..b8df2f8bbe 100644 --- a/okio/src/jvmMain/kotlin/okio/JvmFileHandle.kt +++ b/okio/src/jvmMain/kotlin/okio/JvmFileHandle.kt @@ -16,11 +16,17 @@ package okio import java.io.RandomAccessFile +import java.util.concurrent.locks.Condition +import java.util.concurrent.locks.ReentrantLock internal class JvmFileHandle( readWrite: Boolean, private val randomAccessFile: RandomAccessFile ) : FileHandle(readWrite) { + + val lock: ReentrantLock = ReentrantLock() + val condition: Condition = lock.newCondition() + @Synchronized override fun protectedResize(size: Long) { val currentSize = size() diff --git a/okio/src/jvmMain/kotlin/okio/Pipe.kt b/okio/src/jvmMain/kotlin/okio/Pipe.kt index 43c23bfd3a..bde7c9ce0b 100644 --- a/okio/src/jvmMain/kotlin/okio/Pipe.kt +++ b/okio/src/jvmMain/kotlin/okio/Pipe.kt @@ -15,6 +15,10 @@ */ package okio +import java.util.concurrent.locks.Condition +import java.util.concurrent.locks.ReentrantLock +import kotlin.concurrent.withLock + /** * A source and a sink that are attached. The sink's output is the source's input. Typically each * is accessed by its own thread: a producer thread writes data to the sink and a consumer thread @@ -40,6 +44,9 @@ class Pipe(internal val maxBufferSize: Long) { internal var sourceClosed = false internal var foldedSink: Sink? = null + val lock: ReentrantLock = ReentrantLock() + val condition: Condition = lock.newCondition() + init { require(maxBufferSize >= 1L) { "maxBufferSize < 1: $maxBufferSize" } } @@ -51,21 +58,21 @@ class Pipe(internal val maxBufferSize: Long) { override fun write(source: Buffer, byteCount: Long) { var byteCount = byteCount var delegate: Sink? = null - synchronized(buffer) { + lock.withLock { check(!sinkClosed) { "closed" } if (canceled) throw IOException("canceled") while (byteCount > 0) { foldedSink?.let { delegate = it - return@synchronized + return@withLock } if (sourceClosed) throw IOException("source is closed") val bufferSpaceAvailable = maxBufferSize - buffer.size if (bufferSpaceAvailable == 0L) { - timeout.waitUntilNotified(buffer) // Wait until the source drains the buffer. + timeout.awaitUntilNotified(condition) // Wait until the source drains the buffer. if (canceled) throw IOException("canceled") continue } @@ -73,7 +80,7 @@ class Pipe(internal val maxBufferSize: Long) { val bytesToWrite = minOf(bufferSpaceAvailable, byteCount) buffer.write(source, bytesToWrite) byteCount -= bytesToWrite - (buffer as Object).notifyAll() // Notify the source that it can resume reading. + condition.signalAll() // Notify the source that it can resume reading. } } @@ -82,13 +89,13 @@ class Pipe(internal val maxBufferSize: Long) { override fun flush() { var delegate: Sink? = null - synchronized(buffer) { + lock.withLock { check(!sinkClosed) { "closed" } if (canceled) throw IOException("canceled") foldedSink?.let { delegate = it - return@synchronized + return@withLock } if (sourceClosed && buffer.size > 0L) { @@ -101,17 +108,17 @@ class Pipe(internal val maxBufferSize: Long) { override fun close() { var delegate: Sink? = null - synchronized(buffer) { + lock.withLock { if (sinkClosed) return foldedSink?.let { delegate = it - return@synchronized + return@withLock } if (sourceClosed && buffer.size > 0L) throw IOException("source is closed") sinkClosed = true - (buffer as Object).notifyAll() // Notify the source that no more bytes are coming. + condition.signalAll() // Notify the source that no more bytes are coming. } delegate?.forward { close() } @@ -125,26 +132,26 @@ class Pipe(internal val maxBufferSize: Long) { private val timeout = Timeout() override fun read(sink: Buffer, byteCount: Long): Long { - synchronized(buffer) { + lock.withLock { check(!sourceClosed) { "closed" } if (canceled) throw IOException("canceled") while (buffer.size == 0L) { if (sinkClosed) return -1L - timeout.waitUntilNotified(buffer) // Wait until the sink fills the buffer. + timeout.awaitUntilNotified(condition) // Wait until the sink fills the buffer. if (canceled) throw IOException("canceled") } val result = buffer.read(sink, byteCount) - (buffer as Object).notifyAll() // Notify the sink that it can resume writing. + condition.signalAll() // Notify the sink that it can resume writing. return result } } override fun close() { - synchronized(buffer) { + lock.withLock { sourceClosed = true - (buffer as Object).notifyAll() // Notify the sink that no more bytes are desired. + condition.signalAll() // Notify the sink that no more bytes are desired. } } @@ -166,7 +173,7 @@ class Pipe(internal val maxBufferSize: Long) { // must copy it to sink without holding any locks, then try it all again. var closed = false lateinit var sinkBuffer: Buffer - synchronized(buffer) { + lock.withLock { check(foldedSink == null) { "sink already folded" } if (canceled) { @@ -183,7 +190,7 @@ class Pipe(internal val maxBufferSize: Long) { closed = sinkClosed sinkBuffer = Buffer() sinkBuffer.write(buffer, buffer.size) - (buffer as Object).notifyAll() // Notify the sink that it can resume writing. + condition.signalAll() // Notify the sink that it can resume writing. } var success = false @@ -197,9 +204,9 @@ class Pipe(internal val maxBufferSize: Long) { success = true } finally { if (!success) { - synchronized(buffer) { + lock.withLock { sourceClosed = true - (buffer as Object).notifyAll() // Notify the sink that it can resume writing. + condition.signalAll() // Notify the sink that it can resume writing. } } } @@ -240,10 +247,10 @@ class Pipe(internal val maxBufferSize: Long) { * operating on the source or the sink. */ fun cancel() { - synchronized(buffer) { + lock.withLock { canceled = true buffer.clear() - (buffer as Object).notifyAll() // Notify the source and sink that they're canceled. + condition.signalAll() // Notify the source and sink that they're canceled. } } } diff --git a/okio/src/jvmMain/kotlin/okio/Throttler.kt b/okio/src/jvmMain/kotlin/okio/Throttler.kt index dbb83fe3ba..2b40cc8b03 100644 --- a/okio/src/jvmMain/kotlin/okio/Throttler.kt +++ b/okio/src/jvmMain/kotlin/okio/Throttler.kt @@ -17,6 +17,9 @@ package okio import java.io.IOException import java.io.InterruptedIOException +import java.util.concurrent.locks.Condition +import java.util.concurrent.locks.ReentrantLock +import kotlin.concurrent.withLock /** * Enables limiting of Source and Sink throughput. Attach to this throttler via [source] and [sink] @@ -46,6 +49,9 @@ class Throttler internal constructor( private var waitByteCount: Long = 8 * 1024 // 8 KiB. private var maxByteCount: Long = 256 * 1024 // 256 KiB. + val lock: ReentrantLock = ReentrantLock() + val condition: Condition = lock.newCondition() + constructor() : this(allocatedUntil = System.nanoTime()) /** Sets the rate at which bytes will be allocated. Use 0 for no limit. */ @@ -55,7 +61,7 @@ class Throttler internal constructor( waitByteCount: Long = this.waitByteCount, maxByteCount: Long = this.maxByteCount ) { - synchronized(this) { + lock.withLock { require(bytesPerSecond >= 0) require(waitByteCount > 0) require(maxByteCount >= waitByteCount) @@ -63,7 +69,7 @@ class Throttler internal constructor( this.bytesPerSecond = bytesPerSecond this.waitByteCount = waitByteCount this.maxByteCount = maxByteCount - (this as Object).notifyAll() + condition.signalAll() } } @@ -74,7 +80,7 @@ class Throttler internal constructor( internal fun take(byteCount: Long): Long { require(byteCount > 0) - synchronized(this) { + lock.withLock { while (true) { val now = System.nanoTime() val byteCountOrWaitNanos = byteCountOrWaitNanos(now, byteCount) @@ -82,7 +88,6 @@ class Throttler internal constructor( waitNanos(-byteCountOrWaitNanos) } } - throw AssertionError() // Unreachable, but synchronized() doesn't know that. } /** @@ -126,9 +131,7 @@ class Throttler internal constructor( private fun Long.bytesToNanos() = this * 1_000_000_000L / bytesPerSecond private fun waitNanos(nanosToWait: Long) { - val millisToWait = nanosToWait / 1_000_000L - val remainderNanos = nanosToWait - (millisToWait * 1_000_000L) - (this as Object).wait(millisToWait, remainderNanos.toInt()) + condition.awaitNanos(nanosToWait) } /** Create a Source which honors this Throttler. */ diff --git a/okio/src/jvmMain/kotlin/okio/Timeout.kt b/okio/src/jvmMain/kotlin/okio/Timeout.kt index b45d2ab6b9..b2fe90a682 100644 --- a/okio/src/jvmMain/kotlin/okio/Timeout.kt +++ b/okio/src/jvmMain/kotlin/okio/Timeout.kt @@ -18,6 +18,7 @@ package okio import java.io.IOException import java.io.InterruptedIOException import java.util.concurrent.TimeUnit +import java.util.concurrent.locks.Condition actual open class Timeout { /** @@ -103,6 +104,80 @@ actual open class Timeout { } } + /** + * Waits on `monitor` until it is notified. Throws [InterruptedIOException] if either the thread + * is interrupted or if this timeout elapses before `monitor` is notified. The caller must be + * synchronized on `monitor`. + * + * Here's a sample class that uses `waitUntilNotified()` to await a specific state. Note that the + * call is made within a loop to avoid unnecessary waiting and to mitigate spurious notifications. + * ``` + * class Dice { + * Random random = new Random(); + * int latestTotal; + * + * public synchronized void roll() { + * latestTotal = 2 + random.nextInt(6) + random.nextInt(6); + * System.out.println("Rolled " + latestTotal); + * notifyAll(); + * } + * + * public void rollAtFixedRate(int period, TimeUnit timeUnit) { + * Executors.newScheduledThreadPool(0).scheduleAtFixedRate(new Runnable() { + * public void run() { + * roll(); + * } + * }, 0, period, timeUnit); + * } + * + * public synchronized void awaitTotal(Timeout timeout, int total) + * throws InterruptedIOException { + * while (latestTotal != total) { + * timeout.waitUntilNotified(this); + * } + * } + * } + * ``` + */ + @Throws(InterruptedIOException::class) + fun awaitUntilNotified(monitor: Condition) { + try { + val hasDeadline = hasDeadline() + val timeoutNanos = timeoutNanos() + + if (!hasDeadline && timeoutNanos == 0L) { + monitor.await() // There is no timeout: wait forever. + return + } + + // Compute how long we'll wait. + val start = System.nanoTime() + val waitNanos = if (hasDeadline && timeoutNanos != 0L) { + val deadlineNanos = deadlineNanoTime() - start + minOf(timeoutNanos, deadlineNanos) + } else if (hasDeadline) { + deadlineNanoTime() - start + } else { + timeoutNanos + } + + // Attempt to wait that long. This will break out early if the monitor is notified. + var elapsedNanos = 0L + if (waitNanos > 0L) { + monitor.await(waitNanos, TimeUnit.NANOSECONDS) + elapsedNanos = System.nanoTime() - start + } + + // Throw if the timeout elapsed before the monitor was notified. + if (elapsedNanos >= waitNanos) { + throw InterruptedIOException("timeout") + } + } catch (e: InterruptedException) { + Thread.currentThread().interrupt() // Retain interrupted status. + throw InterruptedIOException("interrupted") + } + } + /** * Waits on `monitor` until it is notified. Throws [InterruptedIOException] if either the thread * is interrupted or if this timeout elapses before `monitor` is notified. The caller must be diff --git a/okio/src/nonJvmMain/kotlin/okio/-NonJvmPlatform.kt b/okio/src/nonJvmMain/kotlin/okio/-NonJvmPlatform.kt index 4f555010f2..17bec906ad 100644 --- a/okio/src/nonJvmMain/kotlin/okio/-NonJvmPlatform.kt +++ b/okio/src/nonJvmMain/kotlin/okio/-NonJvmPlatform.kt @@ -29,7 +29,15 @@ actual open class ArrayIndexOutOfBoundsException actual constructor( message: String? ) : IndexOutOfBoundsException(message) -internal actual inline fun synchronized(lock: Any, block: () -> R): R = block() +actual class ALock { + companion object { + val instance = ALock() + } +} + +internal actual fun newLock(): ALock = ALock.instance + +internal actual inline fun synchronized(lock: ALock, block: () -> R): R = block() actual open class IOException actual constructor( message: String?, From 16f8d45863755c9705b3320dd0264632946d2060 Mon Sep 17 00:00:00 2001 From: Yuri Schimke Date: Sat, 31 Dec 2022 12:09:18 +1000 Subject: [PATCH 02/15] Simplify --- okio/src/jvmMain/kotlin/okio/JvmFileHandle.kt | 5 ----- 1 file changed, 5 deletions(-) diff --git a/okio/src/jvmMain/kotlin/okio/JvmFileHandle.kt b/okio/src/jvmMain/kotlin/okio/JvmFileHandle.kt index b8df2f8bbe..55851538c4 100644 --- a/okio/src/jvmMain/kotlin/okio/JvmFileHandle.kt +++ b/okio/src/jvmMain/kotlin/okio/JvmFileHandle.kt @@ -16,17 +16,12 @@ package okio import java.io.RandomAccessFile -import java.util.concurrent.locks.Condition -import java.util.concurrent.locks.ReentrantLock internal class JvmFileHandle( readWrite: Boolean, private val randomAccessFile: RandomAccessFile ) : FileHandle(readWrite) { - val lock: ReentrantLock = ReentrantLock() - val condition: Condition = lock.newCondition() - @Synchronized override fun protectedResize(size: Long) { val currentSize = size() From a7d8ffd69739f34c73c276d97f4934f430ae81e9 Mon Sep 17 00:00:00 2001 From: Yuri Schimke Date: Mon, 2 Jan 2023 16:38:38 +1000 Subject: [PATCH 03/15] Review comments --- .../commonMain/kotlin/okio/-CommonPlatform.kt | 6 ++-- okio/src/commonMain/kotlin/okio/FileHandle.kt | 30 +++++++++---------- okio/src/jvmMain/kotlin/okio/-JvmPlatform.kt | 8 ++--- .../nonJvmMain/kotlin/okio/-NonJvmPlatform.kt | 8 ++--- 4 files changed, 26 insertions(+), 26 deletions(-) diff --git a/okio/src/commonMain/kotlin/okio/-CommonPlatform.kt b/okio/src/commonMain/kotlin/okio/-CommonPlatform.kt index 8a9f1176fc..d9af5989a5 100644 --- a/okio/src/commonMain/kotlin/okio/-CommonPlatform.kt +++ b/okio/src/commonMain/kotlin/okio/-CommonPlatform.kt @@ -23,11 +23,11 @@ internal expect fun String.asUtf8ToByteArray(): ByteArray // TODO make internal https://youtrack.jetbrains.com/issue/KT-37316 expect class ArrayIndexOutOfBoundsException(message: String?) : IndexOutOfBoundsException -expect class ALock +expect class Lock -internal expect fun newLock(): ALock +expect inline fun Lock.withLock(action: () -> T): T -internal expect inline fun synchronized(lock: ALock, block: () -> R): R +internal expect fun newLock(): Lock expect open class IOException(message: String?, cause: Throwable?) : Exception { constructor(message: String? = null) diff --git a/okio/src/commonMain/kotlin/okio/FileHandle.kt b/okio/src/commonMain/kotlin/okio/FileHandle.kt index f35212116f..472e0e47a2 100644 --- a/okio/src/commonMain/kotlin/okio/FileHandle.kt +++ b/okio/src/commonMain/kotlin/okio/FileHandle.kt @@ -52,7 +52,7 @@ abstract class FileHandle( */ private var openStreamCount = 0 - val aLock: ALock = newLock() + val lock: Lock = newLock() /** * Reads at least 1, and up to [byteCount] bytes from this starting at [fileOffset] and copies @@ -66,7 +66,7 @@ abstract class FileHandle( arrayOffset: Int, byteCount: Int ): Int { - synchronized(aLock) { + lock.withLock { check(!closed) { "closed" } } return protectedRead(fileOffset, array, arrayOffset, byteCount) @@ -78,7 +78,7 @@ abstract class FileHandle( */ @Throws(IOException::class) fun read(fileOffset: Long, sink: Buffer, byteCount: Long): Long { - synchronized(aLock) { + lock.withLock { check(!closed) { "closed" } } return readNoCloseCheck(fileOffset, sink, byteCount) @@ -89,7 +89,7 @@ abstract class FileHandle( */ @Throws(IOException::class) fun size(): Long { - synchronized(aLock) { + lock.withLock { check(!closed) { "closed" } } return protectedSize() @@ -102,7 +102,7 @@ abstract class FileHandle( @Throws(IOException::class) fun resize(size: Long) { check(readWrite) { "file handle is read-only" } - synchronized(aLock) { + lock.withLock { check(!closed) { "closed" } } return protectedResize(size) @@ -116,7 +116,7 @@ abstract class FileHandle( byteCount: Int ) { check(readWrite) { "file handle is read-only" } - synchronized(aLock) { + lock.withLock { check(!closed) { "closed" } } return protectedWrite(fileOffset, array, arrayOffset, byteCount) @@ -126,7 +126,7 @@ abstract class FileHandle( @Throws(IOException::class) fun write(fileOffset: Long, source: Buffer, byteCount: Long) { check(readWrite) { "file handle is read-only" } - synchronized(aLock) { + lock.withLock { check(!closed) { "closed" } } writeNoCloseCheck(fileOffset, source, byteCount) @@ -136,7 +136,7 @@ abstract class FileHandle( @Throws(IOException::class) fun flush() { check(readWrite) { "file handle is read-only" } - synchronized(aLock) { + lock.withLock { check(!closed) { "closed" } } return protectedFlush() @@ -148,7 +148,7 @@ abstract class FileHandle( */ @Throws(IOException::class) fun source(fileOffset: Long = 0L): Source { - synchronized(aLock) { + lock.withLock { check(!closed) { "closed" } openStreamCount++ } @@ -218,7 +218,7 @@ abstract class FileHandle( @Throws(IOException::class) fun sink(fileOffset: Long = 0L): Sink { check(readWrite) { "file handle is read-only" } - synchronized(aLock) { + lock.withLock { check(!closed) { "closed" } openStreamCount++ } @@ -284,10 +284,10 @@ abstract class FileHandle( @Throws(IOException::class) final override fun close() { - synchronized(aLock) { - if (closed) return@close + lock.withLock { + if (closed) return closed = true - if (openStreamCount != 0) return@close + if (openStreamCount != 0) return } protectedClose() } @@ -407,7 +407,7 @@ abstract class FileHandle( override fun close() { if (closed) return closed = true - synchronized(fileHandle.aLock) { + fileHandle.lock.withLock { fileHandle.openStreamCount-- if (fileHandle.openStreamCount != 0 || !fileHandle.closed) return@close } @@ -433,7 +433,7 @@ abstract class FileHandle( override fun close() { if (closed) return closed = true - synchronized(fileHandle.aLock) { + fileHandle.lock.withLock { fileHandle.openStreamCount-- if (fileHandle.openStreamCount != 0 || !fileHandle.closed) return@close } diff --git a/okio/src/jvmMain/kotlin/okio/-JvmPlatform.kt b/okio/src/jvmMain/kotlin/okio/-JvmPlatform.kt index 33b84bdbaa..f40032db5b 100644 --- a/okio/src/jvmMain/kotlin/okio/-JvmPlatform.kt +++ b/okio/src/jvmMain/kotlin/okio/-JvmPlatform.kt @@ -17,7 +17,7 @@ package okio import java.util.concurrent.locks.ReentrantLock -import kotlin.concurrent.withLock +import kotlin.concurrent.withLock as jvmWithLock internal actual fun ByteArray.toUtf8String(): String = String(this, Charsets.UTF_8) @@ -26,11 +26,11 @@ internal actual fun String.asUtf8ToByteArray(): ByteArray = toByteArray(Charsets // TODO remove if https://youtrack.jetbrains.com/issue/KT-20641 provides a better solution actual typealias ArrayIndexOutOfBoundsException = java.lang.ArrayIndexOutOfBoundsException -actual typealias ALock = ReentrantLock +actual typealias Lock = ReentrantLock -internal actual fun newLock(): ALock = ReentrantLock() +internal actual fun newLock(): Lock = ReentrantLock() -internal actual inline fun synchronized(lock: ALock, block: () -> R): R = lock.withLock(block) +actual inline fun Lock.withLock(action: () -> T): T = jvmWithLock(action) actual typealias IOException = java.io.IOException diff --git a/okio/src/nonJvmMain/kotlin/okio/-NonJvmPlatform.kt b/okio/src/nonJvmMain/kotlin/okio/-NonJvmPlatform.kt index 17bec906ad..4077149c74 100644 --- a/okio/src/nonJvmMain/kotlin/okio/-NonJvmPlatform.kt +++ b/okio/src/nonJvmMain/kotlin/okio/-NonJvmPlatform.kt @@ -29,15 +29,15 @@ actual open class ArrayIndexOutOfBoundsException actual constructor( message: String? ) : IndexOutOfBoundsException(message) -actual class ALock { +actual class Lock { companion object { - val instance = ALock() + val instance = Lock() } } -internal actual fun newLock(): ALock = ALock.instance +internal actual fun newLock(): Lock = Lock.instance -internal actual inline fun synchronized(lock: ALock, block: () -> R): R = block() +actual inline fun Lock.withLock(action: () -> T): T = action() actual open class IOException actual constructor( message: String?, From 72899bf813861b9ac5ee986b68e93fb53cb472cf Mon Sep 17 00:00:00 2001 From: Yuri Schimke Date: Mon, 2 Jan 2023 16:41:56 +1000 Subject: [PATCH 04/15] More fixes. --- okio/src/jvmMain/kotlin/okio/Throttler.kt | 6 +----- okio/src/jvmMain/kotlin/okio/Timeout.kt | 4 ++-- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/okio/src/jvmMain/kotlin/okio/Throttler.kt b/okio/src/jvmMain/kotlin/okio/Throttler.kt index 2b40cc8b03..675322b77c 100644 --- a/okio/src/jvmMain/kotlin/okio/Throttler.kt +++ b/okio/src/jvmMain/kotlin/okio/Throttler.kt @@ -85,7 +85,7 @@ class Throttler internal constructor( val now = System.nanoTime() val byteCountOrWaitNanos = byteCountOrWaitNanos(now, byteCount) if (byteCountOrWaitNanos >= 0) return byteCountOrWaitNanos - waitNanos(-byteCountOrWaitNanos) + condition.awaitNanos(-byteCountOrWaitNanos) } } } @@ -130,10 +130,6 @@ class Throttler internal constructor( private fun Long.bytesToNanos() = this * 1_000_000_000L / bytesPerSecond - private fun waitNanos(nanosToWait: Long) { - condition.awaitNanos(nanosToWait) - } - /** Create a Source which honors this Throttler. */ fun source(source: Source): Source { return object : ForwardingSource(source) { diff --git a/okio/src/jvmMain/kotlin/okio/Timeout.kt b/okio/src/jvmMain/kotlin/okio/Timeout.kt index b2fe90a682..fe0cac87a7 100644 --- a/okio/src/jvmMain/kotlin/okio/Timeout.kt +++ b/okio/src/jvmMain/kotlin/okio/Timeout.kt @@ -106,8 +106,8 @@ actual open class Timeout { /** * Waits on `monitor` until it is notified. Throws [InterruptedIOException] if either the thread - * is interrupted or if this timeout elapses before `monitor` is notified. The caller must be - * synchronized on `monitor`. + * is interrupted or if this timeout elapses before `monitor` is notified. + * The caller must hold the lock that monitor is bound to. * * Here's a sample class that uses `waitUntilNotified()` to await a specific state. Note that the * call is made within a loop to avoid unnecessary waiting and to mitigate spurious notifications. From dade57f29c3e2bc4dc60b88ebceb21e0c0fbdd43 Mon Sep 17 00:00:00 2001 From: Yuri Schimke Date: Mon, 2 Jan 2023 17:11:16 +1000 Subject: [PATCH 05/15] Add a test. --- okio/src/jvmMain/kotlin/okio/Pipe.kt | 4 +- okio/src/jvmMain/kotlin/okio/Timeout.kt | 27 ++- okio/src/jvmTest/java/okio/AwaitSignalTest.kt | 172 ++++++++++++++++++ 3 files changed, 191 insertions(+), 12 deletions(-) create mode 100644 okio/src/jvmTest/java/okio/AwaitSignalTest.kt diff --git a/okio/src/jvmMain/kotlin/okio/Pipe.kt b/okio/src/jvmMain/kotlin/okio/Pipe.kt index bde7c9ce0b..dfcbbc54a3 100644 --- a/okio/src/jvmMain/kotlin/okio/Pipe.kt +++ b/okio/src/jvmMain/kotlin/okio/Pipe.kt @@ -72,7 +72,7 @@ class Pipe(internal val maxBufferSize: Long) { val bufferSpaceAvailable = maxBufferSize - buffer.size if (bufferSpaceAvailable == 0L) { - timeout.awaitUntilNotified(condition) // Wait until the source drains the buffer. + timeout.awaitSignal(condition) // Wait until the source drains the buffer. if (canceled) throw IOException("canceled") continue } @@ -138,7 +138,7 @@ class Pipe(internal val maxBufferSize: Long) { while (buffer.size == 0L) { if (sinkClosed) return -1L - timeout.awaitUntilNotified(condition) // Wait until the sink fills the buffer. + timeout.awaitSignal(condition) // Wait until the sink fills the buffer. if (canceled) throw IOException("canceled") } diff --git a/okio/src/jvmMain/kotlin/okio/Timeout.kt b/okio/src/jvmMain/kotlin/okio/Timeout.kt index fe0cac87a7..d7beab8bdb 100644 --- a/okio/src/jvmMain/kotlin/okio/Timeout.kt +++ b/okio/src/jvmMain/kotlin/okio/Timeout.kt @@ -116,10 +116,15 @@ actual open class Timeout { * Random random = new Random(); * int latestTotal; * - * public synchronized void roll() { - * latestTotal = 2 + random.nextInt(6) + random.nextInt(6); - * System.out.println("Rolled " + latestTotal); - * notifyAll(); + * ReentrantLock lock = new ReentrantLock(); + * Condition condition = lock.newCondition(); + * + * public void roll() { + * lock.withLock { + * latestTotal = 2 + random.nextInt(6) + random.nextInt(6); + * System.out.println("Rolled " + latestTotal); + * condition.signalAll(); + * } * } * * public void rollAtFixedRate(int period, TimeUnit timeUnit) { @@ -130,23 +135,25 @@ actual open class Timeout { * }, 0, period, timeUnit); * } * - * public synchronized void awaitTotal(Timeout timeout, int total) + * public void awaitTotal(Timeout timeout, int total) * throws InterruptedIOException { - * while (latestTotal != total) { - * timeout.waitUntilNotified(this); + * lock.withLock { + * while (latestTotal != total) { + * timeout.awaitSignal(this); + * } * } * } * } * ``` */ @Throws(InterruptedIOException::class) - fun awaitUntilNotified(monitor: Condition) { + fun awaitSignal(condition: Condition) { try { val hasDeadline = hasDeadline() val timeoutNanos = timeoutNanos() if (!hasDeadline && timeoutNanos == 0L) { - monitor.await() // There is no timeout: wait forever. + condition.await() // There is no timeout: wait forever. return } @@ -164,7 +171,7 @@ actual open class Timeout { // Attempt to wait that long. This will break out early if the monitor is notified. var elapsedNanos = 0L if (waitNanos > 0L) { - monitor.await(waitNanos, TimeUnit.NANOSECONDS) + condition.await(waitNanos, TimeUnit.NANOSECONDS) elapsedNanos = System.nanoTime() - start } diff --git a/okio/src/jvmTest/java/okio/AwaitSignalTest.kt b/okio/src/jvmTest/java/okio/AwaitSignalTest.kt new file mode 100644 index 0000000000..e6a0ce10e6 --- /dev/null +++ b/okio/src/jvmTest/java/okio/AwaitSignalTest.kt @@ -0,0 +1,172 @@ +/* + * Copyright (C) 2016 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package okio + +import okio.TestUtil.assumeNotWindows +import org.junit.After +import org.junit.Assert +import org.junit.Test +import java.io.InterruptedIOException +import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit +import java.util.concurrent.locks.Condition +import java.util.concurrent.locks.ReentrantLock + +class AwaitSignalTest { + val executorService = Executors.newScheduledThreadPool(0) + + val lock: ReentrantLock = ReentrantLock() + val condition: Condition = lock.newCondition() + + @After + fun tearDown() { + executorService.shutdown() + } + + @Test + fun notified() = lock.withLock { + val timeout = Timeout() + timeout.timeout(5000, TimeUnit.MILLISECONDS) + val start = now() + executorService.schedule( + { lock.withLock { condition.signal() } }, + 1000, + TimeUnit.MILLISECONDS + ) + timeout.awaitSignal(condition) + assertElapsed(1000.0, start) + } + + @Test + fun timeout() = lock.withLock { + assumeNotWindows() + val timeout = Timeout() + timeout.timeout(1000, TimeUnit.MILLISECONDS) + val start = now() + try { + timeout.awaitSignal(condition) + Assert.fail() + } catch (expected: InterruptedIOException) { + Assert.assertEquals("timeout", expected.message) + } + assertElapsed(1000.0, start) + } + + @Test + fun deadline() = lock.withLock { + assumeNotWindows() + val timeout = Timeout() + timeout.deadline(1000, TimeUnit.MILLISECONDS) + val start = now() + try { + timeout.awaitSignal(condition) + Assert.fail() + } catch (expected: InterruptedIOException) { + Assert.assertEquals("timeout", expected.message) + } + assertElapsed(1000.0, start) + } + + @Test + fun deadlineBeforeTimeout() = lock.withLock { + assumeNotWindows() + val timeout = Timeout() + timeout.timeout(5000, TimeUnit.MILLISECONDS) + timeout.deadline(1000, TimeUnit.MILLISECONDS) + val start = now() + try { + timeout.awaitSignal(condition) + Assert.fail() + } catch (expected: InterruptedIOException) { + Assert.assertEquals("timeout", expected.message) + } + assertElapsed(1000.0, start) + } + + @Test + fun timeoutBeforeDeadline() = lock.withLock { + assumeNotWindows() + val timeout = Timeout() + timeout.timeout(1000, TimeUnit.MILLISECONDS) + timeout.deadline(5000, TimeUnit.MILLISECONDS) + val start = now() + try { + timeout.awaitSignal(condition) + Assert.fail() + } catch (expected: InterruptedIOException) { + Assert.assertEquals("timeout", expected.message) + } + assertElapsed(1000.0, start) + } + + @Test + fun deadlineAlreadyReached() = lock.withLock { + assumeNotWindows() + val timeout = Timeout() + timeout.deadlineNanoTime(System.nanoTime()) + val start = now() + try { + timeout.awaitSignal(condition) + Assert.fail() + } catch (expected: InterruptedIOException) { + Assert.assertEquals("timeout", expected.message) + } + assertElapsed(0.0, start) + } + + @Test + fun threadInterrupted() = lock.withLock { + assumeNotWindows() + val timeout = Timeout() + val start = now() + Thread.currentThread().interrupt() + try { + timeout.awaitSignal(condition) + Assert.fail() + } catch (expected: InterruptedIOException) { + Assert.assertEquals("interrupted", expected.message) + Assert.assertTrue(Thread.interrupted()) + } + assertElapsed(0.0, start) + } + + @Test + fun threadInterruptedOnThrowIfReached() = lock.withLock { + assumeNotWindows() + val timeout = Timeout() + Thread.currentThread().interrupt() + try { + timeout.throwIfReached() + Assert.fail() + } catch (expected: InterruptedIOException) { + Assert.assertEquals("interrupted", expected.message) + Assert.assertTrue(Thread.interrupted()) + } + } + + /** Returns the nanotime in milliseconds as a double for measuring timeouts. */ + private fun now(): Double { + return System.nanoTime() / 1000000.0 + } + + /** + * Fails the test unless the time from start until now is duration, accepting differences in + * -50..+450 milliseconds. + */ + private fun assertElapsed(duration: Double, start: Double) { + Assert.assertEquals(duration, now() - start - 200.0, 250.0) + } +} From 22516e5324e1c0bdecb67cc648506c60b17c7d92 Mon Sep 17 00:00:00 2001 From: Yuri Schimke Date: Mon, 2 Jan 2023 17:26:35 +1000 Subject: [PATCH 06/15] Fix formatting. --- okio/src/jvmTest/java/okio/AwaitSignalTest.kt | 248 +++++++++--------- 1 file changed, 124 insertions(+), 124 deletions(-) diff --git a/okio/src/jvmTest/java/okio/AwaitSignalTest.kt b/okio/src/jvmTest/java/okio/AwaitSignalTest.kt index e6a0ce10e6..d1ab2d4c27 100644 --- a/okio/src/jvmTest/java/okio/AwaitSignalTest.kt +++ b/okio/src/jvmTest/java/okio/AwaitSignalTest.kt @@ -26,147 +26,147 @@ import java.util.concurrent.locks.Condition import java.util.concurrent.locks.ReentrantLock class AwaitSignalTest { - val executorService = Executors.newScheduledThreadPool(0) + val executorService = Executors.newScheduledThreadPool(0) - val lock: ReentrantLock = ReentrantLock() - val condition: Condition = lock.newCondition() + val lock: ReentrantLock = ReentrantLock() + val condition: Condition = lock.newCondition() - @After - fun tearDown() { - executorService.shutdown() - } + @After + fun tearDown() { + executorService.shutdown() + } - @Test - fun notified() = lock.withLock { - val timeout = Timeout() - timeout.timeout(5000, TimeUnit.MILLISECONDS) - val start = now() - executorService.schedule( - { lock.withLock { condition.signal() } }, - 1000, - TimeUnit.MILLISECONDS - ) - timeout.awaitSignal(condition) - assertElapsed(1000.0, start) - } + @Test + fun notified() = lock.withLock { + val timeout = Timeout() + timeout.timeout(5000, TimeUnit.MILLISECONDS) + val start = now() + executorService.schedule( + { lock.withLock { condition.signal() } }, + 1000, + TimeUnit.MILLISECONDS + ) + timeout.awaitSignal(condition) + assertElapsed(1000.0, start) + } - @Test - fun timeout() = lock.withLock { - assumeNotWindows() - val timeout = Timeout() - timeout.timeout(1000, TimeUnit.MILLISECONDS) - val start = now() - try { - timeout.awaitSignal(condition) - Assert.fail() - } catch (expected: InterruptedIOException) { - Assert.assertEquals("timeout", expected.message) - } - assertElapsed(1000.0, start) + @Test + fun timeout() = lock.withLock { + assumeNotWindows() + val timeout = Timeout() + timeout.timeout(1000, TimeUnit.MILLISECONDS) + val start = now() + try { + timeout.awaitSignal(condition) + Assert.fail() + } catch (expected: InterruptedIOException) { + Assert.assertEquals("timeout", expected.message) } + assertElapsed(1000.0, start) + } - @Test - fun deadline() = lock.withLock { - assumeNotWindows() - val timeout = Timeout() - timeout.deadline(1000, TimeUnit.MILLISECONDS) - val start = now() - try { - timeout.awaitSignal(condition) - Assert.fail() - } catch (expected: InterruptedIOException) { - Assert.assertEquals("timeout", expected.message) - } - assertElapsed(1000.0, start) + @Test + fun deadline() = lock.withLock { + assumeNotWindows() + val timeout = Timeout() + timeout.deadline(1000, TimeUnit.MILLISECONDS) + val start = now() + try { + timeout.awaitSignal(condition) + Assert.fail() + } catch (expected: InterruptedIOException) { + Assert.assertEquals("timeout", expected.message) } + assertElapsed(1000.0, start) + } - @Test - fun deadlineBeforeTimeout() = lock.withLock { - assumeNotWindows() - val timeout = Timeout() - timeout.timeout(5000, TimeUnit.MILLISECONDS) - timeout.deadline(1000, TimeUnit.MILLISECONDS) - val start = now() - try { - timeout.awaitSignal(condition) - Assert.fail() - } catch (expected: InterruptedIOException) { - Assert.assertEquals("timeout", expected.message) - } - assertElapsed(1000.0, start) + @Test + fun deadlineBeforeTimeout() = lock.withLock { + assumeNotWindows() + val timeout = Timeout() + timeout.timeout(5000, TimeUnit.MILLISECONDS) + timeout.deadline(1000, TimeUnit.MILLISECONDS) + val start = now() + try { + timeout.awaitSignal(condition) + Assert.fail() + } catch (expected: InterruptedIOException) { + Assert.assertEquals("timeout", expected.message) } + assertElapsed(1000.0, start) + } - @Test - fun timeoutBeforeDeadline() = lock.withLock { - assumeNotWindows() - val timeout = Timeout() - timeout.timeout(1000, TimeUnit.MILLISECONDS) - timeout.deadline(5000, TimeUnit.MILLISECONDS) - val start = now() - try { - timeout.awaitSignal(condition) - Assert.fail() - } catch (expected: InterruptedIOException) { - Assert.assertEquals("timeout", expected.message) - } - assertElapsed(1000.0, start) + @Test + fun timeoutBeforeDeadline() = lock.withLock { + assumeNotWindows() + val timeout = Timeout() + timeout.timeout(1000, TimeUnit.MILLISECONDS) + timeout.deadline(5000, TimeUnit.MILLISECONDS) + val start = now() + try { + timeout.awaitSignal(condition) + Assert.fail() + } catch (expected: InterruptedIOException) { + Assert.assertEquals("timeout", expected.message) } + assertElapsed(1000.0, start) + } - @Test - fun deadlineAlreadyReached() = lock.withLock { - assumeNotWindows() - val timeout = Timeout() - timeout.deadlineNanoTime(System.nanoTime()) - val start = now() - try { - timeout.awaitSignal(condition) - Assert.fail() - } catch (expected: InterruptedIOException) { - Assert.assertEquals("timeout", expected.message) - } - assertElapsed(0.0, start) + @Test + fun deadlineAlreadyReached() = lock.withLock { + assumeNotWindows() + val timeout = Timeout() + timeout.deadlineNanoTime(System.nanoTime()) + val start = now() + try { + timeout.awaitSignal(condition) + Assert.fail() + } catch (expected: InterruptedIOException) { + Assert.assertEquals("timeout", expected.message) } + assertElapsed(0.0, start) + } - @Test - fun threadInterrupted() = lock.withLock { - assumeNotWindows() - val timeout = Timeout() - val start = now() - Thread.currentThread().interrupt() - try { - timeout.awaitSignal(condition) - Assert.fail() - } catch (expected: InterruptedIOException) { - Assert.assertEquals("interrupted", expected.message) - Assert.assertTrue(Thread.interrupted()) - } - assertElapsed(0.0, start) + @Test + fun threadInterrupted() = lock.withLock { + assumeNotWindows() + val timeout = Timeout() + val start = now() + Thread.currentThread().interrupt() + try { + timeout.awaitSignal(condition) + Assert.fail() + } catch (expected: InterruptedIOException) { + Assert.assertEquals("interrupted", expected.message) + Assert.assertTrue(Thread.interrupted()) } + assertElapsed(0.0, start) + } - @Test - fun threadInterruptedOnThrowIfReached() = lock.withLock { - assumeNotWindows() - val timeout = Timeout() - Thread.currentThread().interrupt() - try { - timeout.throwIfReached() - Assert.fail() - } catch (expected: InterruptedIOException) { - Assert.assertEquals("interrupted", expected.message) - Assert.assertTrue(Thread.interrupted()) - } + @Test + fun threadInterruptedOnThrowIfReached() = lock.withLock { + assumeNotWindows() + val timeout = Timeout() + Thread.currentThread().interrupt() + try { + timeout.throwIfReached() + Assert.fail() + } catch (expected: InterruptedIOException) { + Assert.assertEquals("interrupted", expected.message) + Assert.assertTrue(Thread.interrupted()) } + } - /** Returns the nanotime in milliseconds as a double for measuring timeouts. */ - private fun now(): Double { - return System.nanoTime() / 1000000.0 - } + /** Returns the nanotime in milliseconds as a double for measuring timeouts. */ + private fun now(): Double { + return System.nanoTime() / 1000000.0 + } - /** - * Fails the test unless the time from start until now is duration, accepting differences in - * -50..+450 milliseconds. - */ - private fun assertElapsed(duration: Double, start: Double) { - Assert.assertEquals(duration, now() - start - 200.0, 250.0) - } + /** + * Fails the test unless the time from start until now is duration, accepting differences in + * -50..+450 milliseconds. + */ + private fun assertElapsed(duration: Double, start: Double) { + Assert.assertEquals(duration, now() - start - 200.0, 250.0) + } } From 1dbddda54b8eaa5d2bccdbb4f08defb8cd2a37cf Mon Sep 17 00:00:00 2001 From: Yuri Schimke Date: Mon, 2 Jan 2023 19:24:14 +1000 Subject: [PATCH 07/15] Run loom tests --- .github/workflows/build.yml | 25 +++++++++- build-support/src/main/kotlin/jvm.kt | 2 + build.gradle.kts | 8 ++++ .../jvmMain/kotlin/okio/TestingExecutors.kt | 46 +++++++++++++++++++ okio/src/jvmTest/java/okio/AwaitSignalTest.kt | 11 ++--- .../jvmTest/java/okio/LargeStreamsTest.java | 2 +- okio/src/jvmTest/java/okio/PipeTest.java | 9 ++-- .../java/okio/WaitUntilNotifiedTest.java | 2 +- .../src/jvmTest/kotlin/okio/PipeKotlinTest.kt | 11 ++--- okio/src/jvmTest/kotlin/okio/ThrottlerTest.kt | 5 +- okio/src/jvmTest/kotlin/okio/TimeoutTest.kt | 5 +- 11 files changed, 99 insertions(+), 27 deletions(-) create mode 100644 build-support/src/main/kotlin/jvm.kt create mode 100644 okio-testing-support/src/jvmMain/kotlin/okio/TestingExecutors.kt diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 8c5570ab90..f2504b73f1 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -48,6 +48,29 @@ jobs: name: japicmp-report path: okio/jvm/japicmp/build/reports/japi.txt + loom: + runs-on: ubuntu-latest + + strategy: + fail-fast: false + + steps: + - name: Checkout + uses: actions/checkout@v2 + + - name: Validate Gradle Wrapper + uses: gradle/wrapper-validation-action@v1 + + - name: Configure JDK + uses: actions/setup-java@v2 + with: + distribution: 'zulu' + java-version: 19 + + - name: Test + run: | + ./gradlew -DloomEnabled=true build + all-platforms: runs-on: ${{ matrix.os }} @@ -71,7 +94,7 @@ jobs: - name: Test run: | - ./gradlew build + ./gradlew -Dloom=true build publish: runs-on: macos-11 diff --git a/build-support/src/main/kotlin/jvm.kt b/build-support/src/main/kotlin/jvm.kt new file mode 100644 index 0000000000..fe13ad5295 --- /dev/null +++ b/build-support/src/main/kotlin/jvm.kt @@ -0,0 +1,2 @@ +// If true - tests should run for a loom environment. +val loomEnabled = System.getProperty("loomEnabled", "false").toBoolean() diff --git a/build.gradle.kts b/build.gradle.kts index 08b301cccb..6509938e2a 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -176,6 +176,14 @@ subprojects { exceptionFormat = TestExceptionFormat.FULL showStandardStreams = false } + + if (loomEnabled) { + jvmArgs = jvmArgs!! + listOf( + "-Djdk.tracePinnedThread=full", + "--enable-preview", + "-DloomEnabled=true" + ) + } } tasks.withType().configureEach { diff --git a/okio-testing-support/src/jvmMain/kotlin/okio/TestingExecutors.kt b/okio-testing-support/src/jvmMain/kotlin/okio/TestingExecutors.kt new file mode 100644 index 0000000000..4561240f15 --- /dev/null +++ b/okio-testing-support/src/jvmMain/kotlin/okio/TestingExecutors.kt @@ -0,0 +1,46 @@ +/* + * Copyright (C) 2023 Block, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package okio + +import java.util.concurrent.ExecutorService +import java.util.concurrent.Executors +import java.util.concurrent.ScheduledExecutorService +import java.util.concurrent.ThreadFactory + +object TestingExecutors { + val isLoom = System.getProperty("loomEnabled").toBoolean() + + fun newScheduledExecutorService(corePoolSize: Int = 0): ScheduledExecutorService = if (isLoom) { + Executors.newScheduledThreadPool(corePoolSize, newVirtualThreadFactory()) + } else { + Executors.newScheduledThreadPool(corePoolSize) + } + + fun newExecutorService(corePoolSize: Int = 0): ExecutorService = if (isLoom) { + Executors.newScheduledThreadPool(corePoolSize, newVirtualThreadFactory()) + } else { + Executors.newScheduledThreadPool(corePoolSize) + } + + fun newVirtualThreadFactory(): ThreadFactory { + val threadBuilder = Thread::class.java.getMethod("ofVirtual").invoke(null) + return Class.forName("java.lang.Thread\$Builder").getMethod("factory").invoke(threadBuilder) as ThreadFactory + } + + fun newVirtualThreadPerTaskExecutor(): ExecutorService { + return Executors::class.java.getMethod("newVirtualThreadPerTaskExecutor").invoke(null) as ExecutorService + } +} diff --git a/okio/src/jvmTest/java/okio/AwaitSignalTest.kt b/okio/src/jvmTest/java/okio/AwaitSignalTest.kt index d1ab2d4c27..779ee9f1b2 100644 --- a/okio/src/jvmTest/java/okio/AwaitSignalTest.kt +++ b/okio/src/jvmTest/java/okio/AwaitSignalTest.kt @@ -15,18 +15,17 @@ */ package okio -import okio.TestUtil.assumeNotWindows -import org.junit.After -import org.junit.Assert -import org.junit.Test import java.io.InterruptedIOException -import java.util.concurrent.Executors import java.util.concurrent.TimeUnit import java.util.concurrent.locks.Condition import java.util.concurrent.locks.ReentrantLock +import okio.TestUtil.assumeNotWindows +import org.junit.After +import org.junit.Assert +import org.junit.Test class AwaitSignalTest { - val executorService = Executors.newScheduledThreadPool(0) + val executorService = TestingExecutors.newScheduledExecutorService(0) val lock: ReentrantLock = ReentrantLock() val condition: Condition = lock.newCondition() diff --git a/okio/src/jvmTest/java/okio/LargeStreamsTest.java b/okio/src/jvmTest/java/okio/LargeStreamsTest.java index b9be1a2e42..6752017d35 100644 --- a/okio/src/jvmTest/java/okio/LargeStreamsTest.java +++ b/okio/src/jvmTest/java/okio/LargeStreamsTest.java @@ -104,7 +104,7 @@ private Long readAllAndClose(Source source, Sink sink) throws IOException { /** Calls {@link #readAllAndClose} on a background thread. */ private Future readAllAndCloseAsync(final Source source, final Sink sink) { - ExecutorService executor = Executors.newSingleThreadExecutor(); + ExecutorService executor = TestingExecutors.INSTANCE.newExecutorService(0); try { return executor.submit(new Callable() { @Override public Long call() throws Exception { diff --git a/okio/src/jvmTest/java/okio/PipeTest.java b/okio/src/jvmTest/java/okio/PipeTest.java index 030e6ba0b7..a56daadc85 100644 --- a/okio/src/jvmTest/java/okio/PipeTest.java +++ b/okio/src/jvmTest/java/okio/PipeTest.java @@ -18,11 +18,8 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.util.Random; -import java.util.concurrent.Callable; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; + import org.junit.After; import org.junit.Test; @@ -31,7 +28,7 @@ import static org.junit.Assert.fail; public final class PipeTest { - final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2); + final ScheduledExecutorService executorService = TestingExecutors.INSTANCE.newScheduledExecutorService(2); @After public void tearDown() throws Exception { executorService.shutdown(); diff --git a/okio/src/jvmTest/java/okio/WaitUntilNotifiedTest.java b/okio/src/jvmTest/java/okio/WaitUntilNotifiedTest.java index e440528960..a3e11f222f 100644 --- a/okio/src/jvmTest/java/okio/WaitUntilNotifiedTest.java +++ b/okio/src/jvmTest/java/okio/WaitUntilNotifiedTest.java @@ -27,7 +27,7 @@ import static org.junit.Assert.fail; public final class WaitUntilNotifiedTest { - final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(0); + final ScheduledExecutorService executorService = TestingExecutors.INSTANCE.newScheduledExecutorService(0); @After public void tearDown() { executorService.shutdown(); diff --git a/okio/src/jvmTest/kotlin/okio/PipeKotlinTest.kt b/okio/src/jvmTest/kotlin/okio/PipeKotlinTest.kt index 3a41e7422b..76eb3d4021 100644 --- a/okio/src/jvmTest/kotlin/okio/PipeKotlinTest.kt +++ b/okio/src/jvmTest/kotlin/okio/PipeKotlinTest.kt @@ -15,6 +15,10 @@ */ package okio +import java.io.IOException +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit +import kotlin.test.assertFailsWith import org.junit.After import org.junit.Assert.assertEquals import org.junit.Assert.assertFalse @@ -22,17 +26,12 @@ import org.junit.Assert.assertTrue import org.junit.Assert.fail import org.junit.Rule import org.junit.Test -import java.io.IOException -import java.util.concurrent.CountDownLatch -import java.util.concurrent.Executors -import java.util.concurrent.TimeUnit -import kotlin.test.assertFailsWith import org.junit.rules.Timeout as JUnitTimeout class PipeKotlinTest { @JvmField @Rule val timeout = JUnitTimeout(5, TimeUnit.SECONDS) - private val executorService = Executors.newScheduledThreadPool(1) + private val executorService = TestingExecutors.newScheduledExecutorService(1) @After @Throws(Exception::class) fun tearDown() { diff --git a/okio/src/jvmTest/kotlin/okio/ThrottlerTest.kt b/okio/src/jvmTest/kotlin/okio/ThrottlerTest.kt index 0b151794d8..baa487c16c 100644 --- a/okio/src/jvmTest/kotlin/okio/ThrottlerTest.kt +++ b/okio/src/jvmTest/kotlin/okio/ThrottlerTest.kt @@ -15,12 +15,11 @@ */ package okio +import kotlin.test.Ignore import okio.TestUtil.randomSource import org.junit.After import org.junit.Before import org.junit.Test -import java.util.concurrent.Executors -import kotlin.test.Ignore @Ignore("These tests are flaky and fail on slower hardware, need to be improved") class ThrottlerTest { @@ -31,7 +30,7 @@ class ThrottlerTest { private val throttlerSlow = Throttler() private val threads = 4 - private val executorService = Executors.newFixedThreadPool(threads) + private val executorService = TestingExecutors.newExecutorService(threads) private var stopwatch = Stopwatch() @Before fun setup() { diff --git a/okio/src/jvmTest/kotlin/okio/TimeoutTest.kt b/okio/src/jvmTest/kotlin/okio/TimeoutTest.kt index 49fb81e2e1..ac091142c1 100644 --- a/okio/src/jvmTest/kotlin/okio/TimeoutTest.kt +++ b/okio/src/jvmTest/kotlin/okio/TimeoutTest.kt @@ -15,19 +15,18 @@ */ package okio +import java.util.concurrent.TimeUnit import org.junit.After import org.junit.Assert.assertEquals import org.junit.Assert.assertFalse import org.junit.Rule import org.junit.Test -import java.util.concurrent.Executors -import java.util.concurrent.TimeUnit import org.junit.rules.Timeout as JUnitTimeout class TimeoutTest { @JvmField @Rule val timeout = JUnitTimeout(5, TimeUnit.SECONDS) - private val executorService = Executors.newScheduledThreadPool(1) + private val executorService = TestingExecutors.newExecutorService(1) @After @Throws(Exception::class) fun tearDown() { From ff74f809a63e44e053d9b962523eb82cecc36afb Mon Sep 17 00:00:00 2001 From: Yuri Schimke Date: Mon, 2 Jan 2023 19:25:33 +1000 Subject: [PATCH 08/15] Run loom tests --- .github/workflows/build.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index f2504b73f1..026a645e8f 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -94,7 +94,7 @@ jobs: - name: Test run: | - ./gradlew -Dloom=true build + ./gradlew build publish: runs-on: macos-11 From 59230719b3c65a9d60e7b42920af4ed7be90ad60 Mon Sep 17 00:00:00 2001 From: Yuri Schimke Date: Mon, 2 Jan 2023 19:31:08 +1000 Subject: [PATCH 09/15] Cleanup --- okio/src/jvmTest/java/okio/AwaitSignalTest.kt | 8 ++++---- okio/src/jvmTest/kotlin/okio/PipeKotlinTest.kt | 8 ++++---- okio/src/jvmTest/kotlin/okio/ThrottlerTest.kt | 2 +- okio/src/jvmTest/kotlin/okio/TimeoutTest.kt | 2 +- 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/okio/src/jvmTest/java/okio/AwaitSignalTest.kt b/okio/src/jvmTest/java/okio/AwaitSignalTest.kt index 779ee9f1b2..05bf9db90f 100644 --- a/okio/src/jvmTest/java/okio/AwaitSignalTest.kt +++ b/okio/src/jvmTest/java/okio/AwaitSignalTest.kt @@ -15,14 +15,14 @@ */ package okio -import java.io.InterruptedIOException -import java.util.concurrent.TimeUnit -import java.util.concurrent.locks.Condition -import java.util.concurrent.locks.ReentrantLock import okio.TestUtil.assumeNotWindows import org.junit.After import org.junit.Assert import org.junit.Test +import java.io.InterruptedIOException +import java.util.concurrent.TimeUnit +import java.util.concurrent.locks.Condition +import java.util.concurrent.locks.ReentrantLock class AwaitSignalTest { val executorService = TestingExecutors.newScheduledExecutorService(0) diff --git a/okio/src/jvmTest/kotlin/okio/PipeKotlinTest.kt b/okio/src/jvmTest/kotlin/okio/PipeKotlinTest.kt index 76eb3d4021..166c98e67f 100644 --- a/okio/src/jvmTest/kotlin/okio/PipeKotlinTest.kt +++ b/okio/src/jvmTest/kotlin/okio/PipeKotlinTest.kt @@ -15,10 +15,6 @@ */ package okio -import java.io.IOException -import java.util.concurrent.CountDownLatch -import java.util.concurrent.TimeUnit -import kotlin.test.assertFailsWith import org.junit.After import org.junit.Assert.assertEquals import org.junit.Assert.assertFalse @@ -26,6 +22,10 @@ import org.junit.Assert.assertTrue import org.junit.Assert.fail import org.junit.Rule import org.junit.Test +import java.io.IOException +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit +import kotlin.test.assertFailsWith import org.junit.rules.Timeout as JUnitTimeout class PipeKotlinTest { diff --git a/okio/src/jvmTest/kotlin/okio/ThrottlerTest.kt b/okio/src/jvmTest/kotlin/okio/ThrottlerTest.kt index baa487c16c..cfda2141d8 100644 --- a/okio/src/jvmTest/kotlin/okio/ThrottlerTest.kt +++ b/okio/src/jvmTest/kotlin/okio/ThrottlerTest.kt @@ -15,11 +15,11 @@ */ package okio -import kotlin.test.Ignore import okio.TestUtil.randomSource import org.junit.After import org.junit.Before import org.junit.Test +import kotlin.test.Ignore @Ignore("These tests are flaky and fail on slower hardware, need to be improved") class ThrottlerTest { diff --git a/okio/src/jvmTest/kotlin/okio/TimeoutTest.kt b/okio/src/jvmTest/kotlin/okio/TimeoutTest.kt index ac091142c1..9bf96f33fa 100644 --- a/okio/src/jvmTest/kotlin/okio/TimeoutTest.kt +++ b/okio/src/jvmTest/kotlin/okio/TimeoutTest.kt @@ -15,12 +15,12 @@ */ package okio -import java.util.concurrent.TimeUnit import org.junit.After import org.junit.Assert.assertEquals import org.junit.Assert.assertFalse import org.junit.Rule import org.junit.Test +import java.util.concurrent.TimeUnit import org.junit.rules.Timeout as JUnitTimeout class TimeoutTest { From a67f4fc69115ae7c2d50357c156264031ff44cbc Mon Sep 17 00:00:00 2001 From: Yuri Schimke Date: Thu, 5 Jan 2023 09:10:01 +1000 Subject: [PATCH 10/15] Update okio/src/jvmTest/java/okio/AwaitSignalTest.kt Co-authored-by: Jesse Wilson --- okio/src/jvmTest/java/okio/AwaitSignalTest.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/okio/src/jvmTest/java/okio/AwaitSignalTest.kt b/okio/src/jvmTest/java/okio/AwaitSignalTest.kt index 05bf9db90f..b61ee4b0f2 100644 --- a/okio/src/jvmTest/java/okio/AwaitSignalTest.kt +++ b/okio/src/jvmTest/java/okio/AwaitSignalTest.kt @@ -1,5 +1,5 @@ /* - * Copyright (C) 2016 Square, Inc. + * Copyright (C) 2023 Block Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. From bfd07955212c0486cfac9250cb61e5c58f065c17 Mon Sep 17 00:00:00 2001 From: Yuri Schimke Date: Thu, 5 Jan 2023 09:10:20 +1000 Subject: [PATCH 11/15] Update okio/src/jvmMain/kotlin/okio/Timeout.kt Co-authored-by: Jesse Wilson --- okio/src/jvmMain/kotlin/okio/Timeout.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/okio/src/jvmMain/kotlin/okio/Timeout.kt b/okio/src/jvmMain/kotlin/okio/Timeout.kt index d7beab8bdb..9f09181653 100644 --- a/okio/src/jvmMain/kotlin/okio/Timeout.kt +++ b/okio/src/jvmMain/kotlin/okio/Timeout.kt @@ -105,7 +105,7 @@ actual open class Timeout { } /** - * Waits on `monitor` until it is notified. Throws [InterruptedIOException] if either the thread + * Waits on `monitor` until it is signaled. Throws [InterruptedIOException] if either the thread * is interrupted or if this timeout elapses before `monitor` is notified. * The caller must hold the lock that monitor is bound to. * From 760daa83acebe3e702e8fd416cc1ab1f83627a2e Mon Sep 17 00:00:00 2001 From: Yuri Schimke Date: Thu, 5 Jan 2023 09:10:34 +1000 Subject: [PATCH 12/15] Update okio/src/jvmTest/java/okio/AwaitSignalTest.kt Co-authored-by: Jesse Wilson --- okio/src/jvmTest/java/okio/AwaitSignalTest.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/okio/src/jvmTest/java/okio/AwaitSignalTest.kt b/okio/src/jvmTest/java/okio/AwaitSignalTest.kt index b61ee4b0f2..78e5d782a2 100644 --- a/okio/src/jvmTest/java/okio/AwaitSignalTest.kt +++ b/okio/src/jvmTest/java/okio/AwaitSignalTest.kt @@ -36,7 +36,7 @@ class AwaitSignalTest { } @Test - fun notified() = lock.withLock { + fun signaled() = lock.withLock { val timeout = Timeout() timeout.timeout(5000, TimeUnit.MILLISECONDS) val start = now() From 5335124d60db4195a4f90cb57516ebc732f44080 Mon Sep 17 00:00:00 2001 From: Yuri Schimke Date: Thu, 5 Jan 2023 09:10:46 +1000 Subject: [PATCH 13/15] Update okio/src/jvmMain/kotlin/okio/Timeout.kt Co-authored-by: Jesse Wilson --- okio/src/jvmMain/kotlin/okio/Timeout.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/okio/src/jvmMain/kotlin/okio/Timeout.kt b/okio/src/jvmMain/kotlin/okio/Timeout.kt index 9f09181653..0773f98791 100644 --- a/okio/src/jvmMain/kotlin/okio/Timeout.kt +++ b/okio/src/jvmMain/kotlin/okio/Timeout.kt @@ -109,7 +109,7 @@ actual open class Timeout { * is interrupted or if this timeout elapses before `monitor` is notified. * The caller must hold the lock that monitor is bound to. * - * Here's a sample class that uses `waitUntilNotified()` to await a specific state. Note that the + * Here's a sample class that uses `awaitSignal()` to await a specific state. Note that the * call is made within a loop to avoid unnecessary waiting and to mitigate spurious notifications. * ``` * class Dice { From 29667df8496b3ea185ec4fc589d1c3dffc86b43a Mon Sep 17 00:00:00 2001 From: Yuri Schimke Date: Thu, 5 Jan 2023 09:10:55 +1000 Subject: [PATCH 14/15] Update okio/src/jvmMain/kotlin/okio/Timeout.kt Co-authored-by: Jesse Wilson --- okio/src/jvmMain/kotlin/okio/Timeout.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/okio/src/jvmMain/kotlin/okio/Timeout.kt b/okio/src/jvmMain/kotlin/okio/Timeout.kt index 0773f98791..e9a9302697 100644 --- a/okio/src/jvmMain/kotlin/okio/Timeout.kt +++ b/okio/src/jvmMain/kotlin/okio/Timeout.kt @@ -106,7 +106,7 @@ actual open class Timeout { /** * Waits on `monitor` until it is signaled. Throws [InterruptedIOException] if either the thread - * is interrupted or if this timeout elapses before `monitor` is notified. + * is interrupted or if this timeout elapses before `monitor` is signaled. * The caller must hold the lock that monitor is bound to. * * Here's a sample class that uses `awaitSignal()` to await a specific state. Note that the From 2d530d5e021f6bf04a062e3048269104e64fb413 Mon Sep 17 00:00:00 2001 From: Yuri Schimke Date: Thu, 5 Jan 2023 09:14:41 +1000 Subject: [PATCH 15/15] Review comments --- okio/src/jvmTest/java/okio/PipeTest.java | 100 ++++++++++++++--------- 1 file changed, 63 insertions(+), 37 deletions(-) diff --git a/okio/src/jvmTest/java/okio/PipeTest.java b/okio/src/jvmTest/java/okio/PipeTest.java index a56daadc85..7c1e1df644 100644 --- a/okio/src/jvmTest/java/okio/PipeTest.java +++ b/okio/src/jvmTest/java/okio/PipeTest.java @@ -15,26 +15,29 @@ */ package okio; +import org.junit.After; +import org.junit.Test; + import java.io.IOException; import java.io.InterruptedIOException; import java.util.Random; -import java.util.concurrent.*; - -import org.junit.After; -import org.junit.Test; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.junit.Assert.*; public final class PipeTest { final ScheduledExecutorService executorService = TestingExecutors.INSTANCE.newScheduledExecutorService(2); - @After public void tearDown() throws Exception { + @After + public void tearDown() throws Exception { executorService.shutdown(); } - @Test public void test() throws Exception { + @Test + public void test() throws Exception { Pipe pipe = new Pipe(6); pipe.sink().write(new Buffer().writeUtf8("abc"), 3L); @@ -53,14 +56,16 @@ public final class PipeTest { * A producer writes the first 16 MiB of bytes generated by {@code new Random(0)} to a sink, and a * consumer consumes them. Both compute hashes of their data to confirm that they're as expected. */ - @Test public void largeDataset() throws Exception { + @Test + public void largeDataset() throws Exception { final Pipe pipe = new Pipe(1000L); // An awkward size to force producer/consumer exchange. final long totalBytes = 16L * 1024L * 1024L; ByteString expectedHash = ByteString.decodeHex("7c3b224bea749086babe079360cf29f98d88262d"); // Write data to the sink. Future sinkHash = executorService.submit(new Callable() { - @Override public ByteString call() throws Exception { + @Override + public ByteString call() throws Exception { HashingSink hashingSink = HashingSink.sha1(pipe.sink()); Random random = new Random(0); byte[] data = new byte[8192]; @@ -79,7 +84,8 @@ public final class PipeTest { // Read data from the source. Future sourceHash = executorService.submit(new Callable() { - @Override public ByteString call() throws Exception { + @Override + public ByteString call() throws Exception { Buffer blackhole = new Buffer(); HashingSink hashingSink = HashingSink.sha1(blackhole); @@ -98,7 +104,8 @@ public final class PipeTest { assertEquals(expectedHash, sourceHash.get()); } - @Test public void sinkTimeout() throws Exception { + @Test + public void sinkTimeout() throws Exception { TestUtil.INSTANCE.assumeNotWindows(); Pipe pipe = new Pipe(3); @@ -118,7 +125,8 @@ public final class PipeTest { assertEquals("abc", readBuffer.readUtf8()); } - @Test public void sourceTimeout() throws Exception { + @Test + public void sourceTimeout() throws Exception { TestUtil.INSTANCE.assumeNotWindows(); Pipe pipe = new Pipe(3L); @@ -139,8 +147,8 @@ public final class PipeTest { * The writer is writing 12 bytes as fast as it can to a 3 byte buffer. The reader alternates * sleeping 1000 ms, then reading 3 bytes. That should make for an approximate timeline like * this: - * - * 0: writer writes 'abc', blocks 0: reader sleeps until 1000 + *

+ * 0: writer writes 'abc', blocks 0: reader sleeps until 1000 * 1000: reader reads 'abc', sleeps until 2000 * 1000: writer writes 'def', blocks * 2000: reader reads 'def', sleeps until 3000 @@ -148,13 +156,15 @@ public final class PipeTest { * 3000: reader reads 'ghi', sleeps until 4000 * 3000: writer writes 'jkl', returns * 4000: reader reads 'jkl', returns - * + *

* Because the writer is writing to a buffer, it finishes before the reader does. */ - @Test public void sinkBlocksOnSlowReader() throws Exception { + @Test + public void sinkBlocksOnSlowReader() throws Exception { final Pipe pipe = new Pipe(3L); executorService.execute(new Runnable() { - @Override public void run() { + @Override + public void run() { try { Buffer buffer = new Buffer(); Thread.sleep(1000L); @@ -180,10 +190,12 @@ public final class PipeTest { assertElapsed(3000.0, start); } - @Test public void sinkWriteFailsByClosedReader() throws Exception { + @Test + public void sinkWriteFailsByClosedReader() throws Exception { final Pipe pipe = new Pipe(3L); executorService.schedule(new Runnable() { - @Override public void run() { + @Override + public void run() { try { pipe.source().close(); } catch (IOException e) { @@ -202,7 +214,8 @@ public final class PipeTest { } } - @Test public void sinkFlushDoesntWaitForReader() throws Exception { + @Test + public void sinkFlushDoesntWaitForReader() throws Exception { Pipe pipe = new Pipe(100L); pipe.sink().write(new Buffer().writeUtf8("abc"), 3); pipe.sink().flush(); @@ -211,7 +224,8 @@ public final class PipeTest { assertEquals("abc", bufferedSource.readUtf8(3)); } - @Test public void sinkFlushFailsIfReaderIsClosedBeforeAllDataIsRead() throws Exception { + @Test + public void sinkFlushFailsIfReaderIsClosedBeforeAllDataIsRead() throws Exception { Pipe pipe = new Pipe(100L); pipe.sink().write(new Buffer().writeUtf8("abc"), 3); pipe.source().close(); @@ -223,7 +237,8 @@ public final class PipeTest { } } - @Test public void sinkCloseFailsIfReaderIsClosedBeforeAllDataIsRead() throws Exception { + @Test + public void sinkCloseFailsIfReaderIsClosedBeforeAllDataIsRead() throws Exception { Pipe pipe = new Pipe(100L); pipe.sink().write(new Buffer().writeUtf8("abc"), 3); pipe.source().close(); @@ -235,7 +250,8 @@ public final class PipeTest { } } - @Test public void sinkClose() throws Exception { + @Test + public void sinkClose() throws Exception { Pipe pipe = new Pipe(100L); pipe.sink().close(); try { @@ -252,13 +268,15 @@ public final class PipeTest { } } - @Test public void sinkMultipleClose() throws Exception { + @Test + public void sinkMultipleClose() throws Exception { Pipe pipe = new Pipe(100L); pipe.sink().close(); pipe.sink().close(); } - @Test public void sinkCloseDoesntWaitForSourceRead() throws Exception { + @Test + public void sinkCloseDoesntWaitForSourceRead() throws Exception { Pipe pipe = new Pipe(100L); pipe.sink().write(new Buffer().writeUtf8("abc"), 3); pipe.sink().close(); @@ -268,7 +286,8 @@ public final class PipeTest { assertTrue(bufferedSource.exhausted()); } - @Test public void sourceClose() throws Exception { + @Test + public void sourceClose() throws Exception { Pipe pipe = new Pipe(100L); pipe.source().close(); try { @@ -279,16 +298,19 @@ public final class PipeTest { } } - @Test public void sourceMultipleClose() throws Exception { + @Test + public void sourceMultipleClose() throws Exception { Pipe pipe = new Pipe(100L); pipe.source().close(); pipe.source().close(); } - @Test public void sourceReadUnblockedByClosedSink() throws Exception { + @Test + public void sourceReadUnblockedByClosedSink() throws Exception { final Pipe pipe = new Pipe(3L); executorService.schedule(new Runnable() { - @Override public void run() { + @Override + public void run() { try { pipe.sink().close(); } catch (IOException e) { @@ -307,9 +329,9 @@ public final class PipeTest { /** * The writer has 12 bytes to write. It alternates sleeping 1000 ms, then writing 3 bytes. The * reader is reading as fast as it can. That should make for an approximate timeline like this: - * - * 0: writer sleeps until 1000 - * 0: reader blocks + *

+ * 0: writer sleeps until 1000 + * 0: reader blocks * 1000: writer writes 'abc', sleeps until 2000 * 1000: reader reads 'abc' * 2000: writer writes 'def', sleeps until 3000 @@ -319,10 +341,12 @@ public final class PipeTest { * 4000: writer writes 'jkl', returns * 4000: reader reads 'jkl', returns */ - @Test public void sourceBlocksOnSlowWriter() throws Exception { + @Test + public void sourceBlocksOnSlowWriter() throws Exception { final Pipe pipe = new Pipe(100L); executorService.execute(new Runnable() { - @Override public void run() { + @Override + public void run() { try { Thread.sleep(1000L); pipe.sink().write(new Buffer().writeUtf8("abc"), 3); @@ -358,7 +382,9 @@ public final class PipeTest { assertElapsed(4000.0, start); } - /** Returns the nanotime in milliseconds as a double for measuring timeouts. */ + /** + * Returns the nanotime in milliseconds as a double for measuring timeouts. + */ private double now() { return System.nanoTime() / 1000000.0d; }