Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose subscription ID to subscription data fetchers #999

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions graphql-dgs-client/dependencies.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import graphql.language.TypeName
import graphql.schema.idl.TypeDefinitionRegistry
import org.junit.jupiter.api.Assertions.assertThrows
import org.junit.jupiter.api.Test
import org.slf4j.LoggerFactory
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.boot.web.server.LocalServerPort
Expand All @@ -42,11 +41,8 @@ import reactor.test.StepVerifier
)
internal class SSESubscriptionGraphQLClientTest {

val logger = LoggerFactory.getLogger(SSESubscriptionGraphQLClient::class.java)

@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN")
@LocalServerPort
lateinit var port: Integer
var port: Int? = null

@Test
fun `A successful subscription should publish ticks`() {
Expand Down
12 changes: 12 additions & 0 deletions graphql-dgs-subscriptions-sse-autoconfigure/dependencies.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions graphql-dgs-subscriptions-sse/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ dependencies {
implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
implementation("org.springframework:spring-web")
implementation("org.springframework:spring-webmvc")
implementation("io.projectreactor:reactor-core")

testImplementation("io.projectreactor:reactor-test")
testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("org.springframework.boot:spring-boot-starter-tomcat")
}
18 changes: 18 additions & 0 deletions graphql-dgs-subscriptions-sse/dependencies.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,25 @@ import com.netflix.graphql.types.subscription.QueryPayload
import com.netflix.graphql.types.subscription.SSEDataPayload
import graphql.ExecutionResult
import graphql.InvalidSyntaxError
import graphql.language.OperationDefinition
import graphql.parser.InvalidSyntaxException
import graphql.parser.Parser
import graphql.validation.ValidationError
import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber
import org.reactivestreams.Subscription
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.http.MediaType
import org.springframework.http.ResponseEntity
import org.springframework.http.codec.ServerSentEvent
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RequestParam
import org.springframework.web.bind.annotation.RestController
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter
import org.springframework.web.server.ServerErrorException
import org.springframework.web.server.ServerWebInputException
import reactor.core.publisher.Flux
import java.nio.charset.StandardCharsets
import java.util.*
import java.util.Base64
import java.util.UUID
import com.netflix.graphql.types.subscription.Error as SseError

/**
* This class is defined as "open" only for proxy/aop use cases. It is not considered part of the API, and backwards compatibility is not guaranteed.
Expand All @@ -44,119 +49,74 @@ import java.util.*
@RestController
open class DgsSSESubscriptionHandler(open val dgsQueryExecutor: DgsQueryExecutor) {

@RequestMapping("/subscriptions", produces = ["text/event-stream"])
fun subscriptionWithId(@RequestParam("query") queryBase64: String): ResponseEntity<SseEmitter> {
val emitter = SseEmitter(-1)
val sessionId = UUID.randomUUID().toString()
@RequestMapping("/subscriptions", produces = [MediaType.TEXT_EVENT_STREAM_VALUE])
fun subscriptionWithId(@RequestParam("query") queryBase64: String): Flux<ServerSentEvent<String>> {
val query = try {
String(Base64.getDecoder().decode(queryBase64), StandardCharsets.UTF_8)
} catch (ex: IllegalArgumentException) {
emitter.send("Error decoding base64 encoded query")
emitter.complete()
return ResponseEntity.badRequest().body(emitter)
throw ServerWebInputException("Error decoding base64-encoded query")
}

val queryPayload = try {
mapper.readValue(query, QueryPayload::class.java)
} catch (ex: Exception) {
emitter.send("Error parsing query: ${ex.message}")
emitter.complete()
return ResponseEntity.badRequest().body(emitter)
throw ServerWebInputException("Error parsing query: ${ex.message}")
}

if (!isSubscriptionQuery(queryPayload.query)) {
throw ServerWebInputException("Invalid query. operation type is not a subscription")
}

val executionResult: ExecutionResult = dgsQueryExecutor.execute(queryPayload.query, queryPayload.variables)
if (executionResult.errors.isNotEmpty()) {
return if (
executionResult.errors.asSequence().filterIsInstance<ValidationError>().any() ||
executionResult.errors.asSequence().filterIsInstance<InvalidSyntaxError>().any()
) {
val errorMessage = "Subscription query failed to validate: ${executionResult.errors.joinToString(", ")}"
emitter.send(errorMessage)
emitter.complete()
ResponseEntity.badRequest().body(emitter)
val errorMessage = if (executionResult.errors.any { error -> error is ValidationError || error is InvalidSyntaxError }) {
"Subscription query failed to validate: ${executionResult.errors.joinToString()}"
} else {
val errorMessage = "Error executing subscription query: ${executionResult.errors.joinToString(", ")}"
logger.error(errorMessage)
emitter.send(errorMessage)
emitter.complete()
ResponseEntity.status(500).body(emitter)
"Error executing subscription query: ${executionResult.errors.joinToString()}"
}
}

val subscriber = object : Subscriber<ExecutionResult> {
lateinit var subscription: Subscription

override fun onSubscribe(s: Subscription) {
logger.info("Started subscription with id {} for request {}", sessionId, queryPayload)
subscription = s
s.request(1)
}

override fun onNext(t: ExecutionResult) {
val event = SseEmitter.event()
.data(
mapper.writeValueAsString(
SSEDataPayload(data = t.getData(), errors = t.errors, subId = sessionId)
),
MediaType.APPLICATION_JSON
).id(UUID.randomUUID().toString())
emitter.send(event)

subscription.request(1)
}

override fun onError(t: Throwable) {
logger.error("Error on subscription {}", sessionId, t)
val event = SseEmitter.event()
.data(
mapper.writeValueAsString(
SSEDataPayload(
data = null,
errors = listOf(Error(t.message)),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this line was actually a bug. Error here is actually a typealias for java.lang.Error, so we'd end up sending back a full stack trace. I changed it to use com.netflix.graphql.types.subscription.Error, which just has a message field instead, since I think that is what was intended. Maybe the name of the class should be changed so that it doesn't so easily conflict with the kotlin Error typealias?

subId = sessionId
)
),
MediaType.APPLICATION_JSON
)

emitter.send(event)
emitter.completeWithError(t)
}

override fun onComplete() {
emitter.complete()
}
}

emitter.onError {
logger.warn("Subscription {} had a connection error", sessionId)
subscriber.subscription.cancel()
}

emitter.onTimeout {
logger.warn("Subscription {} timed out", sessionId)
subscriber.subscription.cancel()
logger.error(errorMessage)
throw ServerWebInputException(errorMessage)
}

val publisher = try {
executionResult.getData<Publisher<ExecutionResult>>()
} catch (ex: ClassCastException) {
return if (query.contains("subscription")) {
logger.error("Invalid return type for subscription datafetcher. A subscription datafetcher must return a Publisher<ExecutionResult>. The query was $query", ex)
emitter.send("Invalid return type for subscription datafetcher. Was a non-subscription query send to the subscription endpoint?")
emitter.complete()
ResponseEntity.status(500).body(emitter)
} else {
logger.warn("Invalid return type for subscription datafetcher. The query sent doesn't appear to be a subscription query: $query", ex)
emitter.send("Invalid return type for subscription datafetcher. Was a non-subscription query send to the subscription endpoint?")
emitter.complete()
ResponseEntity.badRequest().body(emitter)
}
} catch (exc: ClassCastException) {
logger.error(
"Invalid return type for subscription datafetcher. A subscription datafetcher must return a Publisher<ExecutionResult>. The query was {}",
query, exc
)
throw ServerErrorException("Invalid return type for subscription datafetcher. Was a non-subscription query send to the subscription endpoint?", exc)
}

publisher.subscribe(subscriber)
val subscriptionId = UUID.randomUUID().toString()
return Flux.from(publisher)
.contextWrite { ctx -> ctx.put("subscriptionId", subscriptionId) }
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can move this key to a constant somewhere

.map {
val payload = SSEDataPayload(data = it.getData(), errors = it.errors, subId = subscriptionId)
ServerSentEvent.builder(mapper.writeValueAsString(payload))
.id(UUID.randomUUID().toString())
.build()
}.onErrorResume { exc ->
logger.warn("An exception occurred on subscription {}", subscriptionId, exc)
val errorMessage = exc.message ?: "An exception occurred"
val payload = SSEDataPayload(data = null, errors = listOf(SseError(errorMessage)), subId = subscriptionId)
Flux.just(
ServerSentEvent.builder(mapper.writeValueAsString(payload))
.id(UUID.randomUUID().toString())
.build()
)
}
}

return ResponseEntity.ok(emitter)
private fun isSubscriptionQuery(query: String): Boolean {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the endpoint to explicitly check if the query is a subscription query before attempting to execute it, which seemed like a good idea to me.

val document = try {
Parser().parseDocument(query)
} catch (exc: InvalidSyntaxException) {
return false
}
val definitions = document.getDefinitionsOfType(OperationDefinition::class.java)
return definitions.isNotEmpty() &&
definitions.all { def -> def.operation == OperationDefinition.Operation.SUBSCRIPTION }
}

companion object {
Expand Down