Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race in FlowControlHandlerTest that could lead to a NPE #12457

Merged
merged 1 commit into from Jun 10, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -44,6 +44,7 @@
import java.net.SocketAddress;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Exchanger;
import java.util.concurrent.LinkedBlockingQueue;
Expand Down Expand Up @@ -215,25 +216,43 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
@Test
public void testFlowAutoReadOn() throws Exception {
final CountDownLatch latch = new CountDownLatch(3);
final Exchanger<Channel> peerRef = new Exchanger<Channel>();

ChannelInboundHandlerAdapter handler = new ChannelDuplexHandler() {

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
peerRef.exchange(ctx.channel(), 1L, SECONDS);
super.channelActive(ctx);
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ReferenceCountUtil.release(msg);
latch.countDown();
}
};

FlowControlHandler flow = new FlowControlHandler();
final FlowControlHandler flow = new FlowControlHandler();
Channel server = newServer(true, flow, handler);
Channel client = newClient(server.localAddress());
try {
// The client connection on the server side
Channel peer = peerRef.exchange(null, 1L, SECONDS);

// Write the message
client.writeAndFlush(newOneMessage())
.syncUninterruptibly();

// We should receive 3 messages
assertTrue(latch.await(1L, SECONDS));
assertTrue(flow.isQueueEmpty());

assertTrue(peer.eventLoop().submit(new Callable<Boolean>() {
@Override
public Boolean call() {
return flow.isQueueEmpty();
}
}).get());
} finally {
client.close();
server.close();
Expand Down Expand Up @@ -292,7 +311,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Interrupte
}
};

FlowControlHandler flow = new FlowControlHandler();
final FlowControlHandler flow = new FlowControlHandler();
Channel server = newServer(true, flow, handler);
Channel client = newClient(server.localAddress());
try {
Expand All @@ -314,7 +333,13 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Interrupte
peer.config().setAutoRead(true);
setAutoReadLatch2.countDown();
assertTrue(msgRcvLatch3.await(1L, SECONDS));
assertTrue(flow.isQueueEmpty());

assertTrue(peer.eventLoop().submit(new Callable<Boolean>() {
@Override
public Boolean call() {
return flow.isQueueEmpty();
}
}).get());
} finally {
client.close();
server.close();
Expand Down Expand Up @@ -348,7 +373,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
}
};

FlowControlHandler flow = new FlowControlHandler();
final FlowControlHandler flow = new FlowControlHandler();
Channel server = newServer(false, flow, handler);
Channel client = newClient(server.localAddress());
try {
Expand All @@ -370,7 +395,13 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
// channelRead(3)
peer.read();
assertTrue(msgRcvLatch3.await(1L, SECONDS));
assertTrue(flow.isQueueEmpty());

assertTrue(peer.eventLoop().submit(new Callable<Boolean>() {
@Override
public Boolean call() {
return flow.isQueueEmpty();
}
}).get());
} finally {
client.close();
server.close();
Expand Down Expand Up @@ -401,7 +432,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
}
};

FlowControlHandler flow = new FlowControlHandler();
final FlowControlHandler flow = new FlowControlHandler();
Channel server = newServer(false, flow, handler);
Channel client = newClient(server.localAddress());
try {
Expand All @@ -415,7 +446,13 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// channelRead(1)
peer.read();
assertTrue(latch.await(1L, SECONDS));
assertTrue(flow.isQueueEmpty());

assertTrue(peer.eventLoop().submit(new Callable<Boolean>() {
@Override
public Boolean call() {
return flow.isQueueEmpty();
}
}).get());

Throwable cause = causeRef.get();
if (cause != null) {
Expand Down Expand Up @@ -485,11 +522,14 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {

@Test
public void testRemoveFlowControl() throws Exception {
final Exchanger<Channel> peerRef = new Exchanger<Channel>();

final CountDownLatch latch = new CountDownLatch(3);

ChannelInboundHandlerAdapter handler = new ChannelDuplexHandler() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
peerRef.exchange(ctx.channel(), 1L, SECONDS);
//do the first read
ctx.read();
super.channelActive(ctx);
Expand All @@ -501,7 +541,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
}
};

FlowControlHandler flow = new FlowControlHandler() {
final FlowControlHandler flow = new FlowControlHandler() {
private int num;
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
Expand Down Expand Up @@ -530,12 +570,20 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
Channel server = newServer(false /* no auto read */, flow, handler, tail);
Channel client = newClient(server.localAddress());
try {
// The client connection on the server side
Channel peer = peerRef.exchange(null, 1L, SECONDS);

// Write one message
client.writeAndFlush(newOneMessage()).sync();

// We should receive 3 messages
assertTrue(latch.await(1L, SECONDS));
assertTrue(flow.isQueueEmpty());
assertTrue(peer.eventLoop().submit(new Callable<Boolean>() {
@Override
public Boolean call() {
return flow.isQueueEmpty();
}
}).get());
} finally {
client.close();
server.close();
Expand Down