Skip to content

Commit

Permalink
Merge branch 'master' into test-cfg
Browse files Browse the repository at this point in the history
  • Loading branch information
taiki-e committed Sep 30, 2020
2 parents b2cdebb + 1d93f5d commit a52ac49
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 81 deletions.
15 changes: 9 additions & 6 deletions futures-channel/src/oneshot.rs
@@ -1,4 +1,6 @@
//! A channel for sending a single message between asynchronous tasks.
//!
//! This is a single-producer, single-consumer channel.

use alloc::sync::Arc;
use core::fmt;
Expand All @@ -12,7 +14,7 @@ use crate::lock::Lock;

/// A future for a value that will be provided by another asynchronous task.
///
/// This is created by the [`channel`] function.
/// This is created by the [`channel`](channel) function.
#[must_use = "futures do nothing unless you `.await` or poll them"]
#[derive(Debug)]
pub struct Receiver<T> {
Expand All @@ -21,7 +23,7 @@ pub struct Receiver<T> {

/// A means of transmitting a single value to another task.
///
/// This is created by the [`channel`] function.
/// This is created by the [`channel`](channel) function.
#[derive(Debug)]
pub struct Sender<T> {
inner: Arc<Inner<T>>,
Expand Down Expand Up @@ -68,7 +70,9 @@ struct Inner<T> {
tx_task: Lock<Option<Waker>>,
}

/// Creates a new one-shot channel for sending values across asynchronous tasks.
/// Creates a new one-shot channel for sending a single value across asynchronous tasks.
///
/// The channel works for a spsc (single-producer, single-consumer) scheme.
///
/// This function is similar to Rust's channel constructor found in the standard
/// library. Two halves are returned, the first of which is a `Sender` handle,
Expand Down Expand Up @@ -337,14 +341,13 @@ impl<T> Sender<T> {
///
/// If the value is successfully enqueued for the remote end to receive,
/// then `Ok(())` is returned. If the receiving end was dropped before
/// this function was called, however, then `Err` is returned with the value
/// provided.
/// this function was called, however, then `Err(t)` is returned.
pub fn send(self, t: T) -> Result<(), T> {
self.inner.send(t)
}

/// Polls this `Sender` half to detect whether its associated
/// [`Receiver`](Receiver) with has been dropped.
/// [`Receiver`](Receiver) has been dropped.
///
/// # Return values
///
Expand Down
29 changes: 16 additions & 13 deletions futures-util/src/future/future/mod.rs
Expand Up @@ -3,20 +3,21 @@
//! This module contains a number of functions for working with `Future`s,
//! including the `FutureExt` trait which adds methods to `Future` types.

use super::{assert_future, Either};
#[cfg(feature = "alloc")]
use alloc::boxed::Box;
use core::pin::Pin;

use crate::future::{assert_future, Either};
use crate::stream::assert_stream;
use crate::fns::{inspect_fn, into_fn, ok_fn, InspectFn, IntoFn, OkFn};
use crate::never::Never;
#[cfg(feature = "alloc")]
use futures_core::future::{BoxFuture, LocalBoxFuture};
use futures_core::{
future::Future,
stream::Stream,
task::{Context, Poll},
};
use crate::never::Never;
use crate::fns::{OkFn, ok_fn, IntoFn, into_fn, InspectFn, inspect_fn};
use pin_utils::pin_mut;

// Combinators
Expand Down Expand Up @@ -223,7 +224,7 @@ pub trait FutureExt: Future {
B: Future<Output = Self::Output>,
Self: Sized,
{
Either::Left(self)
assert_future::<Self::Output, _>(Either::Left(self))
}

/// Wrap this future in an `Either` future, making it the right-hand variant
Expand Down Expand Up @@ -253,7 +254,7 @@ pub trait FutureExt: Future {
A: Future<Output = Self::Output>,
Self: Sized,
{
Either::Right(self)
assert_future::<Self::Output, _>(Either::Right(self))
}

/// Convert this future into a single element stream.
Expand All @@ -278,7 +279,7 @@ pub trait FutureExt: Future {
where
Self: Sized,
{
IntoStream::new(self)
assert_stream::<Self::Output, _>(IntoStream::new(self))
}

/// Flatten the execution of this future when the output of this
Expand Down Expand Up @@ -342,7 +343,7 @@ pub trait FutureExt: Future {
Self::Output: Stream,
Self: Sized,
{
FlattenStream::new(self)
assert_stream::<<Self::Output as Stream>::Item, _>(FlattenStream::new(self))
}

/// Fuse a future such that `poll` will never again be called once it has
Expand Down Expand Up @@ -431,7 +432,9 @@ pub trait FutureExt: Future {
where
Self: Sized + ::std::panic::UnwindSafe,
{
CatchUnwind::new(self)
assert_future::<Result<Self::Output, Box<dyn std::any::Any + Send>>, _>(CatchUnwind::new(
self,
))
}

/// Create a cloneable handle to this future where all handles will resolve
Expand Down Expand Up @@ -485,7 +488,7 @@ pub trait FutureExt: Future {
Self: Sized,
Self::Output: Clone,
{
Shared::new(self)
assert_future::<Self::Output, _>(Shared::new(self))
}

/// Turn this future into a future that yields `()` on completion and sends
Expand Down Expand Up @@ -515,7 +518,7 @@ pub trait FutureExt: Future {
where
Self: Sized + Send + 'a,
{
Box::pin(self)
assert_future::<Self::Output, _>(Box::pin(self))
}

/// Wrap the future in a Box, pinning it.
Expand All @@ -529,7 +532,7 @@ pub trait FutureExt: Future {
where
Self: Sized + 'a,
{
Box::pin(self)
assert_future::<Self::Output, _>(Box::pin(self))
}

/// Turns a [`Future<Output = T>`](Future) into a
Expand All @@ -538,7 +541,7 @@ pub trait FutureExt: Future {
where
Self: Sized,
{
UnitError::new(self)
assert_future::<Result<Self::Output, ()>, _>(UnitError::new(self))
}

/// Turns a [`Future<Output = T>`](Future) into a
Expand All @@ -547,7 +550,7 @@ pub trait FutureExt: Future {
where
Self: Sized,
{
NeverError::new(self)
assert_future::<Result<Self::Output, Never>, _>(NeverError::new(self))
}

/// A convenience for calling `Future::poll` on `Unpin` future types.
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/future/mod.rs
Expand Up @@ -109,7 +109,7 @@ cfg_target_has_atomic! {

// Just a helper function to ensure the futures we're returning all have the
// right implementations.
fn assert_future<T, F>(future: F) -> F
pub(crate) fn assert_future<T, F>(future: F) -> F
where
F: Future<Output = T>,
{
Expand Down
22 changes: 12 additions & 10 deletions futures-util/src/future/try_future/mod.rs
Expand Up @@ -14,13 +14,13 @@ use futures_core::{
#[cfg(feature = "sink")]
use futures_sink::Sink;

use super::assert_future;
use crate::future::{Map, Inspect};
use crate::fns::{
MapOkFn, map_ok_fn, MapErrFn, map_err_fn, MapOkOrElseFn,
map_ok_or_else_fn, IntoFn, UnwrapOrElseFn, unwrap_or_else_fn, InspectOkFn, inspect_ok_fn, InspectErrFn,
inspect_err_fn, into_fn
inspect_err_fn, inspect_ok_fn, into_fn, map_err_fn, map_ok_fn, map_ok_or_else_fn,
unwrap_or_else_fn, InspectErrFn, InspectOkFn, IntoFn, MapErrFn, MapOkFn, MapOkOrElseFn,
UnwrapOrElseFn,
};
use crate::future::{assert_future, Inspect, Map};
use crate::stream::assert_stream;

// Combinators
mod into_future;
Expand Down Expand Up @@ -230,7 +230,7 @@ pub trait TryFutureExt: TryFuture {
/// The provided closure `f` will only be called if this future is resolved
/// to an [`Ok`]. If it resolves to an [`Err`], panics, or is dropped, then
/// the provided closure will never be invoked.
///
///
/// The provided closure `e` will only be called if this future is resolved
/// to an [`Err`]. If it resolves to an [`Ok`], panics, or is dropped, then
/// the provided closure will never be invoked.
Expand All @@ -247,13 +247,13 @@ pub trait TryFutureExt: TryFuture {
/// let future = async { Ok::<i32, i32>(5) };
/// let future = future.map_ok_or_else(|x| x * 2, |x| x + 3);
/// assert_eq!(future.await, 8);
///
///
/// let future = async { Err::<i32, i32>(5) };
/// let future = future.map_ok_or_else(|x| x * 2, |x| x + 3);
/// assert_eq!(future.await, 10);
/// # });
/// ```
///
///
fn map_ok_or_else<T, E, F>(self, e: E, f: F) -> MapOkOrElse<Self, F, E>
where
F: FnOnce(Self::Ok) -> T,
Expand Down Expand Up @@ -534,7 +534,9 @@ pub trait TryFutureExt: TryFuture {
Self::Ok: TryStream<Error = Self::Error>,
Self: Sized,
{
TryFlattenStream::new(self)
assert_stream::<Result<<Self::Ok as TryStream>::Ok, Self::Error>, _>(TryFlattenStream::new(
self,
))
}

/// Unwraps this future's ouput, producing a future with this future's
Expand Down Expand Up @@ -603,7 +605,7 @@ pub trait TryFutureExt: TryFuture {
where
Self: Sized,
{
IntoFuture::new(self)
assert_future::<Result<Self::Ok, Self::Error>, _>(IntoFuture::new(self))
}

/// A convenience method for calling [`TryFuture::try_poll`] on [`Unpin`]
Expand Down
9 changes: 9 additions & 0 deletions futures-util/src/stream/mod.rs
Expand Up @@ -100,3 +100,12 @@ cfg_target_has_atomic! {
#[cfg(feature = "alloc")]
pub use self::select_all::{select_all, SelectAll};
}

// Just a helper function to ensure the futures we're returning all have the
// right implementations.
pub(crate) fn assert_stream<T, S>(stream: S) -> S
where
S: Stream<Item = T>,
{
stream
}

0 comments on commit a52ac49

Please sign in to comment.