From 07030be7d5a54b52f0583d4c39950015f8779242 Mon Sep 17 00:00:00 2001 From: rsinukov Date: Mon, 19 Dec 2022 14:35:06 +0100 Subject: [PATCH] KTOR-5342 Fix freezing of readUTF8Line --- .../src/io/ktor/utils/io/ByteBufferChannel.kt | 54 +++++++++++++++---- .../io/ktor/utils/io/ByteBufferChannelTest.kt | 25 +++++++++ .../server/plugins/CachingHeadersTest.kt | 2 +- 3 files changed, 71 insertions(+), 10 deletions(-) diff --git a/ktor-io/jvm/src/io/ktor/utils/io/ByteBufferChannel.kt b/ktor-io/jvm/src/io/ktor/utils/io/ByteBufferChannel.kt index f2a9e2752a..f66dd70f23 100644 --- a/ktor-io/jvm/src/io/ktor/utils/io/ByteBufferChannel.kt +++ b/ktor-io/jvm/src/io/ktor/utils/io/ByteBufferChannel.kt @@ -15,6 +15,7 @@ import java.lang.Float.* import java.nio.* import kotlin.coroutines.* import kotlin.coroutines.intrinsics.* +import kotlin.math.* internal const val DEFAULT_CLOSE_MESSAGE: String = "Byte channel was closed" private const val BYTE_BUFFER_CAPACITY: Int = 4088 @@ -1948,40 +1949,71 @@ internal open class ByteBufferChannel( var newLine = false val output = CharArray(8 * 1024) + var transferBuffer: ByteBuffer? = null + var transferredRemaining = 0 while (!isClosedForRead && !newLine && !caret && (limit == Int.MAX_VALUE || consumed <= limit)) { try { - read(required) { + read(required) { buffer -> + val oldPosition = buffer.position() + val bufferToDecode = transferBuffer?.also { + val oldLimit = buffer.limit() + buffer.limit(min(buffer.limit(), buffer.position() + it.remaining())) + it.put(buffer) + it.flip() + buffer.limit(oldLimit) + } ?: buffer + val readLimit = if (limit == Int.MAX_VALUE) output.size else minOf(output.size, limit - consumed) - val decodeResult = it.decodeUTF8Line(output, 0, readLimit) + val decodeResult = bufferToDecode.decodeUTF8Line(output, 0, readLimit) + + transferBuffer?.let { + buffer.position(oldPosition + it.position() - transferredRemaining) + BufferPool.recycle(it) + transferBuffer = null + transferredRemaining = 0 + } val decoded = (decodeResult shr 32).toInt() val requiredBytes = (decodeResult and 0xffffffffL).toInt() - required = kotlin.math.max(1, requiredBytes) + required = max(1, requiredBytes) if (requiredBytes == -1) { newLine = true } - if (requiredBytes != -1 && it.hasRemaining() && it[it.position()] == '\r'.code.toByte()) { - it.position(it.position() + 1) + if (requiredBytes != -1 && + buffer.hasRemaining() && + buffer[buffer.position()] == '\r'.code.toByte() + ) { + buffer.position(buffer.position() + 1) caret = true } - if (requiredBytes != -1 && it.hasRemaining() && it[it.position()] == '\n'.code.toByte()) { - it.position(it.position() + 1) + if (requiredBytes != -1 && + buffer.hasRemaining() && + buffer[buffer.position()] == '\n'.code.toByte() + ) { + buffer.position(buffer.position() + 1) newLine = true } if (out is StringBuilder) { out.append(output, 0, decoded) } else { - val buffer = CharBuffer.wrap(output, 0, decoded) - out.append(buffer, 0, decoded) + val charBuffer = CharBuffer.wrap(output, 0, decoded) + out.append(charBuffer, 0, decoded) } consumed += decoded + if (decoded == 0 && buffer.remaining() < requiredBytes) { + transferBuffer = BufferPool.borrow().also { + transferredRemaining = buffer.remaining() + it.put(buffer) + } + } + if (limit != Int.MAX_VALUE && consumed >= limit && !newLine) { throw TooLongLineException("Line is longer than limit") } @@ -1991,6 +2023,10 @@ internal open class ByteBufferChannel( } } + if (transferBuffer != null) { + BufferPool.recycle(transferBuffer!!) + } + if (!isClosedForRead && caret && !newLine) { try { read(1) { diff --git a/ktor-io/jvm/test/io/ktor/utils/io/ByteBufferChannelTest.kt b/ktor-io/jvm/test/io/ktor/utils/io/ByteBufferChannelTest.kt index 54884433cc..8cb15312fb 100644 --- a/ktor-io/jvm/test/io/ktor/utils/io/ByteBufferChannelTest.kt +++ b/ktor-io/jvm/test/io/ktor/utils/io/ByteBufferChannelTest.kt @@ -228,4 +228,29 @@ class ByteBufferChannelTest { yield() assertFalse(awaitingContent) } + + @Test + fun testReadLine(): Unit = runBlocking { + val channel = ByteChannel(autoFlush = true) + + val writer = launch(Dispatchers.IO) { + repeat(4087) { + channel.writeByte('a'.code.toByte()) + } + + // U+2588 + channel.writeByte(0xE2.toByte()) + channel.writeByte(0x96.toByte()) + channel.writeByte(0x88.toByte()) + + channel.writeByte('\n'.code.toByte()) + } + + val reader = async(Dispatchers.IO) { + channel.readUTF8Line(100_000) + } + + reader.await() + writer.join() + } } diff --git a/ktor-server/ktor-server-tests/jvmAndNix/test/io/ktor/tests/server/plugins/CachingHeadersTest.kt b/ktor-server/ktor-server-tests/jvmAndNix/test/io/ktor/tests/server/plugins/CachingHeadersTest.kt index 617ce00d61..3eb99f058a 100644 --- a/ktor-server/ktor-server-tests/jvmAndNix/test/io/ktor/tests/server/plugins/CachingHeadersTest.kt +++ b/ktor-server/ktor-server-tests/jvmAndNix/test/io/ktor/tests/server/plugins/CachingHeadersTest.kt @@ -62,7 +62,7 @@ class CachingHeadersTest { } ) - object Immutable: CacheControl(null) { + object Immutable : CacheControl(null) { override fun toString(): String = "immutable" }