Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KTOR-4323: WebSockets use custom serializer #3000

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 5 additions & 2 deletions ktor-client/ktor-client-core/api/ktor-client-core.api
Expand Up @@ -747,14 +747,16 @@ public final class io/ktor/client/plugins/websocket/BuildersKt {
}

public final class io/ktor/client/plugins/websocket/ClientSessionsKt {
public static final fun getConverter (Lio/ktor/client/plugins/websocket/DefaultClientWebSocketSession;)Lio/ktor/serialization/WebsocketContentConverter;
public static final synthetic fun getConverter (Lio/ktor/client/plugins/websocket/DefaultClientWebSocketSession;)Lio/ktor/serialization/WebsocketContentConverter;
}

public abstract interface class io/ktor/client/plugins/websocket/ClientWebSocketSession : io/ktor/websocket/WebSocketSession {
public abstract interface class io/ktor/client/plugins/websocket/ClientWebSocketSession : io/ktor/serialization/WebSocketSessionWithContentConverter {
public abstract fun getCall ()Lio/ktor/client/call/HttpClientCall;
public abstract fun getConverter ()Lio/ktor/serialization/WebsocketContentConverter;
}

public final class io/ktor/client/plugins/websocket/ClientWebSocketSession$DefaultImpls {
public static fun getConverter (Lio/ktor/client/plugins/websocket/ClientWebSocketSession;)Lio/ktor/serialization/WebsocketContentConverter;
public static fun send (Lio/ktor/client/plugins/websocket/ClientWebSocketSession;Lio/ktor/websocket/Frame;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

Expand All @@ -763,6 +765,7 @@ public final class io/ktor/client/plugins/websocket/DefaultClientWebSocketSessio
public fun flush (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun getCall ()Lio/ktor/client/call/HttpClientCall;
public fun getCloseReason ()Lkotlinx/coroutines/Deferred;
public fun getConverter ()Lio/ktor/serialization/WebsocketContentConverter;
public fun getCoroutineContext ()Lkotlin/coroutines/CoroutineContext;
public fun getExtensions ()Ljava/util/List;
public fun getIncoming ()Lkotlinx/coroutines/channels/ReceiveChannel;
Expand Down
Expand Up @@ -15,11 +15,17 @@ import io.ktor.websocket.serialization.*
/**
* Client specific [WebSocketSession].
*/
public interface ClientWebSocketSession : WebSocketSession {
public interface ClientWebSocketSession : WebSocketSessionWithContentConverter {
/**
* [HttpClientCall] associated with session.
*/
public val call: HttpClientCall

/**
* Converter for web socket session, if plugin [WebSockets] is installed
*/
public override val converter: WebsocketContentConverter?
get() = call.client.pluginOrNull(WebSockets)?.contentConverter
}

/**
Expand All @@ -38,8 +44,10 @@ internal class DelegatingClientWebSocketSession(
/**
* Converter for web socket session
*/
@Suppress("EXTENSION_SHADOWED_BY_MEMBER")
@Deprecated("Kept for binary compatibility", level = DeprecationLevel.HIDDEN)
public val DefaultClientWebSocketSession.converter: WebsocketContentConverter?
get() = call.client.pluginOrNull(WebSockets)?.contentConverter
get() = converter

/**
* Serializes [data] to a frame and enqueues this frame.
Expand Down
2 changes: 1 addition & 1 deletion ktor-client/ktor-client-tests/build.gradle.kts
Expand Up @@ -7,7 +7,7 @@ import test.server.*
description = "Common tests for client"

plugins {
id("kotlinx-serialization")
kotlin("plugin.serialization")
}

apply<TestServerPlugin>()
Expand Down
Expand Up @@ -10,11 +10,15 @@ import io.ktor.client.plugins.websocket.*
import io.ktor.client.tests.utils.*
import io.ktor.http.*
import io.ktor.serialization.*
import io.ktor.serialization.kotlinx.*
import io.ktor.test.dispatcher.*
import io.ktor.util.reflect.*
import io.ktor.utils.io.charsets.*
import io.ktor.websocket.*
import kotlinx.coroutines.*
import kotlinx.serialization.*
import kotlinx.serialization.Serializer
import kotlinx.serialization.json.*
import kotlin.test.*

internal val ENGINES_WITHOUT_WS = listOf("Android", "Apache", "Curl")
Expand Down Expand Up @@ -146,6 +150,29 @@ class WebSocketTest : ClientLoader() {
}
}

@Serializer(forClass = Data::class)
object DataSerializer

@Test
fun testWebSocketSerializationWithCustomSerializer() = clientTests(ENGINES_WITHOUT_WS) {
config {
WebSockets {
contentConverter = KotlinxWebsocketSerializationConverter(Json)
}
}

test { client ->
client.webSocket("$TEST_WEBSOCKET_SERVER/websockets/echo") {
repeat(TEST_SIZE) {
val originalData = Data("hello")
sendSerialized(originalData, DataSerializer)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the use case of passing serializer here? can't it be registered in module or through @UseSerializers annotation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does not work, if you need to serialize 3rd classes without the possibility to change them. @UseSerializers requires an annotation, and the SerializersModule for generic classes requires @contextual annotation too, according to the docs. It is also not possible to serialize Any with the overloads only, but it is with a custom serializer. That's the problem :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think something like this may be useful for regular requests as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, isn't it currently possible? I tried automatic content negation never before, with web sockets it is my first usage :D

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add it in another PR and focus this PR on Websockets only.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW: have you tried to register external serializer for 3rd party class via SerializersModule via contextual call?. According to code it should just work by fetching serialiaer by KClass
Or can you at least provide an example of what you want to archive, and it fails

Copy link
Contributor Author

@hfhbd hfhbd Jul 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I deleted my old reproducer and at the moment reproducing the case does not work. But adding all possibilities via contextual or polymorphic to the serializer module is annoying, so passing the serializer directly at usage is nicer IMHO. And the implementation is quite forward for web sockets.

private val json = Json {
    serializersModule = SerializersModule {
        val s = Main.M.serializer(Int.serializer()) as KSerializer<Main.M<*>>
        polymorphic(Main::class, Main.M::class, s)
    }
}

fun main() {
    val main = Main.M(42)
    println(json.encodeToString(main))

    println(json.encodeToString(Main.M.serializer(String.serializer()), Main.M("foo")))
}

@Serializable
sealed class Main {
    @Serializable
    data class M<S>(val value: S) : Main()
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that both ways are little annoying :)
But, even it's quite forward for websockets, looks reasonable to add the same to http endpoints, but there it will be not such easy.

And the main my concern about, why it's not good - calling this method will fail, if there is non kx.serialization converter.
Or f.e. if there will be some custom converter, which delegates to kotlinx.serialization converter, but uses another encoding for some types. So using serializersModule will work in both cases, but providing explicit serializer will work only in case of plain kx.serialization converter - such an inconsistency isn't so good in my opionion.

And in this specific use case, may be it will be much easier to not use this plugin at all, and do serialization handling manually, or write a simple plugin for your use case? :)

val actual = receiveDeserialized(DataSerializer)
assertEquals(originalData, actual)
}
}
}
}

@Test
fun testSerializationWithNoConverter() = clientTests(ENGINES_WITHOUT_WS) {
config {
Expand Down
Expand Up @@ -2,6 +2,7 @@ public abstract interface class io/ktor/server/websocket/DefaultWebSocketServerS
}

public final class io/ktor/server/websocket/DefaultWebSocketServerSession$DefaultImpls {
public static fun getConverter (Lio/ktor/server/websocket/DefaultWebSocketServerSession;)Lio/ktor/serialization/WebsocketContentConverter;
public static fun send (Lio/ktor/server/websocket/DefaultWebSocketServerSession;Lio/ktor/websocket/Frame;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

Expand Down Expand Up @@ -42,17 +43,19 @@ public final class io/ktor/server/websocket/RoutingKt {
public static synthetic fun webSocketRaw$default (Lio/ktor/server/routing/Route;Ljava/lang/String;ZLkotlin/jvm/functions/Function2;ILjava/lang/Object;)V
}

public abstract interface class io/ktor/server/websocket/WebSocketServerSession : io/ktor/websocket/WebSocketSession {
public abstract interface class io/ktor/server/websocket/WebSocketServerSession : io/ktor/serialization/WebSocketSessionWithContentConverter {
public abstract fun getCall ()Lio/ktor/server/application/ApplicationCall;
public abstract fun getConverter ()Lio/ktor/serialization/WebsocketContentConverter;
}

public final class io/ktor/server/websocket/WebSocketServerSession$DefaultImpls {
public static fun getConverter (Lio/ktor/server/websocket/WebSocketServerSession;)Lio/ktor/serialization/WebsocketContentConverter;
public static fun send (Lio/ktor/server/websocket/WebSocketServerSession;Lio/ktor/websocket/Frame;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public final class io/ktor/server/websocket/WebSocketServerSessionKt {
public static final fun getApplication (Lio/ktor/server/websocket/WebSocketServerSession;)Lio/ktor/server/application/Application;
public static final fun getConverter (Lio/ktor/server/websocket/WebSocketServerSession;)Lio/ktor/serialization/WebsocketContentConverter;
public static final synthetic fun getConverter (Lio/ktor/server/websocket/WebSocketServerSession;)Lio/ktor/serialization/WebsocketContentConverter;
}

public final class io/ktor/server/websocket/WebSocketUpgrade : io/ktor/http/content/OutgoingContent$ProtocolUpgrade {
Expand Down
@@ -1,5 +1,9 @@
description = ""

plugins {
kotlin("plugin.serialization")
}

kotlin.sourceSets {
jvmAndNixMain {
dependencies {
Expand All @@ -11,6 +15,7 @@ kotlin.sourceSets {
jvmAndNixTest {
dependencies {
api(project(":ktor-server:ktor-server-plugins:ktor-server-content-negotiation"))
api(project(":ktor-shared:ktor-serialization:ktor-serialization-kotlinx:ktor-serialization-kotlinx-json"))
api(project(":ktor-server:ktor-server-cio"))
}
}
Expand Down
Expand Up @@ -14,11 +14,17 @@ import io.ktor.websocket.serialization.*
/**
* Represents a server-side web socket session
*/
public interface WebSocketServerSession : WebSocketSession {
public interface WebSocketServerSession : WebSocketSessionWithContentConverter {
/**
* Associated received [call] that originating this session
*/
public val call: ApplicationCall

/**
* Converter for web socket session, if plugin [WebSockets] is installed
*/
public override val converter: WebsocketContentConverter?
get() = application.plugin(WebSockets).contentConverter
}

/**
Expand All @@ -36,8 +42,10 @@ public val WebSocketServerSession.application: Application get() = call.applicat
/**
* Converter for web socket session
*/
@Suppress("EXTENSION_SHADOWED_BY_MEMBER")
@Deprecated("Kept for binary compatibility", level = DeprecationLevel.HIDDEN)
public val WebSocketServerSession.converter: WebsocketContentConverter?
get() = application.plugin(WebSockets).contentConverter
get() = converter

/**
* Serializes [data] to a frame and enqueues this frame.
Expand Down
Expand Up @@ -4,11 +4,62 @@

package io.ktor.tests.websocket

import io.ktor.serialization.kotlinx.*
import io.ktor.server.application.*
import io.ktor.server.cio.*
import io.ktor.server.routing.*
import io.ktor.server.websocket.*
import io.ktor.util.*
import io.ktor.websocket.*
import kotlinx.serialization.*
import kotlinx.serialization.Serializer
import kotlinx.serialization.json.*
import kotlin.test.*

@InternalAPI
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this annotation is needed here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because I use writeFrame(Frame.Close()), which I copied from other tests.

class CIOWebSocketTest : WebSocketEngineSuite<CIOApplicationEngine, CIOApplicationEngine.Configuration>(CIO) {
init {
enableSsl = false
enableHttp2 = false
}

override fun plugins(application: Application, routingConfigurer: Routing.() -> Unit) {
application.install(WebSockets) {
contentConverter = KotlinxWebsocketSerializationConverter(Json)
}
super.plugins(application, routingConfigurer)
}

data class Data(val s: Int)

@OptIn(ExperimentalSerializationApi::class)
@Serializer(forClass = Data::class)
object DataSerializer

@Test
fun testWebSocketCustomSerializer() = runTest {
createAndStartServer {
webSocket("/") {
val data = receiveDeserialized(DataSerializer)
sendSerialized(data, DataSerializer)
}
}

useSocket {
negotiateHttpWebSocket()

val data = Data(42)
output.writeFrame(Frame.Text(Json.encodeToString(DataSerializer, data)), masking = false)
output.flush()

val frame = input.readFrame(Long.MAX_VALUE, 0)
assertIs<Frame.Text>(frame)
val incomingData = Json.decodeFromString(DataSerializer, frame.readText())
assertEquals(data, incomingData)

output.writeFrame(Frame.Close(), false)
output.flush()
assertCloseFrame()
}
}
}
Expand Up @@ -34,7 +34,7 @@ abstract class WebSocketEngineSuite<TEngine : ApplicationEngine, TConfiguration
override val timeout = 30.seconds

override fun plugins(application: Application, routingConfigurer: Routing.() -> Unit) {
application.install(WebSockets)
application.pluginOrNull(WebSockets) ?: application.install(WebSockets)
super.plugins(application, routingConfigurer)
}

Expand Down Expand Up @@ -578,7 +578,7 @@ abstract class WebSocketEngineSuite<TEngine : ApplicationEngine, TConfiguration
}
}

private suspend fun Connection.negotiateHttpWebSocket() {
internal suspend fun Connection.negotiateHttpWebSocket() {
// send upgrade request
output.apply {
writeFully(
Expand Down Expand Up @@ -608,7 +608,7 @@ abstract class WebSocketEngineSuite<TEngine : ApplicationEngine, TConfiguration
assertEquals("websocket", headers[HttpHeaders.Upgrade])
}

private suspend fun Connection.assertCloseFrame(
internal suspend fun Connection.assertCloseFrame(
closeCode: Short = CloseReason.Codes.NORMAL.code,
replyCloseFrame: Boolean = true
) {
Expand Down Expand Up @@ -665,7 +665,7 @@ abstract class WebSocketEngineSuite<TEngine : ApplicationEngine, TConfiguration
}
}

private suspend inline fun useSocket(block: Connection.() -> Unit) {
internal suspend inline fun useSocket(block: Connection.() -> Unit) {
SelectorManager().use {
aSocket(it).tcp().connect("localhost", port) {
noDelay = true
Expand Down
16 changes: 16 additions & 0 deletions ktor-shared/ktor-serialization/api/ktor-serialization.api
Expand Up @@ -29,6 +29,22 @@ public final class io/ktor/serialization/JsonConvertException : io/ktor/serializ
public synthetic fun <init> (Ljava/lang/String;Ljava/lang/Throwable;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
}

public abstract interface class io/ktor/serialization/SerializableWebSocketSession : io/ktor/websocket/WebSocketSession {
public abstract fun getConverter ()Lio/ktor/serialization/WebsocketContentConverter;
}

public final class io/ktor/serialization/SerializableWebSocketSession$DefaultImpls {
public static fun send (Lio/ktor/serialization/SerializableWebSocketSession;Lio/ktor/websocket/Frame;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public abstract interface class io/ktor/serialization/WebSocketSessionWithContentConverter : io/ktor/websocket/WebSocketSession {
public abstract fun getConverter ()Lio/ktor/serialization/WebsocketContentConverter;
}

public final class io/ktor/serialization/WebSocketSessionWithContentConverter$DefaultImpls {
public static fun send (Lio/ktor/serialization/WebSocketSessionWithContentConverter;Lio/ktor/websocket/Frame;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public class io/ktor/serialization/WebsocketContentConvertException : io/ktor/serialization/ContentConvertException {
public fun <init> (Ljava/lang/String;Ljava/lang/Throwable;)V
public synthetic fun <init> (Ljava/lang/String;Ljava/lang/Throwable;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
Expand Down
@@ -0,0 +1,14 @@
/*
* Copyright 2014-2022 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/

package io.ktor.serialization

import io.ktor.websocket.*

public interface WebSocketSessionWithContentConverter : WebSocketSession {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW: why this interface is needed, for me it's a little strange TBH
I see why it's needed, as converter for client and server are retrieved in different ways.
But IMO, even if add explicit serializers support, better to just add extensions in server and client modules for serialization with explicit serializer parameter as with current (

public suspend inline fun <reified T> DefaultClientWebSocketSession.sendSerialized(data: T) {
and
public suspend inline fun <reified T> WebSocketServerSession.sendSerialized(data: T) {
)

Copy link
Contributor Author

@hfhbd hfhbd Jul 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, but the problem is the classpath. KSerializer is defined in kotlinx-serialiation, which is only added as dependency in ktor-shared/ktor-serialization-kotlinx. To create an extension function in the client web sockets it needs to resolve KSerializer. But the actual serialization lib (kotlinx.serialization) is an implementation detail, and not available in the core module (this is the same problem with client.get { body(foo, FooSerializer) etc.)

One workaround: adding kotlinx.serialization core to the dependencies of client-core /server-core

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, now I see.

Now I think, that may be some additional serialization plugin should be created for kotlinx.serialization specific usage, which will support all it's cool things, like explicit serializers provider, and reduced overhead of usage reflection over typeOf serialization retrieval. But not sure if it's really needed.

/**
* Converter for web socket session, if plugin [WebSockets] is installed
*/
public val converter: WebsocketContentConverter?
}
Expand Up @@ -16,3 +16,8 @@ public final class io/ktor/serialization/kotlinx/KotlinxWebsocketSerializationCo
public fun serialize (Ljava/nio/charset/Charset;Lio/ktor/util/reflect/TypeInfo;Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public final class io/ktor/serialization/kotlinx/KotlinxWebsocketSerializationConverterKt {
public static final fun receiveDeserialized (Lio/ktor/serialization/WebSocketSessionWithContentConverter;Lkotlinx/serialization/DeserializationStrategy;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun sendSerialized (Lio/ktor/serialization/WebSocketSessionWithContentConverter;Ljava/lang/Object;Lkotlinx/serialization/SerializationStrategy;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

Expand Up @@ -38,7 +38,6 @@ internal abstract class KotlinxSerializationBase<T>(
}

internal open class SerializationParameters(
open val format: SerialFormat,
open val value: Any?,
open val typeInfo: TypeInfo,
open val charset: Charset
Expand All @@ -47,9 +46,8 @@ internal open class SerializationParameters(
}

internal class SerializationNegotiationParameters(
override val format: SerialFormat,
override val value: Any?,
override val typeInfo: TypeInfo,
override val charset: Charset,
val contentType: ContentType
) : SerializationParameters(format, value, typeInfo, charset)
) : SerializationParameters(value, typeInfo, charset)