Skip to content

Commit

Permalink
Handle ctrl+c to gracefully shutdown the server(s) (#1613)
Browse files Browse the repository at this point in the history
* Handle ctrl+c to gracefully shutdown the server(s)

* formatting

* move setup_ctrl_c_handler to re_web_viewer_server

* comment fix, use channel for server shutdown in `RemoteViewerServer`

* use tokio in re_web_viewer_server/server in order to use same graceful shutdown mechanism
Had to move setup_ctrl_c_handler back to `rerun` because of this

* don't create tokio runtime if there is already one

* move responsibility for alive tokio runtime outside of web_viewer. Don't create a new runtime if there is one already. Gracefully wait on ctrl+c instead of sleeping for `RerunArgs` entrypoint

* Fix python `serve` lacking a tokio runtime

* fix doc test

* Fix warnings

* improve docstring

* don't unwrap, especially in drop

---------

Co-authored-by: Emil Ernerfeldt <emil.ernerfeldt@gmail.com>
  • Loading branch information
Wumpf and emilk committed Mar 21, 2023
1 parent b5a96ca commit d666c94
Show file tree
Hide file tree
Showing 15 changed files with 219 additions and 126 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ arrow2 = "0.16"
arrow2_convert = "0.4.2"
clap = "4.0"
comfy-table = { version = "6.1", default-features = false }
ctrlc = { version = "3.0", features = ["termination"] }
ecolor = "0.21.0"
eframe = { version = "0.21.3", default-features = false }
egui = "0.21.0"
Expand Down
1 change: 1 addition & 0 deletions crates/re_sdk_comms/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,4 @@ bincode = "1.3"
crossbeam = "0.8"
document-features = "0.2"
rand = { version = "0.8.5", features = ["small_rng"] }
tokio.workspace = true
123 changes: 67 additions & 56 deletions crates/re_sdk_comms/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use rand::{Rng as _, SeedableRng};

use re_log_types::{LogMsg, TimePoint, TimeType, TimelineName};
use re_smart_channel::{Receiver, Sender};
use tokio::net::{TcpListener, TcpStream};

#[derive(Clone, Copy, Debug, PartialEq)]
pub struct ServerOptions {
Expand All @@ -27,37 +28,18 @@ impl Default for ServerOptions {
}
}

/// Listen to multiple SDK:s connecting to us over TCP.
///
/// ``` no_run
/// # use re_sdk_comms::{serve, ServerOptions};
/// let log_msg_rx = serve(80, ServerOptions::default())?;
/// # Ok::<(), anyhow::Error>(())
/// ```
pub fn serve(port: u16, options: ServerOptions) -> anyhow::Result<Receiver<LogMsg>> {
async fn listen_for_new_clients(
port: u16,
options: ServerOptions,
tx: Sender<LogMsg>,
mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
) {
let bind_addr = format!("0.0.0.0:{port}");

let listener = std::net::TcpListener::bind(&bind_addr)
.with_context(|| format!("Failed to bind TCP address {bind_addr:?} for our WS server."))?;

let (tx, rx) = re_smart_channel::smart_channel(re_smart_channel::Source::TcpServer { port });

std::thread::Builder::new()
.name("sdk-server".into())
.spawn(move || {
for stream in listener.incoming() {
match stream {
Ok(stream) => {
let tx = tx.clone();
spawn_client(stream, tx, options);
}
Err(err) => {
re_log::warn!("Failed to accept incoming SDK client: {err}");
}
}
}
})
.expect("Failed to spawn thread");
let listener = TcpListener::bind(&bind_addr)
.await
.with_context(|| format!("Failed to bind TCP address {bind_addr:?}"))
.unwrap();

if options.quiet {
re_log::debug!(
Expand All @@ -69,43 +51,72 @@ pub fn serve(port: u16, options: ServerOptions) -> anyhow::Result<Receiver<LogMs
);
}

Ok(rx)
loop {
let incoming = tokio::select! {
res = listener.accept() => res,
_ = shutdown_rx.recv() => {
return;
}
};
match incoming {
Ok((stream, _)) => {
let tx = tx.clone();
spawn_client(stream, tx, options);
}
Err(err) => {
re_log::warn!("Failed to accept incoming SDK client: {err}");
}
}
}
}

fn spawn_client(stream: std::net::TcpStream, tx: Sender<LogMsg>, options: ServerOptions) {
std::thread::Builder::new()
.name(format!(
"sdk-server-client-handler-{:?}",
stream.peer_addr()
))
.spawn(move || {
let addr_string = stream
.peer_addr()
.map_or_else(|_| "(unknown ip)".to_owned(), |addr| addr.to_string());
if options.quiet {
re_log::debug!("New SDK client connected: {addr_string}");
} else {
re_log::info!("New SDK client connected: {addr_string}");
}
/// Listen to multiple SDK:s connecting to us over TCP.
///
/// ``` no_run
/// # use re_sdk_comms::{serve, ServerOptions};
/// let (sender, receiver) = tokio::sync::broadcast::channel(1);
/// let log_msg_rx = serve(80, ServerOptions::default(), receiver)?;
/// # Ok::<(), anyhow::Error>(())
/// ```
pub fn serve(
port: u16,
options: ServerOptions,
shutdown_rx: tokio::sync::broadcast::Receiver<()>,
) -> anyhow::Result<Receiver<LogMsg>> {
let (tx, rx) = re_smart_channel::smart_channel(re_smart_channel::Source::TcpServer { port });

if let Err(err) = run_client(stream, &tx, options) {
re_log::warn!("Closing connection to client: {err}");
}
})
.expect("Failed to spawn thread");
tokio::spawn(listen_for_new_clients(port, options, tx, shutdown_rx));

Ok(rx)
}

fn spawn_client(stream: TcpStream, tx: Sender<LogMsg>, options: ServerOptions) {
tokio::spawn(async move {
let addr_string = stream
.peer_addr()
.map_or_else(|_| "(unknown ip)".to_owned(), |addr| addr.to_string());
if options.quiet {
re_log::debug!("New SDK client connected: {addr_string}");
} else {
re_log::info!("New SDK client connected: {addr_string}");
}
if let Err(err) = run_client(stream, &tx, options).await {
re_log::warn!("Closing connection to client: {err}");
}
});
}

fn run_client(
mut stream: std::net::TcpStream,
async fn run_client(
mut stream: TcpStream,
tx: &Sender<LogMsg>,
options: ServerOptions,
) -> anyhow::Result<()> {
#![allow(clippy::read_zero_byte_vec)] // false positive: https://github.com/rust-lang/rust-clippy/issues/9274

use std::io::Read as _;
use tokio::io::AsyncReadExt as _;

let mut client_version = [0_u8; 2];
stream.read_exact(&mut client_version)?;
stream.read_exact(&mut client_version).await?;
let client_version = u16::from_le_bytes(client_version);

match client_version.cmp(&crate::PROTOCOL_VERSION) {
Expand All @@ -132,11 +143,11 @@ fn run_client(

loop {
let mut packet_size = [0_u8; 4];
stream.read_exact(&mut packet_size)?;
stream.read_exact(&mut packet_size).await?;
let packet_size = u32::from_le_bytes(packet_size);

packet.resize(packet_size as usize, 0_u8);
stream.read_exact(&mut packet)?;
stream.read_exact(&mut packet).await?;

re_log::trace!("Received log message of size {packet_size}.");

Expand Down
2 changes: 1 addition & 1 deletion crates/re_viewer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ wgpu.workspace = true
arboard = { version = "3.2", default-features = false, features = [
"image-data",
] }
ctrlc = { version = "3.0", features = ["termination"] }
ctrlc.workspace = true
puffin_http = "0.11"
puffin.workspace = true

Expand Down
1 change: 1 addition & 0 deletions crates/re_web_viewer_server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ analytics = ["dep:re_analytics"]
re_log.workspace = true

anyhow.workspace = true
ctrlc.workspace = true
document-features = "0.2"
futures-util = "0.3"
hyper = { version = "0.14", features = ["full"] }
Expand Down
11 changes: 9 additions & 2 deletions crates/re_web_viewer_server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,15 @@ impl WebViewerServer {
Self { server }
}

pub async fn serve(self) -> anyhow::Result<()> {
self.server.await?;
pub async fn serve(
self,
mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
) -> anyhow::Result<()> {
self.server
.with_graceful_shutdown(async {
shutdown_rx.recv().await.ok();
})
.await?;
Ok(())
}
}
11 changes: 10 additions & 1 deletion crates/re_web_viewer_server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,17 @@ async fn main() {
re_log::setup_native_logging();
let port = 9090;
eprintln!("Hosting web-viewer on http://127.0.0.1:{port}");

// Shutdown server via Ctrl+C
let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel(1);
ctrlc::set_handler(move || {
re_log::debug!("Ctrl-C detected - Closing web server.");
shutdown_tx.send(()).unwrap();
})
.expect("Error setting Ctrl-C handler");

re_web_viewer_server::WebViewerServer::new(port)
.serve()
.serve(shutdown_rx)
.await
.unwrap();
}
19 changes: 14 additions & 5 deletions crates/re_ws_comms/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,26 @@ impl Server {
Ok(Self { listener })
}

/// Accept new connections forever
pub async fn listen(self, rx: Receiver<LogMsg>) -> anyhow::Result<()> {
/// Accept new connections until we get a message on `shutdown_rx`
pub async fn listen(
self,
rx: Receiver<LogMsg>,
mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
) -> anyhow::Result<()> {
use anyhow::Context as _;

let history = Arc::new(Mutex::new(Vec::new()));

let log_stream = to_broadcast_stream(rx, history.clone());

while let Ok((tcp_stream, _)) = self.listener.accept().await {
loop {
let (tcp_stream, _) = tokio::select! {
res = self.listener.accept() => res?,
_ = shutdown_rx.recv() => {
return Ok(());
}
};

let peer = tcp_stream
.peer_addr()
.context("connected streams should have a peer address")?;
Expand All @@ -59,8 +70,6 @@ impl Server {
history.clone(),
));
}

Ok(())
}
}

Expand Down
1 change: 1 addition & 0 deletions crates/rerun/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ backtrace = "0.3"
clap = { workspace = true, features = ["derive"] }
mimalloc.workspace = true
puffin_http = "0.11"
ctrlc.workspace = true
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }

# Native unix dependencies:
Expand Down
21 changes: 19 additions & 2 deletions crates/rerun/src/clap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,18 @@ impl RerunArgs {
default_enabled: bool,
run: impl FnOnce(Session) + Send + 'static,
) -> anyhow::Result<()> {
// Ensure we have a running tokio runtime.
#[allow(unused_assignments)]
let mut tokio_runtime = None;
let tokio_runtime_handle = if let Ok(handle) = tokio::runtime::Handle::try_current() {
handle
} else {
tokio_runtime =
Some(tokio::runtime::Runtime::new().expect("Failed to create tokio runtime"));
tokio_runtime.as_ref().unwrap().handle().clone()
};
let _tokio_runtime_guard = tokio_runtime_handle.enter();

let (rerun_enabled, recording_info) = crate::SessionBuilder::new(application_id)
.default_enabled(default_enabled)
.finalize();
Expand Down Expand Up @@ -102,12 +114,17 @@ impl RerunArgs {
};

let session = Session::new(recording_info, sink);
let _sink = session.sink().clone(); // Keep sink (and potential associated servers) alive until the end of this function scope.
run(session);

#[cfg(feature = "web_viewer")]
if matches!(self.to_behavior(), Ok(RerunBehavior::Serve)) {
eprintln!("Sleeping while serving the web viewer. Abort with Ctrl-C");
std::thread::sleep(std::time::Duration::from_secs(1_000_000_000));
use anyhow::Context as _;

let mut shutdown_rx = crate::run::setup_ctrl_c_handler();
return tokio_runtime_handle
.block_on(async { shutdown_rx.recv().await })
.context("Failed to wait for shutdown signal.");
}

Ok(())
Expand Down

0 comments on commit d666c94

Please sign in to comment.