Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure active connections is decremented #2335

Conversation

pderop
Copy link
Member

@pderop pderop commented Jun 28, 2022

This PR is proposing an additional improvement for the original PR #2237.

While debugging another problem in HttpMetricsHandlerTest, I found that when the AbstractHttpServerMetricsHandler.write method is getting an exception thrown by recordWrite, then the recordInactiveConnection is never called, see here.

So, this is too bad because the long adder that is incremented from here is then never decremented by the missed call to recordInactiveConnection, resulting in leaving the number of active connections to an inconsistent value.

So, in HttpMetricsHandlerTest, when the testRecordingFailsServerSide or the testRecordingFailsClientSide are executed, and if the testServerConnectionsMicrometer is executed after, then it will fail because the number of active connections is left in a wrong state (after the testRecordingFailsServerSide or testRecordingFailsClientSide is executed):

08:32:38.898 [reactor-http-nio-5] ERROR r.n.http.server.HttpServerOperations - [95393cb0-1, L:/127.0.0.1:58225 - R:/127.0.0.1:58226] Error finishing response. Closing connection
org.opentest4j.AssertionFailedError: 
expected: 1.0
 but was: 5.0
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at reactor.netty.http.HttpMetricsHandlerTests.checkGauge(HttpMetricsHandlerTests.java:895)
	at reactor.netty.http.HttpMetricsHandlerTests.checkServerConnectionsMicrometer(HttpMetricsHandlerTests.java:738)
	at reactor.netty.http.HttpMetricsHandlerTests.lambda$setUp$7(HttpMetricsHandlerTests.java:144)
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:185)
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200)
	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122)
	at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:279)
	at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:388)
	at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:404)
	at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:595)
	at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:93)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at reactor.netty.http.server.AbstractHttpServerMetricsHandler.channelRead(AbstractHttpServerMetricsHandler.java:180)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at reactor.netty.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:266)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299)
	at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.logging.LoggingHandler.channelRead(LoggingHandler.java:280)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:750)
...
java.lang.AssertionError: expectation "expectNext(Hello World!)" failed (expected: onNext(Hello World!); actual: onError(reactor.netty.http.client.PrematureCloseException: Connection prematurely closed DURING response))
	at reactor.test.MessageFormatter.assertionError(MessageFormatter.java:115)
	at reactor.test.MessageFormatter.failPrefix(MessageFormatter.java:104)
	at reactor.test.MessageFormatter.fail(MessageFormatter.java:73)
	at reactor.test.MessageFormatter.failOptional(MessageFormatter.java:88)
	at reactor.test.DefaultStepVerifierBuilder.lambda$addExpectedValue$10(DefaultStepVerifierBuilder.java:509)
	at reactor.test.DefaultStepVerifierBuilder$SignalEvent.test(DefaultStepVerifierBuilder.java:2288)
	at reactor.test.DefaultStepVerifierBuilder$DefaultVerifySubscriber.onSignal(DefaultStepVerifierBuilder.java:1528)
	at reactor.test.DefaultStepVerifierBuilder$DefaultVerifySubscriber.onExpectation(DefaultStepVerifierBuilder.java:1476)
	at reactor.test.DefaultStepVerifierBuilder$DefaultVerifySubscriber.onError(DefaultStepVerifierBuilder.java:1129)
	at reactor.core.publisher.FluxHandle$HandleSubscriber.onError(FluxHandle.java:210)
	at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onError(FluxMap.java:265)
	at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onError(FluxDoFinally.java:119)
	at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onError(FluxHandleFuseable.java:226)
	at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onError(FluxContextWrite.java:121)
	at reactor.core.publisher.MonoCollectList$MonoCollectListSubscriber.onError(MonoCollectList.java:114)
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:222)
	at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:134)
	at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onError(MonoFlatMapMany.java:255)
	at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:134)
	at reactor.netty.channel.FluxReceive.onInboundError(FluxReceive.java:450)
	at reactor.netty.channel.ChannelOperations.onInboundError(ChannelOperations.java:488)
	at reactor.netty.http.client.HttpClientOperations.onInboundClose(HttpClientOperations.java:298)
	at reactor.netty.channel.ChannelOperationsHandler.channelInactive(ChannelOperationsHandler.java:73)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
	at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelInactive(CombinedChannelDuplexHandler.java:418)
	at io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:392)
	at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:357)
	at io.netty.handler.codec.http.HttpClientCodec$Decoder.channelInactive(HttpClientCodec.java:326)
	at io.netty.channel.CombinedChannelDuplexHandler.channelInactive(CombinedChannelDuplexHandler.java:221)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
	at reactor.netty.channel.AbstractChannelMetricsHandler.channelInactive(AbstractChannelMetricsHandler.java:75)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
	at io.netty.handler.logging.LoggingHandler.channelInactive(LoggingHandler.java:206)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
	at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
	at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:813)
	at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:750)
	Suppressed: reactor.netty.http.client.PrematureCloseException: Connection prematurely closed DURING response

Now, because we retry failed tests one time, then during the second round of test execution, we will only execute the testServerConnectionsMicrometer test which will succeed this time.

So, this PR slightly modify the original patch in order to ensure that we always call recordInactiveConnection from AbstractHttpServerMetricsHandler.write method even if recordWrite fails. It avoids leaving the number of active connections in an inconsistent state and it makes it easier to debug the HttpMetricsHandlerTest, without having to wait for some tests to be re-run alone , a second time.

Fixes #2187

@pderop pderop added the type/enhancement A general enhancement label Jun 28, 2022
@pderop pderop added this to the 1.0.21 milestone Jun 28, 2022
@pderop pderop self-assigned this Jun 28, 2022
@pderop pderop requested a review from violetagg June 28, 2022 07:41
@violetagg
Copy link
Member

violetagg commented Jun 28, 2022

@pderop We are still having failing tests?

@pderop
Copy link
Member Author

pderop commented Jun 28, 2022

yes, the testServerConnectionsMicrometer fails, but only during the first execution of tests (and if it is executed after testRecordingFailsServerSide or the testRecordingFailsClientSide).

but since by default we have maxRetries = 1 in build.gradle, then the second time, the testServerConnectionsMicrometer is re-executed and will succeed because the testRecordingFailsServerSide or the testRecordingFailsClientSide won't be executed before.

@pderop pderop merged commit d1b94b8 into reactor:1.0.x Jun 28, 2022
pderop added a commit that referenced this pull request Jun 28, 2022
pderop added a commit that referenced this pull request Jun 28, 2022
@pderop
Copy link
Member Author

pderop commented Jun 28, 2022

I committed this PR to 1.0.x branch with d1b94b8, then merged it to main branch with e7cf68e, and then merged it to netty5 branch using d621dfc

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/enhancement A general enhancement
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants