Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed decoding of huge JSON data for okio streams #2007

Merged
merged 7 commits into from Aug 15, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -50,7 +50,7 @@ internal class OkioSerialReader(private val source: BufferedSource): SerialReade
override fun read(buffer: CharArray, bufferOffset: Int, count: Int): Int {
var i = 0
while (i < count && !source.exhausted()) {
buffer[i] = source.readUtf8CodePoint().toChar()
buffer[bufferOffset + i] = source.readUtf8CodePoint().toChar()
i++
}
return if (i > 0) i else -1
Expand Down
@@ -0,0 +1,40 @@
/*
* Copyright 2017-2022 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.serialization.json

import kotlinx.serialization.Serializable
import kotlin.test.Test

class JsonHugeDataSerializationTest : JsonTestBase() {

@Serializable
private data class Node(
val children: List<Node>
)

private fun createNodes(count: Int, depth: Int): List<Node> {
val ret = mutableListOf<Node>()
if (depth == 0) return ret
for (i in 0 until count) {
ret.add(Node(createNodes(1, depth - 1)))
}
return ret
}

@Test
fun test() {
// create some huge instance
val rootNode = Node(createNodes(1000, 10))

val expectedJson = Json.encodeToString(Node.serializer(), rootNode)

/*
The assertJsonFormAndRestored function, when checking the encoding, will call Json.encodeToString(...) for `JsonTestingMode.STREAMING`
since the string `expectedJson` was generated by the same function, the test will always consider
the encoding to the `STREAMING` mode is correct, even if there was actually an error there.
shanshin marked this conversation as resolved.
Show resolved Hide resolved
*/
assertJsonFormAndRestored(Node.serializer(), rootNode, expectedJson)
}
}
Expand Up @@ -14,32 +14,41 @@ private const val DEFAULT_THRESHOLD = 128
* For some reason this hand-rolled implementation is faster than
* fun ArrayAsSequence(s: CharArray): CharSequence = java.nio.CharBuffer.wrap(s, 0, length)
*/
private class ArrayAsSequence(private val source: CharArray) : CharSequence {
override val length: Int = source.size
private class ArrayAsSequence(val buffer: CharArray) : CharSequence {
override var length: Int = buffer.size

override fun get(index: Int): Char = source[index]
override fun get(index: Int): Char = buffer[index]

override fun subSequence(startIndex: Int, endIndex: Int): CharSequence {
return source.concatToString(startIndex, endIndex)
return buffer.concatToString(startIndex, minOf(endIndex, length))
}

fun substring(startIndex: Int, endIndex: Int): String {
return buffer.concatToString(startIndex, minOf(endIndex, length))
}

fun trim(newSize: Int) {
length = minOf(buffer.size, newSize)
}
}

internal class ReaderJsonLexer(
private val reader: SerialReader,
private var _source: CharArray = CharArray(BATCH_SIZE)
charsBuffer: CharArray = CharArray(BATCH_SIZE)
) : AbstractJsonLexer() {
private var threshold: Int = DEFAULT_THRESHOLD // chars
private val sourceIntern: ArrayAsSequence = ArrayAsSequence(charsBuffer)

override var source: CharSequence = ArrayAsSequence(_source)
override val source: CharSequence = sourceIntern
shanshin marked this conversation as resolved.
Show resolved Hide resolved

init {
preload(0)
}

override fun tryConsumeComma(): Boolean {
val current = skipWhitespaces()
if (current >= source.length || current == -1) return false
if (source[current] == ',') {
if (current >= sourceIntern.length || current == -1) return false
if (sourceIntern[current] == ',') {
++currentPosition
return true
}
Expand All @@ -52,7 +61,7 @@ internal class ReaderJsonLexer(
while (true) {
current = prefetchOrEof(current)
if (current == -1) break // could be inline function but KT-1436
val c = source[current]
val c = sourceIntern[current]
// Inlined skipWhitespaces without field spill and nested loop. Also faster then char2TokenClass
if (c == ' ' || c == '\n' || c == '\r' || c == '\t') {
++current
Expand All @@ -65,37 +74,37 @@ internal class ReaderJsonLexer(
return false
}

private fun preload(spaceLeft: Int) {
val buffer = _source
buffer.copyInto(buffer, 0, currentPosition, currentPosition + spaceLeft)
var read = spaceLeft
val sizeTotal = _source.size
while (read != sizeTotal) {
val actual = reader.read(buffer, read, sizeTotal - read)
private fun preload(unprocessedCount: Int) {
val buffer = sourceIntern.buffer
if (unprocessedCount != 0) {
buffer.copyInto(buffer, 0, currentPosition, currentPosition + unprocessedCount)
}
var filledCount = unprocessedCount
val sizeTotal = sourceIntern.length
while (filledCount != sizeTotal) {
val actual = reader.read(buffer, filledCount, sizeTotal - filledCount)
if (actual == -1) {
// EOF, resizing the array so it matches input size
// Can also be done by extracting source.length to a separate var
_source = _source.copyOf(read)
source = ArrayAsSequence(_source)
sourceIntern.trim(filledCount)
threshold = -1
break
}
read += actual
filledCount += actual
}
currentPosition = 0
}

override fun prefetchOrEof(position: Int): Int {
if (position < source.length) return position
if (position < sourceIntern.length) return position
currentPosition = position
ensureHaveChars()
if (currentPosition != 0 || source.isEmpty()) return -1 // if something was loaded, then it would be zero.
if (currentPosition != 0 || sourceIntern.isEmpty()) return -1 // if something was loaded, then it would be zero.
return 0
}

override fun consumeNextToken(): Byte {
ensureHaveChars()
val source = source
val source = sourceIntern
var cpos = currentPosition
while (true) {
cpos = prefetchOrEof(cpos)
Expand All @@ -115,7 +124,7 @@ internal class ReaderJsonLexer(

override fun ensureHaveChars() {
val cur = currentPosition
val oldSize = _source.size
val oldSize = sourceIntern.length
val spaceLeft = oldSize - cur
if (spaceLeft > threshold) return
// warning: current position is not updated during string consumption
Expand All @@ -138,33 +147,33 @@ internal class ReaderJsonLexer(
// it's also possible just to resize buffer,
// instead of falling back to slow path,
// not sure what is better
else return consumeString(source, currentPosition, current)
else return consumeString(sourceIntern, currentPosition, current)
}
// Now we _optimistically_ know where the string ends (it might have been an escaped quote)
for (i in current until closingQuote) {
// Encountered escape sequence, should fallback to "slow" path and symmbolic scanning
if (source[i] == STRING_ESC) {
return consumeString(source, currentPosition, i)
if (sourceIntern[i] == STRING_ESC) {
return consumeString(sourceIntern, currentPosition, i)
}
}
this.currentPosition = closingQuote + 1
return substring(current, closingQuote)
}

override fun indexOf(char: Char, startPos: Int): Int {
val src = _source
for (i in startPos until src.size) {
val src = sourceIntern
for (i in startPos until src.length) {
if (src[i] == char) return i
}
return -1
}

override fun substring(startPos: Int, endPos: Int): String {
return _source.concatToString(startPos, endPos)
return sourceIntern.substring(startPos, endPos)
}

override fun appendRange(fromIndex: Int, toIndex: Int) {
escapedString.appendRange(_source, fromIndex, toIndex)
escapedString.appendRange(sourceIntern.buffer, fromIndex, toIndex)
}

// Can be carefully implemented but postponed for now
Expand Down