From 1ff4f9fbbe7f4bf91bebf06677ea0cd834a6088b Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Fri, 10 Jun 2022 19:23:09 +0200 Subject: [PATCH] Fix race in FlowControlHandlerTest that could lead to a NPE (#12457) Motivation: How we accessed the internal state of the handler was not thread safe and so we could see a NPE during testing like: ``` 2022-06-09T10:27:09.1309221Z java.lang.NullPointerException: Cannot invoke "io.netty5.handler.flow.FlowControlHandler$RecyclableArrayDeque.isEmpty()" because "this.queue" is null 2022-06-09T10:27:09.1310056Z at io.netty5.handler.flow.FlowControlHandler.isQueueEmpty(FlowControlHandler.java:90) 2022-06-09T10:27:09.1310829Z at io.netty5.handler.flow.FlowControlHandlerTest.testFlowAutoReadOff(FlowControlHandlerTest.java:370) 2022-06-09T10:27:09.1311575Z at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 2022-06-09T10:27:09.1312301Z at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) 2022-06-09T10:27:09.1313093Z at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 2022-06-09T10:27:09.1313760Z at java.base/java.lang.reflect.Method.invoke(Method.java:568) 2022-06-09T10:27:09.1314400Z at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725) 2022-06-09T10:27:09.1315137Z at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) 2022-06-09T10:27:09.1315983Z at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) 2022-06-09T10:27:09.1316828Z at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149) 2022-06-09T10:27:09.1317641Z at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140) 2022-06-09T10:27:09.1318493Z at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84) 2022-06-09T10:27:09.1319348Z at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115) 2022-06-09T10:27:09.1320165Z at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105) 2022-06-09T10:27:09.1321012Z at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) 2022-06-09T10:27:09.1322136Z at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) 2022-06-09T10:27:09.1323045Z at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) 2022-06-09T10:27:09.1323943Z at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) 2022-06-09T10:27:09.1324743Z at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104) 2022-06-09T10:27:09.1325482Z at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98) 2022-06-09T10:27:09.1326315Z at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:214) 2022-06-09T10:27:09.1327204Z at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) 2022-06-09T10:27:09.1328123Z at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:210) 2022-06-09T10:27:09.1329864Z at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:135) 2022-06-09T10:27:09.1330733Z at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:66) 2022-06-09T10:27:09.1331591Z at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151) 2022-06-09T10:27:09.1332454Z at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) 2022-06-09T10:27:09.1333319Z at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) 2022-06-09T10:27:09.1334081Z at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) 2022-06-09T10:27:09.1334851Z at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139) 2022-06-09T10:27:09.1335715Z at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) 2022-06-09T10:27:09.1336583Z at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) 2022-06-09T10:27:09.1337418Z at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95) 2022-06-09T10:27:09.1338067Z at java.base/java.util.ArrayList.forEach(ArrayList.java:1511) 2022-06-09T10:27:09.1338985Z at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41) 2022-06-09T10:27:09.1340052Z at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155) 2022-06-09T10:27:09.1340908Z at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) 2022-06-09T10:27:09.1341965Z at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) 2022-06-09T10:27:09.1342739Z at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) 2022-06-09T10:27:09.1343508Z at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139) 2022-06-09T10:27:09.1344444Z at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) 2022-06-09T10:27:09.1345330Z at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) 2022-06-09T10:27:09.1346176Z at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95) 2022-06-09T10:27:09.1346868Z at java.base/java.util.ArrayList.forEach(ArrayList.java:1511) 2022-06-09T10:27:09.1347789Z at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41) 2022-06-09T10:27:09.1348854Z at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155) 2022-06-09T10:27:09.1349822Z at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) 2022-06-09T10:27:09.1350695Z at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) 2022-06-09T10:27:09.1351529Z at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) 2022-06-09T10:27:09.1352298Z at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139) 2022-06-09T10:27:09.1353161Z at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) 2022-06-09T10:27:09.1354049Z at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) 2022-06-09T10:27:09.1354894Z at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95) 2022-06-09T10:27:09.1355997Z at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35) 2022-06-09T10:27:09.1357115Z at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57) 2022-06-09T10:27:09.1358070Z at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54) 2022-06-09T10:27:09.1358898Z at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:220) 2022-06-09T10:27:09.1359628Z at org.junit.platform.launcher.core.DefaultLauncher.lambda$execute$6(DefaultLauncher.java:188) 2022-06-09T10:27:09.1360424Z at org.junit.platform.launcher.core.DefaultLauncher.withInterceptedStreams(DefaultLauncher.java:202) 2022-06-09T10:27:09.1361216Z at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:181) 2022-06-09T10:27:09.1361937Z at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:128) 2022-06-09T10:27:09.1362790Z at org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invokeAllTests(JUnitPlatformProvider.java:150) 2022-06-09T10:27:09.1363688Z at org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invoke(JUnitPlatformProvider.java:124) 2022-06-09T10:27:09.1364571Z at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384) 2022-06-09T10:27:09.1365399Z at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345) 2022-06-09T10:27:09.1366134Z at org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126) 2022-06-09T10:27:09.1366804Z at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418) 2022-06-09T10:27:09.1375974Z ``` Modifications: Dispatch the EventLoop before accessing the state Result: No more NPE possible --- .../handler/flow/FlowControlHandlerTest.java | 68 ++++++++++++++++--- 1 file changed, 58 insertions(+), 10 deletions(-) diff --git a/handler/src/test/java/io/netty/handler/flow/FlowControlHandlerTest.java b/handler/src/test/java/io/netty/handler/flow/FlowControlHandlerTest.java index 6e5d4c5348b..c1ef8e6dfbf 100644 --- a/handler/src/test/java/io/netty/handler/flow/FlowControlHandlerTest.java +++ b/handler/src/test/java/io/netty/handler/flow/FlowControlHandlerTest.java @@ -44,6 +44,7 @@ import java.net.SocketAddress; import java.util.List; import java.util.Queue; +import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Exchanger; import java.util.concurrent.LinkedBlockingQueue; @@ -215,25 +216,43 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { @Test public void testFlowAutoReadOn() throws Exception { final CountDownLatch latch = new CountDownLatch(3); + final Exchanger peerRef = new Exchanger(); ChannelInboundHandlerAdapter handler = new ChannelDuplexHandler() { + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + peerRef.exchange(ctx.channel(), 1L, SECONDS); + super.channelActive(ctx); + } + @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { + ReferenceCountUtil.release(msg); latch.countDown(); } }; - FlowControlHandler flow = new FlowControlHandler(); + final FlowControlHandler flow = new FlowControlHandler(); Channel server = newServer(true, flow, handler); Channel client = newClient(server.localAddress()); try { + // The client connection on the server side + Channel peer = peerRef.exchange(null, 1L, SECONDS); + // Write the message client.writeAndFlush(newOneMessage()) .syncUninterruptibly(); // We should receive 3 messages assertTrue(latch.await(1L, SECONDS)); - assertTrue(flow.isQueueEmpty()); + + assertTrue(peer.eventLoop().submit(new Callable() { + @Override + public Boolean call() { + return flow.isQueueEmpty(); + } + }).get()); } finally { client.close(); server.close(); @@ -292,7 +311,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Interrupte } }; - FlowControlHandler flow = new FlowControlHandler(); + final FlowControlHandler flow = new FlowControlHandler(); Channel server = newServer(true, flow, handler); Channel client = newClient(server.localAddress()); try { @@ -314,7 +333,13 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Interrupte peer.config().setAutoRead(true); setAutoReadLatch2.countDown(); assertTrue(msgRcvLatch3.await(1L, SECONDS)); - assertTrue(flow.isQueueEmpty()); + + assertTrue(peer.eventLoop().submit(new Callable() { + @Override + public Boolean call() { + return flow.isQueueEmpty(); + } + }).get()); } finally { client.close(); server.close(); @@ -348,7 +373,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { } }; - FlowControlHandler flow = new FlowControlHandler(); + final FlowControlHandler flow = new FlowControlHandler(); Channel server = newServer(false, flow, handler); Channel client = newClient(server.localAddress()); try { @@ -370,7 +395,13 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { // channelRead(3) peer.read(); assertTrue(msgRcvLatch3.await(1L, SECONDS)); - assertTrue(flow.isQueueEmpty()); + + assertTrue(peer.eventLoop().submit(new Callable() { + @Override + public Boolean call() { + return flow.isQueueEmpty(); + } + }).get()); } finally { client.close(); server.close(); @@ -401,7 +432,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { } }; - FlowControlHandler flow = new FlowControlHandler(); + final FlowControlHandler flow = new FlowControlHandler(); Channel server = newServer(false, flow, handler); Channel client = newClient(server.localAddress()); try { @@ -415,7 +446,13 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // channelRead(1) peer.read(); assertTrue(latch.await(1L, SECONDS)); - assertTrue(flow.isQueueEmpty()); + + assertTrue(peer.eventLoop().submit(new Callable() { + @Override + public Boolean call() { + return flow.isQueueEmpty(); + } + }).get()); Throwable cause = causeRef.get(); if (cause != null) { @@ -485,11 +522,14 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { @Test public void testRemoveFlowControl() throws Exception { + final Exchanger peerRef = new Exchanger(); + final CountDownLatch latch = new CountDownLatch(3); ChannelInboundHandlerAdapter handler = new ChannelDuplexHandler() { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { + peerRef.exchange(ctx.channel(), 1L, SECONDS); //do the first read ctx.read(); super.channelActive(ctx); @@ -501,7 +541,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception } }; - FlowControlHandler flow = new FlowControlHandler() { + final FlowControlHandler flow = new FlowControlHandler() { private int num; @Override public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { @@ -530,12 +570,20 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { Channel server = newServer(false /* no auto read */, flow, handler, tail); Channel client = newClient(server.localAddress()); try { + // The client connection on the server side + Channel peer = peerRef.exchange(null, 1L, SECONDS); + // Write one message client.writeAndFlush(newOneMessage()).sync(); // We should receive 3 messages assertTrue(latch.await(1L, SECONDS)); - assertTrue(flow.isQueueEmpty()); + assertTrue(peer.eventLoop().submit(new Callable() { + @Override + public Boolean call() { + return flow.isQueueEmpty(); + } + }).get()); } finally { client.close(); server.close();