Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify DgsSSESubscriptionHandler by returning Flux #1001

Merged
merged 1 commit into from Apr 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions graphql-dgs-client/dependencies.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Expand Up @@ -27,7 +27,6 @@ import graphql.language.TypeName
import graphql.schema.idl.TypeDefinitionRegistry
import org.junit.jupiter.api.Assertions.assertThrows
import org.junit.jupiter.api.Test
import org.slf4j.LoggerFactory
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.boot.web.server.LocalServerPort
Expand All @@ -42,11 +41,8 @@ import reactor.test.StepVerifier
)
internal class SSESubscriptionGraphQLClientTest {

val logger = LoggerFactory.getLogger(SSESubscriptionGraphQLClient::class.java)

@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN")
@LocalServerPort
lateinit var port: Integer
var port: Int? = null

@Test
fun `A successful subscription should publish ticks`() {
Expand Down
12 changes: 12 additions & 0 deletions graphql-dgs-subscriptions-sse-autoconfigure/dependencies.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions graphql-dgs-subscriptions-sse/build.gradle.kts
Expand Up @@ -20,6 +20,9 @@ dependencies {
implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
implementation("org.springframework:spring-web")
implementation("org.springframework:spring-webmvc")
implementation("io.projectreactor:reactor-core")

testImplementation("io.projectreactor:reactor-test")
testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("org.springframework.boot:spring-boot-starter-tomcat")
}
18 changes: 18 additions & 0 deletions graphql-dgs-subscriptions-sse/dependencies.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Expand Up @@ -22,20 +22,25 @@ import com.netflix.graphql.types.subscription.QueryPayload
import com.netflix.graphql.types.subscription.SSEDataPayload
import graphql.ExecutionResult
import graphql.InvalidSyntaxError
import graphql.language.OperationDefinition
import graphql.parser.InvalidSyntaxException
import graphql.parser.Parser
import graphql.validation.ValidationError
import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber
import org.reactivestreams.Subscription
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.http.MediaType
import org.springframework.http.ResponseEntity
import org.springframework.http.codec.ServerSentEvent
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RequestParam
import org.springframework.web.bind.annotation.RestController
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter
import org.springframework.web.server.ServerErrorException
import org.springframework.web.server.ServerWebInputException
import reactor.core.publisher.Flux
import java.nio.charset.StandardCharsets
import java.util.*
import java.util.Base64
import java.util.UUID
import com.netflix.graphql.types.subscription.Error as SseError

/**
* This class is defined as "open" only for proxy/aop use cases. It is not considered part of the API, and backwards compatibility is not guaranteed.
Expand All @@ -44,119 +49,73 @@ import java.util.*
@RestController
open class DgsSSESubscriptionHandler(open val dgsQueryExecutor: DgsQueryExecutor) {

@RequestMapping("/subscriptions", produces = ["text/event-stream"])
fun subscriptionWithId(@RequestParam("query") queryBase64: String): ResponseEntity<SseEmitter> {
val emitter = SseEmitter(-1)
val sessionId = UUID.randomUUID().toString()
@RequestMapping("/subscriptions", produces = [MediaType.TEXT_EVENT_STREAM_VALUE])
fun subscriptionWithId(@RequestParam("query") queryBase64: String): Flux<ServerSentEvent<String>> {
val query = try {
String(Base64.getDecoder().decode(queryBase64), StandardCharsets.UTF_8)
} catch (ex: IllegalArgumentException) {
emitter.send("Error decoding base64 encoded query")
emitter.complete()
return ResponseEntity.badRequest().body(emitter)
throw ServerWebInputException("Error decoding base64-encoded query")
}

val queryPayload = try {
mapper.readValue(query, QueryPayload::class.java)
} catch (ex: Exception) {
emitter.send("Error parsing query: ${ex.message}")
emitter.complete()
return ResponseEntity.badRequest().body(emitter)
throw ServerWebInputException("Error parsing query: ${ex.message}")
}

if (!isSubscriptionQuery(queryPayload.query)) {
throw ServerWebInputException("Invalid query. operation type is not a subscription")
}

val executionResult: ExecutionResult = dgsQueryExecutor.execute(queryPayload.query, queryPayload.variables)
if (executionResult.errors.isNotEmpty()) {
return if (
executionResult.errors.asSequence().filterIsInstance<ValidationError>().any() ||
executionResult.errors.asSequence().filterIsInstance<InvalidSyntaxError>().any()
) {
val errorMessage = "Subscription query failed to validate: ${executionResult.errors.joinToString(", ")}"
emitter.send(errorMessage)
emitter.complete()
ResponseEntity.badRequest().body(emitter)
val errorMessage = if (executionResult.errors.any { error -> error is ValidationError || error is InvalidSyntaxError }) {
"Subscription query failed to validate: ${executionResult.errors.joinToString()}"
} else {
val errorMessage = "Error executing subscription query: ${executionResult.errors.joinToString(", ")}"
logger.error(errorMessage)
emitter.send(errorMessage)
emitter.complete()
ResponseEntity.status(500).body(emitter)
"Error executing subscription query: ${executionResult.errors.joinToString()}"
}
}

val subscriber = object : Subscriber<ExecutionResult> {
lateinit var subscription: Subscription
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this actually wasn't thread safe, since the property wasn't volatile / access wasn't synchronized, although in practice I never saw any actual issues.


override fun onSubscribe(s: Subscription) {
logger.info("Started subscription with id {} for request {}", sessionId, queryPayload)
subscription = s
s.request(1)
}

override fun onNext(t: ExecutionResult) {
val event = SseEmitter.event()
.data(
mapper.writeValueAsString(
SSEDataPayload(data = t.getData(), errors = t.errors, subId = sessionId)
),
MediaType.APPLICATION_JSON
).id(UUID.randomUUID().toString())
emitter.send(event)

subscription.request(1)
}

override fun onError(t: Throwable) {
logger.error("Error on subscription {}", sessionId, t)
val event = SseEmitter.event()
.data(
mapper.writeValueAsString(
SSEDataPayload(
data = null,
errors = listOf(Error(t.message)),
subId = sessionId
)
),
MediaType.APPLICATION_JSON
)

emitter.send(event)
emitter.completeWithError(t)
}

override fun onComplete() {
emitter.complete()
}
}

emitter.onError {
logger.warn("Subscription {} had a connection error", sessionId)
subscriber.subscription.cancel()
}

emitter.onTimeout {
logger.warn("Subscription {} timed out", sessionId)
subscriber.subscription.cancel()
logger.error(errorMessage)
throw ServerWebInputException(errorMessage)
}

val publisher = try {
executionResult.getData<Publisher<ExecutionResult>>()
} catch (ex: ClassCastException) {
return if (query.contains("subscription")) {
logger.error("Invalid return type for subscription datafetcher. A subscription datafetcher must return a Publisher<ExecutionResult>. The query was $query", ex)
emitter.send("Invalid return type for subscription datafetcher. Was a non-subscription query send to the subscription endpoint?")
emitter.complete()
ResponseEntity.status(500).body(emitter)
} else {
logger.warn("Invalid return type for subscription datafetcher. The query sent doesn't appear to be a subscription query: $query", ex)
emitter.send("Invalid return type for subscription datafetcher. Was a non-subscription query send to the subscription endpoint?")
emitter.complete()
ResponseEntity.badRequest().body(emitter)
}
} catch (exc: ClassCastException) {
logger.error(
"Invalid return type for subscription datafetcher. A subscription datafetcher must return a Publisher<ExecutionResult>. The query was {}",
query, exc
)
throw ServerErrorException("Invalid return type for subscription datafetcher. Was a non-subscription query send to the subscription endpoint?", exc)
}

publisher.subscribe(subscriber)
val subscriptionId = UUID.randomUUID().toString()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does the execution Id get set here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we can get the ExecutionId here.

return Flux.from(publisher)
.map {
val payload = SSEDataPayload(data = it.getData(), errors = it.errors, subId = subscriptionId)
ServerSentEvent.builder(mapper.writeValueAsString(payload))
.id(UUID.randomUUID().toString())
.build()
}.onErrorResume { exc ->
logger.warn("An exception occurred on subscription {}", subscriptionId, exc)
val errorMessage = exc.message ?: "An exception occurred"
val payload = SSEDataPayload(data = null, errors = listOf(SseError(errorMessage)), subId = subscriptionId)
Flux.just(
ServerSentEvent.builder(mapper.writeValueAsString(payload))
.id(UUID.randomUUID().toString())
.build()
)
}
}

return ResponseEntity.ok(emitter)
private fun isSubscriptionQuery(query: String): Boolean {
val document = try {
Parser().parseDocument(query)
} catch (exc: InvalidSyntaxException) {
return false
}
val definitions = document.getDefinitionsOfType(OperationDefinition::class.java)
return definitions.isNotEmpty() &&
definitions.all { def -> def.operation == OperationDefinition.Operation.SUBSCRIPTION }
}

companion object {
Expand Down