From ab084e5cbba2cd24489138458034f0152fc25a78 Mon Sep 17 00:00:00 2001 From: Bernardo Gomez Palacio Date: Tue, 2 Nov 2021 13:27:27 -0700 Subject: [PATCH] Fix SSE Subscriptions Data Type. SSE Subscriptions expect the Data Type to have `subscription_data`. --- .../types/subscription/OperationMessage.kt | 14 ++++++++ .../sse/DgsSSESubscriptionHandler.kt | 35 ++++++++++++------- .../sse/DgsSSESubscriptionHandlerTest.kt | 11 +++--- 3 files changed, 42 insertions(+), 18 deletions(-) diff --git a/graphql-dgs-subscription-types/src/main/kotlin/com/netflix/graphql/types/subscription/OperationMessage.kt b/graphql-dgs-subscription-types/src/main/kotlin/com/netflix/graphql/types/subscription/OperationMessage.kt index 911d9c2aa..5f7845bbb 100644 --- a/graphql-dgs-subscription-types/src/main/kotlin/com/netflix/graphql/types/subscription/OperationMessage.kt +++ b/graphql-dgs-subscription-types/src/main/kotlin/com/netflix/graphql/types/subscription/OperationMessage.kt @@ -33,6 +33,9 @@ const val GQL_COMPLETE = "complete" const val GQL_CONNECTION_TERMINATE = "connection_terminate" const val GQL_CONNECTION_KEEP_ALIVE = "ka" +/** Used only when expressing the data type for SSE Subscriptions. */ +const val SSE_GQL_SUBSCRIPTION_DATA = "subscription_data" + data class OperationMessage( @JsonProperty("type") val type: String, @@ -67,6 +70,17 @@ data class DataPayload( val errors: List? = emptyList() ) : MessagePayload +data class SSEDataPayload( + @JsonProperty("data") + val data: Any?, + @JsonProperty("errors") + val errors: List? = emptyList(), + @JsonProperty("subId") + val subId: String, + @JsonProperty("type") + val type: String = SSE_GQL_SUBSCRIPTION_DATA +) : MessagePayload + data class QueryPayload( @JsonProperty("variables") val variables: Map? = emptyMap(), diff --git a/graphql-dgs-subscriptions-sse/src/main/kotlin/com/netflix/graphql/dgs/subscriptions/sse/DgsSSESubscriptionHandler.kt b/graphql-dgs-subscriptions-sse/src/main/kotlin/com/netflix/graphql/dgs/subscriptions/sse/DgsSSESubscriptionHandler.kt index ba51f25ae..610a5a42c 100644 --- a/graphql-dgs-subscriptions-sse/src/main/kotlin/com/netflix/graphql/dgs/subscriptions/sse/DgsSSESubscriptionHandler.kt +++ b/graphql-dgs-subscriptions-sse/src/main/kotlin/com/netflix/graphql/dgs/subscriptions/sse/DgsSSESubscriptionHandler.kt @@ -16,10 +16,10 @@ package com.netflix.graphql.dgs.subscriptions.sse -import com.fasterxml.jackson.annotation.JsonProperty import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.netflix.graphql.dgs.DgsQueryExecutor -import com.netflix.graphql.types.subscription.DataPayload +import com.netflix.graphql.types.subscription.QueryPayload +import com.netflix.graphql.types.subscription.SSEDataPayload import graphql.ExecutionResult import graphql.InvalidSyntaxError import graphql.validation.ValidationError @@ -66,7 +66,10 @@ open class DgsSSESubscriptionHandler(open val dgsQueryExecutor: DgsQueryExecutor val executionResult: ExecutionResult = dgsQueryExecutor.execute(queryPayload.query, queryPayload.variables) if (executionResult.errors.isNotEmpty()) { - return if (executionResult.errors.asSequence().filterIsInstance().any() || executionResult.errors.asSequence().filterIsInstance().any()) { + return if ( + executionResult.errors.asSequence().filterIsInstance().any() || + executionResult.errors.asSequence().filterIsInstance().any() + ) { val errorMessage = "Subscription query failed to validate: ${executionResult.errors.joinToString(", ")}" emitter.send(errorMessage) emitter.complete() @@ -91,8 +94,12 @@ open class DgsSSESubscriptionHandler(open val dgsQueryExecutor: DgsQueryExecutor override fun onNext(t: ExecutionResult) { val event = SseEmitter.event() - .data(mapper.writeValueAsString(DataPayload(t.getData(), t.errors)), MediaType.APPLICATION_JSON) - .id(UUID.randomUUID().toString()) + .data( + mapper.writeValueAsString( + SSEDataPayload(data = t.getData(), errors = t.errors, subId = sessionId) + ), + MediaType.APPLICATION_JSON + ).id(UUID.randomUUID().toString()) emitter.send(event) subscription.request(1) @@ -101,7 +108,16 @@ open class DgsSSESubscriptionHandler(open val dgsQueryExecutor: DgsQueryExecutor override fun onError(t: Throwable) { logger.error("Error on subscription {}", sessionId, t) val event = SseEmitter.event() - .data(mapper.writeValueAsString(DataPayload(null, listOf(Error(t.message)))), MediaType.APPLICATION_JSON) + .data( + mapper.writeValueAsString( + SSEDataPayload( + data = null, + errors = listOf(Error(t.message)), + subId = sessionId + ) + ), + MediaType.APPLICATION_JSON + ) emitter.send(event) emitter.completeWithError(t) @@ -143,13 +159,6 @@ open class DgsSSESubscriptionHandler(open val dgsQueryExecutor: DgsQueryExecutor return ResponseEntity.ok(emitter) } - data class QueryPayload( - @JsonProperty("variables") val variables: Map = emptyMap(), - @JsonProperty("extensions") val extensions: Map = emptyMap(), - @JsonProperty("operationName") val operationName: String?, - @JsonProperty("query") val query: String - ) - companion object { private val mapper = jacksonObjectMapper() private val logger: Logger = LoggerFactory.getLogger(DgsSSESubscriptionHandler::class.java) diff --git a/graphql-dgs-subscriptions-sse/src/test/kotlin/com/netflix/graphql/dgs/subscriptions/sse/DgsSSESubscriptionHandlerTest.kt b/graphql-dgs-subscriptions-sse/src/test/kotlin/com/netflix/graphql/dgs/subscriptions/sse/DgsSSESubscriptionHandlerTest.kt index dd46a8a61..e2207128b 100644 --- a/graphql-dgs-subscriptions-sse/src/test/kotlin/com/netflix/graphql/dgs/subscriptions/sse/DgsSSESubscriptionHandlerTest.kt +++ b/graphql-dgs-subscriptions-sse/src/test/kotlin/com/netflix/graphql/dgs/subscriptions/sse/DgsSSESubscriptionHandlerTest.kt @@ -18,6 +18,7 @@ package com.netflix.graphql.dgs.subscriptions.sse import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.netflix.graphql.dgs.DgsQueryExecutor +import com.netflix.graphql.types.subscription.QueryPayload import graphql.ExecutionResult import graphql.GraphqlErrorBuilder import graphql.validation.ValidationError @@ -45,7 +46,7 @@ internal class DgsSSESubscriptionHandlerTest { fun queryError() { val query = "subscription { stocks { name, price }}" - val queryPayload = DgsSSESubscriptionHandler.QueryPayload(operationName = "MySubscription", query = query) + val queryPayload = QueryPayload(operationName = "MySubscription", query = query) val base64 = Base64.getEncoder().encodeToString(jacksonObjectMapper().writeValueAsBytes(queryPayload)) every { dgsQueryExecutor.execute(query, any()) } returns executionResultMock @@ -71,7 +72,7 @@ internal class DgsSSESubscriptionHandlerTest { fun queryValidationError() { val query = "subscription { stocks { name, price }}" - val queryPayload = DgsSSESubscriptionHandler.QueryPayload(operationName = "MySubscription", query = query) + val queryPayload = QueryPayload(operationName = "MySubscription", query = query) val base64 = Base64.getEncoder().encodeToString(jacksonObjectMapper().writeValueAsBytes(queryPayload)) every { dgsQueryExecutor.execute(query, any()) } returns executionResultMock @@ -95,7 +96,7 @@ internal class DgsSSESubscriptionHandlerTest { fun notAPublisherServerError() { val query = "subscription { stocks { name, price }}" - val queryPayload = DgsSSESubscriptionHandler.QueryPayload(operationName = "MySubscription", query = query) + val queryPayload = QueryPayload(operationName = "MySubscription", query = query) val base64 = Base64.getEncoder().encodeToString(jacksonObjectMapper().writeValueAsBytes(queryPayload)) every { dgsQueryExecutor.execute(query, any()) } returns executionResultMock @@ -110,7 +111,7 @@ internal class DgsSSESubscriptionHandlerTest { fun notAPublisherClientError() { // Not a subscription query val query = "query { stocks { name, price }}" - val queryPayload = DgsSSESubscriptionHandler.QueryPayload(operationName = "MySubscription", query = query) + val queryPayload = QueryPayload(operationName = "MySubscription", query = query) val base64 = Base64.getEncoder().encodeToString(jacksonObjectMapper().writeValueAsBytes(queryPayload)) every { dgsQueryExecutor.execute(query, any()) } returns executionResultMock @@ -125,7 +126,7 @@ internal class DgsSSESubscriptionHandlerTest { @Suppress("ReactiveStreamsUnusedPublisher") fun success() { val query = "query { stocks { name, price }}" - val queryPayload = DgsSSESubscriptionHandler.QueryPayload(operationName = "MySubscription", query = query) + val queryPayload = QueryPayload(operationName = "MySubscription", query = query) val base64 = Base64.getEncoder().encodeToString(jacksonObjectMapper().writeValueAsBytes(queryPayload)) val nestedExecutionResult = mockk()