From 205c9cf1d693cef819959139d635edd7f3ddda3e Mon Sep 17 00:00:00 2001 From: rsinukov Date: Mon, 19 Dec 2022 14:35:06 +0100 Subject: [PATCH] KTOR-5342 Fix freezing of readUTF8Line --- .../src/io/ktor/utils/io/ByteBufferChannel.kt | 45 +++++++++++++++---- .../io/ktor/utils/io/ByteBufferChannelTest.kt | 25 +++++++++++ 2 files changed, 61 insertions(+), 9 deletions(-) diff --git a/ktor-io/jvm/src/io/ktor/utils/io/ByteBufferChannel.kt b/ktor-io/jvm/src/io/ktor/utils/io/ByteBufferChannel.kt index f2a9e2752af..89219150aaf 100644 --- a/ktor-io/jvm/src/io/ktor/utils/io/ByteBufferChannel.kt +++ b/ktor-io/jvm/src/io/ktor/utils/io/ByteBufferChannel.kt @@ -15,6 +15,7 @@ import java.lang.Float.* import java.nio.* import kotlin.coroutines.* import kotlin.coroutines.intrinsics.* +import kotlin.math.* internal const val DEFAULT_CLOSE_MESSAGE: String = "Byte channel was closed" private const val BYTE_BUFFER_CAPACITY: Int = 4088 @@ -1948,36 +1949,62 @@ internal open class ByteBufferChannel( var newLine = false val output = CharArray(8 * 1024) + var transferBuffer: ByteBuffer? = null + var transferredRemaining = 0 while (!isClosedForRead && !newLine && !caret && (limit == Int.MAX_VALUE || consumed <= limit)) { try { - read(required) { + read(required) { buffer -> + val oldPosition = buffer.position() + val bufferToDecode = transferBuffer?.also { + val oldLimit = buffer.limit() + buffer.limit(min(buffer.limit(), buffer.position() + it.remaining())) + it.put(buffer) + it.flip() + buffer.limit(oldLimit) + } ?: buffer + val readLimit = if (limit == Int.MAX_VALUE) output.size else minOf(output.size, limit - consumed) - val decodeResult = it.decodeUTF8Line(output, 0, readLimit) + val decodeResult = bufferToDecode.decodeUTF8Line(output, 0, readLimit) + + transferBuffer?.let { + buffer.position(oldPosition + it.position() - transferredRemaining) + BufferPool.recycle(it) + transferBuffer = null + transferredRemaining = 0 + } val decoded = (decodeResult shr 32).toInt() val requiredBytes = (decodeResult and 0xffffffffL).toInt() - required = kotlin.math.max(1, requiredBytes) + required = max(1, requiredBytes) + + if (decoded == 0 && buffer.remaining() < required) { + transferBuffer = BufferPool.borrow().also { + transferredRemaining = buffer.remaining() + it.put(buffer) + } + return@read + } if (requiredBytes == -1) { newLine = true } - if (requiredBytes != -1 && it.hasRemaining() && it[it.position()] == '\r'.code.toByte()) { - it.position(it.position() + 1) + if (requiredBytes != -1 && buffer.hasRemaining() && buffer[buffer.position()] == '\r'.code.toByte()) { + buffer.position(buffer.position() + 1) caret = true } - if (requiredBytes != -1 && it.hasRemaining() && it[it.position()] == '\n'.code.toByte()) { - it.position(it.position() + 1) + if (requiredBytes != -1 && buffer.hasRemaining() && buffer[buffer.position()] == '\n'.code.toByte()) { + buffer.position(buffer.position() + 1) newLine = true } if (out is StringBuilder) { out.append(output, 0, decoded) } else { - val buffer = CharBuffer.wrap(output, 0, decoded) - out.append(buffer, 0, decoded) + val charBuffer = CharBuffer.wrap(output, 0, decoded) + out.append(charBuffer, 0, decoded) } consumed += decoded diff --git a/ktor-io/jvm/test/io/ktor/utils/io/ByteBufferChannelTest.kt b/ktor-io/jvm/test/io/ktor/utils/io/ByteBufferChannelTest.kt index 54884433ccc..8cb15312fb3 100644 --- a/ktor-io/jvm/test/io/ktor/utils/io/ByteBufferChannelTest.kt +++ b/ktor-io/jvm/test/io/ktor/utils/io/ByteBufferChannelTest.kt @@ -228,4 +228,29 @@ class ByteBufferChannelTest { yield() assertFalse(awaitingContent) } + + @Test + fun testReadLine(): Unit = runBlocking { + val channel = ByteChannel(autoFlush = true) + + val writer = launch(Dispatchers.IO) { + repeat(4087) { + channel.writeByte('a'.code.toByte()) + } + + // U+2588 + channel.writeByte(0xE2.toByte()) + channel.writeByte(0x96.toByte()) + channel.writeByte(0x88.toByte()) + + channel.writeByte('\n'.code.toByte()) + } + + val reader = async(Dispatchers.IO) { + channel.readUTF8Line(100_000) + } + + reader.await() + writer.join() + } }