Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IO Streams #1901

Merged
merged 27 commits into from Jun 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
21 changes: 14 additions & 7 deletions benchmark/build.gradle
Expand Up @@ -13,19 +13,26 @@ sourceCompatibility = 1.8
targetCompatibility = 1.8
jmh.jmhVersion = "1.22"

processJmhResources {
doFirst {
duplicatesStrategy(DuplicatesStrategy.EXCLUDE)
}
}

jmhJar {
baseName 'benchmarks'
classifier = null
version = null
destinationDir = file("$rootDir")
archiveBaseName.set('benchmarks')
archiveVersion.set('')
destinationDirectory = file("$rootDir")
}

dependencies {
implementation 'org.openjdk.jmh:jmh-core:1.22'
implementation 'com.google.guava:guava:24.1.1-jre'
implementation 'com.fasterxml.jackson.core:jackson-databind:2.12.1'
implementation 'com.fasterxml.jackson.module:jackson-module-kotlin:2.12.1'
implementation 'com.google.guava:guava:31.1-jre'
implementation 'com.fasterxml.jackson.core:jackson-databind:2.13.3'
implementation 'com.fasterxml.jackson.module:jackson-module-kotlin:2.13.3'
implementation "com.squareup.okio:okio:$okio_version"
implementation project(':kotlinx-serialization-core')
implementation project(':kotlinx-serialization-json')
implementation project(':kotlinx-serialization-json-okio')
implementation project(':kotlinx-serialization-protobuf')
}
Expand Up @@ -4,7 +4,11 @@ import com.fasterxml.jackson.databind.*
import com.fasterxml.jackson.module.kotlin.*
import kotlinx.serialization.*
import kotlinx.serialization.json.*
import kotlinx.serialization.json.okio.encodeToSink
import okio.blackholeSink
import okio.buffer
import org.openjdk.jmh.annotations.*
import java.io.OutputStream
import java.util.concurrent.*

@Warmup(iterations = 7, time = 1)
Expand Down Expand Up @@ -63,6 +67,13 @@ open class JacksonComparisonBenchmark {
cookies = "_ga=GA1.2.971852807.1546968515"
)

private val devNullSink = blackholeSink().buffer()
private val devNullStream = object : OutputStream() {
override fun write(b: Int) {}
override fun write(b: ByteArray) {}
override fun write(b: ByteArray, off: Int, len: Int) {}
}

private val stringData = Json.encodeToString(DefaultPixelEvent.serializer(), data)

@Serializable
Expand All @@ -82,12 +93,24 @@ open class JacksonComparisonBenchmark {
@Benchmark
fun kotlinToString(): String = Json.encodeToString(DefaultPixelEvent.serializer(), data)

@Benchmark
fun kotlinToStream() = Json.encodeToStream(DefaultPixelEvent.serializer(), data, devNullStream)

@Benchmark
fun kotlinToOkio() = Json.encodeToSink(DefaultPixelEvent.serializer(), data, devNullSink)

@Benchmark
fun kotlinToStringWithEscapes(): String = Json.encodeToString(DefaultPixelEvent.serializer(), dataWithEscapes)

@Benchmark
fun kotlinSmallToString(): String = Json.encodeToString(SmallDataClass.serializer(), smallData)

@Benchmark
fun kotlinSmallToStream() = Json.encodeToStream(SmallDataClass.serializer(), smallData, devNullStream)

@Benchmark
fun kotlinSmallToOkio() = Json.encodeToSink(SmallDataClass.serializer(), smallData, devNullSink)

@Benchmark
fun jacksonFromString(): DefaultPixelEvent = objectMapper.readValue(stringData, DefaultPixelEvent::class.java)

Expand Down
Expand Up @@ -3,6 +3,7 @@ package kotlinx.benchmarks.json
import kotlinx.benchmarks.model.*
import kotlinx.serialization.json.*
import org.openjdk.jmh.annotations.*
import java.io.OutputStream
import java.util.concurrent.*

@Warmup(iterations = 7, time = 1)
Expand All @@ -24,6 +25,12 @@ open class TwitterBenchmark {

private val jsonImplicitNulls = Json { explicitNulls = false }

private val devNullStream = object : OutputStream() {
override fun write(b: Int) {}
override fun write(b: ByteArray) {}
override fun write(b: ByteArray, off: Int, len: Int) {}
}

@Setup
fun init() {
require(twitter == Json.decodeFromString(Twitter.serializer(), Json.encodeToString(Twitter.serializer(), twitter)))
Expand All @@ -38,4 +45,7 @@ open class TwitterBenchmark {

@Benchmark
fun encodeTwitter() = Json.encodeToString(Twitter.serializer(), twitter)

@Benchmark
fun encodeTwitterStream() = Json.encodeToStream(Twitter.serializer(), twitter, devNullStream)
}
4 changes: 3 additions & 1 deletion build.gradle
Expand Up @@ -144,6 +144,8 @@ allprojects {
}
}

def unpublishedProjects = ["benchmark", "guide", "kotlinx-serialization-json-tests"] as Set

subprojects {
tasks.withType(org.jetbrains.kotlin.gradle.tasks.AbstractKotlinCompile).all { task ->
if (task.name.contains("Test") || task.name.contains("Jmh")) {
Expand All @@ -155,7 +157,7 @@ subprojects {

apply from: rootProject.file('gradle/teamcity.gradle')
// Configure publishing for some artifacts
if (project.name != "benchmark" && project.name != "guide") {
if (!unpublishedProjects.contains(project.name)) {
apply from: rootProject.file('gradle/publishing.gradle')
}

Expand Down
7 changes: 7 additions & 0 deletions formats/json-okio/api/kotlinx-serialization-json-okio.api
@@ -0,0 +1,7 @@
public final class kotlinx/serialization/json/okio/OkioStreamsKt {
public static final fun decodeFromSource (Lkotlinx/serialization/json/Json;Lkotlinx/serialization/DeserializationStrategy;Lokio/Source;)Ljava/lang/Object;
public static final fun decodeSourceToSequence (Lkotlinx/serialization/json/Json;Lokio/Source;Lkotlinx/serialization/DeserializationStrategy;Lkotlinx/serialization/json/DecodeSequenceMode;)Lkotlin/sequences/Sequence;
public static synthetic fun decodeSourceToSequence$default (Lkotlinx/serialization/json/Json;Lokio/Source;Lkotlinx/serialization/DeserializationStrategy;Lkotlinx/serialization/json/DecodeSequenceMode;ILjava/lang/Object;)Lkotlin/sequences/Sequence;
public static final fun encodeToSink (Lkotlinx/serialization/json/Json;Lkotlinx/serialization/SerializationStrategy;Ljava/lang/Object;Lokio/Sink;)V
}

31 changes: 31 additions & 0 deletions formats/json-okio/build.gradle.kts
@@ -0,0 +1,31 @@
/*
* Copyright 2017-2022 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
import Java9Modularity.configureJava9ModuleInfo

plugins {
kotlin("multiplatform")
kotlin("plugin.serialization")
}

apply(from = rootProject.file("gradle/native-targets.gradle"))
apply(from = rootProject.file("gradle/configure-source-sets.gradle"))

kotlin {
sourceSets {
val commonMain by getting {
dependencies {
api(project(":kotlinx-serialization-core"))
api(project(":kotlinx-serialization-json"))
implementation("com.squareup.okio:okio:${property("okio_version")}")
}
}
val commonTest by getting {
dependencies {
implementation("com.squareup.okio:okio:${property("okio_version")}")
}
}
}
}

project.configureJava9ModuleInfo()
@@ -0,0 +1,139 @@
/*
* Copyright 2017-2022 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.serialization.json.okio

import kotlinx.serialization.*
import kotlinx.serialization.json.DecodeSequenceMode
import kotlinx.serialization.json.Json
import kotlinx.serialization.json.internal.*
import kotlinx.serialization.json.okio.internal.JsonToOkioStreamWriter
import kotlinx.serialization.json.internal.decodeToSequenceByReader
import kotlinx.serialization.json.okio.internal.OkioSerialReader
import okio.*

/**
* Serializes the [value] with [serializer] into a [target] using JSON format and UTF-8 encoding.
*
* If [target] is not a [BufferedSink] then called [Sink.buffer] for it to create buffered wrapper.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I'm late to the party! Why not making the "buffer" requirement in the API itself?

public fun <T> Json.encodeToSink(
    serializer: SerializationStrategy<T>,
    value: T,
    target: BufferedSink
)

The current API fails if someone expects to be able to resume reading the stream after decoding an element.

See #1789 (comment) for a related discussion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For encoding this isn't a problem. You are always free to call buffer() and then emit() as implementation details.

The problem you are referring to, however, is for decoding. Decoding absolutely should take a BufferedSource and not a Source. This is what Moshi and Wire do as well.

https://github.com/square/moshi/blob/moshi-parent-1.13.0/moshi/src/main/java/com/squareup/moshi/JsonAdapter.java#L57

https://github.com/square/wire/blob/4.4.0/wire-library/wire-runtime/src/commonMain/kotlin/com/squareup/wire/ProtoAdapter.kt#L145

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right! Sorry for the confusion, I never realised how asymmetric this was 👍 . Maybe it's worth removing the "BufferSink" mention from the kdoc then? Since it's an implementation detail, the caller doesn't really need to know about it?

And in all cases, make the BufferedSource mandatory for decodeFromSource:

public fun <T> Json.decodeFromSource(
    deserializer: DeserializationStrategy<T>,
    source: BufferedSource
): T

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree on both points!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your suggestions! Looks like we probably need to change existing functions working with Java streams applying the same logic.

However, this may lead to some asymmetry in the API. What do you think would be better — change both encodeToSink/decodeFromSource so they accept buffered versions or leave encodeToSink intact?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change both encodeToSink/decodeFromSource so they accept buffered versions or leave encodeToSink intact?

No strong opinion from me. Both Moshi and Wire use BufferedSink so I'd lean towards BufferedSink here as well for consistency/aesthetics reasons.

Since the sink is ending up being buffered in all cases, it's pretty easy to let the caller call .buffer() if required. Plus it might even save an emit() when writing multiple Json to the same BufferedSink (So tiny performance gain maybe...). But if there's a use case for passing a raw Sink, that works with me too.

*
* @throws [SerializationException] if the given value cannot be serialized to JSON.
* @throws [okio.IOException] If an I/O error occurs and sink can't be written to.
*/
@ExperimentalSerializationApi
public fun <T> Json.encodeToSink(
serializer: SerializationStrategy<T>,
value: T,
target: Sink
) {
val buffered = if (target is BufferedSink) target else target.buffer()
val writer = JsonToOkioStreamWriter(buffered)
try {
encodeByWriter(writer, serializer, value)
} finally {
writer.release()
}
}

/**
* Serializes given [value] to a [target] using UTF-8 encoding and serializer retrieved from the reified type parameter.
*
* If [target] is not a [BufferedSink] then called [Sink.buffer] for it to create buffered wrapper.
*
* @throws [SerializationException] if the given value cannot be serialized to JSON.
shanshin marked this conversation as resolved.
Show resolved Hide resolved
* @throws [okio.IOException] If an I/O error occurs and sink can't be written to.
*/
@ExperimentalSerializationApi
public inline fun <reified T> Json.encodeToSink(
value: T,
target: Sink
): Unit = encodeToSink(serializersModule.serializer(), value, target)


/**
* Deserializes JSON from [source] using UTF-8 encoding to a value of type [T] using [deserializer].
*
* If [source] is not a [BufferedSource] then called [Source.buffer] for it to create buffered wrapper.
*
* Note that this functions expects that exactly one object would be present in the source
* and throws an exception if there are any dangling bytes after an object.
*
* @throws [SerializationException] if the given JSON input cannot be deserialized to the value of type [T].
* @throws [okio.IOException] If an I/O error occurs and source can't be read from.
*/
@ExperimentalSerializationApi
public fun <T> Json.decodeFromSource(
deserializer: DeserializationStrategy<T>,
source: Source
): T {
val buffered = if (source is BufferedSource) source else source.buffer()
return decodeByReader(deserializer, OkioSerialReader(buffered))
}

/**
* Deserializes the contents of given [source] to the value of type [T] using UTF-8 encoding and
* deserializer retrieved from the reified type parameter.
*
* If [source] is not a [BufferedSource] then called [Source.buffer] for it to create buffered wrapper.
*
* Note that this functions expects that exactly one object would be present in the stream
* and throws an exception if there are any dangling bytes after an object.
*
* @throws [SerializationException] if the given JSON input cannot be deserialized to the value of type [T].
* @throws [okio.IOException] If an I/O error occurs and source can't be read from.
*/
@ExperimentalSerializationApi
public inline fun <reified T> Json.decodeFromSource(source: Source): T =
decodeFromSource(serializersModule.serializer(), source)


/**
* Transforms the given [source] into lazily deserialized sequence of elements of type [T] using UTF-8 encoding and [deserializer].
* Unlike [decodeFromSource], [source] is allowed to have more than one element, separated as [format] declares.
*
* If [source] is not a [BufferedSource] then called [Source.buffer] for it to create buffered wrapper.
*
* Elements must all be of type [T].
* Elements are parsed lazily when resulting [Sequence] is evaluated.
* Resulting sequence is tied to the stream and can be evaluated only once.
*
* **Resource caution:** this method neither closes the [source] when the parsing is finished nor provides a method to close it manually.
* It is a caller responsibility to hold a reference to a stream and close it. Moreover, because stream is parsed lazily,
* closing it before returned sequence is evaluated completely will result in [Exception] from decoder.
*
* @throws [SerializationException] if the given JSON input cannot be deserialized to the value of type [T].
* @throws [okio.IOException] If an I/O error occurs and source can't be read from.
*/
@ExperimentalSerializationApi
public fun <T> Json.decodeSourceToSequence(
source: Source,
deserializer: DeserializationStrategy<T>,
format: DecodeSequenceMode = DecodeSequenceMode.AUTO_DETECT
): Sequence<T> {
val buffered = if (source is BufferedSource) source else source.buffer()
return decodeToSequenceByReader(OkioSerialReader(buffered), deserializer, format)
}

/**
* Transforms the given [source] into lazily deserialized sequence of elements of type [T] using UTF-8 encoding and deserializer retrieved from the reified type parameter.
* Unlike [decodeFromSource], [source] is allowed to have more than one element, separated as [format] declares.
*
* If [source] is not a [BufferedSource] then called [Source.buffer] for it to create buffered wrapper.
*
* Elements must all be of type [T].
* Elements are parsed lazily when resulting [Sequence] is evaluated.
* Resulting sequence is tied to the stream and constrained to be evaluated only once.
*
* **Resource caution:** this method does not close [source] when the parsing is finished neither provides method to close it manually.
* It is a caller responsibility to hold a reference to a stream and close it. Moreover, because stream is parsed lazily,
* closing it before returned sequence is evaluated fully would result in [Exception] from decoder.
*
* @throws [SerializationException] if the given JSON input cannot be deserialized to the value of type [T].
* @throws [okio.IOException] If an I/O error occurs and source can't be read from.
*/
@ExperimentalSerializationApi
public inline fun <reified T> Json.decodeSourceToSequence(
source: Source,
format: DecodeSequenceMode = DecodeSequenceMode.AUTO_DETECT
): Sequence<T> = decodeSourceToSequence(source, serializersModule.serializer(), format)
@@ -0,0 +1,59 @@
/*
* Copyright 2017-2022 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

@file:Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")

package kotlinx.serialization.json.okio.internal

import kotlinx.serialization.json.internal.ESCAPE_STRINGS
import kotlinx.serialization.json.internal.JsonWriter
import kotlinx.serialization.json.internal.SerialReader
import okio.*

internal class JsonToOkioStreamWriter(private val target: BufferedSink) : JsonWriter {
override fun writeLong(value: Long) {
write(value.toString())
}

override fun writeChar(char: Char) {
target.writeUtf8CodePoint(char.code)
}

override fun write(text: String) {
target.writeUtf8(text)
}

override fun writeQuoted(text: String) {
target.writeUtf8CodePoint('"'.code)
var lastPos = 0
for (i in text.indices) {
val c = text[i].code
if (c < ESCAPE_STRINGS.size && ESCAPE_STRINGS[c] != null) {
target.writeUtf8(text, lastPos, i) // flush prev
target.writeUtf8(ESCAPE_STRINGS[c]!!)
lastPos = i + 1
}
}

if (lastPos != 0) target.writeUtf8(text, lastPos, text.length)
else target.writeUtf8(text)
target.writeUtf8CodePoint('"'.code)
}

override fun release() {
target.flush()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tempted to say that this should be a call to emit() which only pushes buffered data into the wrapped Sink allowing it to choose whether to write further down the chain. flush(), however, forces all of the bytes through the whole stack.

More here: https://jakewharton.com/forcing-bytes-downward-in-okio/

}
}

internal class OkioSerialReader(private val source: BufferedSource): SerialReader {
override fun read(buffer: CharArray, bufferOffset: Int, count: Int): Int {
var i = 0
while (i < count && !source.exhausted()) {
buffer[i] = source.readUtf8CodePoint().toChar()
i++
}
return if (i > 0) i else -1
}
}

1 change: 1 addition & 0 deletions formats/json-okio/gradle.properties
@@ -0,0 +1 @@
kotlin.mpp.enableCompatibilityMetadataVariant=false