Skip to content

Commit

Permalink
Coroutines in Okio.
Browse files Browse the repository at this point in the history
This is a work in progress.
  • Loading branch information
swankjesse committed Nov 3, 2018
1 parent d8acaa0 commit afae35b
Show file tree
Hide file tree
Showing 15 changed files with 279 additions and 6 deletions.
10 changes: 7 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
buildscript {
ext.versions = [
'kotlin': '1.2.60',
'kotlinNative': '0.8.2',
'kotlin': '1.3.0',
'kotlinNative': '1.3.0',
'kotlinCoroutinesCore': '1.0.0',
'jmhPlugin': '0.4.5',
'animalSnifferPlugin': '1.4.3',
'dokka': '0.9.16',
Expand Down Expand Up @@ -32,9 +33,12 @@ buildscript {
'js': "org.jetbrains.kotlin:kotlin-test-js",
],
'native': [
'gradlePlugin': "org.jetbrains.kotlin:kotlin-native-gradle-plugin:${versions.kotlinNative}",
'gradlePlugin': "org.jetbrains.kotlin:kotlin-native-gradle-plugin:${versions.kotlin}",
]
],
'kotlinx': [
'coroutinesCore': "org.jetbrains.kotlinx:kotlinx-coroutines-core:${versions.kotlinCoroutinesCore}",
],
'jmh': [
'gradlePlugin': "me.champeau.gradle:jmh-gradle-plugin:${versions.jmhPlugin}",
'core': "org.openjdk.jmh:jmh-core:${versions.jmh}",
Expand Down
18 changes: 16 additions & 2 deletions okio/jvm/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,21 @@ apply plugin: 'me.champeau.gradle.jmh'
apply plugin: 'com.vanniktech.maven.publish'
apply plugin: 'org.jetbrains.dokka'

sourceCompatibility = JavaVersion.VERSION_1_7
targetCompatibility = JavaVersion.VERSION_1_7
sourceCompatibility = JavaVersion.VERSION_1_8
targetCompatibility = JavaVersion.VERSION_1_8

compileKotlin {
kotlinOptions {
jvmTarget = "1.8"
freeCompilerArgs = ['-Xjvm-default=enable']
}
}
compileTestKotlin {
kotlinOptions {
jvmTarget = "1.8"
freeCompilerArgs = ['-Xjvm-default=enable']
}
}

jar {
manifest {
Expand Down Expand Up @@ -51,6 +64,7 @@ dependencies {
expectedBy project(':okio')

implementation deps.kotlin.stdLib.jdk6
implementation deps.kotlinx.coroutinesCore
compileOnly deps.animalSniffer.annotations
testImplementation deps.test.junit
testImplementation deps.test.assertj
Expand Down
24 changes: 24 additions & 0 deletions okio/jvm/src/main/java/okio/Buffer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -72,17 +72,25 @@ class Buffer : BufferedSource, BufferedSink, Cloneable, ByteChannel {

override fun emitCompleteSegments() = this // Nowhere to emit to!

override suspend fun emitCompleteSegmentsAsync() = emitCompleteSegments()

override fun emit() = this // Nowhere to emit to!

override fun exhausted() = size == 0L

override suspend fun exhaustedAsync() = exhausted()

@Throws(EOFException::class)
override fun require(byteCount: Long) {
if (size < byteCount) throw EOFException()
}

override suspend fun requireAsync(byteCount: Long) = require(byteCount)

override fun request(byteCount: Long) = size >= byteCount

override suspend fun requestAsync(byteCount: Long) = request(byteCount)

override fun peek(): BufferedSource {
return PeekSource(this).buffer()
}
Expand Down Expand Up @@ -650,6 +658,14 @@ class Buffer : BufferedSource, BufferedSink, Cloneable, ByteChannel {
return byteCount
}

override suspend fun readAllAsync(sink: Sink): Long {
val byteCount = size
if (byteCount > 0L) {
sink.writeAsync(this, byteCount)
}
return byteCount
}

override fun readUtf8() = readString(size, Charsets.UTF_8)

@Throws(EOFException::class)
Expand Down Expand Up @@ -1376,6 +1392,8 @@ class Buffer : BufferedSource, BufferedSink, Cloneable, ByteChannel {
}
}

override suspend fun writeAsync(source: Buffer, byteCount: Long) = write(source, byteCount)

override fun read(sink: Buffer, byteCount: Long): Long {
var byteCount = byteCount
require(byteCount >= 0) { "byteCount < 0: $byteCount" }
Expand All @@ -1385,6 +1403,8 @@ class Buffer : BufferedSource, BufferedSink, Cloneable, ByteChannel {
return byteCount
}

override suspend fun readAsync(sink: Buffer, byteCount: Long) = read(sink, byteCount)

override fun indexOf(b: Byte) = indexOf(b, 0, Long.MAX_VALUE)

/**
Expand Down Expand Up @@ -1616,10 +1636,14 @@ class Buffer : BufferedSource, BufferedSink, Cloneable, ByteChannel {

override fun flush() {}

override suspend fun flushAsync() {}

override fun isOpen() = true

override fun close() {}

override suspend fun closeAsync() {}

override fun timeout() = Timeout.NONE

/** Returns the 128-bit MD5 hash of this buffer. */
Expand Down
2 changes: 2 additions & 0 deletions okio/jvm/src/main/java/okio/BufferedSink.kt
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,8 @@ interface BufferedSink : Sink, WritableByteChannel {
@Throws(IOException::class)
fun emitCompleteSegments(): BufferedSink

suspend fun emitCompleteSegmentsAsync(): BufferedSink

/** Returns an output stream that writes to this sink. */
fun outputStream(): OutputStream
}
11 changes: 10 additions & 1 deletion okio/jvm/src/main/java/okio/BufferedSource.kt
Original file line number Diff line number Diff line change
Expand Up @@ -43,20 +43,26 @@ interface BufferedSource : Source, ReadableByteChannel {
@Throws(IOException::class)
fun exhausted(): Boolean

/**
suspend fun exhaustedAsync(): Boolean

/**
* Returns when the buffer contains at least `byteCount` bytes. Throws an
* [java.io.EOFException] if the source is exhausted before the required bytes can be read.
*/
@Throws(IOException::class)
fun require(byteCount: Long)

suspend fun requireAsync(byteCount: Long)

/**
* Returns true when the buffer contains at least `byteCount` bytes, expanding it as
* necessary. Returns false if the source is exhausted before the requested bytes can be read.
*/
@Throws(IOException::class)
fun request(byteCount: Long): Boolean

suspend fun requestAsync(byteCount: Long): Boolean

/** Removes a byte from this source and returns it. */
@Throws(IOException::class)
fun readByte(): Byte
Expand Down Expand Up @@ -339,6 +345,9 @@ interface BufferedSource : Source, ReadableByteChannel {
@Throws(IOException::class)
fun readAll(sink: Sink): Long

@Throws(IOException::class)
suspend fun readAllAsync(sink: Sink): Long

/**
* Removes all bytes from this, decodes them as UTF-8, and returns the string. Returns the empty
* string if this source is empty.
Expand Down
10 changes: 10 additions & 0 deletions okio/jvm/src/main/java/okio/ForwardingSink.kt
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,24 @@ abstract class ForwardingSink(
@Throws(IOException::class)
override fun write(source: Buffer, byteCount: Long) = delegate.write(source, byteCount)

@Throws(IOException::class)
override suspend fun writeAsync(source: Buffer, byteCount: Long) =
delegate.writeAsync(source, byteCount)

@Throws(IOException::class)
override fun flush() = delegate.flush()

@Throws(IOException::class)
override suspend fun flushAsync() = delegate.flushAsync()

override fun timeout() = delegate.timeout()

@Throws(IOException::class)
override fun close() = delegate.close()

@Throws(IOException::class)
override suspend fun closeAsync() = delegate.closeAsync()

override fun toString() = "${javaClass.simpleName}($delegate)"

@JvmName("-deprecated_delegate")
Expand Down
5 changes: 5 additions & 0 deletions okio/jvm/src/main/java/okio/ForwardingSource.kt
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,16 @@ abstract class ForwardingSource(
@Throws(IOException::class)
override fun read(sink: Buffer, byteCount: Long): Long = delegate.read(sink, byteCount)

override suspend fun readAsync(sink: Buffer, byteCount: Long) =
delegate.readAsync(sink, byteCount)

override fun timeout() = delegate.timeout()

@Throws(IOException::class)
override fun close() = delegate.close()

override suspend fun closeAsync() = delegate.closeAsync()

override fun toString() = "${javaClass.simpleName}($delegate)"

@JvmName("-deprecated_delegate")
Expand Down
3 changes: 3 additions & 0 deletions okio/jvm/src/main/java/okio/Okio.kt
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,12 @@ fun blackholeSink(): Sink = BlackholeSink()

private class BlackholeSink : Sink {
override fun write(source: Buffer, byteCount: Long) = source.skip(byteCount)
override suspend fun writeAsync(source: Buffer, byteCount: Long) = write(source, byteCount)
override fun flush() {}
override suspend fun flushAsync() {}
override fun timeout() = Timeout.NONE
override fun close() {}
override suspend fun closeAsync() {}
}

/**
Expand Down
48 changes: 48 additions & 0 deletions okio/jvm/src/main/java/okio/RealBufferedSink.kt
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ internal class RealBufferedSink(
emitCompleteSegments()
}

override suspend fun writeAsync(source: Buffer, byteCount: Long) {
check(!closed) { "closed" }
buffer.write(source, byteCount)
emitCompleteSegmentsAsync()
}

override fun write(byteString: ByteString): BufferedSink {
check(!closed) { "closed" }
buffer.write(byteString)
Expand Down Expand Up @@ -176,12 +182,21 @@ internal class RealBufferedSink(
}

override fun emitCompleteSegments(): BufferedSink {
// TODO(jwilson): if async, avoid the blocking write unless we have many complete segments?

check(!closed) { "closed" }
val byteCount = buffer.completeSegmentByteCount()
if (byteCount > 0L) sink.write(buffer, byteCount)
return this
}

override suspend fun emitCompleteSegmentsAsync(): BufferedSink {
check(!closed) { "closed" }
val byteCount = buffer.completeSegmentByteCount()
if (byteCount > 0L) sink.writeAsync(buffer, byteCount)
return this
}

override fun emit(): BufferedSink {
check(!closed) { "closed" }
val byteCount = buffer.size
Expand Down Expand Up @@ -224,6 +239,14 @@ internal class RealBufferedSink(
sink.flush()
}

override suspend fun flushAsync() {
check(!closed) { "closed" }
if (buffer.size > 0L) {
sink.writeAsync(buffer, buffer.size)
}
sink.flushAsync()
}

override fun isOpen() = !closed

override fun close() {
Expand Down Expand Up @@ -251,6 +274,31 @@ internal class RealBufferedSink(
if (thrown != null) throw thrown
}

override suspend fun closeAsync() {
if (closed) return

// Emit buffered data to the underlying sink. If this fails, we still need
// to close the sink; otherwise we risk leaking resources.
var thrown: Throwable? = null
try {
if (buffer.size > 0) {
sink.writeAsync(buffer, buffer.size)
}
} catch (e: Throwable) {
thrown = e
}

try {
sink.closeAsync()
} catch (e: Throwable) {
if (thrown == null) thrown = e
}

closed = true

if (thrown != null) throw thrown
}

override fun timeout() = sink.timeout()

override fun toString() = "buffer($sink)"
Expand Down

0 comments on commit afae35b

Please sign in to comment.