Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix SSE Subscriptions Data Type. #721

Merged
merged 1 commit into from Nov 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -33,6 +33,9 @@ const val GQL_COMPLETE = "complete"
const val GQL_CONNECTION_TERMINATE = "connection_terminate"
const val GQL_CONNECTION_KEEP_ALIVE = "ka"

/** Used only when expressing the data type for SSE Subscriptions. */
const val SSE_GQL_SUBSCRIPTION_DATA = "subscription_data"

data class OperationMessage(
@JsonProperty("type")
val type: String,
Expand Down Expand Up @@ -67,6 +70,17 @@ data class DataPayload(
val errors: List<Any>? = emptyList()
) : MessagePayload

data class SSEDataPayload(
@JsonProperty("data")
val data: Any?,
@JsonProperty("errors")
val errors: List<Any>? = emptyList(),
@JsonProperty("subId")
val subId: String,
@JsonProperty("type")
val type: String = SSE_GQL_SUBSCRIPTION_DATA
) : MessagePayload

data class QueryPayload(
@JsonProperty("variables")
val variables: Map<String, Any>? = emptyMap(),
Expand Down
Expand Up @@ -16,10 +16,10 @@

package com.netflix.graphql.dgs.subscriptions.sse

import com.fasterxml.jackson.annotation.JsonProperty
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.netflix.graphql.dgs.DgsQueryExecutor
import com.netflix.graphql.types.subscription.DataPayload
import com.netflix.graphql.types.subscription.QueryPayload
import com.netflix.graphql.types.subscription.SSEDataPayload
import graphql.ExecutionResult
import graphql.InvalidSyntaxError
import graphql.validation.ValidationError
Expand Down Expand Up @@ -66,7 +66,10 @@ open class DgsSSESubscriptionHandler(open val dgsQueryExecutor: DgsQueryExecutor

val executionResult: ExecutionResult = dgsQueryExecutor.execute(queryPayload.query, queryPayload.variables)
if (executionResult.errors.isNotEmpty()) {
return if (executionResult.errors.asSequence().filterIsInstance<ValidationError>().any() || executionResult.errors.asSequence().filterIsInstance<InvalidSyntaxError>().any()) {
return if (
executionResult.errors.asSequence().filterIsInstance<ValidationError>().any() ||
executionResult.errors.asSequence().filterIsInstance<InvalidSyntaxError>().any()
) {
val errorMessage = "Subscription query failed to validate: ${executionResult.errors.joinToString(", ")}"
emitter.send(errorMessage)
emitter.complete()
Expand All @@ -91,8 +94,12 @@ open class DgsSSESubscriptionHandler(open val dgsQueryExecutor: DgsQueryExecutor

override fun onNext(t: ExecutionResult) {
val event = SseEmitter.event()
.data(mapper.writeValueAsString(DataPayload(t.getData(), t.errors)), MediaType.APPLICATION_JSON)
.id(UUID.randomUUID().toString())
.data(
mapper.writeValueAsString(
SSEDataPayload(data = t.getData(), errors = t.errors, subId = sessionId)
),
MediaType.APPLICATION_JSON
).id(UUID.randomUUID().toString())
emitter.send(event)

subscription.request(1)
Expand All @@ -101,7 +108,16 @@ open class DgsSSESubscriptionHandler(open val dgsQueryExecutor: DgsQueryExecutor
override fun onError(t: Throwable) {
logger.error("Error on subscription {}", sessionId, t)
val event = SseEmitter.event()
.data(mapper.writeValueAsString(DataPayload(null, listOf(Error(t.message)))), MediaType.APPLICATION_JSON)
.data(
mapper.writeValueAsString(
SSEDataPayload(
data = null,
errors = listOf(Error(t.message)),
subId = sessionId
)
),
MediaType.APPLICATION_JSON
)

emitter.send(event)
emitter.completeWithError(t)
Expand Down Expand Up @@ -143,13 +159,6 @@ open class DgsSSESubscriptionHandler(open val dgsQueryExecutor: DgsQueryExecutor
return ResponseEntity.ok(emitter)
}

data class QueryPayload(
@JsonProperty("variables") val variables: Map<String, Any> = emptyMap(),
@JsonProperty("extensions") val extensions: Map<String, Any> = emptyMap(),
@JsonProperty("operationName") val operationName: String?,
@JsonProperty("query") val query: String
)

companion object {
private val mapper = jacksonObjectMapper()
private val logger: Logger = LoggerFactory.getLogger(DgsSSESubscriptionHandler::class.java)
Expand Down
Expand Up @@ -18,6 +18,7 @@ package com.netflix.graphql.dgs.subscriptions.sse

import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.netflix.graphql.dgs.DgsQueryExecutor
import com.netflix.graphql.types.subscription.QueryPayload
import graphql.ExecutionResult
import graphql.GraphqlErrorBuilder
import graphql.validation.ValidationError
Expand Down Expand Up @@ -45,7 +46,7 @@ internal class DgsSSESubscriptionHandlerTest {
fun queryError() {

val query = "subscription { stocks { name, price }}"
val queryPayload = DgsSSESubscriptionHandler.QueryPayload(operationName = "MySubscription", query = query)
val queryPayload = QueryPayload(operationName = "MySubscription", query = query)
val base64 = Base64.getEncoder().encodeToString(jacksonObjectMapper().writeValueAsBytes(queryPayload))

every { dgsQueryExecutor.execute(query, any()) } returns executionResultMock
Expand All @@ -71,7 +72,7 @@ internal class DgsSSESubscriptionHandlerTest {
fun queryValidationError() {

val query = "subscription { stocks { name, price }}"
val queryPayload = DgsSSESubscriptionHandler.QueryPayload(operationName = "MySubscription", query = query)
val queryPayload = QueryPayload(operationName = "MySubscription", query = query)
val base64 = Base64.getEncoder().encodeToString(jacksonObjectMapper().writeValueAsBytes(queryPayload))

every { dgsQueryExecutor.execute(query, any()) } returns executionResultMock
Expand All @@ -95,7 +96,7 @@ internal class DgsSSESubscriptionHandlerTest {
fun notAPublisherServerError() {

val query = "subscription { stocks { name, price }}"
val queryPayload = DgsSSESubscriptionHandler.QueryPayload(operationName = "MySubscription", query = query)
val queryPayload = QueryPayload(operationName = "MySubscription", query = query)
val base64 = Base64.getEncoder().encodeToString(jacksonObjectMapper().writeValueAsBytes(queryPayload))

every { dgsQueryExecutor.execute(query, any()) } returns executionResultMock
Expand All @@ -110,7 +111,7 @@ internal class DgsSSESubscriptionHandlerTest {
fun notAPublisherClientError() {
// Not a subscription query
val query = "query { stocks { name, price }}"
val queryPayload = DgsSSESubscriptionHandler.QueryPayload(operationName = "MySubscription", query = query)
val queryPayload = QueryPayload(operationName = "MySubscription", query = query)
val base64 = Base64.getEncoder().encodeToString(jacksonObjectMapper().writeValueAsBytes(queryPayload))

every { dgsQueryExecutor.execute(query, any()) } returns executionResultMock
Expand All @@ -125,7 +126,7 @@ internal class DgsSSESubscriptionHandlerTest {
@Suppress("ReactiveStreamsUnusedPublisher")
fun success() {
val query = "query { stocks { name, price }}"
val queryPayload = DgsSSESubscriptionHandler.QueryPayload(operationName = "MySubscription", query = query)
val queryPayload = QueryPayload(operationName = "MySubscription", query = query)
val base64 = Base64.getEncoder().encodeToString(jacksonObjectMapper().writeValueAsBytes(queryPayload))

val nestedExecutionResult = mockk<ExecutionResult>()
Expand Down