Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Waiting for HUP on TCP socket #3467

Open
benesch opened this issue Jan 24, 2021 · 13 comments
Open

Waiting for HUP on TCP socket #3467

benesch opened this issue Jan 24, 2021 · 13 comments
Labels
A-tokio Area: The main tokio crate C-feature-request Category: A feature request. M-net Module: tokio/net

Comments

@benesch
Copy link
Contributor

benesch commented Jan 24, 2021

This is a resubmission of #483 for the Tokio v1.0 era. The question is thus: "how do I wait for a TCP socket to close without reading from or writing to that socket?"

First, some quick history! #483 was resolved with this suggestion:

You can ask to get notified on HUP by calling poll_read_ready and passing Ready::hup() as the interest.

But poll_read_ready was later removed, so 6d8cc4e was filed asking about how to wait for HUP without poll_read_ready. The proposed solution was to restore poll_read_ready. That method is now restored (#2968), but it's no longer clear to me how one uses it to wait for just HUP. In Tokio v1.0, there is no Ready::hup().

Here's the comment I posted on #2968, which contains my current hacky attempt to wait for HUP, and my ideal API:

I managed to get something working here, but it's suboptimal: MaterializeInc/materialize#5235

The tl;dr is what you have to write currently is something like this, which polls Interest::READABLE and then inspects whether the resulting Ready indicates is_read_closed:

fn write_stream(conn: TcpStream, stream: &dyn Stream<Item = Vec<u8>>) -> Result<(), &'static str> {
    loop {
        match time::timeout(Duration::from_secs(1), stream.next()).await {
            Ok(None) => return Ok(()),
            Ok(Some(data)) => conn.write_all(data).await,
            Err(_) => {
                let ready = conn.ready(Interest::READABLE).await?;
                if ready.is_read_closed() {
                    return Err("conn broke");
                }
            }
        }
    }
}

But ideally you'd be able to block on the READ_CLOSED event directly, or something:

fn write_stream(conn: TcpStream, stream: &dyn Stream<Item = Vec<u8>>) -> Result<(), &'static str> {
    loop {
        select! {
            msg = stream.next() => match msg {
                None => return Ok(()),
                Some(data) => conn.write_all(data).await,
            }
            _ = conn.ready(Interest::READ_CLOSED) => return Err("conn broke"),
        }
    }
}
@benesch benesch added A-tokio Area: The main tokio crate C-feature-request Category: A feature request. labels Jan 24, 2021
@Darksonn Darksonn added the M-net Module: tokio/net label Jan 24, 2021
@Darksonn
Copy link
Contributor

The Tokio type internally uses mio::Interest, and it doesn't seem to have a READ_CLOSED option either, so this would have to be added to mio first.

@Thomasdezeeuw Does the various OSes even provide such an interest?

@vi
Copy link
Contributor

vi commented Mar 14, 2021

This related discussion shows that it seems to be impossible to build a HUP-propagating TCP forwarder with just Tokio (but possible lower-level and less portable nix approach instead).

@Thomasdezeeuw
Copy link
Contributor

The Tokio type internally uses mio::Interest, and it doesn't seem to have a READ_CLOSED option either, so this would have to be added to mio first.

@Thomasdezeeuw Does the various OSes even provide such an interest?

Sorry I never received this notification, or I missed the email.

As far as Interest goes error, read-closed and write-closed are always tracked. If the OS supports it of course, see Event::is_read_closed and related methods.

I've also answered here tokio-rs/mio#1476, but Tokio should be getting Event::is_read_closed/Event::is_write_closed events. I don't know how/if Tokio exposes this somehow.

@Thomasdezeeuw
Copy link
Contributor

@benesch looking at your example I have a question; do you read from the connection? Because if so the easiest way to detect a broken/closed connection is by reading and getting zero bytes. This indicates the connection is closed, or the other side called shutdown (which has the same intention).

@benesch
Copy link
Contributor Author

benesch commented Mar 14, 2021

@benesch looking at your example I have a question; do you read from the connection? Because if so the easiest way to detect a broken/closed connection is by reading and getting zero bytes. This indicates the connection is closed, or the other side called shutdown (which has the same intention).

Indeed, it'd be swell if we could read from the connection to determine if the client had gone away. But the protocol we're implementing (the PostgreSQL COPY TO protocol) doesn't really permit that. The server streams data to the client until it runs out of data; only at that point does the server read from the client to figure out what to do next. But the client is welcome to tell the server what it wants the server to do next before the COPY command completes. So if we read from the connection to see if the client has gone away while streaming COPY results to the client, we might consume bytes that we didn't mean to. In theory we could stash those bytes in a buffer somewhere, but it gets really hairy with Tokio codecs.

@benesch
Copy link
Contributor Author

benesch commented Mar 14, 2021

I'm not sure how much sense that last comment made. Let me try again with code. You might hope to be able to write:

fn write_stream(conn: TcpStream, stream: &dyn Stream<Item = Vec<u8>>) -> Result<(), &'static str> {
    loop {
        select! {
            msg = stream.next() => match msg {
                None => return Ok(()),
                Some(data) => conn.write_all(data).await,
            }
            res = conn.read(&mut [0; 1]) => match res {
                Ok(1) => {
                    // Gotta stick this byte somewhere... plus if we saw one
                    // byte we're probably going to see a lot more, but that
                    // doesn't mean we can stop sending the data from `stream`
                    // to the client.
                }
                Ok(0) => {
                    // Client went away. Give up.
                    return Ok(())
                }
                Err(e) => return Err("conn broke"),
            }
        }
    }
}

The problem, as described in the comment within, is what happens when you read and get actual bytes back. If you get a short read you're golden because you know the client has gone away, and you can give up. But if you see bytes... well, you gotta buffer those bytes somewhere. And ideally you'd leave them in the kernel buffer for later, because that way eventually you'll apply some backpressure on the client that's writing a lot of bytes that you don't want to see yet.

@vi
Copy link
Contributor

vi commented Mar 14, 2021

the easiest way to detect a broken/closed connection is by reading and getting zero bytes

  1. Clients may shutdown(2) request-sending part of the socket and still wait for response to come. So that receiving the proof that there will be no more requests from that socket is not a proof that it is useless to prepare a response for that client;
  2. We may want to avoid recv(2)ing anything from that client now to provide backpressure.

@vi
Copy link
Contributor

vi commented Mar 14, 2021

If you get a short read you're golden because you know the client has gone away, and you can give up.

@benesch, The socket may be half-closed: the client had finished writing to it and called shutdown(SHUT_WR), but is still waiting for your reply. You can write to that socket, but can't read from it anymore.

@benesch
Copy link
Contributor Author

benesch commented Mar 14, 2021

If you get a short read you're golden because you know the client has gone away, and you can give up.

@benesch, The socket may be half-closed: the client had finished writing to it and called shutdown(SHUT_WR), but is still waiting for your reply. You can write to that socket, but can't read from it anymore.

That's not a thing that our clients do! But agree that it would be useful to support for other folks that do use half closed sockets.

@Darksonn
Copy link
Contributor

Aha. Well if it is possible to detect it, it would be good to provide a way to do so.

@Millione
Copy link
Sponsor

Any progress here?

@Darksonn
Copy link
Contributor

We added something called error interest, which you can wait for. Not sure if it is helpful in this case.

@vi
Copy link
Contributor

vi commented May 13, 2024

Can that error interest be used to implement something like tokio::net::TcpStream::into_split, but that would additionally return a special error monitor handle that does not participate in byte IO, yet does receive resets?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio Area: The main tokio crate C-feature-request Category: A feature request. M-net Module: tokio/net
Projects
None yet
Development

No branches or pull requests

5 participants