Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce Timeout.waitUntil(monitor, condition: () -> Boolean) #811

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
26 changes: 13 additions & 13 deletions okio/src/jvmMain/kotlin/okio/Pipe.kt
Original file line number Diff line number Diff line change
Expand Up @@ -56,21 +56,20 @@ class Pipe(internal val maxBufferSize: Long) {
if (canceled) throw IOException("canceled")

while (byteCount > 0) {
timeout.waitUntil(buffer) {
maxBufferSize - buffer.size > 0L || canceled || sourceClosed || foldedSink != null
}

foldedSink?.let {
delegate = it
return@synchronized
}

if (sourceClosed) throw IOException("source is closed")
if (canceled) throw IOException("canceled")

val bufferSpaceAvailable = maxBufferSize - buffer.size
if (bufferSpaceAvailable == 0L) {
timeout.waitUntilNotified(buffer) // Wait until the source drains the buffer.
if (canceled) throw IOException("canceled")
continue
}
if (sourceClosed) throw IOException("source is closed")

val bytesToWrite = minOf(bufferSpaceAvailable, byteCount)
val bytesToWrite = minOf(maxBufferSize - buffer.size, byteCount)
buffer.write(source, bytesToWrite)
byteCount -= bytesToWrite
(buffer as Object).notifyAll() // Notify the source that it can resume reading.
Expand Down Expand Up @@ -127,14 +126,15 @@ class Pipe(internal val maxBufferSize: Long) {
override fun read(sink: Buffer, byteCount: Long): Long {
synchronized(buffer) {
check(!sourceClosed) { "closed" }
if (canceled) throw IOException("canceled")

while (buffer.size == 0L) {
if (sinkClosed) return -1L
timeout.waitUntilNotified(buffer) // Wait until the sink fills the buffer.
if (canceled) throw IOException("canceled")
// Wait until the sink fills the buffer.
timeout.waitUntil(buffer) {
buffer.size != 0L || canceled || sinkClosed
}

if (buffer.size == 0L && sinkClosed) return -1L
if (canceled) throw IOException("canceled")

val result = buffer.read(sink, byteCount)
(buffer as Object).notifyAll() // Notify the sink that it can resume writing.
return result
Expand Down
49 changes: 49 additions & 0 deletions okio/src/jvmMain/kotlin/okio/Timeout.kt
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,55 @@ actual open class Timeout {
}
}

/**
* Same as [waitUntilNotified] but with builtin support for spurious notifications and/or bad timer resolution.
*
* waitUntil will wait until either the timeout is reached or [condition] returns true
*
* This is especially useful for Pipes with no-deadline Timeouts that could wait forever on windows hosts where
* the wait returns a few nanoseconds early very often
*
* @param monitor: the object to wait on
* @param condition: a function that returns true if [waitUntil] should stop waiting
* ```
*/
@Throws(InterruptedIOException::class)
fun waitUntil(monitor: Any, condition: () -> Boolean) {
val hasDeadline = hasDeadline()
val timeoutNanos = timeoutNanos()

// Compute how long we'll wait.
val start = System.nanoTime()
val waitUntilNanos = if (hasDeadline) {
if (timeoutNanos != 0L) {
// take the deadline or the timeout, whichever comes first
minOf(deadlineNanoTime(), start + timeoutNanos)
} else {
deadlineNanoTime()
}
} else {
start + timeoutNanos
}

while (!condition()) {
try {
if (!hasDeadline && timeoutNanos == 0L) {
// There is no timeout: wait until notified
(monitor as Object).wait()
} else {
val waitNanos = waitUntilNanos - System.nanoTime()
if (waitNanos < 0) {
throw InterruptedIOException("timeout")
}
val waitMillis = waitNanos / 1000000L
(monitor as Object).wait(waitMillis, (waitNanos - waitMillis * 1000000L).toInt())
}
} catch (e: InterruptedException) {
// spurious wake up
}
}
}

/**
* Applies the minimum intersection between this timeout and `other`, run `block`, then finally
* rollback this timeout's values.
Expand Down
4 changes: 0 additions & 4 deletions okio/src/jvmTest/java/okio/PipeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,6 @@ public final class PipeTest {
}

@Test public void sinkTimeout() throws Exception {
TestUtil.INSTANCE.assumeNotWindows();

Pipe pipe = new Pipe(3);
pipe.sink().timeout().timeout(1000, TimeUnit.MILLISECONDS);
pipe.sink().write(new Buffer().writeUtf8("abc"), 3L);
Expand All @@ -122,8 +120,6 @@ public final class PipeTest {
}

@Test public void sourceTimeout() throws Exception {
TestUtil.INSTANCE.assumeNotWindows();

Pipe pipe = new Pipe(3L);
pipe.source().timeout().timeout(1000, TimeUnit.MILLISECONDS);
double start = now();
Expand Down