Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the ability to send/recv to/from the OS using polling, WouldBlock sys-calls #966

Open
Ben-PH opened this issue Mar 1, 2023 · 2 comments

Comments

@Ben-PH
Copy link

Ben-PH commented Mar 1, 2023

instead of:

std::thread::spawn(move || {
   // wait until condition
   stopper.send(());
});

std::thread::spawn(move || {
    let conn = listener.accept()?;
    conn_tx.send(conn);
});


loop { select! {
    recv(stopper_rx) -> _ => {dbg!("system stopped, exiting"); return;},
    recv(conn_rx) -> conn => handle_connection(conn),
}}

This one, you must spin up a dedicated loop that can block on a listen, but how about:

std::thread::spawn(move || {
   // wait until condition
   stopper.send(());
});

let listener = TcpListener::bind("localhost:0").unwrap();
listener.set_nonblocking(true).unwrap();
loop { select! {
    recv(stopper_rx) -> _ => {dbg!("system stopped, exiting"); return;},
    os_recv(listener.accept()) -> conn => handle_connection(conn),
}}

Because of the selectors limitation to be only able to work on channels, we had to instead write a helper method like so. Here is an MVP:

use crossbeam::{channel::Receiver, utils::Backoff};
use std::{
    net::{SocketAddr, TcpListener, TcpStream},
    time::Duration,
};

fn main() {
    let mut listener = TcpListener::bind("localhost:8080").unwrap();
    listener.set_nonblocking(true).unwrap();
    let (stop_tx, mut stop_rx) = crossbeam::channel::bounded::<()>(2);
    let sleep_stopper = std::thread::spawn(move || {
        std::thread::sleep(Duration::from_secs(20));
        dbg!("awake");
        stop_tx.send(()).unwrap();
    });
    loop {
        if let Err(_) = dbg!(os_friendly_select(&mut stop_rx, &mut listener)) {
            break;
        }
    }
    sleep_stopper.join().unwrap();
}

fn os_friendly_select(
    stop_rx: &mut Receiver<()>,
    listener: &mut TcpListener,
) -> Result<(TcpStream, SocketAddr), ()> {
    loop {
        let backoff = Backoff::new();
        loop {
            if stop_rx.try_recv().is_ok() {
                dbg!("returning because stopped");
                return Err(());
            }

            let try_accept = listener.accept();
            match try_accept {
                Ok(good_conn) => {
                    return Ok(good_conn);
                }
                Err(e) => {
                    if e.kind() == std::io::ErrorKind::WouldBlock {
                        dbg!("zzz");
                        backoff.snooze();
                    } else {
                        panic!("try_accept got a bad error");
                    }
                }
            }
        }
    }
}

Part of the contract would be that anything that calls the OS would have to have the std::io::ErrorKind::WouldBlock in its spec (this is assuming linux)

I would be happy to add this functionality, as it would be great to have in our production code-base.

@taiki-e
Copy link
Member

taiki-e commented Apr 28, 2023

Why not use async? (It should already have the proper mechanisms to handle such cases.)

if e.kind() == std::io::ErrorKind::WouldBlock {
                        dbg!("zzz");
                        backoff.snooze();

It does not seem desirable to do an unbounded spin loop if OS returns WouldBlock.

@Ben-PH
Copy link
Author

Ben-PH commented Apr 28, 2023

That's what we have been using, but we have realised that async in our code-base was a mistake, and we are migrating away from it.

What we ended up doing, for now, is this:

pub struct BootstrapTcpListener {
    poll: Poll,
    events: Events,
    server: TcpListener,
    // HACK : create variable to move ownership of mio_server to the thread
    // if mio_server is not moved, poll does not receive any event from listener
    _mio_server: MioTcpListener,
}

pub enum PollEvent {
    NewConnection((TcpStream, SocketAddr)),
    Stop,
}

impl BSEventPoller for BootstrapTcpListener {
    fn poll(&mut self) -> Result<PollEvent, BootstrapError> {
        self.poll.poll(&mut self.events, None).unwrap();

        // Confirm that we are not being signalled to shut down
        if self.events.iter().any(|ev| ev.token() == STOP_LISTENER) {
            return Ok(PollEvent::Stop);
        }

        // Ther could be more than one connection ready, but we want to re-check for the stop
        // signal after processing each connection.
        return Ok(PollEvent::NewConnection(
            self.server.accept().map_err(BootstrapError::from)?,
        ));
    }
}

impl BootstrapListenerStopHandle {
    /// Stop the bootstrap listener.
    pub fn stop(self) -> Result<(), BootstrapError> {
        self.0.wake().map_err(BootstrapError::from)
    }
}

...and in the main event loop of the server, instead of using std::net::TcpListener::accept(...)?, we use our poll and match on the return enum.

Previously we would have a dedicated listener thread, and two crossbeam channels, one for a connection, one for a stop-signal broadcast:

loop {
    let conn = select! {
        recv(stopper_rx) -> _ => return Ok(()),
        recv(conn_rx) -> conn => conn,
    };
    // ...validate connection before dispatching to a system-thread to handle
}

What the change meant is that we no longer have a need for a dedicated listener thread, or a buffer of connections.

I see that my "MVP" is probably a bad label, and probably a distraction.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

No branches or pull requests

2 participants