Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Json.decodeBufferedSourceToSequence doesn't work with infinite streams #2540

Open
maniac103 opened this issue Jan 8, 2024 · 1 comment
Open

Comments

@maniac103
Copy link

Describe the bug
I'm trying to consume events from a streaming Cometd server. Those events essentially are JSON messages transmitted over a chunked HTTP connection: whenever the server has a new event available, it transmits the JSON encoded event in a new chunk in the HTTP connection, which is kept open for the next event. For receiving the events, I'm using OkHttp and am trying to consume the response body.
However, this doesn't work because OkioSerialReader tries to consume the full source before JSON decoding is even started. It was my impression from the use cases given in #1662 (e.g. this one or that one) that decoding such 'infinite' streams should be supported - am I missing something there?

To Reproduce
Issue can be reproduced without HTTP connection using an Okio.Pipe:

@Serializable
data class TestMessage(val id: Int)

GlobalScope.launch {
    val pipe = Pipe(4096)
    launch {
        val msg = "{\"id\":1}"
        val sink = pipe.sink.buffer()
        val start = Clock.System.now()
        sink.writeUtf8(msg)
        System.out.println("Wrote message after ${Clock.System.now() - start}")
        delay(10000)
        sink.writeUtf8(msg)
        System.out.println("Wrote message after ${Clock.System.now() - start}")
        delay(10000)
        sink.writeUtf8(msg)
        System.out.println("Wrote message after ${Clock.System.now() - start}")
        delay(10000)
        sink.close()
        System.out.println("Closed after ${Clock.System.now() - start}")
    }

    launch {
        val source = pipe.source.buffer()
        val start = Clock.System.now()
        Json.decodeBufferedSourceToSequence<TestMessage>(source).forEach { m ->
            System.out.println("got $m after ${Clock.System.now() - start}")
        }
    }
}

Seen output:

Wrote message after 44us
Wrote message after 10.009552s
Wrote message after 20.012236s
Closed after 30.013578s
got TestMessage(id=1) after 30.011291s
got TestMessage(id=1) after 30.011593s
got TestMessage(id=1) after 30.012009s

Expected output:

Wrote message after 44us
got TestMessage(id=1) after 0.011291s
Wrote message after 10.009552s
got TestMessage(id=1) after 10.011593s
Wrote message after 20.012236s
got TestMessage(id=1) after 20.022009s
Closed after 30.013578s

Expected behavior
Being able to consume JSON messages from a permanently open HTTP connection

Environment

  • Kotlin version: 1.9.22
  • Library version: 1.6.2
  • Kotlin platforms: JVM (Android)
  • Gradle version:8.2
@sandwwraith
Copy link
Member

Yes, you're right, it should be supported. I think it is a bug.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants