diff --git a/tests-integration/tests/process_stdio.rs b/tests-integration/tests/process_stdio.rs index 8bf2c149853..86f5477b4ef 100644 --- a/tests-integration/tests/process_stdio.rs +++ b/tests-integration/tests/process_stdio.rs @@ -72,7 +72,7 @@ async fn feed_cat(mut cat: Child, n: usize) -> io::Result { }; // Compose reading and writing concurrently. - future::join3(write, read, cat) + future::join3(write, read, cat.wait()) .map(|(_, _, status)| status) .await } @@ -125,3 +125,26 @@ async fn status_closes_any_pipes() { assert_ok!(child.await); } + +#[tokio::test] +async fn try_wait() { + let mut child = cat().spawn().unwrap(); + + let id = child.id().expect("missing id"); + assert!(id > 0); + + assert_eq!(None, assert_ok!(child.try_wait())); + + // Drop the child's stdio handles so it can terminate + drop(child.stdin.take()); + drop(child.stderr.take()); + drop(child.stdout.take()); + + assert_ok!(child.wait().await); + + // test that the `.try_wait()` method is fused just like the stdlib + assert!(assert_ok!(child.try_wait()).unwrap().success()); + + // Can't get id after process has exited + assert_eq!(child.id(), None); +} diff --git a/tokio-macros/src/lib.rs b/tokio-macros/src/lib.rs index 79d06a709f8..09733ba5d4c 100644 --- a/tokio-macros/src/lib.rs +++ b/tokio-macros/src/lib.rs @@ -26,7 +26,12 @@ use proc_macro::TokenStream; /// Marks async function to be executed by selected runtime. This macro helps set up a `Runtime` /// without requiring the user to use [Runtime](../tokio/runtime/struct.Runtime.html) or -/// [Builder](../tokio/runtime/struct.builder.html) directly. +/// [Builder](../tokio/runtime/struct.Builder.html) directly. +/// +/// Note: This macro is designed to be simplistic and targets applications that do not require +/// a complex setup. If provided functionality is not sufficient, user may be interested in +/// using [Builder](../tokio/runtime/struct.Builder.html), which provides a more powerful +/// interface. /// /// ## Options: /// @@ -133,7 +138,12 @@ pub fn main_threaded(args: TokenStream, item: TokenStream) -> TokenStream { /// Marks async function to be executed by selected runtime. This macro helps set up a `Runtime` /// without requiring the user to use [Runtime](../tokio/runtime/struct.Runtime.html) or -/// [Builder](../tokio/runtime/struct.builder.html) directly. +/// [Builder](../tokio/runtime/struct.Builder.html) directly. +/// +/// Note: This macro is designed to be simplistic and targets applications that do not require +/// a complex setup. If provided functionality is not sufficient, user may be interested in +/// using [Builder](../tokio/runtime/struct.Builder.html), which provides a more powerful +/// interface. /// /// ## Options: /// diff --git a/tokio-util/Cargo.toml b/tokio-util/Cargo.toml index b47c9dfc21c..85b4e5929c4 100644 --- a/tokio-util/Cargo.toml +++ b/tokio-util/Cargo.toml @@ -25,11 +25,12 @@ publish = false default = [] # Shorthand for enabling everything -full = ["codec", "udp", "compat"] +full = ["codec", "udp", "compat", "io"] compat = ["futures-io",] codec = ["tokio/stream"] udp = ["tokio/udp"] +io = [] [dependencies] tokio = { version = "0.3.0", path = "../tokio" } diff --git a/tokio-util/src/cfg.rs b/tokio-util/src/cfg.rs index 27e8c66a433..2efa5f09aff 100644 --- a/tokio-util/src/cfg.rs +++ b/tokio-util/src/cfg.rs @@ -27,3 +27,13 @@ macro_rules! cfg_udp { )* } } + +macro_rules! cfg_io { + ($($item:item)*) => { + $( + #[cfg(feature = "io")] + #[cfg_attr(docsrs, doc(cfg(feature = "io")))] + $item + )* + } +} diff --git a/tokio-util/src/either.rs b/tokio-util/src/either.rs new file mode 100644 index 00000000000..3c749c1b49d --- /dev/null +++ b/tokio-util/src/either.rs @@ -0,0 +1,194 @@ +//! Module defining an Either type. +use std::{ + future::Future, + io::SeekFrom, + pin::Pin, + task::{Context, Poll}, +}; +use tokio::io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, ReadBuf, Result}; + +/// Combines two different futures, streams, or sinks having the same associated types into a single type. +/// +/// This type implements common asynchronous traits such as [`Future`] and those in Tokio. +/// +/// [`Future`]: std::future::Future +/// +/// # Example +/// +/// The following code will not work: +/// +/// ```compile_fail +/// # fn some_condition() -> bool { true } +/// # async fn some_async_function() -> u32 { 10 } +/// # async fn other_async_function() -> u32 { 20 } +/// #[tokio::main] +/// async fn main() { +/// let result = if some_condition() { +/// some_async_function() +/// } else { +/// other_async_function() // <- Will print: "`if` and `else` have incompatible types" +/// }; +/// +/// println!("Result is {}", result.await); +/// } +/// ``` +/// +// This is because although the output types for both futures is the same, the exact future +// types are different, but the compiler must be able to choose a single type for the +// `result` variable. +/// +/// When the output type is the same, we can wrap each future in `Either` to avoid the +/// issue: +/// +/// ``` +/// use tokio_util::either::Either; +/// # fn some_condition() -> bool { true } +/// # async fn some_async_function() -> u32 { 10 } +/// # async fn other_async_function() -> u32 { 20 } +/// +/// #[tokio::main] +/// async fn main() { +/// let result = if some_condition() { +/// Either::Left(some_async_function()) +/// } else { +/// Either::Right(other_async_function()) +/// }; +/// +/// let value = result.await; +/// println!("Result is {}", value); +/// # assert_eq!(value, 10); +/// } +/// ``` +#[allow(missing_docs)] // Doc-comments for variants in this particular case don't make much sense. +#[derive(Debug, Clone)] +pub enum Either { + Left(L), + Right(R), +} + +/// A small helper macro which reduces amount of boilerplate in the actual trait method implementation. +/// It takes an invokation of method as an argument (e.g. `self.poll(cx)`), and redirects it to either +/// enum variant held in `self`. +macro_rules! delegate_call { + ($self:ident.$method:ident($($args:ident),+)) => { + unsafe { + match $self.get_unchecked_mut() { + Self::Left(l) => Pin::new_unchecked(l).$method($($args),+), + Self::Right(r) => Pin::new_unchecked(r).$method($($args),+), + } + } + } +} + +impl Future for Either +where + L: Future, + R: Future, +{ + type Output = O; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + delegate_call!(self.poll(cx)) + } +} + +impl AsyncRead for Either +where + L: AsyncRead, + R: AsyncRead, +{ + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + delegate_call!(self.poll_read(cx, buf)) + } +} + +impl AsyncBufRead for Either +where + L: AsyncBufRead, + R: AsyncBufRead, +{ + fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + delegate_call!(self.poll_fill_buf(cx)) + } + + fn consume(self: Pin<&mut Self>, amt: usize) { + delegate_call!(self.consume(amt)) + } +} + +impl AsyncSeek for Either +where + L: AsyncSeek, + R: AsyncSeek, +{ + fn start_seek( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + position: SeekFrom, + ) -> Poll> { + delegate_call!(self.start_seek(cx, position)) + } + + fn poll_complete(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + delegate_call!(self.poll_complete(cx)) + } +} + +impl AsyncWrite for Either +where + L: AsyncWrite, + R: AsyncWrite, +{ + fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { + delegate_call!(self.poll_write(cx, buf)) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + delegate_call!(self.poll_flush(cx)) + } + + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + delegate_call!(self.poll_shutdown(cx)) + } +} + +impl futures_core::stream::Stream for Either +where + L: futures_core::stream::Stream, + R: futures_core::stream::Stream, +{ + type Item = L::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + delegate_call!(self.poll_next(cx)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use tokio::{ + io::{repeat, AsyncReadExt, Repeat}, + stream::{once, Once, StreamExt}, + }; + + #[tokio::test] + async fn either_is_stream() { + let mut either: Either, Once> = Either::Left(once(1)); + + assert_eq!(Some(1u32), either.next().await); + } + + #[tokio::test] + async fn either_is_async_read() { + let mut buffer = [0; 3]; + let mut either: Either = Either::Right(repeat(0b101)); + + either.read_exact(&mut buffer).await.unwrap(); + assert_eq!(buffer, [0b101, 0b101, 0b101]); + } +} diff --git a/tokio-util/src/io/mod.rs b/tokio-util/src/io/mod.rs new file mode 100644 index 00000000000..53066c4e444 --- /dev/null +++ b/tokio-util/src/io/mod.rs @@ -0,0 +1,13 @@ +//! Helpers for IO related tasks. +//! +//! These types are often used in combination with hyper or reqwest, as they +//! allow converting between a hyper [`Body`] and [`AsyncRead`]. +//! +//! [`Body`]: https://docs.rs/hyper/0.13/hyper/struct.Body.html +//! [`AsyncRead`]: tokio::io::AsyncRead + +mod reader_stream; +mod stream_reader; + +pub use self::reader_stream::ReaderStream; +pub use self::stream_reader::StreamReader; diff --git a/tokio-util/src/io/reader_stream.rs b/tokio-util/src/io/reader_stream.rs new file mode 100644 index 00000000000..bde7cceec5c --- /dev/null +++ b/tokio-util/src/io/reader_stream.rs @@ -0,0 +1,100 @@ +use bytes::{Bytes, BytesMut}; +use futures_core::stream::Stream; +use pin_project_lite::pin_project; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::io::AsyncRead; + +const CAPACITY: usize = 4096; + +pin_project! { + /// Convert an [`AsyncRead`] into a [`Stream`] of byte chunks. + /// + /// This stream is fused. It performs the inverse operation of + /// [`StreamReader`]. + /// + /// # Example + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() -> std::io::Result<()> { + /// use tokio::stream::StreamExt; + /// use tokio_util::io::ReaderStream; + /// + /// // Create a stream of data. + /// let data = b"hello, world!"; + /// let mut stream = ReaderStream::new(&data[..]); + /// + /// // Read all of the chunks into a vector. + /// let mut stream_contents = Vec::new(); + /// while let Some(chunk) = stream.next().await { + /// stream_contents.extend_from_slice(&chunk?); + /// } + /// + /// // Once the chunks are concatenated, we should have the + /// // original data. + /// assert_eq!(stream_contents, data); + /// # Ok(()) + /// # } + /// ``` + /// + /// [`AsyncRead`]: tokio::io::AsyncRead + /// [`StreamReader`]: crate::io::StreamReader + /// [`Stream`]: tokio::stream::Stream + #[derive(Debug)] + pub struct ReaderStream { + // Reader itself. + // + // This value is `None` if the stream has terminated. + #[pin] + reader: Option, + // Working buffer, used to optimize allocations. + buf: BytesMut, + } +} + +impl ReaderStream { + /// Convert an [`AsyncRead`] into a [`Stream`] with item type + /// `Result`. + /// + /// [`AsyncRead`]: tokio::io::AsyncRead + /// [`Stream`]: tokio::stream::Stream + pub fn new(reader: R) -> Self { + ReaderStream { + reader: Some(reader), + buf: BytesMut::new(), + } + } +} + +impl Stream for ReaderStream { + type Item = std::io::Result; + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.as_mut().project(); + + let reader = match this.reader.as_pin_mut() { + Some(r) => r, + None => return Poll::Ready(None), + }; + + if this.buf.capacity() == 0 { + this.buf.reserve(CAPACITY); + } + + match reader.poll_read_buf(cx, &mut this.buf) { + Poll::Pending => Poll::Pending, + Poll::Ready(Err(err)) => { + self.project().reader.set(None); + Poll::Ready(Some(Err(err))) + } + Poll::Ready(Ok(0)) => { + self.project().reader.set(None); + Poll::Ready(None) + } + Poll::Ready(Ok(_)) => { + let chunk = this.buf.split(); + Poll::Ready(Some(Ok(chunk.freeze()))) + } + } + } +} diff --git a/tokio/src/io/util/stream_reader.rs b/tokio-util/src/io/stream_reader.rs similarity index 56% rename from tokio/src/io/util/stream_reader.rs rename to tokio-util/src/io/stream_reader.rs index 2471197a46e..5c3ab019eba 100644 --- a/tokio/src/io/util/stream_reader.rs +++ b/tokio-util/src/io/stream_reader.rs @@ -1,21 +1,59 @@ -use crate::io::{AsyncBufRead, AsyncRead, ReadBuf}; -use crate::stream::Stream; use bytes::{Buf, BufMut}; +use futures_core::stream::Stream; use pin_project_lite::pin_project; use std::io; use std::pin::Pin; use std::task::{Context, Poll}; +use tokio::io::{AsyncBufRead, AsyncRead, ReadBuf}; pin_project! { - /// Convert a stream of byte chunks into an [`AsyncRead`]. + /// Convert a [`Stream`] of byte chunks into an [`AsyncRead`]. /// - /// This type is usually created using the [`stream_reader`] function. + /// This type performs the inverse operation of [`ReaderStream`]. /// - /// [`AsyncRead`]: crate::io::AsyncRead - /// [`stream_reader`]: crate::io::stream_reader + /// # Example + /// + /// ``` + /// use bytes::Bytes; + /// use tokio::io::{AsyncReadExt, Result}; + /// use tokio_util::io::StreamReader; + /// # #[tokio::main] + /// # async fn main() -> std::io::Result<()> { + /// + /// // Create a stream from an iterator. + /// let stream = tokio::stream::iter(vec![ + /// Result::Ok(Bytes::from_static(&[0, 1, 2, 3])), + /// Result::Ok(Bytes::from_static(&[4, 5, 6, 7])), + /// Result::Ok(Bytes::from_static(&[8, 9, 10, 11])), + /// ]); + /// + /// // Convert it to an AsyncRead. + /// let mut read = StreamReader::new(stream); + /// + /// // Read five bytes from the stream. + /// let mut buf = [0; 5]; + /// read.read_exact(&mut buf).await?; + /// assert_eq!(buf, [0, 1, 2, 3, 4]); + /// + /// // Read the rest of the current chunk. + /// assert_eq!(read.read(&mut buf).await?, 3); + /// assert_eq!(&buf[..3], [5, 6, 7]); + /// + /// // Read the next chunk. + /// assert_eq!(read.read(&mut buf).await?, 4); + /// assert_eq!(&buf[..4], [8, 9, 10, 11]); + /// + /// // We have now reached the end. + /// assert_eq!(read.read(&mut buf).await?, 0); + /// + /// # Ok(()) + /// # } + /// ``` + /// + /// [`AsyncRead`]: tokio::io::AsyncRead + /// [`Stream`]: tokio::stream::Stream + /// [`ReaderStream`]: crate::io::ReaderStream #[derive(Debug)] - #[cfg_attr(docsrs, doc(cfg(feature = "stream")))] - #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))] pub struct StreamReader { #[pin] inner: S, @@ -23,67 +61,28 @@ pin_project! { } } -/// Convert a stream of byte chunks into an [`AsyncRead`](crate::io::AsyncRead). -/// -/// # Example -/// -/// ``` -/// use bytes::Bytes; -/// use tokio::io::{stream_reader, AsyncReadExt}; -/// # #[tokio::main] -/// # async fn main() -> std::io::Result<()> { -/// -/// // Create a stream from an iterator. -/// let stream = tokio::stream::iter(vec![ -/// Ok(Bytes::from_static(&[0, 1, 2, 3])), -/// Ok(Bytes::from_static(&[4, 5, 6, 7])), -/// Ok(Bytes::from_static(&[8, 9, 10, 11])), -/// ]); -/// -/// // Convert it to an AsyncRead. -/// let mut read = stream_reader(stream); -/// -/// // Read five bytes from the stream. -/// let mut buf = [0; 5]; -/// read.read_exact(&mut buf).await?; -/// assert_eq!(buf, [0, 1, 2, 3, 4]); -/// -/// // Read the rest of the current chunk. -/// assert_eq!(read.read(&mut buf).await?, 3); -/// assert_eq!(&buf[..3], [5, 6, 7]); -/// -/// // Read the next chunk. -/// assert_eq!(read.read(&mut buf).await?, 4); -/// assert_eq!(&buf[..4], [8, 9, 10, 11]); -/// -/// // We have now reached the end. -/// assert_eq!(read.read(&mut buf).await?, 0); -/// -/// # Ok(()) -/// # } -/// ``` -#[cfg_attr(docsrs, doc(cfg(feature = "stream")))] -#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))] -pub fn stream_reader(stream: S) -> StreamReader +impl StreamReader where - S: Stream>, + S: Stream>, B: Buf, + E: Into, { - StreamReader::new(stream) -} - -impl StreamReader -where - S: Stream>, - B: Buf, -{ - /// Convert the provided stream into an `AsyncRead`. - fn new(stream: S) -> Self { + /// Convert a stream of byte chunks into an [`AsyncRead`](tokio::io::AsyncRead). + /// + /// The item should be a [`Result`] with the ok variant being something that + /// implements the [`Buf`] trait (e.g. `Vec` or `Bytes`). The error + /// should be convertible into an [io error]. + /// + /// [`Result`]: std::result::Result + /// [`Buf`]: bytes::Buf + /// [io error]: std::io::Error + pub fn new(stream: S) -> Self { Self { inner: stream, chunk: None, } } + /// Do we have a chunk and is it non-empty? fn has_chunk(self: Pin<&mut Self>) -> bool { if let Some(chunk) = self.project().chunk { @@ -94,10 +93,11 @@ where } } -impl AsyncRead for StreamReader +impl AsyncRead for StreamReader where - S: Stream>, + S: Stream>, B: Buf, + E: Into, { fn poll_read( mut self: Pin<&mut Self>, @@ -144,10 +144,11 @@ where } } -impl AsyncBufRead for StreamReader +impl AsyncBufRead for StreamReader where - S: Stream>, + S: Stream>, B: Buf, + E: Into, { fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { @@ -161,7 +162,7 @@ where // Go around the loop in case the chunk is empty. *self.as_mut().project().chunk = Some(chunk); } - Poll::Ready(Some(Err(err))) => return Poll::Ready(Err(err)), + Poll::Ready(Some(Err(err))) => return Poll::Ready(Err(err.into())), Poll::Ready(None) => return Poll::Ready(Ok(&[])), Poll::Pending => return Poll::Pending, } diff --git a/tokio-util/src/lib.rs b/tokio-util/src/lib.rs index 3e9a3b7e6db..137be7a3630 100644 --- a/tokio-util/src/lib.rs +++ b/tokio-util/src/lib.rs @@ -38,6 +38,12 @@ cfg_compat! { pub mod compat; } +cfg_io! { + pub mod io; +} + pub mod context; pub mod sync; + +pub mod either; diff --git a/tokio-util/tests/io_reader_stream.rs b/tokio-util/tests/io_reader_stream.rs new file mode 100644 index 00000000000..b906de097e2 --- /dev/null +++ b/tokio-util/tests/io_reader_stream.rs @@ -0,0 +1,65 @@ +#![warn(rust_2018_idioms)] + +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::io::{AsyncRead, ReadBuf}; +use tokio::stream::StreamExt; + +/// produces at most `remaining` zeros, that returns error. +/// each time it reads at most 31 byte. +struct Reader { + remaining: usize, +} + +impl AsyncRead for Reader { + fn poll_read( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + let this = Pin::into_inner(self); + assert_ne!(buf.remaining(), 0); + if this.remaining > 0 { + let n = std::cmp::min(this.remaining, buf.remaining()); + let n = std::cmp::min(n, 31); + for x in &mut buf.initialize_unfilled_to(n)[..n] { + *x = 0; + } + buf.add_filled(n); + this.remaining -= n; + Poll::Ready(Ok(())) + } else { + Poll::Ready(Err(std::io::Error::from_raw_os_error(22))) + } + } +} + +#[tokio::test] +async fn correct_behavior_on_errors() { + let reader = Reader { remaining: 8000 }; + let mut stream = tokio_util::io::ReaderStream::new(reader); + let mut zeros_received = 0; + let mut had_error = false; + loop { + let item = stream.next().await.unwrap(); + println!("{:?}", item); + match item { + Ok(bytes) => { + let bytes = &*bytes; + for byte in bytes { + assert_eq!(*byte, 0); + zeros_received += 1; + } + } + Err(_) => { + assert!(!had_error); + had_error = true; + break; + } + } + } + + assert!(had_error); + assert_eq!(zeros_received, 8000); + assert!(stream.next().await.is_none()); +} diff --git a/tokio/tests/stream_reader.rs b/tokio-util/tests/io_stream_reader.rs similarity index 82% rename from tokio/tests/stream_reader.rs rename to tokio-util/tests/io_stream_reader.rs index 8370df4dac7..b0ed1d2d046 100644 --- a/tokio/tests/stream_reader.rs +++ b/tokio-util/tests/io_stream_reader.rs @@ -1,14 +1,14 @@ #![warn(rust_2018_idioms)] -#![cfg(feature = "full")] use bytes::Bytes; -use tokio::io::{stream_reader, AsyncReadExt}; +use tokio::io::AsyncReadExt; use tokio::stream::iter; +use tokio_util::io::StreamReader; #[tokio::test] async fn test_stream_reader() -> std::io::Result<()> { let stream = iter(vec![ - Ok(Bytes::from_static(&[])), + std::io::Result::Ok(Bytes::from_static(&[])), Ok(Bytes::from_static(&[0, 1, 2, 3])), Ok(Bytes::from_static(&[])), Ok(Bytes::from_static(&[4, 5, 6, 7])), @@ -17,7 +17,7 @@ async fn test_stream_reader() -> std::io::Result<()> { Ok(Bytes::from_static(&[])), ]); - let mut read = stream_reader(stream); + let mut read = StreamReader::new(stream); let mut buf = [0; 5]; read.read_exact(&mut buf).await?; diff --git a/tokio-util/tests/sync_cancellation_token.rs b/tokio-util/tests/sync_cancellation_token.rs index c65a6425fd9..438e5d5ef13 100644 --- a/tokio-util/tests/sync_cancellation_token.rs +++ b/tokio-util/tests/sync_cancellation_token.rs @@ -1,3 +1,5 @@ +#![warn(rust_2018_idioms)] + use tokio::pin; use tokio_util::sync::CancellationToken; diff --git a/tokio-util/tests/udp.rs b/tokio-util/tests/udp.rs index d0320beb185..4820ac72d00 100644 --- a/tokio-util/tests/udp.rs +++ b/tokio-util/tests/udp.rs @@ -1,3 +1,5 @@ +#![warn(rust_2018_idioms)] + use tokio::{net::UdpSocket, stream::StreamExt}; use tokio_util::codec::{Decoder, Encoder, LinesCodec}; use tokio_util::udp::UdpFramed; diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index 38d2d6f2d5b..b1d943e3119 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -63,9 +63,7 @@ process = [ "mio-named-pipes", "signal", "winapi/consoleapi", - "winapi/minwindef", "winapi/threadpoollegacyapiset", - "winapi/winerror", ] # Includes basic task execution capabilities rt-core = ["slab"] @@ -81,7 +79,6 @@ signal = [ "mio-uds", "signal-hook-registry", "winapi/consoleapi", - "winapi/minwindef", ] stream = ["futures-core"] sync = ["fnv"] diff --git a/tokio/src/fs/file.rs b/tokio/src/fs/file.rs index b801ba999db..319c2c7fc9f 100644 --- a/tokio/src/fs/file.rs +++ b/tokio/src/fs/file.rs @@ -202,68 +202,6 @@ impl File { } } - /// Seeks to an offset, in bytes, in a stream. - /// - /// # Examples - /// - /// ```no_run - /// use tokio::fs::File; - /// use tokio::prelude::*; - /// - /// use std::io::SeekFrom; - /// - /// # async fn dox() -> std::io::Result<()> { - /// let mut file = File::open("foo.txt").await?; - /// file.seek(SeekFrom::Start(6)).await?; - /// - /// let mut contents = vec![0u8; 10]; - /// file.read_exact(&mut contents).await?; - /// # Ok(()) - /// # } - /// ``` - /// - /// The [`read_exact`] method is defined on the [`AsyncReadExt`] trait. - /// - /// [`read_exact`]: fn@crate::io::AsyncReadExt::read_exact - /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt - pub async fn seek(&mut self, mut pos: SeekFrom) -> io::Result { - self.complete_inflight().await; - - let mut buf = match self.state { - Idle(ref mut buf_cell) => buf_cell.take().unwrap(), - _ => unreachable!(), - }; - - // Factor in any unread data from the buf - if !buf.is_empty() { - let n = buf.discard_read(); - - if let SeekFrom::Current(ref mut offset) = pos { - *offset += n; - } - } - - let std = self.std.clone(); - - // Start the operation - self.state = Busy(sys::run(move || { - let res = (&*std).seek(pos); - (Operation::Seek(res), buf) - })); - - let (op, buf) = match self.state { - Idle(_) => unreachable!(), - Busy(ref mut rx) => rx.await.unwrap(), - }; - - self.state = Idle(Some(buf)); - - match op { - Operation::Seek(res) => res, - _ => unreachable!(), - } - } - /// Attempts to sync all OS-internal metadata to disk. /// /// This function will attempt to ensure that all in-core data reaches the diff --git a/tokio/src/io/mod.rs b/tokio/src/io/mod.rs index c43f0e83140..c4b4d7d3401 100644 --- a/tokio/src/io/mod.rs +++ b/tokio/src/io/mod.rs @@ -236,10 +236,6 @@ cfg_io_util! { copy, duplex, empty, repeat, sink, AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufReader, BufStream, BufWriter, DuplexStream, Copy, Empty, Lines, Repeat, Sink, Split, Take, }; - - cfg_stream! { - pub use util::{stream_reader, StreamReader}; - } } cfg_not_io_util! { diff --git a/tokio/src/io/read_buf.rs b/tokio/src/io/read_buf.rs index 03b5d05ca03..8b430ee1093 100644 --- a/tokio/src/io/read_buf.rs +++ b/tokio/src/io/read_buf.rs @@ -77,6 +77,14 @@ impl<'a> ReadBuf<'a> { unsafe { mem::transmute::<&mut [MaybeUninit], &mut [u8]>(slice) } } + /// Returns a new `ReadBuf` comprised of the unfilled section up to `n`. + #[inline] + pub fn take(&mut self, n: usize) -> ReadBuf<'_> { + let max = std::cmp::min(self.remaining(), n); + // Saftey: We don't set any of the `unfilled_mut` with `MaybeUninit::uninit`. + unsafe { ReadBuf::uninit(&mut self.unfilled_mut()[..max]) } + } + /// Returns a shared reference to the initialized portion of the buffer. /// /// This includes the filled portion. diff --git a/tokio/src/io/util/mod.rs b/tokio/src/io/util/mod.rs index 609ff2386a6..1bd0a3f87b4 100644 --- a/tokio/src/io/util/mod.rs +++ b/tokio/src/io/util/mod.rs @@ -63,11 +63,6 @@ cfg_io_util! { mod split; pub use split::Split; - cfg_stream! { - mod stream_reader; - pub use stream_reader::{stream_reader, StreamReader}; - } - mod take; pub use take::Take; diff --git a/tokio/src/io/util/take.rs b/tokio/src/io/util/take.rs index 2abc7693172..4e424f6c8a5 100644 --- a/tokio/src/io/util/take.rs +++ b/tokio/src/io/util/take.rs @@ -85,10 +85,7 @@ impl AsyncRead for Take { } let me = self.project(); - let max = std::cmp::min(buf.remaining() as u64, *me.limit_) as usize; - // Make a ReadBuf of the unfulled section up to max - // Saftey: We don't set any of the `unfilled_mut` with `MaybeUninit::uninit`. - let mut b = unsafe { ReadBuf::uninit(&mut buf.unfilled_mut()[..max]) }; + let mut b = buf.take(*me.limit_ as usize); ready!(me.inner.poll_read(cx, &mut b))?; let n = b.filled().len(); diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs index 7bf9f7435c1..b31b478dcf1 100644 --- a/tokio/src/lib.rs +++ b/tokio/src/lib.rs @@ -196,9 +196,10 @@ //! //! Finally, Tokio provides a _runtime_ for executing asynchronous tasks. Most //! applications can use the [`#[tokio::main]`][main] macro to run their code on the -//! Tokio runtime. In use-cases where manual control over the runtime is -//! required, the [`tokio::runtime`] module provides APIs for configuring and -//! managing runtimes. +//! Tokio runtime. However, this macro provides only basic configuration options. As +//! an alternative, the [`tokio::runtime`] module provides more powerful APIs for configuring +//! and managing runtimes. You should use that module if the `#[tokio::main]` macro doesn't +//! provide the functionality you need. //! //! Using the runtime requires the "rt-core" or "rt-threaded" feature flags, to //! enable the basic [single-threaded scheduler][rt-core] and the [thread-pool diff --git a/tokio/src/process/mod.rs b/tokio/src/process/mod.rs index a3b7c384101..9c19c72ea97 100644 --- a/tokio/src/process/mod.rs +++ b/tokio/src/process/mod.rs @@ -18,16 +18,15 @@ //! //! #[tokio::main] //! async fn main() -> Result<(), Box> { -//! // The usage is the same as with the standard library's `Command` type, however the value -//! // returned from `spawn` is a `Result` containing a `Future`. -//! let child = Command::new("echo").arg("hello").arg("world") -//! .spawn(); +//! // The usage is similar as with the standard library's `Command` type +//! let mut child = Command::new("echo") +//! .arg("hello") +//! .arg("world") +//! .spawn() +//! .expect("failed to spawn"); //! -//! // Make sure our child succeeded in spawning and process the result -//! let future = child.expect("failed to spawn"); -//! -//! // Await until the future (and the command) completes -//! let status = future.await?; +//! // Await until the command completes +//! let status = child.wait().await?; //! println!("the command exited with: {}", status); //! Ok(()) //! } @@ -83,8 +82,8 @@ //! //! // Ensure the child process is spawned in the runtime so it can //! // make progress on its own while we await for any output. -//! tokio::spawn(async { -//! let status = child.await +//! tokio::spawn(async move { +//! let status = child.wait().await //! .expect("child process encountered an error"); //! //! println!("child status was: {}", status); @@ -555,16 +554,17 @@ impl Command { /// Command::new("ls") /// .spawn() /// .expect("ls command failed to start") + /// .wait() /// .await /// .expect("ls command failed to run") /// } /// ``` pub fn spawn(&mut self) -> io::Result { imp::spawn_child(&mut self.std).map(|spawned_child| Child { - child: ChildDropGuard { + child: FusedChild::Child(ChildDropGuard { inner: spawned_child.child, kill_on_drop: self.kill_on_drop, - }, + }), stdin: spawned_child.stdin.map(|inner| ChildStdin { inner }), stdout: spawned_child.stdout.map(|inner| ChildStdout { inner }), stderr: spawned_child.stderr.map(|inner| ChildStderr { inner }), @@ -615,7 +615,7 @@ impl Command { child.stdout.take(); child.stderr.take(); - child.await + child.wait().await } } @@ -725,12 +725,16 @@ where } } +/// Keeps track of the exit status of a child process without worrying about +/// polling the underlying futures even after they have completed. +#[derive(Debug)] +enum FusedChild { + Child(ChildDropGuard), + Done(ExitStatus), +} + /// Representation of a child process spawned onto an event loop. /// -/// This type is also a future which will yield the `ExitStatus` of the -/// underlying child process. A `Child` here also provides access to information -/// like the OS-assigned identifier and the stdio streams. -/// /// # Caveats /// Similar to the behavior to the standard library, and unlike the futures /// paradigm of dropping-implies-cancellation, a spawned process will, by @@ -739,10 +743,9 @@ where /// The `Command::kill_on_drop` method can be used to modify this behavior /// and kill the child process if the `Child` wrapper is dropped before it /// has exited. -#[must_use = "futures do nothing unless polled"] #[derive(Debug)] pub struct Child { - child: ChildDropGuard, + child: FusedChild, /// The handle for writing to the child's standard input (stdin), if it has /// been captured. @@ -758,9 +761,34 @@ pub struct Child { } impl Child { - /// Returns the OS-assigned process identifier associated with this child. - pub fn id(&self) -> u32 { - self.child.inner.id() + /// Returns the OS-assigned process identifier associated with this child + /// while it is still running. + /// + /// Once the child has been polled to completion this will return `None`. + /// This is done to avoid confusion on platforms like Unix where the OS + /// identifier could be reused once the process has completed. + pub fn id(&self) -> Option { + match &self.child { + FusedChild::Child(child) => Some(child.inner.id()), + FusedChild::Done(_) => None, + } + } + + /// Attempts to force the child to exit, but does not wait for the request + /// to take effect. + /// + /// On Unix platforms, this is the equivalent to sending a SIGKILL. Note + /// that on Unix platforms it is possible for a zombie process to remain + /// after a kill is sent; to avoid this, the caller should ensure that either + /// `child.wait().await` or `child.try_wait()` is invoked successfully. + pub fn start_kill(&mut self) -> io::Result<()> { + match &mut self.child { + FusedChild::Child(child) => child.kill(), + FusedChild::Done(_) => Err(io::Error::new( + io::ErrorKind::InvalidInput, + "invalid argument: can't kill an exited process", + )), + } } /// Forces the child to exit. @@ -783,35 +811,70 @@ impl Child { /// let mut child = Command::new("sleep").arg("1").spawn().unwrap(); /// tokio::spawn(async move { send.send(()) }); /// tokio::select! { - /// _ = &mut child => {} - /// _ = recv => { - /// &mut child.kill(); - /// // NB: await the child here to avoid a zombie process on Unix platforms - /// child.await.unwrap(); - /// } + /// _ = child.wait() => {} + /// _ = recv => child.kill().await.expect("kill failed"), /// } /// } - - pub fn kill(&mut self) -> io::Result<()> { - self.child.kill() - } - - #[doc(hidden)] - #[deprecated(note = "please use `child.stdin` instead")] - pub fn stdin(&mut self) -> &mut Option { - &mut self.stdin + /// ``` + pub async fn kill(&mut self) -> io::Result<()> { + self.start_kill()?; + self.wait().await?; + Ok(()) } - #[doc(hidden)] - #[deprecated(note = "please use `child.stdout` instead")] - pub fn stdout(&mut self) -> &mut Option { - &mut self.stdout + /// Waits for the child to exit completely, returning the status that it + /// exited with. This function will continue to have the same return value + /// after it has been called at least once. + /// + /// The stdin handle to the child process, if any, will be closed + /// before waiting. This helps avoid deadlock: it ensures that the + /// child does not block waiting for input from the parent, while + /// the parent waits for the child to exit. + pub async fn wait(&mut self) -> io::Result { + match &mut self.child { + FusedChild::Done(exit) => Ok(*exit), + FusedChild::Child(child) => { + let ret = child.await; + + if let Ok(exit) = ret { + self.child = FusedChild::Done(exit); + } + + ret + } + } } - #[doc(hidden)] - #[deprecated(note = "please use `child.stderr` instead")] - pub fn stderr(&mut self) -> &mut Option { - &mut self.stderr + /// Attempts to collect the exit status of the child if it has already + /// exited. + /// + /// This function will not block the calling thread and will only + /// check to see if the child process has exited or not. If the child has + /// exited then on Unix the process ID is reaped. This function is + /// guaranteed to repeatedly return a successful exit status so long as the + /// child has already exited. + /// + /// If the child has exited, then `Ok(Some(status))` is returned. If the + /// exit status is not available at this time then `Ok(None)` is returned. + /// If an error occurs, then that error is returned. + /// + /// Note that unlike `wait`, this function will not attempt to drop stdin, + /// nor will it wake the current task if the child exits. + pub fn try_wait(&mut self) -> io::Result> { + match &mut self.child { + FusedChild::Done(exit) => Ok(Some(*exit)), + FusedChild::Child(guard) => { + let ret = guard.inner.try_wait(); + + if let Ok(Some(exit)) = ret { + // Avoid the overhead of trying to kill a reaped process + guard.kill_on_drop = false; + self.child = FusedChild::Done(exit); + } + + ret + } + } } /// Returns a future that will resolve to an `Output`, containing the exit @@ -845,7 +908,7 @@ impl Child { let stdout_fut = read_to_end(self.stdout.take()); let stderr_fut = read_to_end(self.stderr.take()); - let (status, stdout, stderr) = try_join3(self, stdout_fut, stderr_fut).await?; + let (status, stdout, stderr) = try_join3(self.wait(), stdout_fut, stderr_fut).await?; Ok(Output { status, @@ -855,14 +918,6 @@ impl Child { } } -impl Future for Child { - type Output = io::Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - Pin::new(&mut self.child).poll(cx) - } -} - /// The standard input stream for spawned children. /// /// This type implements the `AsyncWrite` trait to pass data to the stdin handle of diff --git a/tokio/src/process/unix/mod.rs b/tokio/src/process/unix/mod.rs index c25d98974ae..a8df74a3345 100644 --- a/tokio/src/process/unix/mod.rs +++ b/tokio/src/process/unix/mod.rs @@ -117,6 +117,10 @@ impl Child { pub(crate) fn id(&self) -> u32 { self.inner.id() } + + pub(crate) fn try_wait(&mut self) -> io::Result> { + self.inner.inner_mut().try_wait() + } } impl Kill for Child { diff --git a/tokio/src/process/unix/reap.rs b/tokio/src/process/unix/reap.rs index 8963805afe3..c51a20b9c01 100644 --- a/tokio/src/process/unix/reap.rs +++ b/tokio/src/process/unix/reap.rs @@ -63,7 +63,7 @@ where self.inner.as_ref().expect("inner has gone away") } - fn inner_mut(&mut self) -> &mut W { + pub(crate) fn inner_mut(&mut self) -> &mut W { self.inner.as_mut().expect("inner has gone away") } } diff --git a/tokio/src/process/windows.rs b/tokio/src/process/windows.rs index cbe2fa7596f..1fbdee21d6d 100644 --- a/tokio/src/process/windows.rs +++ b/tokio/src/process/windows.rs @@ -24,20 +24,15 @@ use mio_named_pipes::NamedPipe; use std::fmt; use std::future::Future; use std::io; -use std::os::windows::prelude::*; -use std::os::windows::process::ExitStatusExt; +use std::os::windows::prelude::{AsRawHandle, FromRawHandle, IntoRawHandle}; use std::pin::Pin; use std::process::{Child as StdChild, Command as StdCommand, ExitStatus}; use std::ptr; use std::task::Context; use std::task::Poll; -use winapi::shared::minwindef::FALSE; -use winapi::shared::winerror::WAIT_TIMEOUT; use winapi::um::handleapi::INVALID_HANDLE_VALUE; -use winapi::um::processthreadsapi::GetExitCodeProcess; -use winapi::um::synchapi::WaitForSingleObject; use winapi::um::threadpoollegacyapiset::UnregisterWaitEx; -use winapi::um::winbase::{RegisterWaitForSingleObject, INFINITE, WAIT_OBJECT_0}; +use winapi::um::winbase::{RegisterWaitForSingleObject, INFINITE}; use winapi::um::winnt::{BOOLEAN, HANDLE, PVOID, WT_EXECUTEINWAITTHREAD, WT_EXECUTEONLYONCE}; #[must_use = "futures do nothing unless polled"] @@ -86,6 +81,10 @@ impl Child { pub(crate) fn id(&self) -> u32 { self.child.id() } + + pub(crate) fn try_wait(&mut self) -> io::Result> { + self.child.try_wait() + } } impl Kill for Child { @@ -106,11 +105,11 @@ impl Future for Child { Poll::Ready(Err(_)) => panic!("should not be canceled"), Poll::Pending => return Poll::Pending, } - let status = try_wait(&inner.child)?.expect("not ready yet"); + let status = inner.try_wait()?.expect("not ready yet"); return Poll::Ready(Ok(status)); } - if let Some(e) = try_wait(&inner.child)? { + if let Some(e) = inner.try_wait()? { return Poll::Ready(Ok(e)); } let (tx, rx) = oneshot::channel(); @@ -157,23 +156,6 @@ unsafe extern "system" fn callback(ptr: PVOID, _timer_fired: BOOLEAN) { let _ = complete.take().unwrap().send(()); } -pub(crate) fn try_wait(child: &StdChild) -> io::Result> { - unsafe { - match WaitForSingleObject(child.as_raw_handle(), 0) { - WAIT_OBJECT_0 => {} - WAIT_TIMEOUT => return Ok(None), - _ => return Err(io::Error::last_os_error()), - } - let mut status = 0; - let rc = GetExitCodeProcess(child.as_raw_handle(), &mut status); - if rc == FALSE { - Err(io::Error::last_os_error()) - } else { - Ok(Some(ExitStatus::from_raw(status))) - } - } -} - pub(crate) type ChildStdin = PollEvented; pub(crate) type ChildStdout = PollEvented; pub(crate) type ChildStderr = PollEvented; diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index 47895fcf477..633021ededf 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -47,6 +47,9 @@ struct Inner { // Maximum number of threads thread_cap: usize, + + // Customizable wait timeout + keep_alive: Duration, } struct Shared { @@ -91,6 +94,10 @@ where impl BlockingPool { pub(crate) fn new(builder: &Builder, thread_cap: usize) -> BlockingPool { let (shutdown_tx, shutdown_rx) = shutdown::channel(); + #[cfg(feature = "blocking")] + let keep_alive = builder.keep_alive.unwrap_or(KEEP_ALIVE); + #[cfg(not(feature = "blocking"))] + let keep_alive = KEEP_ALIVE; BlockingPool { spawner: Spawner { @@ -110,6 +117,7 @@ impl BlockingPool { after_start: builder.after_start.clone(), before_stop: builder.before_stop.clone(), thread_cap, + keep_alive, }), }, shutdown_rx, @@ -258,7 +266,7 @@ impl Inner { shared.num_idle += 1; while !shared.shutdown { - let lock_result = self.condvar.wait_timeout(shared, KEEP_ALIVE).unwrap(); + let lock_result = self.condvar.wait_timeout(shared, self.keep_alive).unwrap(); shared = lock_result.0; let timeout_result = lock_result.1; diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index db01cf5871e..ed2cd251c35 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -4,6 +4,8 @@ use crate::runtime::shell::Shell; use crate::runtime::{blocking, io, time, Callback, Runtime, Spawner}; use std::fmt; +#[cfg(feature = "blocking")] +use std::time::Duration; /// Builds Tokio Runtime with custom configuration values. /// @@ -65,6 +67,11 @@ pub struct Builder { /// To run before each worker thread stops pub(super) before_stop: Option, + + #[cfg(feature = "blocking")] + #[cfg_attr(docsrs, doc(cfg(feature = "blocking")))] + /// Customizable keep alive timeout for BlockingPool + pub(super) keep_alive: Option, } pub(crate) type ThreadNameFn = std::sync::Arc String + Send + Sync + 'static>; @@ -108,6 +115,9 @@ impl Builder { // No worker thread callbacks after_start: None, before_stop: None, + + #[cfg(feature = "blocking")] + keep_alive: None, } } @@ -375,6 +385,30 @@ impl Builder { blocking_pool, }) } + + #[cfg(feature = "blocking")] + #[cfg_attr(docsrs, doc(cfg(feature = "blocking")))] + /// Sets a custom timeout for a thread in the blocking pool. + /// + /// By default, the timeout for a thread is set to 10 seconds. This can + /// be overriden using .thread_keep_alive(). + /// + /// # Example + /// + /// ``` + /// # use tokio::runtime; + /// # use std::time::Duration; + /// + /// # pub fn main() { + /// let rt = runtime::Builder::new() + /// .thread_keep_alive(Duration::from_millis(100)) + /// .build(); + /// # } + /// ``` + pub fn thread_keep_alive(&mut self, duration: Duration) -> &mut Self { + self.keep_alive = Some(duration); + self + } } cfg_io_driver! { diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index 9d26446bf7e..bec0ecd5949 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -10,14 +10,14 @@ //! * A **timer** for scheduling work to run after a set period of time. //! //! Tokio's [`Runtime`] bundles all of these services as a single type, allowing -//! them to be started, shut down, and configured together. However, most -//! applications won't need to use [`Runtime`] directly. Instead, they can +//! them to be started, shut down, and configured together. However, often +//! it is not required to configure a [`Runtime`] manually, and user may just //! use the [`tokio::main`] attribute macro, which creates a [`Runtime`] under //! the hood. //! //! # Usage //! -//! Most applications will use the [`tokio::main`] attribute macro. +//! When no fine tuning is required, the [`tokio::main`] attribute macro can be used. //! //! ```no_run //! use tokio::net::TcpListener; diff --git a/tokio/src/runtime/task/join.rs b/tokio/src/runtime/task/join.rs index 3c4aabb2e84..ae776509723 100644 --- a/tokio/src/runtime/task/join.rs +++ b/tokio/src/runtime/task/join.rs @@ -45,6 +45,71 @@ doc_rt_core! { /// # } /// ``` /// + /// The generic parameter `T` in `JoinHandle` is the return type of the spawned task. + /// If the return value is an i32, the join handle has type `JoinHandle`: + /// + /// ``` + /// use tokio::task; + /// + /// # async fn doc() { + /// let join_handle: task::JoinHandle = task::spawn(async { + /// 5 + 3 + /// }); + /// # } + /// + /// ``` + /// + /// If the task does not have a return value, the join handle has type `JoinHandle<()>`: + /// + /// ``` + /// use tokio::task; + /// + /// # async fn doc() { + /// let join_handle: task::JoinHandle<()> = task::spawn(async { + /// println!("I return nothing."); + /// }); + /// # } + /// ``` + /// + /// Note that `handle.await` doesn't give you the return type directly. It is wrapped in a + /// `Result` because panics in the spawned task are caught by Tokio. The `?` operator has + /// to be double chained to extract the returned value: + /// + /// ``` + /// use tokio::task; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let join_handle: task::JoinHandle> = tokio::spawn(async { + /// Ok(5 + 3) + /// }); + /// + /// let result = join_handle.await??; + /// assert_eq!(result, 8); + /// Ok(()) + /// } + /// ``` + /// + /// If the task panics, the error is a [`JoinError`] that contains the panic: + /// + /// ``` + /// use tokio::task; + /// use std::io; + /// use std::panic; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let join_handle: task::JoinHandle> = tokio::spawn(async { + /// panic!("boom"); + /// }); + /// + /// let err = join_handle.await.unwrap_err(); + /// assert!(err.is_panic()); + /// Ok(()) + /// } + /// + /// ``` /// Child being detached and outliving its parent: /// /// ```no_run @@ -75,6 +140,7 @@ doc_rt_core! { /// [`task::spawn`]: crate::task::spawn() /// [`task::spawn_blocking`]: crate::task::spawn_blocking /// [`std::thread::JoinHandle`]: std::thread::JoinHandle + /// [`JoinError`]: crate::task::JoinError pub struct JoinHandle { raw: Option, _p: PhantomData, @@ -91,6 +157,44 @@ impl JoinHandle { _p: PhantomData, } } + + /// Abort the task associated with the handle. + /// + /// Awaiting a cancelled task might complete as usual if the task was + /// already completed at the time it was cancelled, but most likely it + /// will complete with a `Err(JoinError::Cancelled)`. + /// + /// ```rust + /// use tokio::time; + /// + /// #[tokio::main] + /// async fn main() { + /// let mut handles = Vec::new(); + /// + /// handles.push(tokio::spawn(async { + /// time::delay_for(time::Duration::from_secs(10)).await; + /// true + /// })); + /// + /// handles.push(tokio::spawn(async { + /// time::delay_for(time::Duration::from_secs(10)).await; + /// false + /// })); + /// + /// for handle in &handles { + /// handle.abort(); + /// } + /// + /// for handle in handles { + /// assert!(handle.await.unwrap_err().is_cancelled()); + /// } + /// } + /// ``` + pub fn abort(&self) { + if let Some(raw) = self.raw { + raw.shutdown(); + } + } } impl Unpin for JoinHandle {} diff --git a/tokio/src/stream/mod.rs b/tokio/src/stream/mod.rs index bc6eea23969..a2eef57a963 100644 --- a/tokio/src/stream/mod.rs +++ b/tokio/src/stream/mod.rs @@ -1,8 +1,57 @@ //! Stream utilities for Tokio. //! -//! A `Stream` is an asynchronous sequence of values. It can be thought of as an asynchronous version of the standard library's `Iterator` trait. +//! A `Stream` is an asynchronous sequence of values. It can be thought of as +//! an asynchronous version of the standard library's `Iterator` trait. //! -//! This module provides helpers to work with them. +//! This module provides helpers to work with them. For examples of usage and a more in-depth +//! description of streams you can also refer to the [streams +//! tutorial](https://tokio.rs/tokio/tutorial/streams) on the tokio website. +//! +//! # Iterating over a Stream +//! +//! Due to similarities with the standard library's `Iterator` trait, some new +//! users may assume that they can use `for in` syntax to iterate over a +//! `Stream`, but this is unfortunately not possible. Instead, you can use a +//! `while let` loop as follows: +//! +//! ```rust +//! use tokio::stream::{self, StreamExt}; +//! +//! #[tokio::main] +//! async fn main() { +//! let mut stream = stream::iter(vec![0, 1, 2]); +//! +//! while let Some(value) = stream.next().await { +//! println!("Got {}", value); +//! } +//! } +//! ``` +//! +//! # Returning a Stream from a function +//! +//! A common way to stream values from a function is to pass in the sender +//! half of a channel and use the receiver as the stream. This requires awaiting +//! both futures to ensure progress is made. Another alternative is the +//! [async-stream] crate, which contains macros that provide a `yield` keyword +//! and allow you to return an `impl Stream`. +//! +//! [async-stream]: https://docs.rs/async-stream +//! +//! # Conversion to and from AsyncRead/AsyncWrite +//! +//! It is often desirable to convert a `Stream` into an [`AsyncRead`], +//! especially when dealing with plaintext formats streamed over the network. +//! The opposite conversion from an [`AsyncRead`] into a `Stream` is also +//! another commonly required feature. To enable these conversions, +//! [`tokio-util`] provides the [`StreamReader`] and [`ReaderStream`] +//! types when the io feature is enabled. +//! +//! [tokio-util]: https://docs.rs/tokio-util/0.3/tokio_util/codec/index.html +//! [`tokio::io`]: crate::io +//! [`AsyncRead`]: crate::io::AsyncRead +//! [`AsyncWrite`]: crate::io::AsyncWrite +//! [`ReaderStream`]: https://docs.rs/tokio-util/0.4/tokio_util/io/struct.ReaderStream.html +//! [`StreamReader`]: https://docs.rs/tokio-util/0.4/tokio_util/io/struct.StreamReader.html mod all; use all::AllFuture; diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index 2810dad0d7b..c7ecd9fc6a3 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -184,8 +184,6 @@ impl Receiver { /// sync_code.join().unwrap() /// } /// ``` - #[cfg(feature = "rt-core")] - #[cfg_attr(docsrs, doc(cfg(feature = "rt-core")))] pub fn blocking_recv(&mut self) -> Option { let mut enter_handle = crate::runtime::enter::enter(false); enter_handle.block_on(self.recv()).unwrap() @@ -456,8 +454,6 @@ impl Sender { /// sync_code.join().unwrap() /// } /// ``` - #[cfg(feature = "rt-core")] - #[cfg_attr(docsrs, doc(cfg(feature = "rt-core")))] pub fn blocking_send(&mut self, value: T) -> Result<(), SendError> { let mut enter_handle = crate::runtime::enter::enter(false); enter_handle.block_on(self.send(value)).unwrap() diff --git a/tokio/src/sync/notify.rs b/tokio/src/sync/notify.rs index def704f2e13..4321c974608 100644 --- a/tokio/src/sync/notify.rs +++ b/tokio/src/sync/notify.rs @@ -19,20 +19,20 @@ type WaitList = LinkedList::Target>; /// another task to perform an operation. /// /// `Notify` can be thought of as a [`Semaphore`] starting with 0 permits. -/// [`notified().await`] waits for a permit to become available, and [`notify()`] +/// [`notified().await`] waits for a permit to become available, and [`notify_one()`] /// sets a permit **if there currently are no available permits**. /// /// The synchronization details of `Notify` are similar to /// [`thread::park`][park] and [`Thread::unpark`][unpark] from std. A [`Notify`] /// value contains a single permit. [`notified().await`] waits for the permit to -/// be made available, consumes the permit, and resumes. [`notify()`] sets the +/// be made available, consumes the permit, and resumes. [`notify_one()`] sets the /// permit, waking a pending task if there is one. /// -/// If `notify()` is called **before** `notified().await`, then the next call to +/// If `notify_one()` is called **before** `notified().await`, then the next call to /// `notified().await` will complete immediately, consuming the permit. Any /// subsequent calls to `notified().await` will wait for a new permit. /// -/// If `notify()` is called **multiple** times before `notified().await`, only a +/// If `notify_one()` is called **multiple** times before `notified().await`, only a /// **single** permit is stored. The next call to `notified().await` will /// complete immediately, but the one after will wait for a new permit. /// @@ -55,7 +55,7 @@ type WaitList = LinkedList::Target>; /// }); /// /// println!("sending notification"); -/// notify.notify(); +/// notify.notify_one(); /// } /// ``` /// @@ -78,7 +78,7 @@ type WaitList = LinkedList::Target>; /// .push_back(value); /// /// // Notify the consumer a value is available -/// self.notify.notify(); +/// self.notify.notify_one(); /// } /// /// pub async fn recv(&self) -> T { @@ -98,7 +98,7 @@ type WaitList = LinkedList::Target>; /// [park]: std::thread::park /// [unpark]: std::thread::Thread::unpark /// [`notified().await`]: Notify::notified() -/// [`notify()`]: Notify::notify() +/// [`notify_one()`]: Notify::notify_one() /// [`Semaphore`]: crate::sync::Semaphore #[derive(Debug)] pub struct Notify { @@ -173,11 +173,11 @@ impl Notify { /// Wait for a notification. /// /// Each `Notify` value holds a single permit. If a permit is available from - /// an earlier call to [`notify()`], then `notified().await` will complete + /// an earlier call to [`notify_one()`], then `notified().await` will complete /// immediately, consuming that permit. Otherwise, `notified().await` waits - /// for a permit to be made available by the next call to `notify()`. + /// for a permit to be made available by the next call to `notify_one()`. /// - /// [`notify()`]: Notify::notify + /// [`notify_one()`]: Notify::notify_one /// /// # Examples /// @@ -196,7 +196,7 @@ impl Notify { /// }); /// /// println!("sending notification"); - /// notify.notify(); + /// notify.notify_one(); /// } /// ``` pub async fn notified(&self) { @@ -218,10 +218,10 @@ impl Notify { /// If a task is currently waiting, that task is notified. Otherwise, a /// permit is stored in this `Notify` value and the **next** call to /// [`notified().await`] will complete immediately consuming the permit made - /// available by this call to `notify()`. + /// available by this call to `notify_one()`. /// /// At most one permit may be stored by `Notify`. Many sequential calls to - /// `notify` will result in a single permit being stored. The next call to + /// `notify_one` will result in a single permit being stored. The next call to /// `notified().await` will complete immediately, but the one after that /// will wait. /// @@ -244,10 +244,10 @@ impl Notify { /// }); /// /// println!("sending notification"); - /// notify.notify(); + /// notify.notify_one(); /// } /// ``` - pub fn notify(&self) { + pub fn notify_one(&self) { // Load the current state let mut curr = self.state.load(SeqCst); @@ -490,7 +490,7 @@ impl Drop for Notified<'_> { // `Notify.state` may be in any of the three states (Empty, Waiting, // Notified). It doesn't actually matter what the atomic is set to // at this point. We hold the lock and will ensure the atomic is in - // the correct state once th elock is dropped. + // the correct state once the lock is dropped. // // Because the atomic state is not checked, at first glance, it may // seem like this routine does not handle the case where the diff --git a/tokio/src/sync/tests/loom_notify.rs b/tokio/src/sync/tests/loom_notify.rs index 60981d4669a..79a5bf89b74 100644 --- a/tokio/src/sync/tests/loom_notify.rs +++ b/tokio/src/sync/tests/loom_notify.rs @@ -16,7 +16,7 @@ fn notify_one() { }); }); - tx.notify(); + tx.notify_one(); th.join().unwrap(); }); } @@ -34,12 +34,12 @@ fn notify_multi() { ths.push(thread::spawn(move || { block_on(async { notify.notified().await; - notify.notify(); + notify.notify_one(); }) })); } - notify.notify(); + notify.notify_one(); for th in ths.drain(..) { th.join().unwrap(); @@ -67,7 +67,7 @@ fn notify_drop() { block_on(poll_fn(|cx| { if recv.as_mut().poll(cx).is_ready() { - rx1.notify(); + rx1.notify_one(); } Poll::Ready(()) })); @@ -77,12 +77,12 @@ fn notify_drop() { block_on(async { rx2.notified().await; // Trigger second notification - rx2.notify(); + rx2.notify_one(); rx2.notified().await; }); }); - notify.notify(); + notify.notify_one(); th1.join().unwrap(); th2.join().unwrap(); diff --git a/tokio/src/time/driver/handle.rs b/tokio/src/time/driver/handle.rs index 38b1761c8e4..f79f62b46f7 100644 --- a/tokio/src/time/driver/handle.rs +++ b/tokio/src/time/driver/handle.rs @@ -20,6 +20,17 @@ impl Handle { /// # Panics /// /// This function panics if there is no current timer set. + /// + /// It can be triggered when `Builder::enable_time()` or + /// `Builder::enable_all()` are not included in the builder. + /// + /// It can also panic whenever a timer is created outside of a Tokio + /// runtime. That is why `rt.block_on(delay_for(...))` will panic, + /// since the function is executed outside of the runtime. + /// Whereas `rt.block_on(async {delay_for(...).await})` doesn't + /// panic. And this is because wrapping the function on an async makes it + /// lazy, and so gets executed inside the runtime successfuly without + /// panicking. pub(crate) fn current() -> Self { context::time_handle() .expect("there is no timer running, must be called from the context of Tokio runtime") diff --git a/tokio/tests/async_send_sync.rs b/tokio/tests/async_send_sync.rs index 45d11bd441a..f1eed0e410d 100644 --- a/tokio/tests/async_send_sync.rs +++ b/tokio/tests/async_send_sync.rs @@ -152,7 +152,6 @@ async_assert_fn!(tokio::fs::DirEntry::file_type(_): Send & Sync); async_assert_fn!(tokio::fs::File::open(&str): Send & Sync); async_assert_fn!(tokio::fs::File::create(&str): Send & Sync); -async_assert_fn!(tokio::fs::File::seek(_, std::io::SeekFrom): Send & Sync); async_assert_fn!(tokio::fs::File::sync_all(_): Send & Sync); async_assert_fn!(tokio::fs::File::sync_data(_): Send & Sync); async_assert_fn!(tokio::fs::File::set_len(_, u64): Send & Sync); diff --git a/tokio/tests/process_issue_2174.rs b/tokio/tests/process_issue_2174.rs index b5a63ceee83..4493d54ab1a 100644 --- a/tokio/tests/process_issue_2174.rs +++ b/tokio/tests/process_issue_2174.rs @@ -39,8 +39,7 @@ async fn issue_2174() { time::delay_for(Duration::from_secs(1)).await; // Kill the child process. - child.kill().unwrap(); - let _ = child.await; + child.kill().await.unwrap(); assert_err!(handle.await); } diff --git a/tokio/tests/process_issue_42.rs b/tokio/tests/process_issue_42.rs index aa70af3b56e..569c122e36a 100644 --- a/tokio/tests/process_issue_42.rs +++ b/tokio/tests/process_issue_42.rs @@ -18,14 +18,16 @@ async fn issue_42() { let join_handles = (0..10usize).map(|_| { task::spawn(async { let processes = (0..10usize).map(|i| { - Command::new("echo") + let mut child = Command::new("echo") .arg(format!("I am spawned process #{}", i)) .stdin(Stdio::null()) .stdout(Stdio::null()) .stderr(Stdio::null()) .kill_on_drop(true) .spawn() - .unwrap() + .unwrap(); + + async move { child.wait().await } }); join_all(processes).await; diff --git a/tokio/tests/process_smoke.rs b/tokio/tests/process_smoke.rs index d16d1d72c1b..fae5793fab7 100644 --- a/tokio/tests/process_smoke.rs +++ b/tokio/tests/process_smoke.rs @@ -18,12 +18,17 @@ async fn simple() { let mut child = cmd.arg("exit 2").spawn().unwrap(); - let id = child.id(); + let id = child.id().expect("missing id"); assert!(id > 0); - let status = assert_ok!((&mut child).await); + let status = assert_ok!(child.wait().await); assert_eq!(status.code(), Some(2)); - assert_eq!(child.id(), id); + // test that the `.wait()` method is fused just like the stdlib + let status = assert_ok!(child.wait().await); + assert_eq!(status.code(), Some(2)); + + // Can't get id after process has exited + assert_eq!(child.id(), None); drop(child.kill()); } diff --git a/tokio/tests/sync_notify.rs b/tokio/tests/sync_notify.rs index be39ce32dfd..8c70fe39894 100644 --- a/tokio/tests/sync_notify.rs +++ b/tokio/tests/sync_notify.rs @@ -13,7 +13,7 @@ fn notify_notified_one() { let notify = Notify::new(); let mut notified = spawn(async { notify.notified().await }); - notify.notify(); + notify.notify_one(); assert_ready!(notified.poll()); } @@ -24,7 +24,7 @@ fn notified_one_notify() { assert_pending!(notified.poll()); - notify.notify(); + notify.notify_one(); assert!(notified.is_woken()); assert_ready!(notified.poll()); } @@ -38,7 +38,7 @@ fn notified_multi_notify() { assert_pending!(notified1.poll()); assert_pending!(notified2.poll()); - notify.notify(); + notify.notify_one(); assert!(notified1.is_woken()); assert!(!notified2.is_woken()); @@ -50,7 +50,7 @@ fn notified_multi_notify() { fn notify_notified_multi() { let notify = Notify::new(); - notify.notify(); + notify.notify_one(); let mut notified1 = spawn(async { notify.notified().await }); let mut notified2 = spawn(async { notify.notified().await }); @@ -58,7 +58,7 @@ fn notify_notified_multi() { assert_ready!(notified1.poll()); assert_pending!(notified2.poll()); - notify.notify(); + notify.notify_one(); assert!(notified2.is_woken()); assert_ready!(notified2.poll()); @@ -76,7 +76,7 @@ fn notified_drop_notified_notify() { assert_pending!(notified2.poll()); - notify.notify(); + notify.notify_one(); assert!(notified2.is_woken()); assert_ready!(notified2.poll()); } @@ -90,7 +90,7 @@ fn notified_multi_notify_drop_one() { assert_pending!(notified1.poll()); assert_pending!(notified2.poll()); - notify.notify(); + notify.notify_one(); assert!(notified1.is_woken()); assert!(!notified2.is_woken());