From 24ed874e81fa27a6505613051955127ba8ddfdf2 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Fri, 30 Oct 2020 11:26:15 -0700 Subject: [PATCH 1/4] chore: prepare tokio-util v0.5.0 release (#3078) --- examples/Cargo.toml | 2 +- tokio-util/CHANGELOG.md | 8 ++++++++ tokio-util/Cargo.toml | 4 ++-- tokio-util/src/lib.rs | 2 +- 4 files changed, 12 insertions(+), 4 deletions(-) diff --git a/examples/Cargo.toml b/examples/Cargo.toml index d95a32d036d..5fa4f7438c5 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -10,7 +10,7 @@ edition = "2018" tokio = { version = "0.3.0", path = "../tokio", features = ["full", "tracing"] } tracing = "0.1" tracing-subscriber = { version = "0.2.7", default-features = false, features = ["fmt", "ansi", "env-filter", "chrono", "tracing-log"] } -tokio-util = { version = "0.4.0", path = "../tokio-util", features = ["full"] } +tokio-util = { version = "0.5.0", path = "../tokio-util", features = ["full"] } bytes = "0.6" futures = "0.3.0" http = "0.2" diff --git a/tokio-util/CHANGELOG.md b/tokio-util/CHANGELOG.md index e9aa0bac747..64228895200 100644 --- a/tokio-util/CHANGELOG.md +++ b/tokio-util/CHANGELOG.md @@ -1,3 +1,11 @@ +# 0.5.0 (October 30, 2020) + +### Changed +- io: update `bytes` to 0.6 (#3071). + +### Added +- io: `poll_read_buf` util fn (#2972). + # 0.4.0 (October 15, 2020) ### Added diff --git a/tokio-util/Cargo.toml b/tokio-util/Cargo.toml index c1b2ee66db4..ba2f3a4f1f6 100644 --- a/tokio-util/Cargo.toml +++ b/tokio-util/Cargo.toml @@ -7,13 +7,13 @@ name = "tokio-util" # - Cargo.toml # - Update CHANGELOG.md. # - Create "v0.2.x" git tag. -version = "0.4.0" +version = "0.5.0" edition = "2018" authors = ["Tokio Contributors "] license = "MIT" repository = "https://github.com/tokio-rs/tokio" homepage = "https://tokio.rs" -documentation = "https://docs.rs/tokio-util/0.4.0/tokio_util" +documentation = "https://docs.rs/tokio-util/0.5.0/tokio_util" description = """ Additional utilities for working with Tokio. """ diff --git a/tokio-util/src/lib.rs b/tokio-util/src/lib.rs index f4cf94704ad..253f4437aac 100644 --- a/tokio-util/src/lib.rs +++ b/tokio-util/src/lib.rs @@ -1,4 +1,4 @@ -#![doc(html_root_url = "https://docs.rs/tokio-util/0.4.0")] +#![doc(html_root_url = "https://docs.rs/tokio-util/0.5.0")] #![allow(clippy::needless_doctest_main)] #![warn( missing_debug_implementations, From 382ee6bf5ddc45f62fd5faf1428d7f193c9e8ad8 Mon Sep 17 00:00:00 2001 From: Finn Behrens Date: Sat, 31 Oct 2020 10:30:55 +0100 Subject: [PATCH 2/4] net: add pid to tokio::net::unix::UCred (#2633) --- tokio/src/net/unix/ucred.rs | 75 +++++++++++++++++++++++++++++++++---- 1 file changed, 68 insertions(+), 7 deletions(-) diff --git a/tokio/src/net/unix/ucred.rs b/tokio/src/net/unix/ucred.rs index ef214a702fa..7d73ee02ebb 100644 --- a/tokio/src/net/unix/ucred.rs +++ b/tokio/src/net/unix/ucred.rs @@ -1,8 +1,10 @@ -use libc::{gid_t, uid_t}; +use libc::{gid_t, pid_t, uid_t}; /// Credentials of a process #[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)] pub struct UCred { + /// PID (process ID) of the process + pid: Option, /// UID (user ID) of the process uid: uid_t, /// GID (group ID) of the process @@ -19,6 +21,13 @@ impl UCred { pub fn gid(&self) -> gid_t { self.gid } + + /// Gets PID (process ID) of the process. + /// + /// This is only implemented under linux, android, IOS and MacOS + pub fn pid(&self) -> Option { + self.pid + } } #[cfg(any(target_os = "linux", target_os = "android"))] @@ -26,12 +35,13 @@ pub(crate) use self::impl_linux::get_peer_cred; #[cfg(any( target_os = "dragonfly", - target_os = "macos", - target_os = "ios", target_os = "freebsd", target_os = "netbsd", target_os = "openbsd" ))] +pub(crate) use self::impl_bsd::get_peer_cred; + +#[cfg(any(target_os = "macos", target_os = "ios"))] pub(crate) use self::impl_macos::get_peer_cred; #[cfg(any(target_os = "solaris", target_os = "illumos"))] @@ -77,6 +87,7 @@ pub(crate) mod impl_linux { Ok(super::UCred { uid: ucred.uid, gid: ucred.gid, + pid: Some(ucred.pid), }) } else { Err(io::Error::last_os_error()) @@ -87,13 +98,11 @@ pub(crate) mod impl_linux { #[cfg(any( target_os = "dragonfly", - target_os = "macos", - target_os = "ios", target_os = "freebsd", target_os = "netbsd", target_os = "openbsd" ))] -pub(crate) mod impl_macos { +pub(crate) mod impl_bsd { use crate::net::unix::UnixStream; use libc::getpeereid; @@ -114,6 +123,54 @@ pub(crate) mod impl_macos { Ok(super::UCred { uid: uid.assume_init(), gid: gid.assume_init(), + pid: None, + }) + } else { + Err(io::Error::last_os_error()) + } + } + } +} + +#[cfg(any(target_os = "macos", target_os = "ios"))] +pub(crate) mod impl_macos { + use crate::net::unix::UnixStream; + + use libc::{c_void, getpeereid, getsockopt, pid_t, LOCAL_PEEREPID, SOL_LOCAL}; + use std::io; + use std::mem::size_of; + use std::mem::MaybeUninit; + use std::os::unix::io::AsRawFd; + + pub(crate) fn get_peer_cred(sock: &UnixStream) -> io::Result { + unsafe { + let raw_fd = sock.as_raw_fd(); + + let mut uid = MaybeUninit::uninit(); + let mut gid = MaybeUninit::uninit(); + let mut pid: MaybeUninit = MaybeUninit::uninit(); + let mut pid_size: MaybeUninit = MaybeUninit::new(size_of::() as u32); + + if getsockopt( + raw_fd, + SOL_LOCAL, + LOCAL_PEEREPID, + pid.as_mut_ptr() as *mut c_void, + pid_size.as_mut_ptr(), + ) != 0 + { + return Err(io::Error::last_os_error()); + } + + assert!(pid_size.assume_init() == (size_of::() as u32)); + + let ret = getpeereid(raw_fd, uid.as_mut_ptr(), gid.as_mut_ptr()); + + if ret == 0 { + Ok(super::UCred { + uid: uid.assume_init(), + gid: gid.assume_init(), + pid: Some(pid.assume_init()), }) } else { Err(io::Error::last_os_error()) @@ -154,7 +211,11 @@ pub(crate) mod impl_solaris { ucred_free(cred); - Ok(super::UCred { uid, gid }) + Ok(super::UCred { + uid, + gid, + pid: None, + }) } else { Err(io::Error::last_os_error()) } From 2b23aa7389686f341e564c8da565a3b4c8b1188f Mon Sep 17 00:00:00 2001 From: Dirkjan Ochtman Date: Sun, 1 Nov 2020 10:22:22 +0100 Subject: [PATCH 3/4] util: add back public poll_read_buf() function (#3079) This was accidentally removed in #3064. --- tokio-util/CHANGELOG.md | 6 ++-- tokio-util/src/codec/framed_impl.rs | 2 +- tokio-util/src/io/mod.rs | 1 + tokio-util/src/io/read_buf.rs | 2 +- tokio-util/src/io/reader_stream.rs | 2 +- tokio-util/src/lib.rs | 45 +++++++++++++++++++++++++++-- 6 files changed, 49 insertions(+), 9 deletions(-) diff --git a/tokio-util/CHANGELOG.md b/tokio-util/CHANGELOG.md index 64228895200..48dfe778e26 100644 --- a/tokio-util/CHANGELOG.md +++ b/tokio-util/CHANGELOG.md @@ -1,11 +1,11 @@ +### Added +- io: `poll_read_buf` util fn (#2972). + # 0.5.0 (October 30, 2020) ### Changed - io: update `bytes` to 0.6 (#3071). -### Added -- io: `poll_read_buf` util fn (#2972). - # 0.4.0 (October 15, 2020) ### Added diff --git a/tokio-util/src/codec/framed_impl.rs b/tokio-util/src/codec/framed_impl.rs index c161808f66e..e8b29999e10 100644 --- a/tokio-util/src/codec/framed_impl.rs +++ b/tokio-util/src/codec/framed_impl.rs @@ -150,7 +150,7 @@ where // got room for at least one byte to read to ensure that we don't // get a spurious 0 that looks like EOF state.buffer.reserve(1); - let bytect = match poll_read_buf(cx, pinned.inner.as_mut(), &mut state.buffer)? { + let bytect = match poll_read_buf(pinned.inner.as_mut(), cx, &mut state.buffer)? { Poll::Ready(ct) => ct, Poll::Pending => return Poll::Pending, }; diff --git a/tokio-util/src/io/mod.rs b/tokio-util/src/io/mod.rs index 6f181ab1703..eefd65a5e83 100644 --- a/tokio-util/src/io/mod.rs +++ b/tokio-util/src/io/mod.rs @@ -13,3 +13,4 @@ mod stream_reader; pub use self::read_buf::read_buf; pub use self::reader_stream::ReaderStream; pub use self::stream_reader::StreamReader; +pub use crate::util::poll_read_buf; diff --git a/tokio-util/src/io/read_buf.rs b/tokio-util/src/io/read_buf.rs index 5bc0d5866f4..cc3c505f934 100644 --- a/tokio-util/src/io/read_buf.rs +++ b/tokio-util/src/io/read_buf.rs @@ -59,7 +59,7 @@ where fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = &mut *self; - crate::util::poll_read_buf(cx, Pin::new(this.0), this.1) + crate::util::poll_read_buf(Pin::new(this.0), cx, this.1) } } } diff --git a/tokio-util/src/io/reader_stream.rs b/tokio-util/src/io/reader_stream.rs index ab0c22fba73..3e6a05eff4d 100644 --- a/tokio-util/src/io/reader_stream.rs +++ b/tokio-util/src/io/reader_stream.rs @@ -83,7 +83,7 @@ impl Stream for ReaderStream { this.buf.reserve(CAPACITY); } - match poll_read_buf(cx, reader, &mut this.buf) { + match poll_read_buf(reader, cx, &mut this.buf) { Poll::Pending => Poll::Pending, Poll::Ready(Err(err)) => { self.project().reader.set(None); diff --git a/tokio-util/src/lib.rs b/tokio-util/src/lib.rs index 253f4437aac..09dd5a1071c 100644 --- a/tokio-util/src/lib.rs +++ b/tokio-util/src/lib.rs @@ -69,10 +69,49 @@ mod util { use std::pin::Pin; use std::task::{Context, Poll}; - pub(crate) fn poll_read_buf( - cx: &mut Context<'_>, + /// Try to read data from an `AsyncRead` into an implementer of the [`Buf`] trait. + /// + /// [`Buf`]: bytes::Buf + /// + /// # Example + /// + /// ``` + /// use bytes::{Bytes, BytesMut}; + /// use tokio::stream; + /// use tokio::io::Result; + /// use tokio_util::io::{StreamReader, poll_read_buf}; + /// use futures::future::poll_fn; + /// use std::pin::Pin; + /// # #[tokio::main] + /// # async fn main() -> std::io::Result<()> { + /// + /// // Create a reader from an iterator. This particular reader will always be + /// // ready. + /// let mut read = StreamReader::new(stream::iter(vec![Result::Ok(Bytes::from_static(&[0, 1, 2, 3]))])); + /// + /// let mut buf = BytesMut::new(); + /// let mut reads = 0; + /// + /// loop { + /// reads += 1; + /// let n = poll_fn(|cx| poll_read_buf(Pin::new(&mut read), cx, &mut buf)).await?; + /// + /// if n == 0 { + /// break; + /// } + /// } + /// + /// // one or more reads might be necessary. + /// assert!(reads >= 1); + /// assert_eq!(&buf[..], &[0, 1, 2, 3]); + /// # Ok(()) + /// # } + /// ``` + #[cfg_attr(not(feature = "io"), allow(unreachable_pub))] + pub fn poll_read_buf( io: Pin<&mut T>, - buf: &mut impl BufMut, + cx: &mut Context<'_>, + buf: &mut B, ) -> Poll> { if !buf.has_remaining_mut() { return Poll::Ready(Ok(0)); From fede3db76aef95c010c3e0c3da3b732380285097 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Sun, 1 Nov 2020 10:48:44 -0800 Subject: [PATCH 4/4] tracing: replace future names with spawn locations in task spans (#3074) ## Motivation Currently, the per-task `tracing` spans generated by tokio's `tracing` feature flag include the `std::any::type_name` of the future that was spawned. When future combinators and/or libraries like Tower are in use, these future names can get _quite_ long. Furthermore, when formatting the `tracing` spans with their parent spans as context, any other task spans in the span context where the future was spawned from can _also_ include extremely long future names. In some cases, this can result in extremely high memory use just to store the future names. For example, in Linkerd, when we enable `tokio=trace` to enable the task spans, there's a spawned task whose future name is _232990 characters long_. A proxy with only 14 spawned tasks generates a task list that's over 690 KB. Enabling task spans under load results in the process getting OOM killed very quickly. ## Solution This branch removes future type names from the spans generated by `spawn`. As a replacement, to allow identifying which `spawn` call a span corresponds to, the task span now contains the source code location where `spawn` was called, when the compiler supports the `#[track_caller]` attribute. Since `track_caller` was stabilized in Rust 1.46.0, and our minimum supported Rust version is 1.45.0, we can't assume that `#[track_caller]` is always available. Instead, we have a RUSTFLAGS cfg, `tokio_track_caller`, that guards whether or not we use it. I've also added a `build.rs` that detects the compiler minor version, and sets the cfg flag automatically if the current compiler version is >= 1.46. This means users shouldn't have to enable `tokio_track_caller` manually. Here's the trace output from the `chat` example, before this change: ![Screenshot_20201030_110157](https://user-images.githubusercontent.com/2796466/97741071-6d408800-1a9f-11eb-9ed6-b25e72f58c7b.png) ...and after: ![Screenshot_20201030_110303](https://user-images.githubusercontent.com/2796466/97741112-7e899480-1a9f-11eb-9197-c5a3f9ea1c05.png) Closes #3073 Signed-off-by: Eliza Weisman --- tokio/Cargo.toml | 7 ++++-- tokio/build.rs | 22 +++++++++++++++++ tokio/src/runtime/handle.rs | 15 +++++++++++- tokio/src/runtime/mod.rs | 4 ++++ tokio/src/task/blocking.rs | 1 + tokio/src/task/local.rs | 2 ++ tokio/src/task/spawn.rs | 1 + tokio/src/util/trace.rs | 48 +++++++++++-------------------------- 8 files changed, 63 insertions(+), 37 deletions(-) create mode 100644 tokio/build.rs diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index 6c5de90ddc2..c4825c20650 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -102,7 +102,7 @@ mio = { version = "0.7.3", optional = true } num_cpus = { version = "1.8.0", optional = true } parking_lot = { version = "0.11.0", optional = true } # Not in full slab = { version = "0.4.1", optional = true } -tracing = { version = "0.1.16", default-features = false, features = ["std"], optional = true } # Not in full +tracing = { version = "0.1.21", default-features = false, features = ["std"], optional = true } # Not in full [target.'cfg(unix)'.dependencies] libc = { version = "0.2.42", optional = true } @@ -126,9 +126,12 @@ tempfile = "3.1.0" [target.'cfg(loom)'.dev-dependencies] loom = { version = "0.3.5", features = ["futures", "checkpoint"] } +[build-dependencies] +autocfg = "1" # Needed for conditionally enabling `track-caller` + [package.metadata.docs.rs] all-features = true rustdoc-args = ["--cfg", "docsrs"] [package.metadata.playground] -features = ["full"] +features = ["full"] \ No newline at end of file diff --git a/tokio/build.rs b/tokio/build.rs new file mode 100644 index 00000000000..fe5c8300560 --- /dev/null +++ b/tokio/build.rs @@ -0,0 +1,22 @@ +use autocfg::AutoCfg; + +fn main() { + match AutoCfg::new() { + Ok(ac) => { + // The #[track_caller] attribute was stabilized in rustc 1.46.0. + if ac.probe_rustc_version(1, 46) { + autocfg::emit("tokio_track_caller") + } + } + + Err(e) => { + // If we couldn't detect the compiler version and features, just + // print a warning. This isn't a fatal error: we can still build + // Tokio, we just can't enable cfgs automatically. + println!( + "cargo:warning=tokio: failed to detect compiler features: {}", + e + ); + } + } +} diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index b1e8d8f187c..c9ffd5cd0e4 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -39,13 +39,26 @@ impl Handle { // context::enter(self.clone(), f) // } - /// Run the provided function on an executor dedicated to blocking operations. + /// Run the provided function on an executor dedicated to blocking + /// operations. + #[cfg_attr(tokio_track_caller, track_caller)] pub(crate) fn spawn_blocking(&self, func: F) -> JoinHandle where F: FnOnce() -> R + Send + 'static, { #[cfg(feature = "tracing")] let func = { + #[cfg(tokio_track_caller)] + let location = std::panic::Location::caller(); + #[cfg(tokio_track_caller)] + let span = tracing::trace_span!( + target: "tokio::task", + "task", + kind = %"blocking", + function = %std::any::type_name::(), + spawn.location = %format_args!("{}:{}:{}", location.file(), location.line(), location.column()), + ); + #[cfg(not(tokio_track_caller))] let span = tracing::trace_span!( target: "tokio::task", "task", diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index 01788b9cc84..c76f5cf6d1c 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -357,11 +357,14 @@ cfg_rt! { /// }); /// # } /// ``` + #[cfg_attr(tokio_track_caller, track_caller)] pub fn spawn(&self, future: F) -> JoinHandle where F: Future + Send + 'static, F::Output: Send + 'static, { + #[cfg(feature = "tracing")] + let future = crate::util::trace::task(future, "task"); match &self.kind { #[cfg(feature = "rt-multi-thread")] Kind::ThreadPool(exec) => exec.spawn(future), @@ -385,6 +388,7 @@ cfg_rt! { /// println!("now running on a worker thread"); /// }); /// # } + #[cfg_attr(tokio_track_caller, track_caller)] pub fn spawn_blocking(&self, func: F) -> JoinHandle where F: FnOnce() -> R + Send + 'static, diff --git a/tokio/src/task/blocking.rs b/tokio/src/task/blocking.rs index fc6632be3b7..36bc4579bfe 100644 --- a/tokio/src/task/blocking.rs +++ b/tokio/src/task/blocking.rs @@ -104,6 +104,7 @@ cfg_rt_multi_thread! { /// # Ok(()) /// # } /// ``` +#[cfg_attr(tokio_track_caller, track_caller)] pub fn spawn_blocking(f: F) -> JoinHandle where F: FnOnce() -> R + Send + 'static, diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index 5896126c7e5..566b2f2d7f8 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -190,6 +190,7 @@ cfg_rt! { /// }).await; /// } /// ``` + #[cfg_attr(tokio_track_caller, track_caller)] pub fn spawn_local(future: F) -> JoinHandle where F: Future + 'static, @@ -273,6 +274,7 @@ impl LocalSet { /// } /// ``` /// [`spawn_local`]: fn@spawn_local + #[cfg_attr(tokio_track_caller, track_caller)] pub fn spawn_local(&self, future: F) -> JoinHandle where F: Future + 'static, diff --git a/tokio/src/task/spawn.rs b/tokio/src/task/spawn.rs index 77acb579f73..a060852dc23 100644 --- a/tokio/src/task/spawn.rs +++ b/tokio/src/task/spawn.rs @@ -122,6 +122,7 @@ cfg_rt! { /// ```text /// error[E0391]: cycle detected when processing `main` /// ``` + #[cfg_attr(tokio_track_caller, track_caller)] pub fn spawn(task: T) -> JoinHandle where T: Future + Send + 'static, diff --git a/tokio/src/util/trace.rs b/tokio/src/util/trace.rs index 18956a36306..96a9db91d1f 100644 --- a/tokio/src/util/trace.rs +++ b/tokio/src/util/trace.rs @@ -1,47 +1,27 @@ cfg_trace! { cfg_rt! { - use std::future::Future; - use std::pin::Pin; - use std::task::{Context, Poll}; - use pin_project_lite::pin_project; - - use tracing::Span; - - pin_project! { - /// A future that has been instrumented with a `tracing` span. - #[derive(Debug, Clone)] - pub(crate) struct Instrumented { - #[pin] - inner: T, - span: Span, - } - } - - impl Future for Instrumented { - type Output = T::Output; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - let _enter = this.span.enter(); - this.inner.poll(cx) - } - } - - impl Instrumented { - pub(crate) fn new(inner: T, span: Span) -> Self { - Self { inner, span } - } - } + pub(crate) use tracing::instrument::Instrumented; #[inline] + #[cfg_attr(tokio_track_caller, track_caller)] pub(crate) fn task(task: F, kind: &'static str) -> Instrumented { + use tracing::instrument::Instrument; + #[cfg(tokio_track_caller)] + let location = std::panic::Location::caller(); + #[cfg(tokio_track_caller)] + let span = tracing::trace_span!( + target: "tokio::task", + "task", + %kind, + spawn.location = %format_args!("{}:{}:{}", location.file(), location.line(), location.column()), + ); + #[cfg(not(tokio_track_caller))] let span = tracing::trace_span!( target: "tokio::task", "task", %kind, - future = %std::any::type_name::(), ); - Instrumented::new(task, span) + task.instrument(span) } } }