Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support CONNECT over h2 #2508

Closed
nox opened this issue Apr 21, 2021 · 9 comments
Closed

Support CONNECT over h2 #2508

nox opened this issue Apr 21, 2021 · 9 comments
Labels
A-client Area: client. A-http2 Area: HTTP/2 specific. A-server Area: server. C-feature Category: feature. This is adding a new feature.

Comments

@nox
Copy link
Contributor

nox commented Apr 21, 2021

There is a h2 issue but even if support for CONNECT was perfect in h2, there are still details to sort out on hyper itself too. I'm filing this issue because I started working on that today and will probably need some help.

@nox
Copy link
Contributor Author

nox commented Apr 21, 2021

AFAICT for the client part we mostly want to patch this match arm:

Poll::Ready(Some((req, cb))) => {
// check that future hasn't been canceled already
if cb.is_canceled() {
trace!("request callback is canceled");
continue;
}
let (head, body) = req.into_parts();
let mut req = ::http::Request::from_parts(head, ());
super::strip_connection_headers(req.headers_mut(), true);
if let Some(len) = body.size_hint().exact() {
if len != 0 || headers::method_has_defined_payload_semantics(req.method()) {
headers::set_content_length_if_missing(req.headers_mut(), len);
}
}
let eos = body.is_end_stream();
let (fut, body_tx) = match self.h2_tx.send_request(req, eos) {
Ok(ok) => ok,
Err(err) => {
debug!("client send request error: {}", err);
cb.send(Err((crate::Error::new_h2(err), None)));
continue;
}
};
let ping = self.ping.clone();
if !eos {
let mut pipe = Box::pin(PipeToSendStream::new(body, body_tx)).map(|res| {
if let Err(e) = res {
debug!("client request body error: {}", e);
}
});
// eagerly see if the body pipe is ready and
// can thus skip allocating in the executor
match Pin::new(&mut pipe).poll(cx) {
Poll::Ready(_) => (),
Poll::Pending => {
let conn_drop_ref = self.conn_drop_ref.clone();
// keep the ping recorder's knowledge of an
// "open stream" alive while this body is
// still sending...
let ping = ping.clone();
let pipe = pipe.map(move |x| {
drop(conn_drop_ref);
drop(ping);
x
});
self.executor.execute(pipe);
}
}
}
let fut = fut.map(move |result| match result {
Ok(res) => {
// record that we got the response headers
ping.record_non_data();
let content_length = decode_content_length(res.headers());
let res = res.map(|stream| {
let ping = ping.for_stream(&stream);
crate::Body::h2(stream, content_length, ping)
});
Ok(res)
}
Err(err) => {
ping.ensure_not_timed_out().map_err(|e| (e, None))?;
debug!("client response error: {}", err);
Err((crate::Error::new_h2(err), None))
}
});
self.executor.execute(cb.send_when(fut));
continue;
}

Specifically, ultimately we want to patch this snippet so that it inserts an OnUpgrade extension in the returned response, with its inner oneshot channel already sent to with a Upgraded built from the SendStream<_> that was returned by self.h2_tx.send_request and the RecvStream returned from the ResponseFuture:

Ok(res) => {
// record that we got the response headers
ping.record_non_data();
let content_length = decode_content_length(res.headers());
let res = res.map(|stream| {
let ping = ping.for_stream(&stream);
crate::Body::h2(stream, content_length, ping)
});
Ok(res)
}

The first issue I'm experiencing is that the SendStream<_> is a SendStream<SendBuf<B as Body>::Data>> and we need this to be the AsyncWrite half of the Upgraded IO object and I have no damn clue how we can possibly do that.

@nox
Copy link
Contributor Author

nox commented Apr 22, 2021

I guess I can change the struct SendBuf to an enum like this:

enum SendBuf<B> {
    Buf(B),
    Cursor(Cursor<Box<[u8]>>),
    None,
}

where SendBuf::Cursor is only ever used in the impl of AsyncWrite for the h2 upgraded stream.

@nox
Copy link
Contributor Author

nox commented Apr 22, 2021

To implement AsyncRead over RecvStream, I need hyperium/h2#532.

@nox
Copy link
Contributor Author

nox commented Apr 22, 2021

I'm also not exactly sure what poll_flush and poll_shutdown should do.

@nox
Copy link
Contributor Author

nox commented Apr 22, 2021

AFAIK poll_shutdown can just send an empty data frame with the EOS flag set, but we definitely need something in h2 to properly implement poll_flush.

@nox
Copy link
Contributor Author

nox commented Apr 26, 2021

So, I implemented poll_flush as a noop and tried adding upgrades to the server-side of Hyper, and I encountered a quite pesky blocker.

To implement Io over an H2 stream, I wrote this:

struct H2Upgraded<B>
where
    B: Buf,
{
    ping: Recorder,
    send_stream: SendStream<SendBuf<B>>,
    recv_stream: RecvStream,
    buf: Bytes,
}

impl<B> AsyncRead for H2Upgraded<B>
where
    B: Buf,
{
    fn poll_read(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        read_buf: &mut ReadBuf<'_>,
    ) -> Poll<Result<(), io::Error>> {
        let mut polled_stream = false;
        Poll::Ready(loop {
            if !self.buf.is_empty() {
                let cnt = std::cmp::min(self.buf.len(), read_buf.remaining());
                read_buf.put_slice(&self.buf[..cnt]);
                self.buf.advance(cnt);
                let _ = self.recv_stream.flow_control().release_capacity(cnt);
            }
            if polled_stream || read_buf.remaining() == 0 {
                break Ok(());
            }
            debug_assert!(!self.buf.is_empty());
            self.buf = match ready!(self.recv_stream.poll_data(cx)) {
                None => break Ok(()),
                Some(Ok(buf)) => {
                    self.ping.record_data(buf.len());
                    buf
                }
                Some(Err(e)) => {
                    return Poll::Ready(Err(h2_to_io_error(e)));
                }
            };
            polled_stream = true;
        })
    }
}

impl<B> AsyncWrite for H2Upgraded<B>
where
    B: Buf,
{
    fn poll_write(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &[u8],
    ) -> Poll<Result<usize, io::Error>> {
        if buf.is_empty() {
            return Poll::Ready(Ok(0));
        }
        self.send_stream.reserve_capacity(buf.len());
        Poll::Ready(match ready!(self.send_stream.poll_capacity(cx)) {
            None => Ok(0),
            Some(Ok(cnt)) => self.write(&buf[..cnt], false).map(|()| cnt),
            Some(Err(e)) => Err(h2_to_io_error(e)),
        })
    }

    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
        Poll::Ready(Ok(()))
    }

    fn poll_shutdown(
        mut self: Pin<&mut Self>,
        _cx: &mut Context<'_>,
    ) -> Poll<Result<(), io::Error>> {
        Poll::Ready(self.write(&[], true))
    }
}

So far so good, right?

Then in the server code, I pass an H2Upgraded to Upgraded::new and this is where I am stuck.

Upgraded wants a Send I/O object, but all we get from the h2 crate is a SendStream<SendBuf<<B as Body>::Data>, and the system has no Send bound on B::Data. I'll try to propagate this bound everywhere but AFAIK that's a breaking change.

@nox
Copy link
Contributor Author

nox commented Apr 26, 2021

Ah right, it also wants B::Data to be 'static.

nox added a commit to nox/hyper that referenced this issue Apr 28, 2021
nox added a commit to nox/hyper that referenced this issue Apr 28, 2021
nox added a commit to nox/hyper that referenced this issue Apr 28, 2021
nox added a commit to nox/hyper that referenced this issue Apr 28, 2021
nox added a commit to nox/hyper that referenced this issue Apr 28, 2021
nox added a commit to nox/hyper that referenced this issue Apr 28, 2021
nox added a commit to nox/hyper that referenced this issue Apr 28, 2021
nox added a commit to nox/hyper that referenced this issue Apr 28, 2021
nox added a commit to nox/hyper that referenced this issue Apr 28, 2021
nox added a commit to nox/hyper that referenced this issue Apr 28, 2021
nox added a commit to nox/hyper that referenced this issue Apr 28, 2021
nox added a commit to nox/hyper that referenced this issue Apr 29, 2021
nox added a commit to nox/hyper that referenced this issue Apr 29, 2021
nox added a commit to nox/hyper that referenced this issue Apr 29, 2021
nox added a commit to nox/hyper that referenced this issue Apr 29, 2021
nox added a commit to nox/hyper that referenced this issue Apr 29, 2021
nox added a commit to nox/hyper that referenced this issue Apr 29, 2021
nox added a commit to nox/hyper that referenced this issue Apr 29, 2021
nox added a commit to nox/hyper that referenced this issue Apr 29, 2021
nox added a commit to nox/hyper that referenced this issue Apr 30, 2021
nox added a commit to nox/hyper that referenced this issue Apr 30, 2021
@jvimal
Copy link

jvimal commented May 4, 2021

@nox Thanks for implementing this -- is it complete so I can give it a try?

I kind of worked around this by issuing a lower-case "connect" request and passing around a streaming body for the "connect" request. The streaming body is basically one end of the pipe (I used tokio's DuplexStream, but I just need a HalfDuplexStream). The other end of this pipe, and the response's streaming body become a custom H2Upgraded struct that satisfies the trait bound AsyncRead+AsyncWrite+Unpin+'static, so I can use it anywhere to spawn servers or whatever.

@nox
Copy link
Contributor Author

nox commented May 5, 2021

This PR only includes support for CONNECT over h2 on the server side, not the client, but yeah it works!

nox added a commit to nox/hyper that referenced this issue May 6, 2021
nox added a commit to nox/hyper that referenced this issue May 6, 2021
@davidpdrsn davidpdrsn added A-client Area: client. A-http2 Area: HTTP/2 specific. A-server Area: server. C-feature Category: feature. This is adding a new feature. labels May 7, 2021
nox added a commit to nox/hyper that referenced this issue May 7, 2021
nox added a commit to nox/hyper that referenced this issue May 17, 2021
nox added a commit to nox/hyper that referenced this issue May 19, 2021
nox added a commit to nox/hyper that referenced this issue May 19, 2021
nox added a commit to nox/hyper that referenced this issue May 20, 2021
nox added a commit to nox/hyper that referenced this issue May 20, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-client Area: client. A-http2 Area: HTTP/2 specific. A-server Area: server. C-feature Category: feature. This is adding a new feature.
Projects
None yet
Development

No branches or pull requests

3 participants