diff --git a/tokio/src/future/poll_fn.rs b/tokio/src/future/poll_fn.rs index 041e3d70a8d..074d9438eb4 100644 --- a/tokio/src/future/poll_fn.rs +++ b/tokio/src/future/poll_fn.rs @@ -7,13 +7,23 @@ use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; +// This struct is intentionally `!Unpin` when `F` is `!Unpin`. This is to +// mitigate the issue where rust puts noalias on mutable references to the +// `PollFn` type if it is `Unpin`. If the closure has ownership of a future, +// then this "leaks" and the future is affected by noalias too, which we don't +// want. +// +// See this thread for more information: +// +// +// The fact that `PollFn` is not `Unpin` when it shouldn't be is tested in +// `tests/async_send_sync.rs`. + /// Future for the [`poll_fn`] function. pub struct PollFn { f: F, } -impl Unpin for PollFn {} - /// Creates a new future wrapping around a function returning [`Poll`]. pub fn poll_fn(f: F) -> PollFn where @@ -34,7 +44,17 @@ where { type Output = T; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - (self.f)(cx) + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + // Safety: We never construct a `Pin<&mut F>` anywhere, so accessing `f` + // mutably in an unpinned way is sound. + // + // This use of unsafe cannot be replaced with the pin-project macro + // because: + // * If we put `#[pin]` on the field, then it gives us a `Pin<&mut F>`, + // which we can't use to call the closure. + // * If we don't put `#[pin]` on the field, then it makes `PollFn` be + // unconditionally `Unpin`, which we also don't want. + let me = unsafe { Pin::into_inner_unchecked(self) }; + (me.f)(cx) } } diff --git a/tokio/src/macros/join.rs b/tokio/src/macros/join.rs index 9697936c895..7e85203c0c4 100644 --- a/tokio/src/macros/join.rs +++ b/tokio/src/macros/join.rs @@ -72,8 +72,18 @@ macro_rules! join { // Safety: nothing must be moved out of `futures`. This is to satisfy // the requirement of `Pin::new_unchecked` called below. + // + // We can't use the `pin!` macro for this because `futures` is a tuple + // and the standard library provides no way to pin-project to the fields + // of a tuple. let mut futures = ( $( maybe_done($e), )* ); + // This assignment makes sure that the `poll_fn` closure only has a + // reference to the futures, instead of taking ownership of them. This + // mitigates the issue described in + // + let mut futures = &mut futures; + // Each time the future created by poll_fn is polled, a different future will be polled first // to ensure every future passed to join! gets a chance to make progress even if // one of the futures consumes the whole budget. @@ -106,7 +116,7 @@ macro_rules! join { to_run -= 1; // Extract the future for this branch from the tuple. - let ( $($skip,)* fut, .. ) = &mut futures; + let ( $($skip,)* fut, .. ) = &mut *futures; // Safety: future is stored on the stack above // and never moved. diff --git a/tokio/src/macros/select.rs b/tokio/src/macros/select.rs index f38aee0f20b..7ba04a00768 100644 --- a/tokio/src/macros/select.rs +++ b/tokio/src/macros/select.rs @@ -460,8 +460,18 @@ macro_rules! select { let mut output = { // Safety: Nothing must be moved out of `futures`. This is to // satisfy the requirement of `Pin::new_unchecked` called below. + // + // We can't use the `pin!` macro for this because `futures` is a + // tuple and the standard library provides no way to pin-project to + // the fields of a tuple. let mut futures = ( $( $fut , )+ ); + // This assignment makes sure that the `poll_fn` closure only has a + // reference to the futures, instead of taking ownership of them. + // This mitigates the issue described in + // + let mut futures = &mut futures; + $crate::macros::support::poll_fn(|cx| { // Track if any branch returns pending. If no branch completes // **or** returns pending, this implies that all branches are @@ -497,7 +507,7 @@ macro_rules! select { // Extract the future for this branch from the // tuple - let ( $($skip,)* fut, .. ) = &mut futures; + let ( $($skip,)* fut, .. ) = &mut *futures; // Safety: future is stored on the stack above // and never moved. diff --git a/tokio/src/macros/try_join.rs b/tokio/src/macros/try_join.rs index c80395c9097..597cd5df021 100644 --- a/tokio/src/macros/try_join.rs +++ b/tokio/src/macros/try_join.rs @@ -118,8 +118,18 @@ macro_rules! try_join { // Safety: nothing must be moved out of `futures`. This is to satisfy // the requirement of `Pin::new_unchecked` called below. + // + // We can't use the `pin!` macro for this because `futures` is a tuple + // and the standard library provides no way to pin-project to the fields + // of a tuple. let mut futures = ( $( maybe_done($e), )* ); + // This assignment makes sure that the `poll_fn` closure only has a + // reference to the futures, instead of taking ownership of them. This + // mitigates the issue described in + // + let mut futures = &mut futures; + // Each time the future created by poll_fn is polled, a different future will be polled first // to ensure every future passed to join! gets a chance to make progress even if // one of the futures consumes the whole budget. @@ -152,7 +162,7 @@ macro_rules! try_join { to_run -= 1; // Extract the future for this branch from the tuple. - let ( $($skip,)* fut, .. ) = &mut futures; + let ( $($skip,)* fut, .. ) = &mut *futures; // Safety: future is stored on the stack above // and never moved. diff --git a/tokio/tests/async_send_sync.rs b/tokio/tests/async_send_sync.rs index ecfa9e27b85..2ef02c7e8d8 100644 --- a/tokio/tests/async_send_sync.rs +++ b/tokio/tests/async_send_sync.rs @@ -136,6 +136,21 @@ macro_rules! cfg_not_wasi { } } +// Manualy re-implementation of `async_assert_fn` for `poll_fn`. The macro +// doesn't work for this particular case because constructing the closure +// is too complicated. +const _: fn() = || { + let pinned = std::marker::PhantomPinned; + let f = tokio::macros::support::poll_fn(move |_| { + // Use `pinned` to take ownership of it. + let _ = &pinned; + std::task::Poll::Pending::<()> + }); + require_send(&f); + require_sync(&f); + AmbiguousIfUnpin::some_item(&f); +}; + cfg_not_wasi! { mod fs { use super::*; diff --git a/tokio/tests/macros_join.rs b/tokio/tests/macros_join.rs index 0efbd83c3ea..c289c00679a 100644 --- a/tokio/tests/macros_join.rs +++ b/tokio/tests/macros_join.rs @@ -3,6 +3,7 @@ use std::sync::Arc; #[cfg(tokio_wasm_not_wasi)] +#[cfg(target_pointer_width = "64")] use wasm_bindgen_test::wasm_bindgen_test as test; #[cfg(tokio_wasm_not_wasi)] use wasm_bindgen_test::wasm_bindgen_test as maybe_tokio_test; @@ -64,6 +65,7 @@ async fn two_await() { } #[test] +#[cfg(target_pointer_width = "64")] fn join_size() { use futures::future; use std::mem; @@ -72,14 +74,14 @@ fn join_size() { let ready = future::ready(0i32); tokio::join!(ready) }; - assert_eq!(mem::size_of_val(&fut), 20); + assert_eq!(mem::size_of_val(&fut), 32); let fut = async { let ready1 = future::ready(0i32); let ready2 = future::ready(0i32); tokio::join!(ready1, ready2) }; - assert_eq!(mem::size_of_val(&fut), 32); + assert_eq!(mem::size_of_val(&fut), 48); } async fn non_cooperative_task(permits: Arc) -> usize { diff --git a/tokio/tests/macros_select.rs b/tokio/tests/macros_select.rs index ede8390b70d..60f3738c991 100644 --- a/tokio/tests/macros_select.rs +++ b/tokio/tests/macros_select.rs @@ -207,6 +207,7 @@ async fn nested() { } #[maybe_tokio_test] +#[cfg(target_pointer_width = "64")] async fn struct_size() { use futures::future; use std::mem; @@ -219,7 +220,7 @@ async fn struct_size() { } }; - assert!(mem::size_of_val(&fut) <= 32); + assert_eq!(mem::size_of_val(&fut), 40); let fut = async { let ready1 = future::ready(0i32); @@ -231,7 +232,7 @@ async fn struct_size() { } }; - assert!(mem::size_of_val(&fut) <= 40); + assert_eq!(mem::size_of_val(&fut), 48); let fut = async { let ready1 = future::ready(0i32); @@ -245,7 +246,7 @@ async fn struct_size() { } }; - assert!(mem::size_of_val(&fut) <= 48); + assert_eq!(mem::size_of_val(&fut), 56); } #[maybe_tokio_test] diff --git a/tokio/tests/macros_try_join.rs b/tokio/tests/macros_try_join.rs index 681b1b7bc68..209516bb998 100644 --- a/tokio/tests/macros_try_join.rs +++ b/tokio/tests/macros_try_join.rs @@ -88,6 +88,7 @@ async fn err_abort_early() { } #[test] +#[cfg(target_pointer_width = "64")] fn join_size() { use futures::future; use std::mem; @@ -96,14 +97,14 @@ fn join_size() { let ready = future::ready(ok(0i32)); tokio::try_join!(ready) }; - assert_eq!(mem::size_of_val(&fut), 20); + assert_eq!(mem::size_of_val(&fut), 32); let fut = async { let ready1 = future::ready(ok(0i32)); let ready2 = future::ready(ok(0i32)); tokio::try_join!(ready1, ready2) }; - assert_eq!(mem::size_of_val(&fut), 32); + assert_eq!(mem::size_of_val(&fut), 48); } fn ok(val: T) -> Result {