/
WebSocketGraphQLClient.kt
270 lines (242 loc) · 10 KB
/
WebSocketGraphQLClient.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
/*
* Copyright 2021 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.graphql.dgs.client
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.fasterxml.jackson.module.kotlin.jacksonTypeRef
import com.netflix.graphql.types.subscription.*
import graphql.GraphQLException
import org.springframework.web.reactive.socket.WebSocketMessage
import org.springframework.web.reactive.socket.WebSocketSession
import org.springframework.web.reactive.socket.client.ReactorNettyWebSocketClient
import org.springframework.web.reactive.socket.client.WebSocketClient
import reactor.core.Disposable
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.publisher.Sinks
import reactor.util.concurrent.Queues
import java.net.URI
import java.time.Duration
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.atomic.AtomicReference
/**
* Reactive client implementation using websockets and the subscription-transport-ws protocol:
* https://github.com/apollographql/subscriptions-transport-ws/blob/master/PROTOCOL.md
*/
class WebSocketGraphQLClient(
private val client: OperationMessageWebSocketClient,
private val acknowledgementTimeout: Duration = DEFAULT_ACKNOWLEDGEMENT_TIMEOUT
) : ReactiveGraphQLClient {
companion object {
private val DEFAULT_ACKNOWLEDGEMENT_TIMEOUT = Duration.ofSeconds(30)
private val CONNECTION_INIT_MESSAGE = OperationMessage(GQL_CONNECTION_INIT, null, null)
private val MAPPER = jacksonObjectMapper()
}
constructor(
url: String,
client: WebSocketClient,
acknowledgementTimeout: Duration
) :
this(OperationMessageWebSocketClient(url, client), acknowledgementTimeout)
constructor(
url: String,
client: WebSocketClient
) :
this(OperationMessageWebSocketClient(url, client), DEFAULT_ACKNOWLEDGEMENT_TIMEOUT)
constructor(
url: String
) : this(OperationMessageWebSocketClient(url, ReactorNettyWebSocketClient()), DEFAULT_ACKNOWLEDGEMENT_TIMEOUT)
constructor(client: OperationMessageWebSocketClient) :
this(client, DEFAULT_ACKNOWLEDGEMENT_TIMEOUT)
private val subscriptionCount = AtomicLong(0L)
// The handshake represents a connection to the server, it is cached so that there is one per client instance.
// The handshake only completes once the connection has been establishes and a GQL_CONNECTION_ACK message has been
// received from the server. If the connection closes it is reestablished and the handshake is reperformed on the
// next downstream subscribe.
// TODO: This functionality can be achieved more easily with Mono::cacheInvalidateIf, which is available in the
// next release of reactors (v3.4.8: https://github.com/reactor/reactor-core/releases/tag/v3.4.8)
private val connection = AtomicReference<Disposable?>(null)
private val handshake = Mono.defer {
if (connectionIsStale())
doHandshake()
else
Mono.empty()
}
override fun reactiveExecuteQuery(
query: String,
variables: Map<String, Any>,
): Flux<GraphQLResponse> {
return reactiveExecuteQuery(query, variables, null)
}
override fun reactiveExecuteQuery(
query: String,
variables: Map<String, Any>,
operationName: String?,
): Flux<GraphQLResponse> {
// Generate a unique number for each subscription in the same session.
val subscriptionId = subscriptionCount
.incrementAndGet()
.toString()
val queryMessage = OperationMessage(
GQL_START,
QueryPayload(variables, emptyMap(), operationName, query),
subscriptionId
)
val stopMessage = OperationMessage(GQL_STOP, null, subscriptionId)
// Because handshake is cached it should have only been done once, all subsequent calls to
// reactiveExecuteQuery() will proceed straight to client.receive()
return handshake
.doOnSuccess { client.send(queryMessage) }
.thenMany(
client.receive()
.filter { it.id == subscriptionId }
.takeUntil { it.type == GQL_COMPLETE }
.doOnCancel { client.send(stopMessage) }
.flatMap(this::handleMessage)
)
}
private fun connectionIsStale() = connection.get()?.isDisposed != false
private fun doHandshake(): Mono<Void> {
return Mono.defer {
connection.set(client.connect().subscribe())
client.send(CONNECTION_INIT_MESSAGE)
client.receive()
.take(1)
.map { message ->
if (message.type == GQL_CONNECTION_ACK)
message
else
throw GraphQLException("Acknowledgement expected from server, received $message")
}
.timeout(acknowledgementTimeout)
.then()
}
}
private fun handleMessage(
message: OperationMessage
): Flux<GraphQLResponse> {
@Suppress("BlockingMethodInNonBlockingContext")
when (message.type) {
// Do nothing if no data provided
GQL_CONNECTION_ACK, GQL_CONNECTION_KEEP_ALIVE, GQL_COMPLETE -> {
return Flux.empty()
}
// Convert data to GraphQLResponse
GQL_DATA -> {
val payload = message.payload
return Flux.just(GraphQLResponse(MAPPER.writeValueAsString(payload)))
}
// Convert errors received from the server into exceptions, does
// not include GraphQL execution errors which are bundled in the
// GraphQLResponse above.
GQL_CONNECTION_ERROR, GQL_ERROR -> {
val errorMessage = message.payload.toString()
throw GraphQLException(errorMessage)
}
// The message is invalid according to the subscriptions transport
// protocol so it should result in an exception
else -> {
throw GraphQLException(
"Unable to handle message of type " +
"${message.type}. Full message: $message"
)
}
}
}
}
/**
* Wrapper around a WebSocketClient for sending/receiving OperationMessages
*/
class OperationMessageWebSocketClient(
private val url: String,
private val client: WebSocketClient
) {
companion object {
private val MAPPER = jacksonObjectMapper()
}
// Sinks are used as buffers, incoming messages from the server are
// buffered in incomingSink before being consumed. Outgoing messages
// for the server are buffered in outgoingSink before being sent.
private val incomingSink = Sinks
.many()
.multicast()
// Flag prevents the sink from auto-cancelling on the completion of a single subscriber, see:
// https://stackoverflow.com/questions/66671636/why-is-sinks-many-multicast-onbackpressurebuffer-completing-after-one-of-t
.onBackpressureBuffer<OperationMessage>(Queues.SMALL_BUFFER_SIZE, false)
private val outgoingSink = Sinks
.many()
.multicast()
.onBackpressureBuffer<OperationMessage>(Queues.SMALL_BUFFER_SIZE, false)
private val errorSink = Sinks
.many()
.multicast()
.onBackpressureBuffer<GraphQLException>(Queues.SMALL_BUFFER_SIZE, false)
fun connect(): Mono<Void> {
return Mono.defer { client.execute(URI(url), this::exchange) }
}
/**
* Send a message to the server, the message is buffered for sending later if connection has not been established
* @param message The OperationMessage to send
*/
fun send(message: OperationMessage) {
outgoingSink
.tryEmitNext(message)
.orThrow()
}
/**
* Stream messages from the server, lazily establish connection
* @return Flux of OperationMessages
*/
fun receive(): Flux<OperationMessage> {
return incomingSink
.asFlux()
.mergeWith(errorSink.asFlux().map { throw it })
}
private fun exchange(
session: WebSocketSession
): Mono<Void> {
// Create chains to handle de/serialization
val incomingDeserialized = session
.receive()
.map(this::decodeMessage)
.doOnNext(incomingSink::tryEmitNext)
val outgoingSerialized = session
.send(
outgoingSink
.asFlux()
.map { createMessage(session, it) }
)
// Transfer the contents of the sinks to/from the server
return Flux
.merge(incomingDeserialized, outgoingSerialized)
.then()
// Ensure the output flux collapses neatly if an error occurs
.doOnError { errorSink.tryEmitNext(GraphQLException(it)).orThrow() }
.doAfterTerminate {
errorSink.tryEmitNext(GraphQLException("Server closed the connection unexpectedly")).orThrow()
}
}
private fun createMessage(
session: WebSocketSession,
message: OperationMessage
): WebSocketMessage {
return session.textMessage(MAPPER.writeValueAsString(message))
}
private fun decodeMessage(message: WebSocketMessage): OperationMessage {
val messageText = message.payloadAsText
val type = jacksonTypeRef<OperationMessage>()
return MAPPER.readValue(messageText, type)
}
}