Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

0.3 backports #2432

Merged
merged 15 commits into from May 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion futures-macro/Cargo.toml
Expand Up @@ -23,4 +23,4 @@ autocfg = "1"
proc-macro2 = "1.0"
proc-macro-hack = "0.5.19"
quote = "1.0"
syn = { version = "1.0", features = ["full"] }
syn = { version = "1.0.56", features = ["full"] }
55 changes: 55 additions & 0 deletions futures-macro/src/executor.rs
@@ -0,0 +1,55 @@
use proc_macro::TokenStream;
use proc_macro2::Span;
use quote::{quote, quote_spanned, ToTokens};

pub(crate) fn test(args: TokenStream, item: TokenStream) -> TokenStream {
if !args.is_empty() {
return syn::Error::new_spanned(proc_macro2::TokenStream::from(args), "invalid argument")
.to_compile_error()
.into();
}

let mut input = syn::parse_macro_input!(item as syn::ItemFn);

if input.sig.asyncness.take().is_none() {
return syn::Error::new_spanned(input.sig.fn_token, "Only async functions are supported")
.to_compile_error()
.into();
}

// If type mismatch occurs, the current rustc points to the last statement.
let (last_stmt_start_span, last_stmt_end_span) = {
let mut last_stmt = input
.block
.stmts
.last()
.map(ToTokens::into_token_stream)
.unwrap_or_default()
.into_iter();
// `Span` on stable Rust has a limitation that only points to the first
// token, not the whole tokens. We can work around this limitation by
// using the first/last span of the tokens like
// `syn::Error::new_spanned` does.
let start = last_stmt.next().map_or_else(Span::call_site, |t| t.span());
let end = last_stmt.last().map_or(start, |t| t.span());
(start, end)
};

let path = quote_spanned! {last_stmt_start_span=>
::futures_test::__private
};
let body = &input.block;
input.block.stmts = vec![syn::Stmt::Expr(
syn::parse2(quote_spanned! {last_stmt_end_span=>
#path::block_on(async #body)
})
.unwrap(),
)];

let gen = quote! {
#[::core::prelude::v1::test]
#input
};

gen.into()
}
8 changes: 8 additions & 0 deletions futures-macro/src/lib.rs
Expand Up @@ -14,6 +14,7 @@ extern crate proc_macro;

use proc_macro::TokenStream;

mod executor;
mod join;
mod select;

Expand Down Expand Up @@ -44,3 +45,10 @@ pub fn select_internal(input: TokenStream) -> TokenStream {
pub fn select_biased_internal(input: TokenStream) -> TokenStream {
crate::select::select_biased(input)
}

// TODO: Change this to doc comment once rustdoc bug fixed.
// The `test` attribute.
#[proc_macro_attribute]
pub fn test_internal(input: TokenStream, item: TokenStream) -> TokenStream {
crate::executor::test(input, item)
}
1 change: 1 addition & 0 deletions futures-test/Cargo.toml
Expand Up @@ -18,6 +18,7 @@ futures-io = { version = "0.3.14", path = "../futures-io", default-features = fa
futures-util = { version = "0.3.14", path = "../futures-util", default-features = false }
futures-executor = { version = "0.3.14", path = "../futures-executor", default-features = false }
futures-sink = { version = "0.3.14", path = "../futures-sink", default-features = false }
futures-macro = { version = "=0.3.14", path = "../futures-macro", default-features = false }
pin-utils = { version = "0.1.0", default-features = false }
pin-project = "1.0.1"

Expand Down
26 changes: 26 additions & 0 deletions futures-test/src/lib.rs
Expand Up @@ -16,6 +16,7 @@ compile_error!(
#[cfg(feature = "std")]
pub mod __private {
pub use futures_core::{future, stream, task};
pub use futures_executor::block_on;
pub use std::{
option::Option::{None, Some},
pin::Pin,
Expand Down Expand Up @@ -49,3 +50,28 @@ pub mod io;
mod assert_unmoved;
mod interleave_pending;
mod track_closed;

/// Enables an `async` test function. The generated future will be run to completion with
/// [`futures_executor::block_on`](futures_executor::block_on).
///
/// ```
/// #[futures_test::test]
/// async fn my_test() {
/// let fut = async { true };
/// assert!(fut.await);
/// }
/// ```
///
/// This is equivalent to the following code:
///
/// ```
/// #[test]
/// fn my_test() {
/// futures::executor::block_on(async move {
/// let fut = async { true };
/// assert!(fut.await);
/// })
/// }
/// ```
#[cfg(feature = "std")]
pub use futures_macro::test_internal as test;
185 changes: 185 additions & 0 deletions futures-util/src/abortable.rs
@@ -0,0 +1,185 @@
use crate::task::AtomicWaker;
use alloc::sync::Arc;
use core::fmt;
use core::pin::Pin;
use core::sync::atomic::{AtomicBool, Ordering};
use futures_core::future::Future;
use futures_core::task::{Context, Poll};
use futures_core::Stream;
use pin_project_lite::pin_project;

pin_project! {
/// A future/stream which can be remotely short-circuited using an `AbortHandle`.
#[derive(Debug, Clone)]
#[must_use = "futures/streams do nothing unless you poll them"]
pub struct Abortable<T> {
#[pin]
task: T,
inner: Arc<AbortInner>,
}
}

impl<T> Abortable<T> {
/// Creates a new `Abortable` future/stream using an existing `AbortRegistration`.
/// `AbortRegistration`s can be acquired through `AbortHandle::new`.
///
/// When `abort` is called on the handle tied to `reg` or if `abort` has
/// already been called, the future/stream will complete immediately without making
/// any further progress.
///
/// # Examples:
///
/// Usage with futures:
///
/// ```
/// # futures::executor::block_on(async {
/// use futures::future::{Abortable, AbortHandle, Aborted};
///
/// let (abort_handle, abort_registration) = AbortHandle::new_pair();
/// let future = Abortable::new(async { 2 }, abort_registration);
/// abort_handle.abort();
/// assert_eq!(future.await, Err(Aborted));
/// # });
/// ```
///
/// Usage with streams:
///
/// ```
/// # futures::executor::block_on(async {
/// # use futures::future::{Abortable, AbortHandle};
/// # use futures::stream::{self, StreamExt};
///
/// let (abort_handle, abort_registration) = AbortHandle::new_pair();
/// let mut stream = Abortable::new(stream::iter(vec![1, 2, 3]), abort_registration);
/// abort_handle.abort();
/// assert_eq!(stream.next().await, None);
/// # });
/// ```
pub fn new(task: T, reg: AbortRegistration) -> Self {
Self { task, inner: reg.inner }
}

/// Checks whether the task has been aborted. Note that all this
/// method indicates is whether [`AbortHandle::abort`] was *called*.
/// This means that it will return `true` even if:
/// * `abort` was called after the task had completed.
/// * `abort` was called while the task was being polled - the task may still be running and
/// will not be stopped until `poll` returns.
pub fn is_aborted(&self) -> bool {
self.inner.aborted.load(Ordering::Relaxed)
}
}

/// A registration handle for an `Abortable` task.
/// Values of this type can be acquired from `AbortHandle::new` and are used
/// in calls to `Abortable::new`.
#[derive(Debug)]
pub struct AbortRegistration {
inner: Arc<AbortInner>,
}

/// A handle to an `Abortable` task.
#[derive(Debug, Clone)]
pub struct AbortHandle {
inner: Arc<AbortInner>,
}

impl AbortHandle {
/// Creates an (`AbortHandle`, `AbortRegistration`) pair which can be used
/// to abort a running future or stream.
///
/// This function is usually paired with a call to [`Abortable::new`].
pub fn new_pair() -> (Self, AbortRegistration) {
let inner =
Arc::new(AbortInner { waker: AtomicWaker::new(), aborted: AtomicBool::new(false) });

(Self { inner: inner.clone() }, AbortRegistration { inner })
}
}

// Inner type storing the waker to awaken and a bool indicating that it
// should be aborted.
#[derive(Debug)]
struct AbortInner {
waker: AtomicWaker,
aborted: AtomicBool,
}

/// Indicator that the `Abortable` task was aborted.
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub struct Aborted;

impl fmt::Display for Aborted {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "`Abortable` future has been aborted")
}
}

#[cfg(feature = "std")]
impl std::error::Error for Aborted {}

impl<T> Abortable<T> {
fn try_poll<I>(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
poll: impl Fn(Pin<&mut T>, &mut Context<'_>) -> Poll<I>,
) -> Poll<Result<I, Aborted>> {
// Check if the task has been aborted
if self.is_aborted() {
return Poll::Ready(Err(Aborted));
}

// attempt to complete the task
if let Poll::Ready(x) = poll(self.as_mut().project().task, cx) {
return Poll::Ready(Ok(x));
}

// Register to receive a wakeup if the task is aborted in the future
self.inner.waker.register(cx.waker());

// Check to see if the task was aborted between the first check and
// registration.
// Checking with `is_aborted` which uses `Relaxed` is sufficient because
// `register` introduces an `AcqRel` barrier.
if self.is_aborted() {
return Poll::Ready(Err(Aborted));
}

Poll::Pending
}
}

impl<Fut> Future for Abortable<Fut>
where
Fut: Future,
{
type Output = Result<Fut::Output, Aborted>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.try_poll(cx, |fut, cx| fut.poll(cx))
}
}

impl<St> Stream for Abortable<St>
where
St: Stream,
{
type Item = St::Item;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.try_poll(cx, |stream, cx| stream.poll_next(cx)).map(Result::ok).map(Option::flatten)
}
}

impl AbortHandle {
/// Abort the `Abortable` stream/future associated with this handle.
///
/// Notifies the Abortable task associated with this handle that it
/// should abort. Note that if the task is currently being polled on
/// another thread, it will not immediately stop running. Instead, it will
/// continue to run until its poll method returns.
pub fn abort(&self) {
self.inner.aborted.store(true, Ordering::Relaxed);
self.inner.waker.wake();
}
}