Skip to content

Commit

Permalink
fix #1165 Change the state after notifying that the connection is acq…
Browse files Browse the repository at this point in the history
…uired successfully.

Cancel the subscription when disposing ChannelOperations
  • Loading branch information
violetagg committed Jul 9, 2020
1 parent 8041a8e commit c61e556
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 2 deletions.
5 changes: 5 additions & 0 deletions src/main/java/reactor/netty/channel/ChannelOperations.java
Expand Up @@ -157,6 +157,11 @@ public ChannelOperations<INBOUND, OUTBOUND> withConnection(Consumer<? super Conn

@Override
public void dispose() {
if (log.isTraceEnabled()) {
log.trace(format(channel(), "Disposing ChannelOperation from a channel"),
new Exception("ChannelOperation dispose stack"));
}
OUTBOUND_CLOSE.set(this, Operators.cancelledSubscription());
if (!inbound.isDisposed()) {
inbound.cancel();
}
Expand Down
Expand Up @@ -531,6 +531,10 @@ public void onUncaughtException(Connection connection, Throwable error) {
@Override
public void onStateChange(Connection connection, State newState) {
if (newState == State.CONFIGURED) {
// First send a notification that the connection is ready and then change the state
// In case a cancellation was received, ChannelOperations will be disposed
// and there will be no subscription to the I/O handler at all.
// https://github.com/reactor/reactor-netty/issues/1165
sink.success(connection);
}
obs.onStateChange(connection, newState);
Expand Down Expand Up @@ -600,8 +604,12 @@ else if (current == null) {
ChannelOperations<?, ?> ops = opsFactory.create(pooledConnection, pooledConnection, null);
if (ops != null) {
ops.bind();
obs.onStateChange(ops, State.CONFIGURED);
// First send a notification that the connection is ready and then change the state
// In case a cancellation was received, ChannelOperations will be disposed
// and there will be no subscription to the I/O handler at all.
// https://github.com/reactor/reactor-netty/issues/1165
sink.success(ops);
obs.onStateChange(ops, State.CONFIGURED);
}
else {
//already configured, just forward the connection
Expand Down
4 changes: 3 additions & 1 deletion src/test/java/reactor/netty/http/server/HttpServerTests.java
Expand Up @@ -1131,6 +1131,7 @@ public void testIssue630() {
}

@Test
@SuppressWarnings("FutureReturnValueIgnored")
public void testExpectErrorWhenConnectionClosed() throws Exception {
SelfSignedCertificate ssc = new SelfSignedCertificate();
SslContext serverCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey())
Expand All @@ -1142,7 +1143,8 @@ public void testExpectErrorWhenConnectionClosed() throws Exception {
.port(0)
.secure(spec -> spec.sslContext(serverCtx))
.handle((req, res) -> {
res.withConnection(DisposableChannel::dispose);
// "FutureReturnValueIgnored" is suppressed deliberately
res.withConnection(conn -> conn.channel().close());
return res.sendString(Flux.just("OK").hide())
.then()
.doOnError(t -> {
Expand Down

0 comments on commit c61e556

Please sign in to comment.