Skip to content

Commit

Permalink
Fix race in FlowControlHandlerTest that could lead to a NPE (#12457)
Browse files Browse the repository at this point in the history
Motivation:

How we accessed the internal state of the handler was not thread safe and so we could see a NPE during testing like:

```
2022-06-09T10:27:09.1309221Z java.lang.NullPointerException: Cannot invoke "io.netty5.handler.flow.FlowControlHandler$RecyclableArrayDeque.isEmpty()" because "this.queue" is null
2022-06-09T10:27:09.1310056Z 	at io.netty5.handler.flow.FlowControlHandler.isQueueEmpty(FlowControlHandler.java:90)
2022-06-09T10:27:09.1310829Z 	at io.netty5.handler.flow.FlowControlHandlerTest.testFlowAutoReadOff(FlowControlHandlerTest.java:370)
2022-06-09T10:27:09.1311575Z 	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
2022-06-09T10:27:09.1312301Z 	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
2022-06-09T10:27:09.1313093Z 	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
2022-06-09T10:27:09.1313760Z 	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
2022-06-09T10:27:09.1314400Z 	at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
2022-06-09T10:27:09.1315137Z 	at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
2022-06-09T10:27:09.1315983Z 	at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
2022-06-09T10:27:09.1316828Z 	at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
2022-06-09T10:27:09.1317641Z 	at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
2022-06-09T10:27:09.1318493Z 	at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
2022-06-09T10:27:09.1319348Z 	at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
2022-06-09T10:27:09.1320165Z 	at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
2022-06-09T10:27:09.1321012Z 	at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
2022-06-09T10:27:09.1322136Z 	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
2022-06-09T10:27:09.1323045Z 	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
2022-06-09T10:27:09.1323943Z 	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
2022-06-09T10:27:09.1324743Z 	at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
2022-06-09T10:27:09.1325482Z 	at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
2022-06-09T10:27:09.1326315Z 	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:214)
2022-06-09T10:27:09.1327204Z 	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
2022-06-09T10:27:09.1328123Z 	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:210)
2022-06-09T10:27:09.1329864Z 	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:135)
2022-06-09T10:27:09.1330733Z 	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:66)
2022-06-09T10:27:09.1331591Z 	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
2022-06-09T10:27:09.1332454Z 	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
2022-06-09T10:27:09.1333319Z 	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
2022-06-09T10:27:09.1334081Z 	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
2022-06-09T10:27:09.1334851Z 	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
2022-06-09T10:27:09.1335715Z 	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
2022-06-09T10:27:09.1336583Z 	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
2022-06-09T10:27:09.1337418Z 	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
2022-06-09T10:27:09.1338067Z 	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
2022-06-09T10:27:09.1338985Z 	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
2022-06-09T10:27:09.1340052Z 	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
2022-06-09T10:27:09.1340908Z 	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
2022-06-09T10:27:09.1341965Z 	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
2022-06-09T10:27:09.1342739Z 	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
2022-06-09T10:27:09.1343508Z 	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
2022-06-09T10:27:09.1344444Z 	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
2022-06-09T10:27:09.1345330Z 	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
2022-06-09T10:27:09.1346176Z 	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
2022-06-09T10:27:09.1346868Z 	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
2022-06-09T10:27:09.1347789Z 	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
2022-06-09T10:27:09.1348854Z 	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
2022-06-09T10:27:09.1349822Z 	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
2022-06-09T10:27:09.1350695Z 	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
2022-06-09T10:27:09.1351529Z 	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
2022-06-09T10:27:09.1352298Z 	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
2022-06-09T10:27:09.1353161Z 	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
2022-06-09T10:27:09.1354049Z 	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
2022-06-09T10:27:09.1354894Z 	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
2022-06-09T10:27:09.1355997Z 	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)
2022-06-09T10:27:09.1357115Z 	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
2022-06-09T10:27:09.1358070Z 	at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54)
2022-06-09T10:27:09.1358898Z 	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:220)
2022-06-09T10:27:09.1359628Z 	at org.junit.platform.launcher.core.DefaultLauncher.lambda$execute$6(DefaultLauncher.java:188)
2022-06-09T10:27:09.1360424Z 	at org.junit.platform.launcher.core.DefaultLauncher.withInterceptedStreams(DefaultLauncher.java:202)
2022-06-09T10:27:09.1361216Z 	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:181)
2022-06-09T10:27:09.1361937Z 	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:128)
2022-06-09T10:27:09.1362790Z 	at org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invokeAllTests(JUnitPlatformProvider.java:150)
2022-06-09T10:27:09.1363688Z 	at org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invoke(JUnitPlatformProvider.java:124)
2022-06-09T10:27:09.1364571Z 	at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
2022-06-09T10:27:09.1365399Z 	at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
2022-06-09T10:27:09.1366134Z 	at org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
2022-06-09T10:27:09.1366804Z 	at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418)
2022-06-09T10:27:09.1375974Z
```

Modifications:

Dispatch the EventLoop before accessing the state

Result:

No more NPE possible
  • Loading branch information
normanmaurer committed Jun 10, 2022
1 parent bc085b0 commit db8d5c5
Showing 1 changed file with 58 additions and 10 deletions.
Expand Up @@ -44,6 +44,7 @@
import java.net.SocketAddress;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Exchanger;
import java.util.concurrent.LinkedBlockingQueue;
Expand Down Expand Up @@ -215,25 +216,43 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
@Test
public void testFlowAutoReadOn() throws Exception {
final CountDownLatch latch = new CountDownLatch(3);
final Exchanger<Channel> peerRef = new Exchanger<Channel>();

ChannelInboundHandlerAdapter handler = new ChannelDuplexHandler() {

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
peerRef.exchange(ctx.channel(), 1L, SECONDS);
super.channelActive(ctx);
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ReferenceCountUtil.release(msg);
latch.countDown();
}
};

FlowControlHandler flow = new FlowControlHandler();
final FlowControlHandler flow = new FlowControlHandler();
Channel server = newServer(true, flow, handler);
Channel client = newClient(server.localAddress());
try {
// The client connection on the server side
Channel peer = peerRef.exchange(null, 1L, SECONDS);

// Write the message
client.writeAndFlush(newOneMessage())
.syncUninterruptibly();

// We should receive 3 messages
assertTrue(latch.await(1L, SECONDS));
assertTrue(flow.isQueueEmpty());

assertTrue(peer.eventLoop().submit(new Callable<Boolean>() {
@Override
public Boolean call() {
return flow.isQueueEmpty();
}
}).get());
} finally {
client.close();
server.close();
Expand Down Expand Up @@ -292,7 +311,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Interrupte
}
};

FlowControlHandler flow = new FlowControlHandler();
final FlowControlHandler flow = new FlowControlHandler();
Channel server = newServer(true, flow, handler);
Channel client = newClient(server.localAddress());
try {
Expand All @@ -314,7 +333,13 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Interrupte
peer.config().setAutoRead(true);
setAutoReadLatch2.countDown();
assertTrue(msgRcvLatch3.await(1L, SECONDS));
assertTrue(flow.isQueueEmpty());

assertTrue(peer.eventLoop().submit(new Callable<Boolean>() {
@Override
public Boolean call() {
return flow.isQueueEmpty();
}
}).get());
} finally {
client.close();
server.close();
Expand Down Expand Up @@ -348,7 +373,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
}
};

FlowControlHandler flow = new FlowControlHandler();
final FlowControlHandler flow = new FlowControlHandler();
Channel server = newServer(false, flow, handler);
Channel client = newClient(server.localAddress());
try {
Expand All @@ -370,7 +395,13 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
// channelRead(3)
peer.read();
assertTrue(msgRcvLatch3.await(1L, SECONDS));
assertTrue(flow.isQueueEmpty());

assertTrue(peer.eventLoop().submit(new Callable<Boolean>() {
@Override
public Boolean call() {
return flow.isQueueEmpty();
}
}).get());
} finally {
client.close();
server.close();
Expand Down Expand Up @@ -401,7 +432,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
}
};

FlowControlHandler flow = new FlowControlHandler();
final FlowControlHandler flow = new FlowControlHandler();
Channel server = newServer(false, flow, handler);
Channel client = newClient(server.localAddress());
try {
Expand All @@ -415,7 +446,13 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// channelRead(1)
peer.read();
assertTrue(latch.await(1L, SECONDS));
assertTrue(flow.isQueueEmpty());

assertTrue(peer.eventLoop().submit(new Callable<Boolean>() {
@Override
public Boolean call() {
return flow.isQueueEmpty();
}
}).get());

Throwable cause = causeRef.get();
if (cause != null) {
Expand Down Expand Up @@ -485,11 +522,14 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {

@Test
public void testRemoveFlowControl() throws Exception {
final Exchanger<Channel> peerRef = new Exchanger<Channel>();

final CountDownLatch latch = new CountDownLatch(3);

ChannelInboundHandlerAdapter handler = new ChannelDuplexHandler() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
peerRef.exchange(ctx.channel(), 1L, SECONDS);
//do the first read
ctx.read();
super.channelActive(ctx);
Expand All @@ -501,7 +541,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
}
};

FlowControlHandler flow = new FlowControlHandler() {
final FlowControlHandler flow = new FlowControlHandler() {
private int num;
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
Expand Down Expand Up @@ -530,12 +570,20 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
Channel server = newServer(false /* no auto read */, flow, handler, tail);
Channel client = newClient(server.localAddress());
try {
// The client connection on the server side
Channel peer = peerRef.exchange(null, 1L, SECONDS);

// Write one message
client.writeAndFlush(newOneMessage()).sync();

// We should receive 3 messages
assertTrue(latch.await(1L, SECONDS));
assertTrue(flow.isQueueEmpty());
assertTrue(peer.eventLoop().submit(new Callable<Boolean>() {
@Override
public Boolean call() {
return flow.isQueueEmpty();
}
}).get());
} finally {
client.close();
server.close();
Expand Down

0 comments on commit db8d5c5

Please sign in to comment.