Skip to content

Commit

Permalink
Merge pull request #721 from Netflix/bugfix/sse-subscriptions
Browse files Browse the repository at this point in the history
Fix SSE Subscriptions Data Type.
  • Loading branch information
berngp committed Nov 2, 2021
2 parents 7b6513b + ab084e5 commit 08f92af
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 18 deletions.
Expand Up @@ -33,6 +33,9 @@ const val GQL_COMPLETE = "complete"
const val GQL_CONNECTION_TERMINATE = "connection_terminate"
const val GQL_CONNECTION_KEEP_ALIVE = "ka"

/** Used only when expressing the data type for SSE Subscriptions. */
const val SSE_GQL_SUBSCRIPTION_DATA = "subscription_data"

data class OperationMessage(
@JsonProperty("type")
val type: String,
Expand Down Expand Up @@ -67,6 +70,17 @@ data class DataPayload(
val errors: List<Any>? = emptyList()
) : MessagePayload

data class SSEDataPayload(
@JsonProperty("data")
val data: Any?,
@JsonProperty("errors")
val errors: List<Any>? = emptyList(),
@JsonProperty("subId")
val subId: String,
@JsonProperty("type")
val type: String = SSE_GQL_SUBSCRIPTION_DATA
) : MessagePayload

data class QueryPayload(
@JsonProperty("variables")
val variables: Map<String, Any>? = emptyMap(),
Expand Down
Expand Up @@ -16,10 +16,10 @@

package com.netflix.graphql.dgs.subscriptions.sse

import com.fasterxml.jackson.annotation.JsonProperty
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.netflix.graphql.dgs.DgsQueryExecutor
import com.netflix.graphql.types.subscription.DataPayload
import com.netflix.graphql.types.subscription.QueryPayload
import com.netflix.graphql.types.subscription.SSEDataPayload
import graphql.ExecutionResult
import graphql.InvalidSyntaxError
import graphql.validation.ValidationError
Expand Down Expand Up @@ -66,7 +66,10 @@ open class DgsSSESubscriptionHandler(open val dgsQueryExecutor: DgsQueryExecutor

val executionResult: ExecutionResult = dgsQueryExecutor.execute(queryPayload.query, queryPayload.variables)
if (executionResult.errors.isNotEmpty()) {
return if (executionResult.errors.asSequence().filterIsInstance<ValidationError>().any() || executionResult.errors.asSequence().filterIsInstance<InvalidSyntaxError>().any()) {
return if (
executionResult.errors.asSequence().filterIsInstance<ValidationError>().any() ||
executionResult.errors.asSequence().filterIsInstance<InvalidSyntaxError>().any()
) {
val errorMessage = "Subscription query failed to validate: ${executionResult.errors.joinToString(", ")}"
emitter.send(errorMessage)
emitter.complete()
Expand All @@ -91,8 +94,12 @@ open class DgsSSESubscriptionHandler(open val dgsQueryExecutor: DgsQueryExecutor

override fun onNext(t: ExecutionResult) {
val event = SseEmitter.event()
.data(mapper.writeValueAsString(DataPayload(t.getData(), t.errors)), MediaType.APPLICATION_JSON)
.id(UUID.randomUUID().toString())
.data(
mapper.writeValueAsString(
SSEDataPayload(data = t.getData(), errors = t.errors, subId = sessionId)
),
MediaType.APPLICATION_JSON
).id(UUID.randomUUID().toString())
emitter.send(event)

subscription.request(1)
Expand All @@ -101,7 +108,16 @@ open class DgsSSESubscriptionHandler(open val dgsQueryExecutor: DgsQueryExecutor
override fun onError(t: Throwable) {
logger.error("Error on subscription {}", sessionId, t)
val event = SseEmitter.event()
.data(mapper.writeValueAsString(DataPayload(null, listOf(Error(t.message)))), MediaType.APPLICATION_JSON)
.data(
mapper.writeValueAsString(
SSEDataPayload(
data = null,
errors = listOf(Error(t.message)),
subId = sessionId
)
),
MediaType.APPLICATION_JSON
)

emitter.send(event)
emitter.completeWithError(t)
Expand Down Expand Up @@ -143,13 +159,6 @@ open class DgsSSESubscriptionHandler(open val dgsQueryExecutor: DgsQueryExecutor
return ResponseEntity.ok(emitter)
}

data class QueryPayload(
@JsonProperty("variables") val variables: Map<String, Any> = emptyMap(),
@JsonProperty("extensions") val extensions: Map<String, Any> = emptyMap(),
@JsonProperty("operationName") val operationName: String?,
@JsonProperty("query") val query: String
)

companion object {
private val mapper = jacksonObjectMapper()
private val logger: Logger = LoggerFactory.getLogger(DgsSSESubscriptionHandler::class.java)
Expand Down
Expand Up @@ -18,6 +18,7 @@ package com.netflix.graphql.dgs.subscriptions.sse

import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.netflix.graphql.dgs.DgsQueryExecutor
import com.netflix.graphql.types.subscription.QueryPayload
import graphql.ExecutionResult
import graphql.GraphqlErrorBuilder
import graphql.validation.ValidationError
Expand Down Expand Up @@ -45,7 +46,7 @@ internal class DgsSSESubscriptionHandlerTest {
fun queryError() {

val query = "subscription { stocks { name, price }}"
val queryPayload = DgsSSESubscriptionHandler.QueryPayload(operationName = "MySubscription", query = query)
val queryPayload = QueryPayload(operationName = "MySubscription", query = query)
val base64 = Base64.getEncoder().encodeToString(jacksonObjectMapper().writeValueAsBytes(queryPayload))

every { dgsQueryExecutor.execute(query, any()) } returns executionResultMock
Expand All @@ -71,7 +72,7 @@ internal class DgsSSESubscriptionHandlerTest {
fun queryValidationError() {

val query = "subscription { stocks { name, price }}"
val queryPayload = DgsSSESubscriptionHandler.QueryPayload(operationName = "MySubscription", query = query)
val queryPayload = QueryPayload(operationName = "MySubscription", query = query)
val base64 = Base64.getEncoder().encodeToString(jacksonObjectMapper().writeValueAsBytes(queryPayload))

every { dgsQueryExecutor.execute(query, any()) } returns executionResultMock
Expand All @@ -95,7 +96,7 @@ internal class DgsSSESubscriptionHandlerTest {
fun notAPublisherServerError() {

val query = "subscription { stocks { name, price }}"
val queryPayload = DgsSSESubscriptionHandler.QueryPayload(operationName = "MySubscription", query = query)
val queryPayload = QueryPayload(operationName = "MySubscription", query = query)
val base64 = Base64.getEncoder().encodeToString(jacksonObjectMapper().writeValueAsBytes(queryPayload))

every { dgsQueryExecutor.execute(query, any()) } returns executionResultMock
Expand All @@ -110,7 +111,7 @@ internal class DgsSSESubscriptionHandlerTest {
fun notAPublisherClientError() {
// Not a subscription query
val query = "query { stocks { name, price }}"
val queryPayload = DgsSSESubscriptionHandler.QueryPayload(operationName = "MySubscription", query = query)
val queryPayload = QueryPayload(operationName = "MySubscription", query = query)
val base64 = Base64.getEncoder().encodeToString(jacksonObjectMapper().writeValueAsBytes(queryPayload))

every { dgsQueryExecutor.execute(query, any()) } returns executionResultMock
Expand All @@ -125,7 +126,7 @@ internal class DgsSSESubscriptionHandlerTest {
@Suppress("ReactiveStreamsUnusedPublisher")
fun success() {
val query = "query { stocks { name, price }}"
val queryPayload = DgsSSESubscriptionHandler.QueryPayload(operationName = "MySubscription", query = query)
val queryPayload = QueryPayload(operationName = "MySubscription", query = query)
val base64 = Base64.getEncoder().encodeToString(jacksonObjectMapper().writeValueAsBytes(queryPayload))

val nestedExecutionResult = mockk<ExecutionResult>()
Expand Down

0 comments on commit 08f92af

Please sign in to comment.