Skip to content

Commit

Permalink
FlattenUnordered + merged master
Browse files Browse the repository at this point in the history
  • Loading branch information
olegnn committed May 17, 2020
2 parents b7c27bc + 8a95e9a commit 42a8152
Show file tree
Hide file tree
Showing 214 changed files with 6,164 additions and 6,466 deletions.
3 changes: 2 additions & 1 deletion .travis.yml
Expand Up @@ -145,7 +145,8 @@ matrix:
- name: cargo doc
rust: nightly
script:
- RUSTDOCFLAGS=-Dwarnings cargo doc --workspace --no-deps --all-features
# TODO: Remove -Aunused_braces once https://github.com/rust-lang/rust/issues/70814 is fixed
- RUSTDOCFLAGS="-Dwarnings -Aunused_braces" cargo doc --workspace --no-deps --all-features

script:
- cargo test --workspace --all-features
Expand Down
19 changes: 18 additions & 1 deletion CHANGELOG.md
@@ -1,3 +1,20 @@
# 0.3.5 - 2020-05-08
* Added `StreamExt::flat_map`.
* Added `StreamExt::ready_chunks`.
* Added `*_unpin` methods to `SinkExt`.
* Added a `cancellation()` future to `oneshot::Sender`.
* Added `reunite` method to `ReadHalf` and `WriteHalf`.
* Added `Extend` implementations for `Futures(Un)Ordered` and `SelectAll`.
* Added support for reexporting the `join!` and `select!` macros.
* Added `no_std` support for the `pending!` and `poll!` macros.
* Added `Send` and `Sync` support for `AssertUnmoved`.
* Fixed a bug where `Shared` wasn't relinquishing control to the executor.
* Removed the `Send` bound on the output of `RemoteHandle`.
* Relaxed bounds on `FuturesUnordered`.
* Reorganized internal tests to work under different `--feature`s.
* Reorganized the bounds on `StreamExt::forward`.
* Removed and replaced a large amount of internal `unsafe`.

# 0.3.4 - 2020-02-06
* Fixed missing `Drop` for `UnboundedReceiver` (#2064)

Expand Down Expand Up @@ -239,7 +256,7 @@
* Improvements to `select!` and `join!` macros
* Added `try_join!` macro
* Added `StreamExt` combinator methods `try_join` and `for_each_concurrent`
* Added `TryStreamExt` combinator methdos `into_stream`, `try_filter_map`, `try_skip_while`, `try_for_each_concurrent` and `try_buffer_unordered`
* Added `TryStreamExt` combinator methods `into_stream`, `try_filter_map`, `try_skip_while`, `try_for_each_concurrent` and `try_buffer_unordered`
* Fix stream termination bug in `StreamExt::buffered` and `StreamExt::buffer_unordered`
* Added docs for `StreamExt::buffered`, `StreamExt::buffer_unordered`
* Added `task::local_waker_ref_from_nonlocal` and `task::local_waker_ref` functions
Expand Down
3 changes: 3 additions & 0 deletions Cargo.toml
Expand Up @@ -11,6 +11,9 @@ members = [
"futures-util",
"futures-test",

"futures/tests/macro-tests",
"futures/tests/macro-reexport",

"examples/functional",
"examples/imperative",
]
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -58,7 +58,7 @@ a `#[no_std]` environment, use:

```toml
[dependencies]
futures = { version = "0.3.4", default-features = false }
futures = { version = "0.3.5", default-features = false }
```

# License
Expand Down
6 changes: 3 additions & 3 deletions examples/functional/Cargo.toml
@@ -1,14 +1,14 @@
[package]
name = "futures-example-functional"
edition = "2018"
version = "0.3.0"
version = "0.3.5"
authors = ["Alex Crichton <alex@alexcrichton.com>"]
license = "MIT OR Apache-2.0"
readme = "../README.md"
keywords = ["futures", "async", "future"]
repository = "https://github.com/rust-lang/futures-rs"
homepage = "https://rust-lang.github.io/futures-rs"
documentation = "https://docs.rs/futures/0.3.0"
documentation = "https://docs.rs/futures/0.3.5"
description = """
An implementation of futures and streams featuring zero allocations,
composability, and iterator-like interfaces.
Expand All @@ -17,4 +17,4 @@ categories = ["asynchronous"]
publish = false

[dependencies]
futures = { path = "../../futures", version = "0.3.0", features = ["thread-pool"] }
futures = { path = "../../futures", version = "0.3.5", features = ["thread-pool"] }
6 changes: 3 additions & 3 deletions examples/imperative/Cargo.toml
@@ -1,14 +1,14 @@
[package]
name = "futures-example-imperative"
edition = "2018"
version = "0.3.0"
version = "0.3.5"
authors = ["Alex Crichton <alex@alexcrichton.com>"]
license = "MIT OR Apache-2.0"
readme = "../README.md"
keywords = ["futures", "async", "future"]
repository = "https://github.com/rust-lang/futures-rs"
homepage = "https://rust-lang.github.io/futures-rs"
documentation = "https://docs.rs/futures/0.3.0"
documentation = "https://docs.rs/futures/0.3.5"
description = """
An implementation of futures and streams featuring zero allocations,
composability, and iterator-like interfaces.
Expand All @@ -17,4 +17,4 @@ categories = ["asynchronous"]
publish = false

[dependencies]
futures = { path = "../../futures", version = "0.3.0", features = ["thread-pool"] }
futures = { path = "../../futures", version = "0.3.5", features = ["thread-pool"] }
12 changes: 6 additions & 6 deletions futures-channel/Cargo.toml
@@ -1,12 +1,12 @@
[package]
name = "futures-channel"
edition = "2018"
version = "0.3.4"
version = "0.3.5"
authors = ["Alex Crichton <alex@alexcrichton.com>"]
license = "MIT OR Apache-2.0"
repository = "https://github.com/rust-lang/futures-rs"
homepage = "https://rust-lang.github.io/futures-rs"
documentation = "https://docs.rs/futures-channel/0.3.0"
documentation = "https://docs.rs/futures-channel/0.3.5"
description = """
Channels for asynchronous communication using futures-rs.
"""
Expand All @@ -24,12 +24,12 @@ unstable = ["futures-core/unstable"]
cfg-target-has-atomic = ["futures-core/cfg-target-has-atomic"]

[dependencies]
futures-core = { path = "../futures-core", version = "0.3.4", default-features = false }
futures-sink = { path = "../futures-sink", version = "0.3.4", default-features = false, optional = true }
futures-core = { path = "../futures-core", version = "0.3.5", default-features = false }
futures-sink = { path = "../futures-sink", version = "0.3.5", default-features = false, optional = true }

[dev-dependencies]
futures = { path = "../futures", version = "0.3.4", default-features = true }
futures-test = { path = "../futures-test", version = "0.3.4", default-features = true }
futures = { path = "../futures", version = "0.3.5", default-features = true }
futures-test = { path = "../futures-test", version = "0.3.5", default-features = true }

[package.metadata.docs.rs]
all-features = true
2 changes: 1 addition & 1 deletion futures-channel/src/lib.rs
Expand Up @@ -17,7 +17,7 @@

#![doc(test(attr(deny(warnings), allow(dead_code, unused_assignments, unused_variables))))]

#![doc(html_root_url = "https://docs.rs/futures-channel/0.3.0")]
#![doc(html_root_url = "https://docs.rs/futures-channel/0.3.5")]

#[cfg(all(feature = "cfg-target-has-atomic", not(feature = "unstable")))]
compile_error!("The `cfg-target-has-atomic` feature requires the `unstable` feature as an explicit opt-in to unstable features");
Expand Down
47 changes: 37 additions & 10 deletions futures-channel/src/oneshot.rs
Expand Up @@ -82,22 +82,23 @@ struct Inner<T> {
///
/// ```
/// use futures::channel::oneshot;
/// use futures::future::FutureExt;
/// use std::thread;
/// use std::{thread, time::Duration};
///
/// let (sender, receiver) = oneshot::channel::<i32>();
///
/// # let t =
/// thread::spawn(|| {
/// let future = receiver.map(|i| {
/// println!("got: {:?}", i);
/// });
/// // ...
/// # return future;
/// println!("THREAD: sleeping zzz...");
/// thread::sleep(Duration::from_millis(1000));
/// println!("THREAD: i'm awake! sending.");
/// sender.send(3).unwrap();
/// });
///
/// sender.send(3).unwrap();
/// # futures::executor::block_on(t.join().unwrap());
/// println!("MAIN: doing some useful stuff");
///
/// futures::executor::block_on(async {
/// println!("MAIN: waiting for msg...");
/// println!("MAIN: got: {:?}", receiver.await)
/// });
/// ```
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let inner = Arc::new(Inner::new());
Expand Down Expand Up @@ -358,6 +359,15 @@ impl<T> Sender<T> {
self.inner.poll_canceled(cx)
}

/// Creates a future that resolves when this `Sender`'s corresponding
/// [`Receiver`](Receiver) half has hung up.
///
/// This is a utility wrapping [`poll_canceled`](Sender::poll_canceled)
/// to expose a [`Future`](core::future::Future).
pub fn cancellation(&mut self) -> Cancellation<'_, T> {
Cancellation { inner: self }
}

/// Tests to see whether this `Sender`'s corresponding `Receiver`
/// has been dropped.
///
Expand All @@ -375,6 +385,23 @@ impl<T> Drop for Sender<T> {
}
}

/// A future that resolves when the receiving end of a channel has hung up.
///
/// This is an `.await`-friendly interface around [`poll_canceled`](Sender::poll_canceled).
#[must_use = "futures do nothing unless you `.await` or poll them"]
#[derive(Debug)]
pub struct Cancellation<'a, T> {
inner: &'a mut Sender<T>,
}

impl<T> Future for Cancellation<'_, T> {
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
self.inner.poll_canceled(cx)
}
}

/// Error returned from a [`Receiver`](Receiver) when the corresponding
/// [`Sender`](Sender) is dropped.
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
Expand Down
29 changes: 8 additions & 21 deletions futures-channel/tests/oneshot.rs
@@ -1,9 +1,8 @@
use futures::channel::oneshot::{self, Sender};
use futures::executor::block_on;
use futures::future::{Future, FutureExt, poll_fn};
use futures::future::{FutureExt, poll_fn};
use futures::task::{Context, Poll};
use futures_test::task::panic_waker_ref;
use std::pin::Pin;
use std::sync::mpsc;
use std::thread;

Expand All @@ -25,33 +24,21 @@ fn smoke_poll() {

#[test]
fn cancel_notifies() {
let (tx, rx) = oneshot::channel::<u32>();
let (mut tx, rx) = oneshot::channel::<u32>();

let t = thread::spawn(|| {
block_on(WaitForCancel { tx });
let t = thread::spawn(move || {
block_on(tx.cancellation());
});
drop(rx);
t.join().unwrap();
}

struct WaitForCancel {
tx: Sender<u32>,
}

impl Future for WaitForCancel {
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.tx.poll_canceled(cx)
}
}

#[test]
fn cancel_lots() {
let (tx, rx) = mpsc::channel::<(Sender<_>, mpsc::Sender<_>)>();
let t = thread::spawn(move || {
for (tx, tx2) in rx {
block_on(WaitForCancel { tx });
for (mut tx, tx2) in rx {
block_on(tx.cancellation());
tx2.send(()).unwrap();
}
});
Expand Down Expand Up @@ -93,13 +80,13 @@ fn close() {

#[test]
fn close_wakes() {
let (tx, mut rx) = oneshot::channel::<u32>();
let (mut tx, mut rx) = oneshot::channel::<u32>();
let (tx2, rx2) = mpsc::channel();
let t = thread::spawn(move || {
rx.close();
rx2.recv().unwrap();
});
block_on(WaitForCancel { tx });
block_on(tx.cancellation());
tx2.send(()).unwrap();
t.join().unwrap();
}
Expand Down
6 changes: 3 additions & 3 deletions futures-core/Cargo.toml
@@ -1,12 +1,12 @@
[package]
name = "futures-core"
edition = "2018"
version = "0.3.4"
version = "0.3.5"
authors = ["Alex Crichton <alex@alexcrichton.com>"]
license = "MIT OR Apache-2.0"
repository = "https://github.com/rust-lang/futures-rs"
homepage = "https://rust-lang.github.io/futures-rs"
documentation = "https://docs.rs/futures-core/0.3.0"
documentation = "https://docs.rs/futures-core/0.3.5"
description = """
The core traits and types in for the `futures` library.
"""
Expand All @@ -25,7 +25,7 @@ cfg-target-has-atomic = []
[dependencies]

[dev-dependencies]
futures = { path = "../futures", version = "0.3.4" }
futures = { path = "../futures", version = "0.3.5" }

[package.metadata.docs.rs]
all-features = true
2 changes: 1 addition & 1 deletion futures-core/src/lib.rs
Expand Up @@ -11,7 +11,7 @@

#![doc(test(attr(deny(warnings), allow(dead_code, unused_assignments, unused_variables))))]

#![doc(html_root_url = "https://docs.rs/futures-core/0.3.0")]
#![doc(html_root_url = "https://docs.rs/futures-core/0.3.5")]

#[cfg(all(feature = "cfg-target-has-atomic", not(feature = "unstable")))]
compile_error!("The `cfg-target-has-atomic` feature requires the `unstable` feature as an explicit opt-in to unstable features");
Expand Down
14 changes: 11 additions & 3 deletions futures-core/src/stream.rs
Expand Up @@ -50,11 +50,19 @@ pub trait Stream {
///
/// # Panics
///
/// Once a stream is finished, i.e. `Ready(None)` has been returned, further
/// calls to `poll_next` may result in a panic or other "bad behavior". If
/// this is difficult to guard against then the `fuse` adapter can be used
/// Once a stream has finished (returned `Ready(None)` from `poll_next`), calling its
/// `poll_next` method again may panic, block forever, or cause other kinds of
/// problems; the `Stream` trait places no requirements on the effects of
/// such a call. However, as the `poll_next` method is not marked `unsafe`,
/// Rust's usual rules apply: calls must never cause undefined behavior
/// (memory corruption, incorrect use of `unsafe` functions, or the like),
/// regardless of the stream's state.
///
/// If this is difficult to guard against then the [`fuse`] adapter can be used
/// to ensure that `poll_next` always returns `Ready(None)` in subsequent
/// calls.
///
/// [`fuse`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.fuse
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
Expand Down
12 changes: 6 additions & 6 deletions futures-executor/Cargo.toml
@@ -1,12 +1,12 @@
[package]
name = "futures-executor"
edition = "2018"
version = "0.3.4"
version = "0.3.5"
authors = ["Alex Crichton <alex@alexcrichton.com>"]
license = "MIT OR Apache-2.0"
repository = "https://github.com/rust-lang/futures-rs"
homepage = "https://rust-lang.github.io/futures-rs"
documentation = "https://docs.rs/futures-executor/0.3.0"
documentation = "https://docs.rs/futures-executor/0.3.5"
description = """
Executors for asynchronous tasks based on the futures-rs library.
"""
Expand All @@ -17,13 +17,13 @@ std = ["futures-core/std", "futures-task/std", "futures-util/std"]
thread-pool = ["std", "num_cpus"]

[dependencies]
futures-core = { path = "../futures-core", version = "0.3.4", default-features = false }
futures-task = { path = "../futures-task", version = "0.3.4", default-features = false }
futures-util = { path = "../futures-util", version = "0.3.4", default-features = false }
futures-core = { path = "../futures-core", version = "0.3.5", default-features = false }
futures-task = { path = "../futures-task", version = "0.3.5", default-features = false }
futures-util = { path = "../futures-util", version = "0.3.5", default-features = false }
num_cpus = { version = "1.8.0", optional = true }

[dev-dependencies]
futures = { path = "../futures", version = "0.3.4" }
futures = { path = "../futures", version = "0.3.5" }

[package.metadata.docs.rs]
all-features = true
2 changes: 1 addition & 1 deletion futures-executor/src/lib.rs
Expand Up @@ -12,7 +12,7 @@

#![doc(test(attr(deny(warnings), allow(dead_code, unused_assignments, unused_variables))))]

#![doc(html_root_url = "https://docs.rs/futures-executor/0.3.0")]
#![doc(html_root_url = "https://docs.rs/futures-executor/0.3.5")]

#[cfg(feature = "std")]
mod local_pool;
Expand Down
2 changes: 1 addition & 1 deletion futures-executor/src/thread_pool.rs
Expand Up @@ -210,7 +210,7 @@ impl ThreadPoolBuilder {
self
}

/// Set stack size of threads in the pool.
/// Set stack size of threads in the pool, in bytes.
///
/// By default, worker threads use Rust's standard stack size.
pub fn stack_size(&mut self, stack_size: usize) -> &mut Self {
Expand Down

0 comments on commit 42a8152

Please sign in to comment.