diff --git a/src/main/java/reactor/netty/http/client/HttpClientOperations.java b/src/main/java/reactor/netty/http/client/HttpClientOperations.java index 1df9fb73f6..132df819ac 100644 --- a/src/main/java/reactor/netty/http/client/HttpClientOperations.java +++ b/src/main/java/reactor/netty/http/client/HttpClientOperations.java @@ -63,6 +63,7 @@ import io.netty.handler.stream.ChunkedWriteHandler; import io.netty.util.ReferenceCountUtil; import org.reactivestreams.Publisher; +import org.reactivestreams.Subscription; import reactor.core.CoreSubscriber; import reactor.core.Disposable; import reactor.core.Exceptions; @@ -765,6 +766,13 @@ static final class SendForm extends Mono { @Override public void subscribe(CoreSubscriber s) { + if (!parent.markSentHeaders()) { + Operators.error(s, + new IllegalStateException("headers have already been sent")); + return; + } + Subscription subscription = Operators.emptySubscription(); + s.onSubscribe(subscription); if (parent.channel() .eventLoop() .inEventLoop()) { @@ -779,12 +787,6 @@ public void subscribe(CoreSubscriber s) { @SuppressWarnings("FutureReturnValueIgnored") void _subscribe(CoreSubscriber s) { - if (!parent.markSentHeaders()) { - Operators.error(s, - new IllegalStateException("headers have already been sent")); - return; - } - HttpDataFactory df = DEFAULT_FACTORY; try { @@ -841,14 +843,14 @@ void _subscribe(CoreSubscriber s) { .flux()); } } - Operators.complete(s); + s.onComplete(); } catch (Throwable e) { Exceptions.throwIfJvmFatal(e); df.cleanRequestHttpData(parent.nettyRequest); - Operators.error(s, Exceptions.unwrap(e)); + s.onError(Exceptions.unwrap(e)); } } }