From 58fe5b00528c44f7db46ec6a7af0785adc2e05dc Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Fri, 10 Jun 2022 19:23:09 +0200 Subject: [PATCH] Fix race in FlowControlHandlerTest that could lead to a NPE (#12457) Motivation: How we accessed the internal state of the handler was not thread safe and so we could see a NPE during testing like: ``` 2022-06-09T10:27:09.1309221Z java.lang.NullPointerException: Cannot invoke "io.netty5.handler.flow.FlowControlHandler$RecyclableArrayDeque.isEmpty()" because "this.queue" is null 2022-06-09T10:27:09.1310056Z at io.netty5.handler.flow.FlowControlHandler.isQueueEmpty(FlowControlHandler.java:90) 2022-06-09T10:27:09.1310829Z at io.netty5.handler.flow.FlowControlHandlerTest.testFlowAutoReadOff(FlowControlHandlerTest.java:370) 2022-06-09T10:27:09.1311575Z at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 2022-06-09T10:27:09.1312301Z at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) 2022-06-09T10:27:09.1313093Z at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 2022-06-09T10:27:09.1313760Z at java.base/java.lang.reflect.Method.invoke(Method.java:568) 2022-06-09T10:27:09.1314400Z at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725) 2022-06-09T10:27:09.1315137Z at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) 2022-06-09T10:27:09.1315983Z at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) 2022-06-09T10:27:09.1316828Z at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149) 2022-06-09T10:27:09.1317641Z at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140) 2022-06-09T10:27:09.1318493Z at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84) 2022-06-09T10:27:09.1319348Z at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115) 2022-06-09T10:27:09.1320165Z at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105) 2022-06-09T10:27:09.1321012Z at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) 2022-06-09T10:27:09.1322136Z at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) 2022-06-09T10:27:09.1323045Z at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) 2022-06-09T10:27:09.1323943Z at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) 2022-06-09T10:27:09.1324743Z at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104) 2022-06-09T10:27:09.1325482Z at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98) 2022-06-09T10:27:09.1326315Z at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:214) 2022-06-09T10:27:09.1327204Z at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) 2022-06-09T10:27:09.1328123Z at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:210) 2022-06-09T10:27:09.1329864Z at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:135) 2022-06-09T10:27:09.1330733Z at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:66) 2022-06-09T10:27:09.1331591Z at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151) 2022-06-09T10:27:09.1332454Z at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) 2022-06-09T10:27:09.1333319Z at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) 2022-06-09T10:27:09.1334081Z at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) 2022-06-09T10:27:09.1334851Z at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139) 2022-06-09T10:27:09.1335715Z at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) 2022-06-09T10:27:09.1336583Z at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) 2022-06-09T10:27:09.1337418Z at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95) 2022-06-09T10:27:09.1338067Z at java.base/java.util.ArrayList.forEach(ArrayList.java:1511) 2022-06-09T10:27:09.1338985Z at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41) 2022-06-09T10:27:09.1340052Z at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155) 2022-06-09T10:27:09.1340908Z at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) 2022-06-09T10:27:09.1341965Z at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) 2022-06-09T10:27:09.1342739Z at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) 2022-06-09T10:27:09.1343508Z at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139) 2022-06-09T10:27:09.1344444Z at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) 2022-06-09T10:27:09.1345330Z at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) 2022-06-09T10:27:09.1346176Z at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95) 2022-06-09T10:27:09.1346868Z at java.base/java.util.ArrayList.forEach(ArrayList.java:1511) 2022-06-09T10:27:09.1347789Z at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41) 2022-06-09T10:27:09.1348854Z at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155) 2022-06-09T10:27:09.1349822Z at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) 2022-06-09T10:27:09.1350695Z at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) 2022-06-09T10:27:09.1351529Z at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) 2022-06-09T10:27:09.1352298Z at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139) 2022-06-09T10:27:09.1353161Z at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) 2022-06-09T10:27:09.1354049Z at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) 2022-06-09T10:27:09.1354894Z at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95) 2022-06-09T10:27:09.1355997Z at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35) 2022-06-09T10:27:09.1357115Z at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57) 2022-06-09T10:27:09.1358070Z at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54) 2022-06-09T10:27:09.1358898Z at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:220) 2022-06-09T10:27:09.1359628Z at org.junit.platform.launcher.core.DefaultLauncher.lambda$execute$6(DefaultLauncher.java:188) 2022-06-09T10:27:09.1360424Z at org.junit.platform.launcher.core.DefaultLauncher.withInterceptedStreams(DefaultLauncher.java:202) 2022-06-09T10:27:09.1361216Z at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:181) 2022-06-09T10:27:09.1361937Z at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:128) 2022-06-09T10:27:09.1362790Z at org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invokeAllTests(JUnitPlatformProvider.java:150) 2022-06-09T10:27:09.1363688Z at org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invoke(JUnitPlatformProvider.java:124) 2022-06-09T10:27:09.1364571Z at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384) 2022-06-09T10:27:09.1365399Z at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345) 2022-06-09T10:27:09.1366134Z at org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126) 2022-06-09T10:27:09.1366804Z at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418) 2022-06-09T10:27:09.1375974Z ``` Modifications: Dispatch the EventLoop before accessing the state Result: No more NPE possible --- .../handler/flow/FlowControlHandlerTest.java | 44 ++++++++++++++----- 1 file changed, 33 insertions(+), 11 deletions(-) diff --git a/handler/src/test/java/io/netty5/handler/flow/FlowControlHandlerTest.java b/handler/src/test/java/io/netty5/handler/flow/FlowControlHandlerTest.java index 690a69e0c63..1491f9049ff 100644 --- a/handler/src/test/java/io/netty5/handler/flow/FlowControlHandlerTest.java +++ b/handler/src/test/java/io/netty5/handler/flow/FlowControlHandlerTest.java @@ -212,25 +212,37 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { @Test public void testFlowAutoReadOn() throws Exception { final CountDownLatch latch = new CountDownLatch(3); + final Exchanger peerRef = new Exchanger(); ChannelHandler handler = new ChannelHandler() { + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + peerRef.exchange(ctx.channel(), 1L, SECONDS); + ctx.fireChannelActive(); + } + @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { + Resource.dispose(msg); latch.countDown(); } }; - FlowControlHandler flow = new FlowControlHandler(); + final FlowControlHandler flow = new FlowControlHandler(); Channel server = newServer(true, flow, handler); Channel client = newClient(server.localAddress()); try { + // The client connection on the server side + Channel peer = peerRef.exchange(null, 1L, SECONDS); + // Write the message client.writeAndFlush(newOneMessage()) .syncUninterruptibly(); // We should receive 3 messages assertTrue(latch.await(1L, SECONDS)); - assertTrue(flow.isQueueEmpty()); + + assertTrue(peer.executor().submit(flow::isQueueEmpty).get()); } finally { client.close(); server.close(); @@ -289,7 +301,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Interrupte } }; - FlowControlHandler flow = new FlowControlHandler(); + final FlowControlHandler flow = new FlowControlHandler(); Channel server = newServer(true, flow, handler); Channel client = newClient(server.localAddress()); try { @@ -311,7 +323,8 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Interrupte peer.config().setAutoRead(true); setAutoReadLatch2.countDown(); assertTrue(msgRcvLatch3.await(1L, SECONDS)); - assertTrue(flow.isQueueEmpty()); + + assertTrue(peer.executor().submit(flow::isQueueEmpty).get()); } finally { client.close(); server.close(); @@ -345,7 +358,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { } }; - FlowControlHandler flow = new FlowControlHandler(); + final FlowControlHandler flow = new FlowControlHandler(); Channel server = newServer(false, flow, handler); Channel client = newClient(server.localAddress()); try { @@ -367,7 +380,8 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { // channelRead(3) peer.read(); assertTrue(msgRcvLatch3.await(1L, SECONDS)); - assertTrue(flow.isQueueEmpty()); + + assertTrue(peer.executor().submit(flow::isQueueEmpty).get()); } finally { client.close(); server.close(); @@ -398,7 +412,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { } }; - FlowControlHandler flow = new FlowControlHandler(); + final FlowControlHandler flow = new FlowControlHandler(); Channel server = newServer(false, flow, handler); Channel client = newClient(server.localAddress()); try { @@ -412,7 +426,8 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // channelRead(1) peer.read(); assertTrue(latch.await(1L, SECONDS)); - assertTrue(flow.isQueueEmpty()); + + assertTrue(peer.executor().submit(flow::isQueueEmpty).get()); Throwable cause = causeRef.get(); if (cause != null) { @@ -482,15 +497,19 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { @Test public void testRemoveFlowControl() throws Exception { + final Exchanger peerRef = new Exchanger(); + final CountDownLatch latch = new CountDownLatch(3); ChannelHandler handler = new ChannelHandler() { @Override - public void channelActive(ChannelHandlerContext ctx) { + public void channelActive(ChannelHandlerContext ctx) throws Exception { + peerRef.exchange(ctx.channel(), 1L, SECONDS); //do the first read ctx.read(); ctx.fireChannelActive(); } + @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { latch.countDown(); @@ -498,7 +517,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { } }; - FlowControlHandler flow = new FlowControlHandler() { + final FlowControlHandler flow = new FlowControlHandler() { private int num; @Override public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { @@ -527,12 +546,15 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { Channel server = newServer(false /* no auto read */, flow, handler, tail); Channel client = newClient(server.localAddress()); try { + // The client connection on the server side + Channel peer = peerRef.exchange(null, 1L, SECONDS); + // Write one message client.writeAndFlush(newOneMessage()).sync(); // We should receive 3 messages assertTrue(latch.await(1L, SECONDS)); - assertTrue(flow.isQueueEmpty()); + assertTrue(peer.executor().submit(flow::isQueueEmpty).get()); } finally { client.close(); server.close();