Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(console): add support for Unix domain sockets #388

Merged
merged 4 commits into from
Mar 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion console-subscriber/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ env-filter = ["tracing-subscriber/env-filter"]

crossbeam-utils = "0.8.7"
tokio = { version = "^1.21", features = ["sync", "time", "macros", "tracing"] }
tokio-stream = "0.1"
tokio-stream = { version = "0.1", features = ["net"] }
thread_local = "1.1.3"
console-api = { version = "0.4.0", path = "../console-api", features = ["transport"] }
tonic = { version = "0.8", features = ["transport"] }
Expand Down
40 changes: 40 additions & 0 deletions console-subscriber/examples/uds.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
//! Demonstrates serving the console API over a [Unix domain socket] (UDS)
//! connection, rather than over TCP.
//!
//! Note that this example only works on Unix operating systems that
//! support UDS, such as Linux, BSDs, and macOS.
//!
//! [Unix domain socket]: https://en.wikipedia.org/wiki/Unix_domain_socket

#[cfg(unix)]
use {
std::time::Duration,
tokio::{fs, task, time},
tracing::info,
};

#[cfg(unix)]
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let cwd = fs::canonicalize(".").await?;
let addr = cwd.join("console-server");
console_subscriber::ConsoleLayer::builder()
.server_addr(&*addr)
.init();
info!(
"listening for console connections at file://localhost{}",
addr.display()
);
task::Builder::default()
.name("sleepy")
.spawn(async move { time::sleep(Duration::from_secs(90)).await })
.unwrap()
.await?;

Ok(())
}

#[cfg(not(unix))]
fn main() {
panic!("only supported on Unix platforms")
}
113 changes: 104 additions & 9 deletions console-subscriber/src/builder.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use super::{ConsoleLayer, Server};
#[cfg(unix)]
use std::path::Path;
use std::{
net::{SocketAddr, ToSocketAddrs},
net::{IpAddr, SocketAddr, SocketAddrV4, SocketAddrV6, ToSocketAddrs},
path::PathBuf,
thread,
time::Duration,
Expand Down Expand Up @@ -32,7 +34,7 @@ pub struct Builder {
pub(crate) retention: Duration,

/// The address on which to serve the RPC server.
pub(super) server_addr: SocketAddr,
pub(super) server_addr: ServerAddr,

/// If and where to save a recording of the events.
pub(super) recording_path: Option<PathBuf>,
Expand All @@ -58,7 +60,7 @@ impl Default for Builder {
publish_interval: ConsoleLayer::DEFAULT_PUBLISH_INTERVAL,
retention: ConsoleLayer::DEFAULT_RETENTION,
poll_duration_max: ConsoleLayer::DEFAULT_POLL_DURATION_MAX,
server_addr: SocketAddr::new(Server::DEFAULT_IP, Server::DEFAULT_PORT),
server_addr: ServerAddr::Tcp(SocketAddr::new(Server::DEFAULT_IP, Server::DEFAULT_PORT)),
recording_path: None,
filter_env_var: "RUST_LOG".to_string(),
self_trace: false,
Expand Down Expand Up @@ -137,8 +139,38 @@ impl Builder {
/// before falling back on constructing a socket address from those
/// defaults.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this documentation should be updated to state that the provided address can be a TCP address, or (on Unix systems) a UDS address. It would be nice to show examples of both usages in this method's documentation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

///
/// The socket address can be either a TCP socket address or a
/// [Unix domain socket] (UDS) address. Unix domain sockets are only
/// supported on Unix-compatible operating systems, such as Linux, BSDs,
/// and macOS.
///
/// Each call to this method will overwrite the previously set value.
///
/// # Examples
///
/// Connect to the TCP address `localhost:1234`:
///
/// ```
/// # use console_subscriber::Builder;
/// use std::net::Ipv4Addr;
/// let builder = Builder::default().server_addr((Ipv4Addr::LOCALHOST, 1234));
/// ```
///
/// Connect to the UDS address `/tmp/tokio-console`:
///
/// ```
/// # use console_subscriber::Builder;
/// # #[cfg(unix)]
/// use std::path::Path;
///
/// // Unix domain sockets are only available on Unix-compatible operating systems.
/// #[cfg(unix)]
/// let builder = Builder::default().server_addr(Path::new("/tmp/tokio-console"));
/// ```
///
/// [environment variable]: `Builder::with_default_env`
pub fn server_addr(self, server_addr: impl Into<SocketAddr>) -> Self {
/// [Unix domain socket]: https://en.wikipedia.org/wiki/Unix_domain_socket
pub fn server_addr(self, server_addr: impl Into<ServerAddr>) -> Self {
hawkw marked this conversation as resolved.
Show resolved Hide resolved
Self {
server_addr: server_addr.into(),
..self
Expand Down Expand Up @@ -231,11 +263,14 @@ impl Builder {
}

if let Ok(bind) = std::env::var("TOKIO_CONSOLE_BIND") {
self.server_addr = bind
.to_socket_addrs()
.expect("TOKIO_CONSOLE_BIND must be formatted as HOST:PORT, such as localhost:4321")
.next()
.expect("tokio console could not resolve TOKIO_CONSOLE_BIND");
self.server_addr = ServerAddr::Tcp(
bind.to_socket_addrs()
.expect(
"TOKIO_CONSOLE_BIND must be formatted as HOST:PORT, such as localhost:4321",
)
.next()
.expect("tokio console could not resolve TOKIO_CONSOLE_BIND"),
);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hawkw do you think this should support UDS addresses too? E.g., by detecting the presence of a leading /, or the absence of a :port?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, my preference is to have this env var only configure a TCP socket, because it seems unfortunate to have a potential typo in the environment variable silently switch from TCP to UDS when that might not actually be desired.

Instead, I think we might want to consider adding a separate environment variable to configure serving over Unix sockets. It would be fine to add that env var in this branch, or in a follow-up PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead, I think we might want to consider adding a separate environment variable to configure serving over Unix sockets. It would be fine to add that env var in this branch, or in a follow-up PR.

I'll leave that to a different PR, then!

}

if let Some(interval) = duration_from_env("TOKIO_CONSOLE_PUBLISH_INTERVAL") {
Expand Down Expand Up @@ -456,6 +491,66 @@ impl Builder {
}
}

/// Specifies the address on which a [`Server`] should listen.
///
hawkw marked this conversation as resolved.
Show resolved Hide resolved
/// This type is passed as an argument to the [`Builder::server_addr`]
/// method, and may be either a TCP socket address, or a [Unix domain socket]
/// (UDS) address. Unix domain sockets are only supported on Unix-compatible
/// operating systems, such as Linux, BSDs, and macOS.
///
/// [`Server`]: crate::Server
/// [Unix domain socket]: https://en.wikipedia.org/wiki/Unix_domain_socket
#[derive(Clone, Debug)]
hawkw marked this conversation as resolved.
Show resolved Hide resolved
#[non_exhaustive]
pub enum ServerAddr {
/// A TCP address.
Tcp(SocketAddr),
/// A Unix socket address.
#[cfg(unix)]
Unix(PathBuf),
}

impl From<SocketAddr> for ServerAddr {
fn from(addr: SocketAddr) -> ServerAddr {
ServerAddr::Tcp(addr)
}
}

impl From<SocketAddrV4> for ServerAddr {
fn from(addr: SocketAddrV4) -> ServerAddr {
ServerAddr::Tcp(addr.into())
}
}

impl From<SocketAddrV6> for ServerAddr {
fn from(addr: SocketAddrV6) -> ServerAddr {
ServerAddr::Tcp(addr.into())
}
}

impl<I> From<(I, u16)> for ServerAddr
where
I: Into<IpAddr>,
{
fn from(pieces: (I, u16)) -> ServerAddr {
ServerAddr::Tcp(pieces.into())
}
}

#[cfg(unix)]
impl From<PathBuf> for ServerAddr {
fn from(path: PathBuf) -> ServerAddr {
ServerAddr::Unix(path)
}
}

#[cfg(unix)]
impl<'a> From<&'a Path> for ServerAddr {
fn from(path: &'a Path) -> ServerAddr {
ServerAddr::Unix(path.to_path_buf())
}
}

/// Initializes the console [tracing `Subscriber`][sub] and starts the console
/// subscriber [`Server`] on its own background thread.
///
Expand Down
33 changes: 23 additions & 10 deletions console-subscriber/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,19 @@ use serde::Serialize;
use std::{
cell::RefCell,
fmt,
net::{IpAddr, Ipv4Addr, SocketAddr},
net::{IpAddr, Ipv4Addr},
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
time::{Duration, Instant},
};
use thread_local::ThreadLocal;
#[cfg(unix)]
use tokio::net::UnixListener;
use tokio::sync::{mpsc, oneshot};
#[cfg(unix)]
use tokio_stream::wrappers::UnixListenerStream;
use tracing_core::{
span::{self, Id},
subscriber::{self, Subscriber},
Expand All @@ -36,7 +40,7 @@ pub(crate) mod sync;
mod visitors;

use aggregator::Aggregator;
pub use builder::Builder;
pub use builder::{Builder, ServerAddr};
use callsites::Callsites;
use record::Recorder;
use stack::SpanStack;
Expand Down Expand Up @@ -134,7 +138,7 @@ pub struct ConsoleLayer {
/// [cli]: https://crates.io/crates/tokio-console
pub struct Server {
subscribe: mpsc::Sender<Command>,
addr: SocketAddr,
addr: ServerAddr,
aggregator: Option<Aggregator>,
client_buffer: usize,
}
Expand Down Expand Up @@ -945,13 +949,22 @@ impl Server {
.take()
.expect("cannot start server multiple times");
let aggregate = spawn_named(aggregate.run(), "console::aggregate");
let addr = self.addr;
let serve = builder
.add_service(proto::instrument::instrument_server::InstrumentServer::new(
self,
))
.serve(addr);
let res = spawn_named(serve, "console::serve").await;
let addr = self.addr.clone();
let router = builder.add_service(
proto::instrument::instrument_server::InstrumentServer::new(self),
);
let res = match addr {
ServerAddr::Tcp(addr) => {
let serve = router.serve(addr);
spawn_named(serve, "console::serve").await
}
#[cfg(unix)]
ServerAddr::Unix(path) => {
let incoming = UnixListener::bind(path)?;
let serve = router.serve_with_incoming(UnixListenerStream::new(incoming));
spawn_named(serve, "console::serve").await
}
hawkw marked this conversation as resolved.
Show resolved Hide resolved
};
aggregate.abort();
res?.map_err(Into::into)
}
Expand Down
1 change: 1 addition & 0 deletions tokio-console/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ tokio = { version = "1", features = ["full", "rt-multi-thread"] }
tonic = { version = "0.8", features = ["transport"] }
futures = "0.3"
tui = { version = "0.16.0", default-features = false, features = ["crossterm"] }
tower = "0.4.12"
tracing = "0.1"
tracing-subscriber = { version = "0.3.0", features = ["env-filter"] }
tracing-journald = { version = "0.2", optional = true }
Expand Down
3 changes: 3 additions & 0 deletions tokio-console/args.example
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ ARGS:

This may be an IP address and port, or a DNS name.

On Unix platforms, this may also be a URI with the `file` scheme that specifies the path
to a Unix domain socket, as in `file://localhost/path/to/socket`.

[default: http://127.0.0.1:6669]

OPTIONS:
Expand Down
4 changes: 4 additions & 0 deletions tokio-console/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ pub struct Config {
///
/// This may be an IP address and port, or a DNS name.
///
/// On Unix platforms, this may also be a URI with the `file` scheme that
/// specifies the path to a Unix domain socket, as in
/// `file://localhost/path/to/socket`.
///
/// [default: http://127.0.0.1:6669]
#[clap(value_hint = ValueHint::Url)]
pub(crate) target_addr: Option<Uri>,
Expand Down
33 changes: 31 additions & 2 deletions tokio-console/src/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@ use console_api::instrument::{
use console_api::tasks::TaskDetails;
use futures::stream::StreamExt;
use std::{error::Error, pin::Pin, time::Duration};
use tonic::{transport::Channel, transport::Uri, Streaming};
#[cfg(unix)]
use tokio::net::UnixStream;
hawkw marked this conversation as resolved.
Show resolved Hide resolved
use tonic::{
transport::{Channel, Endpoint, Uri},
Streaming,
};

#[derive(Debug)]
pub struct Connection {
Expand Down Expand Up @@ -78,7 +83,31 @@ impl Connection {
tokio::time::sleep(backoff).await;
}
let try_connect = async {
let mut client = InstrumentClient::connect(self.target.clone()).await?;
let channel = match self.target.scheme_str() {
#[cfg(unix)]
Some("file") => {
// Dummy endpoint is ignored by the connector.
let endpoint = Endpoint::from_static("http://localhost");
if !matches!(self.target.host(), None | Some("localhost")) {
return Err("cannot connect to non-localhost unix domain socket".into());
}
let path = self.target.path().to_owned();
endpoint
.connect_with_connector(tower::service_fn(move |_| {
UnixStream::connect(path.clone())
}))
.await?
}
#[cfg(not(unix))]
Some("file") => {
return Err("unix domain sockets are not supported on this platform".into());
}
_ => {
let endpoint = Endpoint::try_from(self.target.clone())?;
endpoint.connect().await?
}
};
let mut client = InstrumentClient::new(channel);
let request = tonic::Request::new(InstrumentRequest {});
let stream = Box::new(client.watch_updates(request).await?.into_inner());
Ok::<State, Box<dyn Error + Send + Sync>>(State::Connected { client, stream })
Expand Down