Skip to content

Commit

Permalink
Merge branch 'master' into handle
Browse files Browse the repository at this point in the history
* master:
  tracing: replace future names with spawn locations in task spans (tokio-rs#3074)
  util: add back public poll_read_buf() function (tokio-rs#3079)
  net: add pid to tokio::net::unix::UCred (tokio-rs#2633)
  chore: prepare tokio-util v0.5.0 release (tokio-rs#3078)
  • Loading branch information
Keruspe committed Nov 2, 2020
2 parents 549a285 + fede3db commit 2c6ed38
Show file tree
Hide file tree
Showing 17 changed files with 190 additions and 54 deletions.
2 changes: 1 addition & 1 deletion examples/Cargo.toml
Expand Up @@ -10,7 +10,7 @@ edition = "2018"
tokio = { version = "0.3.0", path = "../tokio", features = ["full", "tracing"] }
tracing = "0.1"
tracing-subscriber = { version = "0.2.7", default-features = false, features = ["fmt", "ansi", "env-filter", "chrono", "tracing-log"] }
tokio-util = { version = "0.4.0", path = "../tokio-util", features = ["full"] }
tokio-util = { version = "0.5.0", path = "../tokio-util", features = ["full"] }
bytes = "0.6"
futures = "0.3.0"
http = "0.2"
Expand Down
8 changes: 8 additions & 0 deletions tokio-util/CHANGELOG.md
@@ -1,3 +1,11 @@
### Added
- io: `poll_read_buf` util fn (#2972).

# 0.5.0 (October 30, 2020)

### Changed
- io: update `bytes` to 0.6 (#3071).

# 0.4.0 (October 15, 2020)

### Added
Expand Down
4 changes: 2 additions & 2 deletions tokio-util/Cargo.toml
Expand Up @@ -7,13 +7,13 @@ name = "tokio-util"
# - Cargo.toml
# - Update CHANGELOG.md.
# - Create "v0.2.x" git tag.
version = "0.4.0"
version = "0.5.0"
edition = "2018"
authors = ["Tokio Contributors <team@tokio.rs>"]
license = "MIT"
repository = "https://github.com/tokio-rs/tokio"
homepage = "https://tokio.rs"
documentation = "https://docs.rs/tokio-util/0.4.0/tokio_util"
documentation = "https://docs.rs/tokio-util/0.5.0/tokio_util"
description = """
Additional utilities for working with Tokio.
"""
Expand Down
2 changes: 1 addition & 1 deletion tokio-util/src/codec/framed_impl.rs
Expand Up @@ -150,7 +150,7 @@ where
// got room for at least one byte to read to ensure that we don't
// get a spurious 0 that looks like EOF
state.buffer.reserve(1);
let bytect = match poll_read_buf(cx, pinned.inner.as_mut(), &mut state.buffer)? {
let bytect = match poll_read_buf(pinned.inner.as_mut(), cx, &mut state.buffer)? {
Poll::Ready(ct) => ct,
Poll::Pending => return Poll::Pending,
};
Expand Down
1 change: 1 addition & 0 deletions tokio-util/src/io/mod.rs
Expand Up @@ -13,3 +13,4 @@ mod stream_reader;
pub use self::read_buf::read_buf;
pub use self::reader_stream::ReaderStream;
pub use self::stream_reader::StreamReader;
pub use crate::util::poll_read_buf;
2 changes: 1 addition & 1 deletion tokio-util/src/io/read_buf.rs
Expand Up @@ -59,7 +59,7 @@ where

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = &mut *self;
crate::util::poll_read_buf(cx, Pin::new(this.0), this.1)
crate::util::poll_read_buf(Pin::new(this.0), cx, this.1)
}
}
}
2 changes: 1 addition & 1 deletion tokio-util/src/io/reader_stream.rs
Expand Up @@ -83,7 +83,7 @@ impl<R: AsyncRead> Stream for ReaderStream<R> {
this.buf.reserve(CAPACITY);
}

match poll_read_buf(cx, reader, &mut this.buf) {
match poll_read_buf(reader, cx, &mut this.buf) {
Poll::Pending => Poll::Pending,
Poll::Ready(Err(err)) => {
self.project().reader.set(None);
Expand Down
47 changes: 43 additions & 4 deletions tokio-util/src/lib.rs
@@ -1,4 +1,4 @@
#![doc(html_root_url = "https://docs.rs/tokio-util/0.4.0")]
#![doc(html_root_url = "https://docs.rs/tokio-util/0.5.0")]
#![allow(clippy::needless_doctest_main)]
#![warn(
missing_debug_implementations,
Expand Down Expand Up @@ -69,10 +69,49 @@ mod util {
use std::pin::Pin;
use std::task::{Context, Poll};

pub(crate) fn poll_read_buf<T: AsyncRead>(
cx: &mut Context<'_>,
/// Try to read data from an `AsyncRead` into an implementer of the [`Buf`] trait.
///
/// [`Buf`]: bytes::Buf
///
/// # Example
///
/// ```
/// use bytes::{Bytes, BytesMut};
/// use tokio::stream;
/// use tokio::io::Result;
/// use tokio_util::io::{StreamReader, poll_read_buf};
/// use futures::future::poll_fn;
/// use std::pin::Pin;
/// # #[tokio::main]
/// # async fn main() -> std::io::Result<()> {
///
/// // Create a reader from an iterator. This particular reader will always be
/// // ready.
/// let mut read = StreamReader::new(stream::iter(vec![Result::Ok(Bytes::from_static(&[0, 1, 2, 3]))]));
///
/// let mut buf = BytesMut::new();
/// let mut reads = 0;
///
/// loop {
/// reads += 1;
/// let n = poll_fn(|cx| poll_read_buf(Pin::new(&mut read), cx, &mut buf)).await?;
///
/// if n == 0 {
/// break;
/// }
/// }
///
/// // one or more reads might be necessary.
/// assert!(reads >= 1);
/// assert_eq!(&buf[..], &[0, 1, 2, 3]);
/// # Ok(())
/// # }
/// ```
#[cfg_attr(not(feature = "io"), allow(unreachable_pub))]
pub fn poll_read_buf<T: AsyncRead, B: BufMut>(
io: Pin<&mut T>,
buf: &mut impl BufMut,
cx: &mut Context<'_>,
buf: &mut B,
) -> Poll<io::Result<usize>> {
if !buf.has_remaining_mut() {
return Poll::Ready(Ok(0));
Expand Down
7 changes: 5 additions & 2 deletions tokio/Cargo.toml
Expand Up @@ -102,7 +102,7 @@ mio = { version = "0.7.3", optional = true }
num_cpus = { version = "1.8.0", optional = true }
parking_lot = { version = "0.11.0", optional = true } # Not in full
slab = { version = "0.4.1", optional = true }
tracing = { version = "0.1.16", default-features = false, features = ["std"], optional = true } # Not in full
tracing = { version = "0.1.21", default-features = false, features = ["std"], optional = true } # Not in full

[target.'cfg(unix)'.dependencies]
libc = { version = "0.2.42", optional = true }
Expand All @@ -126,9 +126,12 @@ tempfile = "3.1.0"
[target.'cfg(loom)'.dev-dependencies]
loom = { version = "0.3.5", features = ["futures", "checkpoint"] }

[build-dependencies]
autocfg = "1" # Needed for conditionally enabling `track-caller`

[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]

[package.metadata.playground]
features = ["full"]
features = ["full"]
22 changes: 22 additions & 0 deletions tokio/build.rs
@@ -0,0 +1,22 @@
use autocfg::AutoCfg;

fn main() {
match AutoCfg::new() {
Ok(ac) => {
// The #[track_caller] attribute was stabilized in rustc 1.46.0.
if ac.probe_rustc_version(1, 46) {
autocfg::emit("tokio_track_caller")
}
}

Err(e) => {
// If we couldn't detect the compiler version and features, just
// print a warning. This isn't a fatal error: we can still build
// Tokio, we just can't enable cfgs automatically.
println!(
"cargo:warning=tokio: failed to detect compiler features: {}",
e
);
}
}
}
75 changes: 68 additions & 7 deletions tokio/src/net/unix/ucred.rs
@@ -1,8 +1,10 @@
use libc::{gid_t, uid_t};
use libc::{gid_t, pid_t, uid_t};

/// Credentials of a process
#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
pub struct UCred {
/// PID (process ID) of the process
pid: Option<pid_t>,
/// UID (user ID) of the process
uid: uid_t,
/// GID (group ID) of the process
Expand All @@ -19,19 +21,27 @@ impl UCred {
pub fn gid(&self) -> gid_t {
self.gid
}

/// Gets PID (process ID) of the process.
///
/// This is only implemented under linux, android, IOS and MacOS
pub fn pid(&self) -> Option<pid_t> {
self.pid
}
}

#[cfg(any(target_os = "linux", target_os = "android"))]
pub(crate) use self::impl_linux::get_peer_cred;

#[cfg(any(
target_os = "dragonfly",
target_os = "macos",
target_os = "ios",
target_os = "freebsd",
target_os = "netbsd",
target_os = "openbsd"
))]
pub(crate) use self::impl_bsd::get_peer_cred;

#[cfg(any(target_os = "macos", target_os = "ios"))]
pub(crate) use self::impl_macos::get_peer_cred;

#[cfg(any(target_os = "solaris", target_os = "illumos"))]
Expand Down Expand Up @@ -77,6 +87,7 @@ pub(crate) mod impl_linux {
Ok(super::UCred {
uid: ucred.uid,
gid: ucred.gid,
pid: Some(ucred.pid),
})
} else {
Err(io::Error::last_os_error())
Expand All @@ -87,13 +98,11 @@ pub(crate) mod impl_linux {

#[cfg(any(
target_os = "dragonfly",
target_os = "macos",
target_os = "ios",
target_os = "freebsd",
target_os = "netbsd",
target_os = "openbsd"
))]
pub(crate) mod impl_macos {
pub(crate) mod impl_bsd {
use crate::net::unix::UnixStream;

use libc::getpeereid;
Expand All @@ -114,6 +123,54 @@ pub(crate) mod impl_macos {
Ok(super::UCred {
uid: uid.assume_init(),
gid: gid.assume_init(),
pid: None,
})
} else {
Err(io::Error::last_os_error())
}
}
}
}

#[cfg(any(target_os = "macos", target_os = "ios"))]
pub(crate) mod impl_macos {
use crate::net::unix::UnixStream;

use libc::{c_void, getpeereid, getsockopt, pid_t, LOCAL_PEEREPID, SOL_LOCAL};
use std::io;
use std::mem::size_of;
use std::mem::MaybeUninit;
use std::os::unix::io::AsRawFd;

pub(crate) fn get_peer_cred(sock: &UnixStream) -> io::Result<super::UCred> {
unsafe {
let raw_fd = sock.as_raw_fd();

let mut uid = MaybeUninit::uninit();
let mut gid = MaybeUninit::uninit();
let mut pid: MaybeUninit<pid_t> = MaybeUninit::uninit();
let mut pid_size: MaybeUninit<u32> = MaybeUninit::new(size_of::<pid_t>() as u32);

if getsockopt(
raw_fd,
SOL_LOCAL,
LOCAL_PEEREPID,
pid.as_mut_ptr() as *mut c_void,
pid_size.as_mut_ptr(),
) != 0
{
return Err(io::Error::last_os_error());
}

assert!(pid_size.assume_init() == (size_of::<pid_t>() as u32));

let ret = getpeereid(raw_fd, uid.as_mut_ptr(), gid.as_mut_ptr());

if ret == 0 {
Ok(super::UCred {
uid: uid.assume_init(),
gid: gid.assume_init(),
pid: Some(pid.assume_init()),
})
} else {
Err(io::Error::last_os_error())
Expand Down Expand Up @@ -154,7 +211,11 @@ pub(crate) mod impl_solaris {

ucred_free(cred);

Ok(super::UCred { uid, gid })
Ok(super::UCred {
uid,
gid,
pid: None,
})
} else {
Err(io::Error::last_os_error())
}
Expand Down
18 changes: 17 additions & 1 deletion tokio/src/runtime/handle.rs
Expand Up @@ -125,15 +125,19 @@ impl Handle {
/// });
/// # }
/// ```
#[cfg_attr(tokio_track_caller, track_caller)]
pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
#[cfg(feature = "tracing")]
let future = crate::util::trace::task(future, "task");
self.spawner.spawn(future)
}

/// Run the provided function on an executor dedicated to blocking operations.
/// Run the provided function on an executor dedicated to blocking
/// operations.
///
/// # Examples
///
Expand All @@ -151,12 +155,24 @@ impl Handle {
/// println!("now running on a worker thread");
/// });
/// # }
#[cfg_attr(tokio_track_caller, track_caller)]
pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
{
#[cfg(feature = "tracing")]
let func = {
#[cfg(tokio_track_caller)]
let location = std::panic::Location::caller();
#[cfg(tokio_track_caller)]
let span = tracing::trace_span!(
target: "tokio::task",
"task",
kind = %"blocking",
function = %std::any::type_name::<F>(),
spawn.location = %format_args!("{}:{}:{}", location.file(), location.line(), location.column()),
);
#[cfg(not(tokio_track_caller))]
let span = tracing::trace_span!(
target: "tokio::task",
"task",
Expand Down
2 changes: 2 additions & 0 deletions tokio/src/runtime/mod.rs
Expand Up @@ -378,6 +378,7 @@ cfg_rt! {
/// });
/// # }
/// ```
#[cfg_attr(tokio_track_caller, track_caller)]
pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
Expand All @@ -402,6 +403,7 @@ cfg_rt! {
/// println!("now running on a worker thread");
/// });
/// # }
#[cfg_attr(tokio_track_caller, track_caller)]
pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
Expand Down
1 change: 1 addition & 0 deletions tokio/src/task/blocking.rs
Expand Up @@ -104,6 +104,7 @@ cfg_rt_multi_thread! {
/// # Ok(())
/// # }
/// ```
#[cfg_attr(tokio_track_caller, track_caller)]
pub fn spawn_blocking<F, R>(f: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
Expand Down
2 changes: 2 additions & 0 deletions tokio/src/task/local.rs
Expand Up @@ -190,6 +190,7 @@ cfg_rt! {
/// }).await;
/// }
/// ```
#[cfg_attr(tokio_track_caller, track_caller)]
pub fn spawn_local<F>(future: F) -> JoinHandle<F::Output>
where
F: Future + 'static,
Expand Down Expand Up @@ -273,6 +274,7 @@ impl LocalSet {
/// }
/// ```
/// [`spawn_local`]: fn@spawn_local
#[cfg_attr(tokio_track_caller, track_caller)]
pub fn spawn_local<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: Future + 'static,
Expand Down

0 comments on commit 2c6ed38

Please sign in to comment.