Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update documentation to explain streaming vs collecting and error handling with Flux<T> return values #32630

Open
Airidas36 opened this issue Apr 12, 2024 · 5 comments
Labels
in: web Issues in web modules (web, webmvc, webflux, websocket) type: documentation A documentation task
Milestone

Comments

@Airidas36
Copy link

Affects: 6.1.5

Description

@RestController which is producing Flux<T> as MediaType.APPLICATION_JSON_VALUE will fail to encode an error, handled by @RestControllerAdvice, if the Flux contains multiple signals starting with the first one onNext followed by onError(s) after it.
Such signal arrangement in the Flux will result in a stacktrace that looks like this as the AbstractJackson2Encoder will fail to encode the error because the ServerHttpResponse was already committed and partially transferred to the client (the first item from the Flux with onNext):

2024-04-13T01:08:17.665+03:00 ERROR 8668 --- [demo] [ctor-http-nio-2] o.s.w.s.adapter.HttpWebHandlerAdapter    : [7ac50042-1] Error [java.lang.UnsupportedOperationException] for HTTP GET "/failEncoder", but ServerHttpResponse already committed (200 OK)
2024-04-13T01:08:17.666+03:00 ERROR 8668 --- [demo] [ctor-http-nio-2] r.n.http.server.HttpServerOperations     : [7ac50042-1, L:/127.0.0.1:8080 - R:/127.0.0.1:52702] Error finishing response. Closing connection

java.lang.UnsupportedOperationException: null
	at org.springframework.http.ReadOnlyHttpHeaders.set(ReadOnlyHttpHeaders.java:108) ~[spring-web-6.1.5.jar:6.1.5]
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
	*__checkpoint ⇢ Exception handler com.example.demo.ExceptionHandlers#handleAllErrors(Exception), error="FAIL state encountered" [DispatcherHandler]
	*__checkpoint ⇢ HTTP GET "/failEncoder" [ExceptionHandlingWebHandler]
Original Stack Trace:
		at org.springframework.http.ReadOnlyHttpHeaders.set(ReadOnlyHttpHeaders.java:108) ~[spring-web-6.1.5.jar:6.1.5]
		at org.springframework.http.HttpHeaders.setContentLength(HttpHeaders.java:967) ~[spring-web-6.1.5.jar:6.1.5]
		at org.springframework.http.codec.EncoderHttpMessageWriter.lambda$write$1(EncoderHttpMessageWriter.java:135) ~[spring-web-6.1.5.jar:6.1.5]
		at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:132) ~[reactor-core-3.6.4.jar:3.6.4]
		at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:74) ~[reactor-core-3.6.4.jar:3.6.4]
		at reactor.core.publisher.Operators$MonoInnerProducerBase.complete(Operators.java:2842) ~[reactor-core-3.6.4.jar:3.6.4]
		at reactor.core.publisher.MonoSingle$SingleSubscriber.onComplete(MonoSingle.java:180) ~[reactor-core-3.6.4.jar:3.6.4]
		at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onComplete(FluxMapFuseable.java:152) ~[reactor-core-3.6.4.jar:3.6.4]
		at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2573) ~[reactor-core-3.6.4.jar:3.6.4]
		at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171) ~[reactor-core-3.6.4.jar:3.6.4]
		at reactor.core.publisher.MonoSingle$SingleSubscriber.doOnRequest(MonoSingle.java:103) ~[reactor-core-3.6.4.jar:3.6.4]
		at reactor.core.publisher.Operators$MonoInnerProducerBase.request(Operators.java:2909) ~[reactor-core-3.6.4.jar:3.6.4]
		at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2367) ~[reactor-core-3.6.4.jar:3.6.4]
		at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:2241) ~[reactor-core-3.6.4.jar:3.6.4]
		at reactor.core.publisher.MonoSingle$SingleSubscriber.onSubscribe(MonoSingle.java:115) ~[reactor-core-3.6.4.jar:3.6.4]
		at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96) ~[reactor-core-3.6.4.jar:3.6.4]
		at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55) ~[reactor-core-3.6.4.jar:3.6.4]
		at reactor.core.publisher.FluxFromMonoOperator.subscribe(FluxFromMonoOperator.java:85) ~[reactor-core-3.6.4.jar:3.6.4]
		at reactor.core.publisher.FluxDeferContextual.subscribe(FluxDeferContextual.java:57) ~[reactor-core-3.6.4.jar:3.6.4]
		at reactor.core.publisher.Mono.subscribe(Mono.java:4568) ~[reactor-core-3.6.4.jar:3.6.4]
		at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:202) ~[reactor-core-3.6.4.jar:3.6.4]
		at reactor.core.publisher.MonoFlatMap.subscribeOrReturn(MonoFlatMap.java:53) ~[reactor-core-3.6.4.jar:3.6.4]
		at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:63) ~[reactor-core-3.6.4.jar:3.6.4]
		at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:165) ~[reactor-core-3.6.4.jar:3.6.4]
		at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79) ~[reactor-core-3.6.4.jar:3.6.4]
		at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:158) ~[reactor-core-3.6.4.jar:3.6.4]
		at reactor.core.publisher.MonoZip$ZipCoordinator.signal(MonoZip.java:297) ~[reactor-core-3.6.4.jar:3.6.4]
		at reactor.core.publisher.MonoZip$ZipInner.onNext(MonoZip.java:478) ~[reactor-core-3.6.4.jar:3.6.4]
		at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2571) ~[reactor-core-3.6.4.jar:3.6.4]
		at reactor.core.publisher.MonoZip$ZipInner.onSubscribe(MonoZip.java:470) ~[reactor-core-3.6.4.jar:3.6.4]
		at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55) ~[reactor-core-3.6.4.jar:3.6.4]
		at reactor.core.publisher.MonoZip$ZipCoordinator.request(MonoZip.java:220) ~[reactor-core-3.6.4.jar:3.6.4]
		at reactor.core.publisher.MonoFlatMap$FlatMapMain.request(MonoFlatMap.java:194) ~[reactor-core-3.6.4.jar:3.6.4]
		at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2367) ~[reactor-core-3.6.4.jar:3.6.4]
		at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onSubscribe(FluxOnErrorResume.java:74) ~[reactor-core-3.6.4.jar:3.6.4]
		at reactor.core.publisher.MonoFlatMap$FlatMapMain.onSubscribe(MonoFlatMap.java:117) ~[reactor-core-3.6.4.jar:3.6.4]
		at reactor.core.publisher.MonoZip.subscribe(MonoZip.java:129) ~[reactor-core-3.6.4.jar:3.6.4]
		at reactor.core.publisher.Mono.subscribe(Mono.java:4568) ~[reactor-core-3.6.4.jar:3.6.4]
		at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103) ~[reactor-core-3.6.4.jar:3.6.4]
		at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onError(MonoPeekTerminal.java:258) ~[reactor-core-3.6.4.jar:3.6.4]
		at org.springframework.http.server.reactive.ChannelSendOperator$WriteCompletionBarrier.onError(ChannelSendOperator.java:418) ~[spring-web-6.1.5.jar:6.1.5]
		at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onError(MonoIgnoreElements.java:84) ~[reactor-core-3.6.4.jar:3.6.4]
		at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onError(FluxConcatArray.java:208) ~[reactor-core-3.6.4.jar:3.6.4]
		at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onError(MonoIgnoreThen.java:280) ~[reactor-core-3.6.4.jar:3.6.4]
		at reactor.netty.channel.MonoSendMany$SendManyInner.run(MonoSendMany.java:352) ~[reactor-netty-core-1.1.17.jar:1.1.17]
		at reactor.netty.channel.MonoSendMany$SendManyInner.trySchedule(MonoSendMany.java:434) ~[reactor-netty-core-1.1.17.jar:1.1.17]
		at reactor.netty.channel.MonoSendMany$SendManyInner.trySuccess(MonoSendMany.java:598) ~[reactor-netty-core-1.1.17.jar:1.1.17]
		at reactor.netty.channel.MonoSendMany$SendManyInner.trySuccess(MonoSendMany.java:118) ~[reactor-netty-core-1.1.17.jar:1.1.17]
		at io.netty.util.concurrent.PromiseCombiner.tryPromise(PromiseCombiner.java:170) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
		at io.netty.util.concurrent.PromiseCombiner.access$600(PromiseCombiner.java:35) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
		at io.netty.util.concurrent.PromiseCombiner$1.operationComplete0(PromiseCombiner.java:62) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
		at io.netty.util.concurrent.PromiseCombiner$1.operationComplete(PromiseCombiner.java:44) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
		at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:590) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
		at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:557) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
		at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:492) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
		at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:636) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
		at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:625) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
		at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:105) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
		at io.netty.util.internal.PromiseNotificationUtil.trySuccess(PromiseNotificationUtil.java:48) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
		at io.netty.channel.ChannelOutboundBuffer.safeSuccess(ChannelOutboundBuffer.java:748) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
		at io.netty.channel.ChannelOutboundBuffer.remove(ChannelOutboundBuffer.java:303) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
		at io.netty.channel.ChannelOutboundBuffer.removeBytes(ChannelOutboundBuffer.java:383) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
		at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:438) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
		at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:931) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
		at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:355) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
		at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:895) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
		at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1372) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:935) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:921) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
		at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:907) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
		at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.flush(CombinedChannelDuplexHandler.java:531) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
		at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:125) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
		at io.netty.channel.CombinedChannelDuplexHandler.flush(CombinedChannelDuplexHandler.java:356) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:937) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:921) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
		at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:907) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
		at reactor.netty.channel.MonoSendMany$SendManyInner$AsyncFlush.run(MonoSendMany.java:800) ~[reactor-netty-core-1.1.17.jar:1.1.17]
		at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
		at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
		at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
		at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:566) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
		at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
		at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
		at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
		at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]

Testing with Mozilla Firefox client

failEncoder
failEncoderMultiple

Testing with Postman (v9.25.2)

postman_error

Steps to reproduce

Demo project with reproducible issue

@spring-projects-issues spring-projects-issues added the status: waiting-for-triage An issue we've not yet triaged or decided on label Apr 12, 2024
@Airidas36 Airidas36 changed the title AbstractJackson2Encoder fails to encode entity from exception handler because of committed ServerHttpResponse if Flux contains onError signal(s) after onNext/onComplete AbstractJackson2Encoder fails to encode entity from exception handler because of committed ServerHttpResponse if Flux contains onError signal(s) after onNext Apr 12, 2024
@bclozel
Copy link
Member

bclozel commented Apr 13, 2024

What you have described is the expected behavior. If the response has been committed already, headers and partial body might have been sent to the client over the network already.

Are you requesting a change of behavior here? Can you explain what it is?

@bclozel bclozel added status: waiting-for-feedback We need additional information before we can continue in: web Issues in web modules (web, webmvc, webflux, websocket) labels Apr 13, 2024
@Airidas36
Copy link
Author

What you have described is the expected behavior. If the response has been committed already, headers and partial body might have been sent to the client over the network already.

Are you requesting a change of behavior here? Can you explain what it is?

Previously, AbstractJackson2Encoder would accumulate the Flux signals behind the scenes itself by calling .collectList(), and serializing the data. If I specify that I produce a non-streaming media type of application/json and have a finite amount of flux items, I would expect the framework to handle serialization, or error propagation from the Flux if it contained any onError. But it seems that the contents of Flux are streamed anyways, even if application/json is specified and fails upon the first encountered onError signal if onNext was present before.

As I see, having a method in the controller with a return type Flux<T> and non-streaming media type is only supported for cases when no subsequent onError signals are present in the Flux. I think it should documented that this approach should only be used for cases when no onError signals are present, or the user should handle them himself before sending it over to the encoder, or just state that Mono<List<T>> is recommended for producing non-streaming types.

@spring-projects-issues spring-projects-issues added status: feedback-provided Feedback has been provided and removed status: waiting-for-feedback We need additional information before we can continue labels Apr 13, 2024
@rstoyanchev
Copy link
Contributor

We did switch from collecting into a List to streaming. This reduces memory usage but also provides control to collect via Fux#collectToList() if needed while with collecting by default there is no way to stream. I think it is a good idea to add something to the documentation that explains this so I'll turn this into a documentation issue.

@rstoyanchev rstoyanchev added type: documentation A documentation task and removed status: waiting-for-triage An issue we've not yet triaged or decided on status: feedback-provided Feedback has been provided labels Apr 18, 2024
@rstoyanchev rstoyanchev added this to the 6.1.7 milestone Apr 18, 2024
@rstoyanchev rstoyanchev changed the title AbstractJackson2Encoder fails to encode entity from exception handler because of committed ServerHttpResponse if Flux contains onError signal(s) after onNext Update documentation to explain streaming vs collecting and error handling with Flux<T> return values Apr 18, 2024
@Airidas36
Copy link
Author

We did switch from collecting into a List to streaming. This reduces memory usage but also provides control to collect via Fux#collectToList() if needed while with collecting by default there is no way to stream. I think it is a good idea to add something to the documentation that explains this so I'll turn this into a documentation issue.

But wouldn't it make sense to approach encoding based on the media type that is supposed to be produced? As a user, if my intention is to return an application/json, I would expect the Flux is collected to a List and any error signals are thrown up the stack. Streaming of course would make sense if I'm producing application/x-ndjson.

@rstoyanchev
Copy link
Contributor

The trade-off is additional memory to aggregate, and if we aggregate by default, there is no way to get it to behave whichever way you prefer. This way, you can choose to aggregate or not. I suppose we could make this configurable in some way, if you would be okay to switch it wholesale.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
in: web Issues in web modules (web, webmvc, webflux, websocket) type: documentation A documentation task
Projects
None yet
Development

No branches or pull requests

4 participants