Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle ctrl+c to gracefully shutdown the server(s) #1613

Merged
merged 13 commits into from
Mar 21, 2023
Merged
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ arrow2 = "0.16"
arrow2_convert = "0.4.2"
clap = "4.0"
comfy-table = { version = "6.1", default-features = false }
ctrlc = { version = "3.0", features = ["termination"] }
ecolor = "0.21.0"
eframe = { version = "0.21.3", default-features = false }
egui = "0.21.0"
Expand Down
2 changes: 1 addition & 1 deletion crates/re_viewer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ wgpu.workspace = true
arboard = { version = "3.2", default-features = false, features = [
"image-data",
] }
ctrlc = { version = "3.0", features = ["termination"] }
ctrlc.workspace = true
puffin_http = "0.11"
puffin.workspace = true

Expand Down
1 change: 1 addition & 0 deletions crates/re_web_viewer_server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ analytics = ["dep:re_analytics"]
re_log.workspace = true

anyhow.workspace = true
ctrlc.workspace = true
document-features = "0.2"
futures-util = "0.3"
hyper = { version = "0.14", features = ["full"] }
Expand Down
11 changes: 9 additions & 2 deletions crates/re_web_viewer_server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,15 @@ impl WebViewerServer {
Self { server }
}

pub async fn serve(self) -> anyhow::Result<()> {
self.server.await?;
pub async fn serve(
self,
mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
) -> anyhow::Result<()> {
self.server
.with_graceful_shutdown(async {
shutdown_rx.recv().await.ok();
})
.await?;
Ok(())
}
}
11 changes: 10 additions & 1 deletion crates/re_web_viewer_server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,17 @@ async fn main() {
re_log::setup_native_logging();
let port = 9090;
eprintln!("Hosting web-viewer on http://127.0.0.1:{port}");

// Shutdown server via Ctrl+C
let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel(1);
ctrlc::set_handler(move || {
re_log::debug!("Ctrl-C detected - Closing server.");
Wumpf marked this conversation as resolved.
Show resolved Hide resolved
shutdown_tx.send(()).unwrap();
})
.expect("Error setting Ctrl-C handler");

re_web_viewer_server::WebViewerServer::new(port)
.serve()
.serve(shutdown_rx)
.await
.unwrap();
}
17 changes: 13 additions & 4 deletions crates/re_ws_comms/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,25 @@ impl Server {
}

/// Accept new connections forever
emilk marked this conversation as resolved.
Show resolved Hide resolved
pub async fn listen(self, rx: Receiver<LogMsg>) -> anyhow::Result<()> {
pub async fn listen(
self,
rx: Receiver<LogMsg>,
mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
) -> anyhow::Result<()> {
use anyhow::Context as _;

let history = Arc::new(Mutex::new(Vec::new()));

let log_stream = to_broadcast_stream(rx, history.clone());

while let Ok((tcp_stream, _)) = self.listener.accept().await {
loop {
let (tcp_stream, _) = tokio::select! {
res = self.listener.accept() => res?,
_ = shutdown_rx.recv() => {
return Ok(());
}
};

let peer = tcp_stream
.peer_addr()
.context("connected streams should have a peer address")?;
Expand All @@ -59,8 +70,6 @@ impl Server {
history.clone(),
));
}

Ok(())
}
}

Expand Down
1 change: 1 addition & 0 deletions crates/rerun/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ backtrace = "0.3"
clap = { workspace = true, features = ["derive"] }
mimalloc.workspace = true
puffin_http = "0.11"
ctrlc.workspace = true
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }

# Native unix dependencies:
Expand Down
51 changes: 30 additions & 21 deletions crates/rerun/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,9 @@ async fn run_impl(
// We are connecting to a server at a websocket address:

if args.web_viewer {
return host_web_viewer(rerun_server_ws_url).await;
let shutdown_rx = setup_ctrl_c_handler();
let web_viewer = host_web_viewer(true, rerun_server_ws_url, shutdown_rx);
return web_viewer.await;
} else {
#[cfg(feature = "native_viewer")]
return native_viewer_connect_to_ws_url(
Expand Down Expand Up @@ -311,13 +313,21 @@ async fn run_impl(
);
}

// Make it possible to gracefully shutdown the servers on ctrl-c.
let shutdown_ws_server = setup_ctrl_c_handler();
let shutdown_web_viewer = shutdown_ws_server.resubscribe();

// This is the server which the web viewer will talk to:
let ws_server = re_ws_comms::Server::new(re_ws_comms::DEFAULT_WS_SERVER_PORT).await?;
let server_handle = tokio::spawn(ws_server.listen(rx));
let server_handle = tokio::spawn(ws_server.listen(rx, shutdown_ws_server));

let rerun_ws_server_url = re_ws_comms::default_server_url();
host_web_viewer(rerun_ws_server_url).await?;
// This is the server that serves the Wasm+HTML:
let ws_server_url = re_ws_comms::default_server_url();
let ws_server_handle =
tokio::spawn(host_web_viewer(true, ws_server_url, shutdown_web_viewer));

// Wait for both servers to shutdown.
ws_server_handle.await?.ok();
return server_handle.await?;
}

Expand Down Expand Up @@ -355,6 +365,16 @@ async fn run_impl(
}
}

fn setup_ctrl_c_handler() -> tokio::sync::broadcast::Receiver<()> {
Wumpf marked this conversation as resolved.
Show resolved Hide resolved
let (sender, receiver) = tokio::sync::broadcast::channel(1);
ctrlc::set_handler(move || {
re_log::debug!("Ctrl-C detected - Closing server.");
Wumpf marked this conversation as resolved.
Show resolved Hide resolved
sender.send(()).unwrap();
})
.expect("Error setting Ctrl-C handler");
receiver
}

enum ArgumentCategory {
/// A remote RRD file, served over http.
RrdHttpUrl(String),
Expand Down Expand Up @@ -440,25 +460,14 @@ fn load_file_to_channel(path: &std::path::Path) -> anyhow::Result<Receiver<LogMs
}

#[cfg(feature = "web_viewer")]
async fn host_web_viewer(rerun_ws_server_url: String) -> anyhow::Result<()> {
let web_port = 9090;
let viewer_url = format!("http://127.0.0.1:{web_port}?url={rerun_ws_server_url}");

let web_server = re_web_viewer_server::WebViewerServer::new(web_port);
let web_server_handle = tokio::spawn(web_server.serve());

let open = true;
if open {
webbrowser::open(&viewer_url).ok();
} else {
println!("Hosting Rerun Web Viewer at {viewer_url}.");
}

web_server_handle.await?
}
use crate::web_viewer::host_web_viewer;

#[cfg(not(feature = "web_viewer"))]
async fn host_web_viewer(_rerun_ws_server_url: String) -> anyhow::Result<()> {
pub async fn host_web_viewer(
_open_browser: bool,
_ws_server_url: String,
_shutdown_rx: tokio::sync::broadcast::Receiver<()>,
) -> anyhow::Result<()> {
panic!("Can't host web-viewer - rerun was not compiled with the 'web_viewer' feature");
}

Expand Down
48 changes: 34 additions & 14 deletions crates/rerun/src/web_viewer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use re_log_types::LogMsg;
struct RemoteViewerServer {
web_server_join_handle: tokio::task::JoinHandle<()>,
sender: re_smart_channel::Sender<LogMsg>,
#[allow(dead_code)] // Unused currently, but can be used later to cancel a serve.
Wumpf marked this conversation as resolved.
Show resolved Hide resolved
shutdown_tx: tokio::sync::broadcast::Sender<()>,
Wumpf marked this conversation as resolved.
Show resolved Hide resolved
}

impl Drop for RemoteViewerServer {
Expand All @@ -18,40 +20,58 @@ impl Drop for RemoteViewerServer {
impl RemoteViewerServer {
pub fn new(tokio_rt: &tokio::runtime::Runtime, open_browser: bool) -> Self {
let (rerun_tx, rerun_rx) = re_smart_channel::smart_channel(re_smart_channel::Source::Sdk);
let (shutdown_tx, shutdown_rx_ws_server) = tokio::sync::broadcast::channel(1);
let shutdown_rx_web_server = shutdown_tx.subscribe();

let web_server_join_handle = tokio_rt.spawn(async move {
// This is the server which the web viewer will talk to:
let ws_server = re_ws_comms::Server::new(re_ws_comms::DEFAULT_WS_SERVER_PORT)
.await
.unwrap();
let ws_server_handle = tokio::spawn(ws_server.listen(rerun_rx)); // TODO(emilk): use tokio_rt ?
let ws_server_handle = tokio::spawn(ws_server.listen(rerun_rx, shutdown_rx_ws_server)); // TODO(emilk): use tokio_rt ?
Copy link
Member

@emilk emilk Mar 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need this channel for the ws server? Doesn't calling self.web_server_join_handle.abort() (as we do in drop) already work?

That's one of the advantages of async tasks in general: they are easy to cancel (just drop their handles).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm. I could try ripping out the channel again from both servers and go with abort. Wonder if that is the same really as the shutdown I'm doing now.
I'd suggest not going there and unify as suggested above by using shutdown_txon Drop

Copy link
Member Author

@Wumpf Wumpf Mar 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dropping a handle does not cancel a task though, one needs to call abort on it explicitly? (only found this old thread where that decision was made, maybe has changed since then tokio-rs/tokio#1830 (comment))
Also do all tasks implement proper abort? The channel setup here follows tokio's recommendation on how to deal with ctrl+c

Copy link
Member

@emilk emilk Mar 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought abort would work on any async task, since it simply stops the future from being polled. I don't understand why tokio would recommend something else.

Where is this recommendation?

Copy link
Member Author

@Wumpf Wumpf Mar 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://tokio.rs/tokio/topics/shutdown

let next_frame = tokio::select! {
    res = self.connection.read_frame() => res?,
    _ = self.shutdown.recv() => {
        // If a shutdown signal is received, return from `run`.
        // This will result in the task terminating.
        return Ok(());
    }
};

Maybe this is a bit too much out of context we're dealing with more specific tasks after all. Furthermore, most of this does boil down to an abort I reckon. The reason I ended up here was not exhaustive research and consideration of what we have, but rather seeing that example and going with it, discovering stuff in the process.
That said, I'm actually quite happy with this solution here since it gives us a single super easy entry point for signaling shut down which also makes it easy to work with hyper's with_graceful_shutdown


// This is the server that serves the Wasm+HTML:
let web_port = 9090;
let web_server = re_web_viewer_server::WebViewerServer::new(web_port);
let web_server_handle = tokio::spawn(async move {
web_server.serve().await.unwrap();
});

let ws_server_url = re_ws_comms::default_server_url();
let viewer_url = format!("http://127.0.0.1:{web_port}?url={ws_server_url}");
if open_browser {
webbrowser::open(&viewer_url).ok();
} else {
re_log::info!("Web server is running - view it at {viewer_url}");
}
let web_server_handle = tokio::spawn(host_web_viewer(
open_browser,
ws_server_url,
shutdown_rx_web_server,
));

ws_server_handle.await.unwrap().unwrap();
web_server_handle.await.unwrap();
web_server_handle.await.unwrap().unwrap();
});

Self {
web_server_join_handle,
sender: rerun_tx,
shutdown_tx,
}
}
}

/// Start a web server for localhost and optionally spawns a browser to view it.
Wumpf marked this conversation as resolved.
Show resolved Hide resolved
#[cfg(feature = "web_viewer")]
pub async fn host_web_viewer(
open_browser: bool,
ws_server_url: String,
shutdown_rx: tokio::sync::broadcast::Receiver<()>,
) -> anyhow::Result<()> {
let web_port = 9090;
let viewer_url = format!("http://127.0.0.1:{web_port}?url={ws_server_url}");

let web_server = re_web_viewer_server::WebViewerServer::new(web_port);
let web_server_handle = tokio::spawn(web_server.serve(shutdown_rx));

if open_browser {
webbrowser::open(&viewer_url).ok();
} else {
re_log::info!("Web server is running - view it at {viewer_url}");
}

web_server_handle.await?
}

impl crate::sink::LogSink for RemoteViewerServer {
fn send(&self, msg: LogMsg) {
if let Err(err) = self.sender.send(msg) {
Expand Down
10 changes: 6 additions & 4 deletions rerun_py/rerun_sdk/rerun/script_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
```

"""
import contextlib
from argparse import ArgumentParser, Namespace
from time import sleep

import rerun as rr

Expand Down Expand Up @@ -93,6 +91,10 @@ def script_teardown(args: Namespace) -> None:

"""
if args.serve:
import signal
from threading import Event

exit = Event()
signal.signal(signal.SIGINT, lambda sig, frame: exit.set())
print("Sleeping while serving the web viewer. Abort with Ctrl-C")
with contextlib.suppress(Exception):
sleep(1_000_000_000)
exit.wait()
Wumpf marked this conversation as resolved.
Show resolved Hide resolved