diff --git a/handler/src/test/java/io/netty/handler/flow/FlowControlHandlerTest.java b/handler/src/test/java/io/netty/handler/flow/FlowControlHandlerTest.java index 6e5d4c5348b..c1ef8e6dfbf 100644 --- a/handler/src/test/java/io/netty/handler/flow/FlowControlHandlerTest.java +++ b/handler/src/test/java/io/netty/handler/flow/FlowControlHandlerTest.java @@ -44,6 +44,7 @@ import java.net.SocketAddress; import java.util.List; import java.util.Queue; +import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Exchanger; import java.util.concurrent.LinkedBlockingQueue; @@ -215,25 +216,43 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { @Test public void testFlowAutoReadOn() throws Exception { final CountDownLatch latch = new CountDownLatch(3); + final Exchanger peerRef = new Exchanger(); ChannelInboundHandlerAdapter handler = new ChannelDuplexHandler() { + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + peerRef.exchange(ctx.channel(), 1L, SECONDS); + super.channelActive(ctx); + } + @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { + ReferenceCountUtil.release(msg); latch.countDown(); } }; - FlowControlHandler flow = new FlowControlHandler(); + final FlowControlHandler flow = new FlowControlHandler(); Channel server = newServer(true, flow, handler); Channel client = newClient(server.localAddress()); try { + // The client connection on the server side + Channel peer = peerRef.exchange(null, 1L, SECONDS); + // Write the message client.writeAndFlush(newOneMessage()) .syncUninterruptibly(); // We should receive 3 messages assertTrue(latch.await(1L, SECONDS)); - assertTrue(flow.isQueueEmpty()); + + assertTrue(peer.eventLoop().submit(new Callable() { + @Override + public Boolean call() { + return flow.isQueueEmpty(); + } + }).get()); } finally { client.close(); server.close(); @@ -292,7 +311,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Interrupte } }; - FlowControlHandler flow = new FlowControlHandler(); + final FlowControlHandler flow = new FlowControlHandler(); Channel server = newServer(true, flow, handler); Channel client = newClient(server.localAddress()); try { @@ -314,7 +333,13 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Interrupte peer.config().setAutoRead(true); setAutoReadLatch2.countDown(); assertTrue(msgRcvLatch3.await(1L, SECONDS)); - assertTrue(flow.isQueueEmpty()); + + assertTrue(peer.eventLoop().submit(new Callable() { + @Override + public Boolean call() { + return flow.isQueueEmpty(); + } + }).get()); } finally { client.close(); server.close(); @@ -348,7 +373,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { } }; - FlowControlHandler flow = new FlowControlHandler(); + final FlowControlHandler flow = new FlowControlHandler(); Channel server = newServer(false, flow, handler); Channel client = newClient(server.localAddress()); try { @@ -370,7 +395,13 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { // channelRead(3) peer.read(); assertTrue(msgRcvLatch3.await(1L, SECONDS)); - assertTrue(flow.isQueueEmpty()); + + assertTrue(peer.eventLoop().submit(new Callable() { + @Override + public Boolean call() { + return flow.isQueueEmpty(); + } + }).get()); } finally { client.close(); server.close(); @@ -401,7 +432,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { } }; - FlowControlHandler flow = new FlowControlHandler(); + final FlowControlHandler flow = new FlowControlHandler(); Channel server = newServer(false, flow, handler); Channel client = newClient(server.localAddress()); try { @@ -415,7 +446,13 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // channelRead(1) peer.read(); assertTrue(latch.await(1L, SECONDS)); - assertTrue(flow.isQueueEmpty()); + + assertTrue(peer.eventLoop().submit(new Callable() { + @Override + public Boolean call() { + return flow.isQueueEmpty(); + } + }).get()); Throwable cause = causeRef.get(); if (cause != null) { @@ -485,11 +522,14 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { @Test public void testRemoveFlowControl() throws Exception { + final Exchanger peerRef = new Exchanger(); + final CountDownLatch latch = new CountDownLatch(3); ChannelInboundHandlerAdapter handler = new ChannelDuplexHandler() { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { + peerRef.exchange(ctx.channel(), 1L, SECONDS); //do the first read ctx.read(); super.channelActive(ctx); @@ -501,7 +541,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception } }; - FlowControlHandler flow = new FlowControlHandler() { + final FlowControlHandler flow = new FlowControlHandler() { private int num; @Override public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { @@ -530,12 +570,20 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { Channel server = newServer(false /* no auto read */, flow, handler, tail); Channel client = newClient(server.localAddress()); try { + // The client connection on the server side + Channel peer = peerRef.exchange(null, 1L, SECONDS); + // Write one message client.writeAndFlush(newOneMessage()).sync(); // We should receive 3 messages assertTrue(latch.await(1L, SECONDS)); - assertTrue(flow.isQueueEmpty()); + assertTrue(peer.eventLoop().submit(new Callable() { + @Override + public Boolean call() { + return flow.isQueueEmpty(); + } + }).get()); } finally { client.close(); server.close();