From 4b6c28aeec3907bae17b9d04e0f3f0847cbb8b8e Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Fri, 24 Jun 2022 19:12:18 +0300 Subject: [PATCH] Adapt to the changed semantic for EmbeddedChannel (#2325) Related to Netty 5 change https://github.com/netty/netty/pull/9529 Related to #1873 --- .../netty/http/client/Http2PoolTest.java | 373 ++++++++++-------- 1 file changed, 203 insertions(+), 170 deletions(-) diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java b/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java index 070c37e4f3..53d87c0e6d 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java @@ -21,7 +21,6 @@ import io.netty5.channel.embedded.EmbeddedChannel; import io.netty5.handler.codec.http2.Http2FrameCodecBuilder; import io.netty5.handler.codec.http2.Http2MultiplexHandler; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import reactor.core.Disposable; import reactor.core.publisher.Flux; @@ -33,6 +32,7 @@ import reactor.netty.internal.shaded.reactor.pool.PoolConfig; import reactor.netty.internal.shaded.reactor.pool.PooledRef; import reactor.test.StepVerifier; +import reactor.util.annotation.Nullable; import java.nio.charset.StandardCharsets; import java.time.Duration; @@ -46,7 +46,6 @@ import static org.assertj.core.api.Assertions.assertThat; -@Disabled class Http2PoolTest { @Test @@ -62,9 +61,11 @@ void acquireInvalidate() { try { List> acquired = new ArrayList<>(); - http2Pool.acquire().subscribe(acquired::add); - http2Pool.acquire().subscribe(acquired::add); - http2Pool.acquire().subscribe(acquired::add); + channel.executor().execute(() -> { + http2Pool.acquire().subscribe(acquired::add); + http2Pool.acquire().subscribe(acquired::add); + http2Pool.acquire().subscribe(acquired::add); + }); channel.runPendingTasks(); @@ -73,7 +74,7 @@ void acquireInvalidate() { assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(Integer.MAX_VALUE); for (PooledRef slot : acquired) { - slot.invalidate().block(Duration.ofSeconds(1)); + channel.executor().execute(() -> slot.invalidate().block(Duration.ofSeconds(1))); } assertThat(http2Pool.activeStreams()).isEqualTo(0); @@ -81,7 +82,7 @@ void acquireInvalidate() { for (PooledRef slot : acquired) { // second invalidate() should be ignored and ACQUIRED size should remain the same - slot.invalidate().block(Duration.ofSeconds(1)); + channel.executor().execute(() -> slot.invalidate().block(Duration.ofSeconds(1))); } assertThat(http2Pool.activeStreams()).isEqualTo(0); @@ -106,9 +107,11 @@ void acquireRelease() { try { List> acquired = new ArrayList<>(); - http2Pool.acquire().subscribe(acquired::add); - http2Pool.acquire().subscribe(acquired::add); - http2Pool.acquire().subscribe(acquired::add); + channel.executor().execute(() -> { + http2Pool.acquire().subscribe(acquired::add); + http2Pool.acquire().subscribe(acquired::add); + http2Pool.acquire().subscribe(acquired::add); + }); channel.runPendingTasks(); @@ -117,7 +120,7 @@ void acquireRelease() { assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(Integer.MAX_VALUE); for (PooledRef slot : acquired) { - slot.release().block(Duration.ofSeconds(1)); + channel.executor().execute(() -> slot.release().block(Duration.ofSeconds(1))); } assertThat(http2Pool.activeStreams()).isEqualTo(0); @@ -125,7 +128,7 @@ void acquireRelease() { for (PooledRef slot : acquired) { // second release() should be ignored and ACQUIRED size should remain the same - slot.release().block(Duration.ofSeconds(1)); + channel.executor().execute(() -> slot.release().block(Duration.ofSeconds(1))); } assertThat(http2Pool.activeStreams()).isEqualTo(0); @@ -139,13 +142,10 @@ void acquireRelease() { @Test void evictClosedConnection() throws Exception { + Channel channel = new EmbeddedChannel(new TestChannelId(), Http2FrameCodecBuilder.forClient().build()); + AtomicReference channelRef = new AtomicReference<>(channel); PoolBuilder> poolBuilder = - PoolBuilder.from(Mono.fromSupplier(() -> { - Channel channel = new EmbeddedChannel( - new TestChannelId(), - Http2FrameCodecBuilder.forClient().build()); - return Connection.from(channel); - })) + PoolBuilder.from(Mono.fromSupplier(() -> Connection.from(channelRef.get()))) .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); @@ -153,14 +153,15 @@ void evictClosedConnection() throws Exception { Connection connection = null; try { - PooledRef acquired1 = http2Pool.acquire().block(); + List> acquired1 = new ArrayList<>(); + channel.executor().execute(() -> http2Pool.acquire().subscribe(acquired1::add)); - assertThat(acquired1).isNotNull(); + assertThat(acquired1).hasSize(1); assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); - connection = acquired1.poolable(); + connection = acquired1.get(0).poolable(); ChannelId id1 = connection.channel().id(); CountDownLatch latch = new CountDownLatch(1); ((EmbeddedChannel) connection.channel()).finishAndReleaseAll(); @@ -173,25 +174,29 @@ void evictClosedConnection() throws Exception { assertThat(http2Pool.connections.size()).isEqualTo(1); assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); - acquired1.invalidate().block(); + channel.executor().execute(() -> acquired1.get(0).invalidate().block()); assertThat(http2Pool.activeStreams()).isEqualTo(0); assertThat(http2Pool.connections.size()).isEqualTo(0); assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); - PooledRef acquired2 = http2Pool.acquire().block(); + channel = new EmbeddedChannel(new TestChannelId(), Http2FrameCodecBuilder.forClient().build()); + channelRef.set(channel); - assertThat(acquired2).isNotNull(); + List> acquired2 = new ArrayList<>(); + channel.executor().execute(() -> http2Pool.acquire().subscribe(acquired2::add)); + + assertThat(acquired2).hasSize(1); assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); - connection = acquired2.poolable(); + connection = acquired2.get(0).poolable(); ChannelId id2 = connection.channel().id(); assertThat(id1).isNotEqualTo(id2); - acquired2.invalidate().block(); + channel.executor().execute(() -> acquired2.get(0).invalidate().block()); assertThat(http2Pool.activeStreams()).isEqualTo(0); assertThat(http2Pool.connections.size()).isEqualTo(1); @@ -216,14 +221,11 @@ void evictClosedConnectionMaxConnectionsNotReached_2() throws Exception { } private void evictClosedConnectionMaxConnectionsNotReached(boolean closeSecond) throws Exception { + Channel channel = new EmbeddedChannel(new TestChannelId(), Http2FrameCodecBuilder.forClient().build(), + new Http2MultiplexHandler(new ChannelHandlerAdapter() {})); + AtomicReference channelRef = new AtomicReference<>(channel); PoolBuilder> poolBuilder = - PoolBuilder.from(Mono.fromSupplier(() -> { - Channel channel = new EmbeddedChannel( - new TestChannelId(), - Http2FrameCodecBuilder.forClient().build(), - new Http2MultiplexHandler(new ChannelHandlerAdapter() {})); - return Connection.from(channel); - })) + PoolBuilder.from(Mono.fromSupplier(() -> Connection.from(channelRef.get()))) .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 2); @@ -231,14 +233,15 @@ private void evictClosedConnectionMaxConnectionsNotReached(boolean closeSecond) Connection connection = null; try { - PooledRef acquired1 = http2Pool.acquire().block(); + List> acquired1 = new ArrayList<>(); + channel.executor().execute(() -> http2Pool.acquire().subscribe(acquired1::add)); - assertThat(acquired1).isNotNull(); + assertThat(acquired1).hasSize(1); assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(Integer.MAX_VALUE); - connection = acquired1.poolable(); + connection = acquired1.get(0).poolable(); ChannelId id1 = connection.channel().id(); CountDownLatch latch = new CountDownLatch(1); ((EmbeddedChannel) connection.channel()).finishAndReleaseAll(); @@ -251,13 +254,25 @@ private void evictClosedConnectionMaxConnectionsNotReached(boolean closeSecond) assertThat(http2Pool.connections.size()).isEqualTo(1); assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(Integer.MAX_VALUE); - PooledRef acquired2 = http2Pool.acquire().block(); - assertThat(acquired2).isNotNull(); + channel = new EmbeddedChannel(new TestChannelId(), Http2FrameCodecBuilder.forClient().build(), + new Http2MultiplexHandler(new ChannelHandlerAdapter() {})); + channelRef.set(channel); + + List> acquired2 = new ArrayList<>(); + channel.executor().execute(() -> http2Pool.acquire().subscribe(acquired2::add)); + + assertThat(acquired2).hasSize(1); + + channel = new EmbeddedChannel(new TestChannelId(), Http2FrameCodecBuilder.forClient().build(), + new Http2MultiplexHandler(new ChannelHandlerAdapter() {})); + channelRef.set(channel); - AtomicReference> acquired3 = new AtomicReference<>(); - http2Pool.acquire().subscribe(acquired3::set); + List> acquired3 = new ArrayList<>(); + channel.executor().execute(() -> http2Pool.acquire().subscribe(acquired3::add)); - connection = acquired2.poolable(); + assertThat(acquired3).hasSize(1); + + connection = acquired2.get(0).poolable(); ((EmbeddedChannel) connection.channel()).runPendingTasks(); assertThat(http2Pool.activeStreams()).isEqualTo(3); @@ -276,14 +291,14 @@ private void evictClosedConnectionMaxConnectionsNotReached(boolean closeSecond) ChannelId id2 = connection.channel().id(); assertThat(id1).isNotEqualTo(id2); - acquired1.invalidate().block(); - acquired2.invalidate().block(); + acquired1.get(0).poolable().channel().executor().execute(() -> acquired1.get(0).invalidate().block()); + acquired2.get(0).poolable().channel().executor().execute(() -> acquired2.get(0).invalidate().block()); assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(Integer.MAX_VALUE); - acquired3.get().invalidate().block(); + acquired3.get(0).poolable().channel().executor().execute(() -> acquired3.get(0).invalidate().block()); assertThat(http2Pool.activeStreams()).isEqualTo(0); if (closeSecond) { @@ -305,13 +320,9 @@ private void evictClosedConnectionMaxConnectionsNotReached(boolean closeSecond) @Test void evictClosedConnectionMaxConnectionsReached() throws Exception { + Channel channel = new EmbeddedChannel(new TestChannelId(), Http2FrameCodecBuilder.forClient().build()); PoolBuilder> poolBuilder = - PoolBuilder.from(Mono.fromSupplier(() -> { - Channel channel = new EmbeddedChannel( - new TestChannelId(), - Http2FrameCodecBuilder.forClient().build()); - return Connection.from(channel); - })) + PoolBuilder.from(Mono.fromSupplier(() -> Connection.from(channel))) .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); @@ -319,14 +330,15 @@ void evictClosedConnectionMaxConnectionsReached() throws Exception { Connection connection = null; try { - PooledRef acquired1 = http2Pool.acquire().block(); + List> acquired1 = new ArrayList<>(); + channel.executor().execute(() -> http2Pool.acquire().subscribe(acquired1::add)); - assertThat(acquired1).isNotNull(); + assertThat(acquired1).hasSize(1); assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); - connection = acquired1.poolable(); + connection = acquired1.get(0).poolable(); CountDownLatch latch = new CountDownLatch(1); ((EmbeddedChannel) connection.channel()).finishAndReleaseAll(); connection.onDispose(latch::countDown); @@ -347,7 +359,7 @@ void evictClosedConnectionMaxConnectionsReached() throws Exception { assertThat(http2Pool.connections.size()).isEqualTo(1); assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); - acquired1.invalidate().block(); + channel.executor().execute(() -> acquired1.get(0).invalidate().block()); assertThat(http2Pool.activeStreams()).isEqualTo(0); assertThat(http2Pool.connections.size()).isEqualTo(0); @@ -363,13 +375,10 @@ void evictClosedConnectionMaxConnectionsReached() throws Exception { @Test void evictInBackgroundClosedConnection() throws Exception { + Channel channel = new EmbeddedChannel(new TestChannelId(), Http2FrameCodecBuilder.forClient().build()); + AtomicReference channelRef = new AtomicReference<>(channel); PoolBuilder> poolBuilder = - PoolBuilder.from(Mono.fromSupplier(() -> { - Channel channel = new EmbeddedChannel( - new TestChannelId(), - Http2FrameCodecBuilder.forClient().build()); - return Connection.from(channel); - })) + PoolBuilder.from(Mono.fromSupplier(() -> Connection.from(channelRef.get()))) .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1) @@ -378,14 +387,15 @@ void evictInBackgroundClosedConnection() throws Exception { Connection connection = null; try { - PooledRef acquired1 = http2Pool.acquire().block(); + List> acquired1 = new ArrayList<>(); + channel.executor().execute(() -> http2Pool.acquire().subscribe(acquired1::add)); - assertThat(acquired1).isNotNull(); + assertThat(acquired1).hasSize(1); assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); - connection = acquired1.poolable(); + connection = acquired1.get(0).poolable(); ChannelId id1 = connection.channel().id(); CountDownLatch latch = new CountDownLatch(1); ((EmbeddedChannel) connection.channel()).finishAndReleaseAll(); @@ -398,7 +408,7 @@ void evictInBackgroundClosedConnection() throws Exception { assertThat(http2Pool.connections.size()).isEqualTo(1); assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); - acquired1.invalidate().block(); + channel.executor().execute(() -> acquired1.get(0).invalidate().block()); http2Pool.evictInBackground(); @@ -406,19 +416,23 @@ void evictInBackgroundClosedConnection() throws Exception { assertThat(http2Pool.connections.size()).isEqualTo(0); assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); - PooledRef acquired2 = http2Pool.acquire().block(); + channel = new EmbeddedChannel(new TestChannelId(), Http2FrameCodecBuilder.forClient().build()); + channelRef.set(channel); + + List> acquired2 = new ArrayList<>(); + channel.executor().execute(() -> http2Pool.acquire().subscribe(acquired2::add)); - assertThat(acquired2).isNotNull(); + assertThat(acquired2).hasSize(1); assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); - connection = acquired2.poolable(); + connection = acquired2.get(0).poolable(); ChannelId id2 = connection.channel().id(); assertThat(id1).isNotEqualTo(id2); - acquired2.invalidate().block(); + channel.executor().execute(() -> acquired2.get(0).invalidate().block()); http2Pool.evictInBackground(); @@ -436,13 +450,10 @@ void evictInBackgroundClosedConnection() throws Exception { @Test void evictInBackgroundMaxIdleTime() throws Exception { + Channel channel = new EmbeddedChannel(new TestChannelId(), Http2FrameCodecBuilder.forClient().build()); + AtomicReference channelRef = new AtomicReference<>(channel); PoolBuilder> poolBuilder = - PoolBuilder.from(Mono.fromSupplier(() -> { - Channel channel = new EmbeddedChannel( - new TestChannelId(), - Http2FrameCodecBuilder.forClient().build()); - return Connection.from(channel); - })) + PoolBuilder.from(Mono.fromSupplier(() -> Connection.from(channelRef.get()))) .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1) @@ -452,17 +463,18 @@ void evictInBackgroundMaxIdleTime() throws Exception { Connection connection1 = null; Connection connection2 = null; try { - PooledRef acquired1 = http2Pool.acquire().block(); + List> acquired1 = new ArrayList<>(); + channel.executor().execute(() -> http2Pool.acquire().subscribe(acquired1::add)); - assertThat(acquired1).isNotNull(); + assertThat(acquired1).hasSize(1); assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); - connection1 = acquired1.poolable(); + connection1 = acquired1.get(0).poolable(); ChannelId id1 = connection1.channel().id(); - acquired1.invalidate().block(); + channel.executor().execute(() -> acquired1.get(0).invalidate().block()); Thread.sleep(15); @@ -472,19 +484,23 @@ void evictInBackgroundMaxIdleTime() throws Exception { assertThat(http2Pool.connections.size()).isEqualTo(0); assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); - PooledRef acquired2 = http2Pool.acquire().block(); + channel = new EmbeddedChannel(new TestChannelId(), Http2FrameCodecBuilder.forClient().build()); + channelRef.set(channel); - assertThat(acquired2).isNotNull(); + List> acquired2 = new ArrayList<>(); + channel.executor().execute(() -> http2Pool.acquire().subscribe(acquired2::add)); + + assertThat(acquired2).hasSize(1); assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); - connection2 = acquired2.poolable(); + connection2 = acquired2.get(0).poolable(); ChannelId id2 = connection2.channel().id(); assertThat(id1).isNotEqualTo(id2); - acquired2.invalidate().block(); + channel.executor().execute(() -> acquired2.get(0).invalidate().block()); Thread.sleep(15); @@ -508,13 +524,10 @@ void evictInBackgroundMaxIdleTime() throws Exception { @Test void evictInBackgroundMaxLifeTime() throws Exception { + Channel channel = new EmbeddedChannel(new TestChannelId(), Http2FrameCodecBuilder.forClient().build()); + AtomicReference channelRef = new AtomicReference<>(channel); PoolBuilder> poolBuilder = - PoolBuilder.from(Mono.fromSupplier(() -> { - Channel channel = new EmbeddedChannel( - new TestChannelId(), - Http2FrameCodecBuilder.forClient().build()); - return Connection.from(channel); - })) + PoolBuilder.from(Mono.fromSupplier(() -> Connection.from(channelRef.get()))) .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1) @@ -524,14 +537,15 @@ void evictInBackgroundMaxLifeTime() throws Exception { Connection connection1 = null; Connection connection2 = null; try { - PooledRef acquired1 = http2Pool.acquire().block(); + List> acquired1 = new ArrayList<>(); + channel.executor().execute(() -> http2Pool.acquire().subscribe(acquired1::add)); - assertThat(acquired1).isNotNull(); + assertThat(acquired1).hasSize(1); assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); - connection1 = acquired1.poolable(); + connection1 = acquired1.get(0).poolable(); ChannelId id1 = connection1.channel().id(); Thread.sleep(10); @@ -540,7 +554,7 @@ void evictInBackgroundMaxLifeTime() throws Exception { assertThat(http2Pool.connections.size()).isEqualTo(1); assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); - acquired1.invalidate().block(); + channel.executor().execute(() -> acquired1.get(0).invalidate().block()); http2Pool.evictInBackground(); @@ -548,19 +562,23 @@ void evictInBackgroundMaxLifeTime() throws Exception { assertThat(http2Pool.connections.size()).isEqualTo(0); assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); - PooledRef acquired2 = http2Pool.acquire().block(); + channel = new EmbeddedChannel(new TestChannelId(), Http2FrameCodecBuilder.forClient().build()); + channelRef.set(channel); + + List> acquired2 = new ArrayList<>(); + channel.executor().execute(() -> http2Pool.acquire().subscribe(acquired2::add)); - assertThat(acquired2).isNotNull(); + assertThat(acquired2).hasSize(1); assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); - connection2 = acquired2.poolable(); + connection2 = acquired2.get(0).poolable(); ChannelId id2 = connection2.channel().id(); assertThat(id1).isNotEqualTo(id2); - acquired2.invalidate().block(); + channel.executor().execute(() -> acquired2.get(0).invalidate().block()); Thread.sleep(10); @@ -584,13 +602,10 @@ void evictInBackgroundMaxLifeTime() throws Exception { @Test void maxIdleTime() throws Exception { + Channel channel = new EmbeddedChannel(new TestChannelId(), Http2FrameCodecBuilder.forClient().build()); + AtomicReference channelRef = new AtomicReference<>(channel); PoolBuilder> poolBuilder = - PoolBuilder.from(Mono.fromSupplier(() -> { - Channel channel = new EmbeddedChannel( - new TestChannelId(), - Http2FrameCodecBuilder.forClient().build()); - return Connection.from(channel); - })) + PoolBuilder.from(Mono.fromSupplier(() -> Connection.from(channelRef.get()))) .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); @@ -599,33 +614,38 @@ void maxIdleTime() throws Exception { Connection connection1 = null; Connection connection2 = null; try { - PooledRef acquired1 = http2Pool.acquire().block(); + List> acquired1 = new ArrayList<>(); + channel.executor().execute(() -> http2Pool.acquire().subscribe(acquired1::add)); - assertThat(acquired1).isNotNull(); + assertThat(acquired1).hasSize(1); assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); - connection1 = acquired1.poolable(); + connection1 = acquired1.get(0).poolable(); ChannelId id1 = connection1.channel().id(); - acquired1.invalidate().block(); + channel.executor().execute(() -> acquired1.get(0).invalidate().block()); Thread.sleep(15); - PooledRef acquired2 = http2Pool.acquire().block(); + channel = new EmbeddedChannel(new TestChannelId(), Http2FrameCodecBuilder.forClient().build()); + channelRef.set(channel); - assertThat(acquired2).isNotNull(); + List> acquired2 = new ArrayList<>(); + channel.executor().execute(() -> http2Pool.acquire().subscribe(acquired2::add)); + + assertThat(acquired2).hasSize(1); assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); - connection2 = acquired2.poolable(); + connection2 = acquired2.get(0).poolable(); ChannelId id2 = connection2.channel().id(); assertThat(id1).isNotEqualTo(id2); - acquired2.invalidate().block(); + channel.executor().execute(() -> acquired2.get(0).invalidate().block()); assertThat(http2Pool.activeStreams()).isEqualTo(0); assertThat(http2Pool.connections.size()).isEqualTo(1); @@ -658,8 +678,10 @@ void maxIdleTimeActiveStreams() throws Exception { Connection connection2 = null; try { List> acquired = new ArrayList<>(); - http2Pool.acquire().subscribe(acquired::add); - http2Pool.acquire().subscribe(acquired::add); + channel.executor().execute(() -> { + http2Pool.acquire().subscribe(acquired::add); + http2Pool.acquire().subscribe(acquired::add); + }); channel.runPendingTasks(); @@ -671,7 +693,7 @@ void maxIdleTimeActiveStreams() throws Exception { connection1 = acquired.get(0).poolable(); ChannelId id1 = connection1.channel().id(); - acquired.get(0).invalidate().block(); + channel.executor().execute(() -> acquired.get(0).invalidate().block()); Thread.sleep(15); @@ -684,7 +706,7 @@ void maxIdleTimeActiveStreams() throws Exception { assertThat(id1).isEqualTo(id2); - acquired.get(1).invalidate().block(); + channel.executor().execute(() -> acquired.get(1).invalidate().block()); assertThat(http2Pool.activeStreams()).isEqualTo(0); assertThat(http2Pool.connections.size()).isEqualTo(1); @@ -704,13 +726,10 @@ void maxIdleTimeActiveStreams() throws Exception { @Test void maxLifeTime() throws Exception { + Channel channel = new EmbeddedChannel(new TestChannelId(), Http2FrameCodecBuilder.forClient().build()); + AtomicReference channelRef = new AtomicReference<>(channel); PoolBuilder> poolBuilder = - PoolBuilder.from(Mono.fromSupplier(() -> { - Channel channel = new EmbeddedChannel( - new TestChannelId(), - Http2FrameCodecBuilder.forClient().build()); - return Connection.from(channel); - })) + PoolBuilder.from(Mono.fromSupplier(() -> Connection.from(channelRef.get()))) .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); @@ -719,14 +738,15 @@ void maxLifeTime() throws Exception { Connection connection1 = null; Connection connection2 = null; try { - PooledRef acquired1 = http2Pool.acquire().block(); + List> acquired1 = new ArrayList<>(); + channel.executor().execute(() -> http2Pool.acquire().subscribe(acquired1::add)); - assertThat(acquired1).isNotNull(); + assertThat(acquired1).hasSize(1); assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); - connection1 = acquired1.poolable(); + connection1 = acquired1.get(0).poolable(); ChannelId id1 = connection1.channel().id(); Thread.sleep(10); @@ -735,25 +755,29 @@ void maxLifeTime() throws Exception { assertThat(http2Pool.connections.size()).isEqualTo(1); assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); - acquired1.invalidate().block(); + channel.executor().execute(() -> acquired1.get(0).invalidate().block()); assertThat(http2Pool.activeStreams()).isEqualTo(0); assertThat(http2Pool.connections.size()).isEqualTo(0); assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); - PooledRef acquired2 = http2Pool.acquire().block(); + channel = new EmbeddedChannel(new TestChannelId(), Http2FrameCodecBuilder.forClient().build()); + channelRef.set(channel); + + List> acquired2 = new ArrayList<>(); + channel.executor().execute(() -> http2Pool.acquire().subscribe(acquired2::add)); - assertThat(acquired2).isNotNull(); + assertThat(acquired2).hasSize(1); assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); - connection2 = acquired2.poolable(); + connection2 = acquired2.get(0).poolable(); ChannelId id2 = connection2.channel().id(); assertThat(id1).isNotEqualTo(id2); - acquired2.invalidate().block(); + channel.executor().execute(() -> acquired2.get(0).invalidate().block()); assertThat(http2Pool.activeStreams()).isEqualTo(0); assertThat(http2Pool.connections.size()).isEqualTo(1); @@ -773,13 +797,10 @@ void maxLifeTime() throws Exception { @Test void maxLifeTimeMaxConnectionsNotReached() throws Exception { + Channel channel = new EmbeddedChannel(new TestChannelId(), Http2FrameCodecBuilder.forClient().build()); + AtomicReference channelRef = new AtomicReference<>(channel); PoolBuilder> poolBuilder = - PoolBuilder.from(Mono.fromSupplier(() -> { - Channel channel = new EmbeddedChannel( - new TestChannelId(), - Http2FrameCodecBuilder.forClient().build()); - return Connection.from(channel); - })) + PoolBuilder.from(Mono.fromSupplier(() -> Connection.from(channelRef.get()))) .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 2); @@ -788,14 +809,15 @@ void maxLifeTimeMaxConnectionsNotReached() throws Exception { Connection connection1 = null; Connection connection2 = null; try { - PooledRef acquired1 = http2Pool.acquire().block(); + List> acquired1 = new ArrayList<>(); + channel.executor().execute(() -> http2Pool.acquire().subscribe(acquired1::add)); - assertThat(acquired1).isNotNull(); + assertThat(acquired1).hasSize(1); assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); - connection1 = acquired1.poolable(); + connection1 = acquired1.get(0).poolable(); ChannelId id1 = connection1.channel().id(); Thread.sleep(50); @@ -804,20 +826,24 @@ void maxLifeTimeMaxConnectionsNotReached() throws Exception { assertThat(http2Pool.connections.size()).isEqualTo(1); assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); - PooledRef acquired2 = http2Pool.acquire().block(); + channel = new EmbeddedChannel(new TestChannelId(), Http2FrameCodecBuilder.forClient().build()); + channelRef.set(channel); + + List> acquired2 = new ArrayList<>(); + channel.executor().execute(() -> http2Pool.acquire().subscribe(acquired2::add)); - assertThat(acquired2).isNotNull(); + assertThat(acquired2).hasSize(1); assertThat(http2Pool.activeStreams()).isEqualTo(2); assertThat(http2Pool.connections.size()).isEqualTo(2); assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); - connection2 = acquired2.poolable(); + connection2 = acquired2.get(0).poolable(); ChannelId id2 = connection2.channel().id(); assertThat(id1).isNotEqualTo(id2); - acquired1.invalidate().block(); - acquired2.invalidate().block(); + acquired1.get(0).poolable().channel().executor().execute(() -> acquired1.get(0).invalidate().block()); + acquired2.get(0).poolable().channel().executor().execute(() -> acquired2.get(0).invalidate().block()); assertThat(http2Pool.activeStreams()).isEqualTo(0); assertThat(http2Pool.connections.size()).isEqualTo(1); @@ -854,14 +880,13 @@ void maxLifeTimeMaxConnectionsReachedWithCustomTimer() throws Exception { assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); } - private void doMaxLifeTimeMaxConnectionsReached(BiFunction pendingAcquireTimer) throws Exception { + private void doMaxLifeTimeMaxConnectionsReached(@Nullable BiFunction pendingAcquireTimer) + throws Exception { + Channel channel = new EmbeddedChannel(new TestChannelId(), Http2FrameCodecBuilder.forClient().build()); + AtomicReference channelRef = new AtomicReference<>(channel); PoolBuilder> poolBuilder = - PoolBuilder.from(Mono.fromSupplier(() -> { - Channel channel = new EmbeddedChannel( - new TestChannelId(), - Http2FrameCodecBuilder.forClient().build()); - return Connection.from(channel); - })) + PoolBuilder.from(Mono.fromSupplier(() -> Connection.from(channelRef.get()))) + .idleResourceReuseLruOrder() .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); @@ -872,14 +897,15 @@ private void doMaxLifeTimeMaxConnectionsReached(BiFunction acquired1 = http2Pool.acquire().block(); + List> acquired1 = new ArrayList<>(); + channel.executor().execute(() -> http2Pool.acquire().subscribe(acquired1::add)); - assertThat(acquired1).isNotNull(); + assertThat(acquired1).hasSize(1); assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); - connection = acquired1.poolable(); + connection = acquired1.get(0).poolable(); Thread.sleep(10); @@ -896,7 +922,7 @@ private void doMaxLifeTimeMaxConnectionsReached(BiFunction acquired1.get(0).invalidate().block()); assertThat(http2Pool.activeStreams()).isEqualTo(0); assertThat(http2Pool.connections.size()).isEqualTo(0); @@ -927,9 +953,10 @@ void minConnections() { List> acquired = new ArrayList<>(); try { - Flux.range(0, 3) - .flatMap(i -> http2Pool.acquire().doOnNext(acquired::add)) - .subscribe(); + channel.executor().execute(() -> + Flux.range(0, 3) + .flatMap(i -> http2Pool.acquire().doOnNext(acquired::add)) + .subscribe()); channel.runPendingTasks(); @@ -941,7 +968,7 @@ void minConnections() { assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(Integer.MAX_VALUE); for (PooledRef slot : acquired) { - slot.release().block(Duration.ofSeconds(1)); + channel.executor().execute(() -> slot.release().block(Duration.ofSeconds(1))); } assertThat(http2Pool.activeStreams()).isEqualTo(0); @@ -958,13 +985,10 @@ void minConnections() { @Test void minConnectionsMaxStreamsReached() { + Channel channel = new EmbeddedChannel(new TestChannelId(), Http2FrameCodecBuilder.forClient().build()); + AtomicReference channelRef = new AtomicReference<>(channel); PoolBuilder> poolBuilder = - PoolBuilder.from(Mono.fromSupplier(() -> { - Channel channel = new EmbeddedChannel( - new TestChannelId(), - Http2FrameCodecBuilder.forClient().build()); - return Connection.from(channel); - })) + PoolBuilder.from(Mono.fromSupplier(() -> Connection.from(channelRef.get()))) .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(1, 3); @@ -976,9 +1000,17 @@ void minConnectionsMaxStreamsReached() { List> acquired = new ArrayList<>(); try { - Flux.range(0, 3) - .flatMap(i -> http2Pool.acquire().doOnNext(acquired::add)) - .blockLast(Duration.ofSeconds(1)); + channel.executor().execute(() -> http2Pool.acquire().subscribe(acquired::add)); + + channel = new EmbeddedChannel(new TestChannelId(), Http2FrameCodecBuilder.forClient().build()); + channelRef.set(channel); + + channel.executor().execute(() -> http2Pool.acquire().subscribe(acquired::add)); + + channel = new EmbeddedChannel(new TestChannelId(), Http2FrameCodecBuilder.forClient().build()); + channelRef.set(channel); + + channel.executor().execute(() -> http2Pool.acquire().subscribe(acquired::add)); assertThat(acquired).hasSize(3); @@ -993,7 +1025,7 @@ void minConnectionsMaxStreamsReached() { assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); for (PooledRef slot : acquired) { - slot.release().block(Duration.ofSeconds(1)); + slot.poolable().channel().executor().execute(() -> slot.release().block(Duration.ofSeconds(1))); } assertThat(http2Pool.activeStreams()).isEqualTo(0); @@ -1019,9 +1051,10 @@ void nonHttp2ConnectionEmittedOnce() { Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, -1, -1)); try { - PooledRef acquired = http2Pool.acquire().block(Duration.ofSeconds(1)); + List> acquired = new ArrayList<>(); + channel.executor().execute(() -> http2Pool.acquire().subscribe(acquired::add)); - assertThat(acquired).isNotNull(); + assertThat(acquired).hasSize(1); assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); @@ -1032,7 +1065,7 @@ void nonHttp2ConnectionEmittedOnce() { assertThat(http2Pool.activeStreams()).isEqualTo(1); - acquired.invalidate().block(Duration.ofSeconds(1)); + channel.executor().execute(() -> acquired.get(0).invalidate().block(Duration.ofSeconds(1))); assertThat(http2Pool.activeStreams()).isEqualTo(0); assertThat(http2Pool.connections.size()).isEqualTo(0);