From 9f8f53238164e638406e80c07921ee8fb10507ab Mon Sep 17 00:00:00 2001 From: Jacques Fourie Date: Wed, 2 Nov 2022 14:13:35 -0700 Subject: [PATCH] fix(server): Fix race condition in client dispatcher (#2419) There exists a race condition in ClientTask::poll() when the request that is sent via h2::client::send_request() is pending open. A task will be spawned to wait for send capacity on the sendstream. Because this same stream is also stored in the pending member of h2::client::SendRequest the next iteration of the poll() loop can call poll_ready() and call wait_send() on the same stream passed into the spawned task. Fix this by always calling poll_ready() after send_request(). If this call to poll_ready() returns Pending save the necessary context in ClientTask and only spawn the task that will eventually resolve to the response after poll_ready() returns Ok. --- src/proto/h2/client.rs | 224 ++++++++++++++++++++++++++--------------- 1 file changed, 143 insertions(+), 81 deletions(-) diff --git a/src/proto/h2/client.rs b/src/proto/h2/client.rs index 013f6fb5a8..bac8eceb3a 100644 --- a/src/proto/h2/client.rs +++ b/src/proto/h2/client.rs @@ -7,12 +7,14 @@ use futures_channel::{mpsc, oneshot}; use futures_util::future::{self, Either, FutureExt as _, TryFutureExt as _}; use futures_util::stream::StreamExt as _; use h2::client::{Builder, SendRequest}; +use h2::SendStream; use http::{Method, StatusCode}; use tokio::io::{AsyncRead, AsyncWrite}; use tracing::{debug, trace, warn}; use super::{ping, H2Upgraded, PipeToSendStream, SendBuf}; use crate::body::HttpBody; +use crate::client::dispatch::Callback; use crate::common::{exec::Exec, task, Future, Never, Pin, Poll}; use crate::ext::Protocol; use crate::headers; @@ -20,6 +22,7 @@ use crate::proto::h2::UpgradedSendStream; use crate::proto::Dispatched; use crate::upgrade::Upgraded; use crate::{Body, Request, Response}; +use h2::client::ResponseFuture; type ClientRx = crate::client::dispatch::Receiver, Response>; @@ -170,6 +173,7 @@ where executor: exec, h2_tx, req_rx, + fut_ctx: None, }) } @@ -193,6 +197,20 @@ where } } +struct FutCtx +where + B: HttpBody, +{ + is_connect: bool, + eos: bool, + fut: ResponseFuture, + body_tx: SendStream>, + body: B, + cb: Callback, Response>, +} + +impl Unpin for FutCtx {} + pub(crate) struct ClientTask where B: HttpBody, @@ -203,6 +221,7 @@ where executor: Exec, h2_tx: SendRequest>, req_rx: ClientRx, + fut_ctx: Option>, } impl ClientTask @@ -214,6 +233,99 @@ where } } +impl ClientTask +where + B: HttpBody + Send + 'static, + B::Data: Send, + B::Error: Into>, +{ + fn poll_pipe(&mut self, f: FutCtx, cx: &mut task::Context<'_>) { + let ping = self.ping.clone(); + let send_stream = if !f.is_connect { + if !f.eos { + let mut pipe = Box::pin(PipeToSendStream::new(f.body, f.body_tx)).map(|res| { + if let Err(e) = res { + debug!("client request body error: {}", e); + } + }); + + // eagerly see if the body pipe is ready and + // can thus skip allocating in the executor + match Pin::new(&mut pipe).poll(cx) { + Poll::Ready(_) => (), + Poll::Pending => { + let conn_drop_ref = self.conn_drop_ref.clone(); + // keep the ping recorder's knowledge of an + // "open stream" alive while this body is + // still sending... + let ping = ping.clone(); + let pipe = pipe.map(move |x| { + drop(conn_drop_ref); + drop(ping); + x + }); + // Clear send task + self.executor.execute(pipe); + } + } + } + + None + } else { + Some(f.body_tx) + }; + + let fut = f.fut.map(move |result| match result { + Ok(res) => { + // record that we got the response headers + ping.record_non_data(); + + let content_length = headers::content_length_parse_all(res.headers()); + if let (Some(mut send_stream), StatusCode::OK) = (send_stream, res.status()) { + if content_length.map_or(false, |len| len != 0) { + warn!("h2 connect response with non-zero body not supported"); + + send_stream.send_reset(h2::Reason::INTERNAL_ERROR); + return Err(( + crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()), + None, + )); + } + let (parts, recv_stream) = res.into_parts(); + let mut res = Response::from_parts(parts, Body::empty()); + + let (pending, on_upgrade) = crate::upgrade::pending(); + let io = H2Upgraded { + ping, + send_stream: unsafe { UpgradedSendStream::new(send_stream) }, + recv_stream, + buf: Bytes::new(), + }; + let upgraded = Upgraded::new(io, Bytes::new()); + + pending.fulfill(upgraded); + res.extensions_mut().insert(on_upgrade); + + Ok(res) + } else { + let res = res.map(|stream| { + let ping = ping.for_stream(&stream); + crate::Body::h2(stream, content_length.into(), ping) + }); + Ok(res) + } + } + Err(err) => { + ping.ensure_not_timed_out().map_err(|e| (e, None))?; + + debug!("client response error: {}", err); + Err((crate::Error::new_h2(err), None)) + } + }); + self.executor.execute(f.cb.send_when(fut)); + } +} + impl Future for ClientTask where B: HttpBody + Send + 'static, @@ -237,6 +349,16 @@ where } }; + match self.fut_ctx.take() { + // If we were waiting on pending open + // continue where we left off. + Some(f) => { + self.poll_pipe(f, cx); + continue; + } + None => (), + } + match self.req_rx.poll_recv(cx) { Poll::Ready(Some((req, cb))) => { // check that future hasn't been canceled already @@ -255,7 +377,6 @@ where let is_connect = req.method() == Method::CONNECT; let eos = body.is_end_stream(); - let ping = self.ping.clone(); if is_connect { if headers::content_length_parse_all(req.headers()) @@ -283,90 +404,31 @@ where } }; - let send_stream = if !is_connect { - if !eos { - let mut pipe = - Box::pin(PipeToSendStream::new(body, body_tx)).map(|res| { - if let Err(e) = res { - debug!("client request body error: {}", e); - } - }); - - // eagerly see if the body pipe is ready and - // can thus skip allocating in the executor - match Pin::new(&mut pipe).poll(cx) { - Poll::Ready(_) => (), - Poll::Pending => { - let conn_drop_ref = self.conn_drop_ref.clone(); - // keep the ping recorder's knowledge of an - // "open stream" alive while this body is - // still sending... - let ping = ping.clone(); - let pipe = pipe.map(move |x| { - drop(conn_drop_ref); - drop(ping); - x - }); - self.executor.execute(pipe); - } - } - } - - None - } else { - Some(body_tx) + let f = FutCtx { + is_connect, + eos, + fut, + body_tx, + body, + cb, }; - let fut = fut.map(move |result| match result { - Ok(res) => { - // record that we got the response headers - ping.record_non_data(); - - let content_length = headers::content_length_parse_all(res.headers()); - if let (Some(mut send_stream), StatusCode::OK) = - (send_stream, res.status()) - { - if content_length.map_or(false, |len| len != 0) { - warn!("h2 connect response with non-zero body not supported"); - - send_stream.send_reset(h2::Reason::INTERNAL_ERROR); - return Err(( - crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()), - None, - )); - } - let (parts, recv_stream) = res.into_parts(); - let mut res = Response::from_parts(parts, Body::empty()); - - let (pending, on_upgrade) = crate::upgrade::pending(); - let io = H2Upgraded { - ping, - send_stream: unsafe { UpgradedSendStream::new(send_stream) }, - recv_stream, - buf: Bytes::new(), - }; - let upgraded = Upgraded::new(io, Bytes::new()); - - pending.fulfill(upgraded); - res.extensions_mut().insert(on_upgrade); - - Ok(res) - } else { - let res = res.map(|stream| { - let ping = ping.for_stream(&stream); - crate::Body::h2(stream, content_length.into(), ping) - }); - Ok(res) - } + // Check poll_ready() again. + // If the call to send_request() resulted in the new stream being pending open + // we have to wait for the open to complete before accepting new requests. + match self.h2_tx.poll_ready(cx) { + Poll::Pending => { + // Save Context + self.fut_ctx = Some(f); + return Poll::Pending; } - Err(err) => { - ping.ensure_not_timed_out().map_err(|e| (e, None))?; - - debug!("client response error: {}", err); - Err((crate::Error::new_h2(err), None)) + Poll::Ready(Ok(())) => (), + Poll::Ready(Err(err)) => { + f.cb.send(Err((crate::Error::new_h2(err), None))); + continue; } - }); - self.executor.execute(cb.send_when(fut)); + } + self.poll_pipe(f, cx); continue; }