Skip to content

Commit

Permalink
KTOR-5342 Fix freezing of readUTF8Line
Browse files Browse the repository at this point in the history
  • Loading branch information
rsinukov committed Dec 19, 2022
1 parent 355e990 commit 205c9cf
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 9 deletions.
45 changes: 36 additions & 9 deletions ktor-io/jvm/src/io/ktor/utils/io/ByteBufferChannel.kt
Expand Up @@ -15,6 +15,7 @@ import java.lang.Float.*
import java.nio.*
import kotlin.coroutines.*
import kotlin.coroutines.intrinsics.*
import kotlin.math.*

internal const val DEFAULT_CLOSE_MESSAGE: String = "Byte channel was closed"
private const val BYTE_BUFFER_CAPACITY: Int = 4088
Expand Down Expand Up @@ -1948,36 +1949,62 @@ internal open class ByteBufferChannel(
var newLine = false

val output = CharArray(8 * 1024)
var transferBuffer: ByteBuffer? = null
var transferredRemaining = 0
while (!isClosedForRead && !newLine && !caret && (limit == Int.MAX_VALUE || consumed <= limit)) {
try {
read(required) {
read(required) { buffer ->
val oldPosition = buffer.position()
val bufferToDecode = transferBuffer?.also {
val oldLimit = buffer.limit()
buffer.limit(min(buffer.limit(), buffer.position() + it.remaining()))
it.put(buffer)
it.flip()
buffer.limit(oldLimit)
} ?: buffer

val readLimit = if (limit == Int.MAX_VALUE) output.size else minOf(output.size, limit - consumed)
val decodeResult = it.decodeUTF8Line(output, 0, readLimit)
val decodeResult = bufferToDecode.decodeUTF8Line(output, 0, readLimit)

transferBuffer?.let {
buffer.position(oldPosition + it.position() - transferredRemaining)
BufferPool.recycle(it)
transferBuffer = null
transferredRemaining = 0
}

val decoded = (decodeResult shr 32).toInt()
val requiredBytes = (decodeResult and 0xffffffffL).toInt()

required = kotlin.math.max(1, requiredBytes)
required = max(1, requiredBytes)

if (decoded == 0 && buffer.remaining() < required) {
transferBuffer = BufferPool.borrow().also {
transferredRemaining = buffer.remaining()
it.put(buffer)
}
return@read
}

if (requiredBytes == -1) {
newLine = true
}

if (requiredBytes != -1 && it.hasRemaining() && it[it.position()] == '\r'.code.toByte()) {
it.position(it.position() + 1)
if (requiredBytes != -1 && buffer.hasRemaining() && buffer[buffer.position()] == '\r'.code.toByte()) {
buffer.position(buffer.position() + 1)
caret = true
}

if (requiredBytes != -1 && it.hasRemaining() && it[it.position()] == '\n'.code.toByte()) {
it.position(it.position() + 1)
if (requiredBytes != -1 && buffer.hasRemaining() && buffer[buffer.position()] == '\n'.code.toByte()) {
buffer.position(buffer.position() + 1)
newLine = true
}

if (out is StringBuilder) {
out.append(output, 0, decoded)
} else {
val buffer = CharBuffer.wrap(output, 0, decoded)
out.append(buffer, 0, decoded)
val charBuffer = CharBuffer.wrap(output, 0, decoded)
out.append(charBuffer, 0, decoded)
}

consumed += decoded
Expand Down
25 changes: 25 additions & 0 deletions ktor-io/jvm/test/io/ktor/utils/io/ByteBufferChannelTest.kt
Expand Up @@ -228,4 +228,29 @@ class ByteBufferChannelTest {
yield()
assertFalse(awaitingContent)
}

@Test
fun testReadLine(): Unit = runBlocking {
val channel = ByteChannel(autoFlush = true)

val writer = launch(Dispatchers.IO) {
repeat(4087) {
channel.writeByte('a'.code.toByte())
}

// U+2588
channel.writeByte(0xE2.toByte())
channel.writeByte(0x96.toByte())
channel.writeByte(0x88.toByte())

channel.writeByte('\n'.code.toByte())
}

val reader = async(Dispatchers.IO) {
channel.readUTF8Line(100_000)
}

reader.await()
writer.join()
}
}

0 comments on commit 205c9cf

Please sign in to comment.