Skip to content

Commit

Permalink
Add impl Spawn and LocalSpawn for Arc and Rc.
Browse files Browse the repository at this point in the history
Since rust-lang#1950 (0.3.0) the Spawn and LocalSpawn only require a shared reference
for spawning operations.

This adds blanket impls for `Arc<Sp: ?Sized + Spawn>` and `Rc<Sp: ?Sized + Spawn>`.
It does the same for LocalSpawn.

This allows client code to pass an Arc<Exec> when library code takes
parameters as `&dyn Spawn` or `impl Spawn`.

So far there were blanket impls for `&`, `&mut` and `Box`.
  • Loading branch information
najamelan authored and cramertj committed Jan 23, 2020
1 parent c600651 commit ca52056
Show file tree
Hide file tree
Showing 202 changed files with 1,912 additions and 2,426 deletions.
6 changes: 2 additions & 4 deletions examples/functional/src/main.rs
Expand Up @@ -30,9 +30,7 @@ fn main() {
// responsible for transmission
pool.spawn_ok(fut_tx_result);

let fut_values = rx
.map(|v| v * 2)
.collect();
let fut_values = rx.map(|v| v * 2).collect();

// Use the executor provided to this async block to wait for the
// future to complete.
Expand All @@ -45,4 +43,4 @@ fn main() {
let values: Vec<i32> = executor::block_on(fut_values);

println!("Values={:?}", values);
}
}
4 changes: 2 additions & 2 deletions examples/imperative/src/main.rs
Expand Up @@ -34,7 +34,7 @@ fn main() {
// of the stream to be available.
while let Some(v) = rx.next().await {
pending.push(v * 2);
};
}

pending
};
Expand All @@ -45,4 +45,4 @@ fn main() {
let values: Vec<i32> = executor::block_on(fut_values);

println!("Values={:?}", values);
}
}
15 changes: 3 additions & 12 deletions futures-channel/benches/sync_mpsc.rs
Expand Up @@ -7,8 +7,8 @@ use {
futures::{
channel::mpsc::{self, Sender, UnboundedSender},
ready,
stream::{Stream, StreamExt},
sink::Sink,
stream::{Stream, StreamExt},
task::{Context, Poll},
},
futures_test::task::noop_context,
Expand All @@ -25,7 +25,6 @@ fn unbounded_1_tx(b: &mut Bencher) {
// 1000 iterations to avoid measuring overhead of initialization
// Result should be divided by 1000
for i in 0..1000 {

// Poll, not ready, park
assert_eq!(Poll::Pending, rx.poll_next_unpin(&mut cx));

Expand Down Expand Up @@ -73,7 +72,6 @@ fn unbounded_uncontended(b: &mut Bencher) {
})
}


/// A Stream that continuously sends incrementing number of the queue
struct TestSender {
tx: Sender<u32>,
Expand All @@ -84,9 +82,7 @@ struct TestSender {
impl Stream for TestSender {
type Item = u32;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<Option<Self::Item>>
{
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = &mut *self;
let mut tx = Pin::new(&mut this.tx);

Expand Down Expand Up @@ -123,12 +119,7 @@ fn bounded_100_tx(b: &mut Bencher) {
// Each sender can send one item after specified capacity
let (tx, mut rx) = mpsc::channel(0);

let mut tx: Vec<_> = (0..100).map(|_| {
TestSender {
tx: tx.clone(),
last: 0
}
}).collect();
let mut tx: Vec<_> = (0..100).map(|_| TestSender { tx: tx.clone(), last: 0 }).collect();

for i in 0..10 {
for x in &mut tx {
Expand Down
4 changes: 0 additions & 4 deletions futures-channel/src/lib.rs
Expand Up @@ -7,16 +7,12 @@
//! library is activated, and it is activated by default.

#![cfg_attr(feature = "cfg-target-has-atomic", feature(cfg_target_has_atomic))]

#![cfg_attr(not(feature = "std"), no_std)]

#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms, unreachable_pub)]
// It cannot be included in the published code because this lints have false positives in the minimum required version.
#![cfg_attr(test, warn(single_use_lifetimes))]
#![warn(clippy::all)]

#![doc(test(attr(deny(warnings), allow(dead_code, unused_assignments, unused_variables))))]

#![doc(html_root_url = "https://docs.rs/futures-channel/0.3.0")]

#[cfg(all(feature = "cfg-target-has-atomic", not(feature = "unstable")))]
Expand Down
10 changes: 3 additions & 7 deletions futures-channel/tests/channel.rs
@@ -1,8 +1,8 @@
use futures::channel::mpsc;
use futures::executor::block_on;
use futures::future::poll_fn;
use futures::stream::StreamExt;
use futures::sink::SinkExt;
use futures::stream::StreamExt;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;

Expand All @@ -11,9 +11,7 @@ fn sequence() {
let (tx, rx) = mpsc::channel(1);

let amt = 20;
let t = thread::spawn(move || {
block_on(send_sequence(amt, tx))
});
let t = thread::spawn(move || block_on(send_sequence(amt, tx)));
let list: Vec<_> = block_on(rx.collect());
let mut list = list.into_iter();
for i in (1..=amt).rev() {
Expand All @@ -34,9 +32,7 @@ async fn send_sequence(n: u32, mut sender: mpsc::Sender<u32>) {
fn drop_sender() {
let (tx, mut rx) = mpsc::channel::<u32>(1);
drop(tx);
let f = poll_fn(|cx| {
rx.poll_next_unpin(cx)
});
let f = poll_fn(|cx| rx.poll_next_unpin(cx));
assert_eq!(block_on(f), None)
}

Expand Down
4 changes: 1 addition & 3 deletions futures-channel/tests/mpsc-close.rs
Expand Up @@ -8,9 +8,7 @@ use std::thread;
fn smoke() {
let (mut sender, receiver) = mpsc::channel(1);

let t = thread::spawn(move || {
while let Ok(()) = block_on(sender.send(42)) {}
});
let t = thread::spawn(move || while let Ok(()) = block_on(sender.send(42)) {});

// `receiver` needs to be dropped for `sender` to stop sending and therefore before the join.
block_on(receiver.take(3).for_each(|_| futures::future::ready(())));
Expand Down
33 changes: 15 additions & 18 deletions futures-channel/tests/mpsc.rs
@@ -1,13 +1,13 @@
use futures::channel::{mpsc, oneshot};
use futures::executor::{block_on, block_on_stream};
use futures::future::{FutureExt, poll_fn};
use futures::stream::{Stream, StreamExt};
use futures::future::{poll_fn, FutureExt};
use futures::pin_mut;
use futures::sink::{Sink, SinkExt};
use futures::stream::{Stream, StreamExt};
use futures::task::{Context, Poll};
use futures::pin_mut;
use futures_test::task::{new_count_waker, noop_context};
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::thread;

trait AssertSend: Send {}
Expand Down Expand Up @@ -77,7 +77,7 @@ fn send_shared_recv() {
fn send_recv_threads() {
let (mut tx, rx) = mpsc::channel::<i32>(16);

let t = thread::spawn(move|| {
let t = thread::spawn(move || {
block_on(tx.send(1)).unwrap();
});

Expand Down Expand Up @@ -204,7 +204,7 @@ fn stress_shared_unbounded() {
const NTHREADS: u32 = 8;
let (tx, rx) = mpsc::unbounded::<i32>();

let t = thread::spawn(move|| {
let t = thread::spawn(move || {
let result: Vec<_> = block_on(rx.collect());
assert_eq!(result.len(), (AMT * NTHREADS) as usize);
for item in result {
Expand All @@ -215,7 +215,7 @@ fn stress_shared_unbounded() {
for _ in 0..NTHREADS {
let tx = tx.clone();

thread::spawn(move|| {
thread::spawn(move || {
for _ in 0..AMT {
tx.unbounded_send(1).unwrap();
}
Expand All @@ -233,7 +233,7 @@ fn stress_shared_bounded_hard() {
const NTHREADS: u32 = 8;
let (tx, rx) = mpsc::channel::<i32>(0);

let t = thread::spawn(move|| {
let t = thread::spawn(move || {
let result: Vec<_> = block_on(rx.collect());
assert_eq!(result.len(), (AMT * NTHREADS) as usize);
for item in result {
Expand Down Expand Up @@ -296,9 +296,9 @@ fn stress_receiver_multi_task_bounded_hard() {
}
Poll::Ready(None) => {
*rx_opt = None;
break
},
Poll::Pending => {},
break;
}
Poll::Pending => {}
}
}
} else {
Expand All @@ -310,7 +310,6 @@ fn stress_receiver_multi_task_bounded_hard() {
th.push(t);
}


for i in 0..AMT {
block_on(tx.send(i)).unwrap();
}
Expand All @@ -327,7 +326,7 @@ fn stress_receiver_multi_task_bounded_hard() {
/// after sender dropped.
#[test]
fn stress_drop_sender() {
fn list() -> impl Stream<Item=i32> {
fn list() -> impl Stream<Item = i32> {
let (tx, rx) = mpsc::channel(1);
thread::spawn(move || {
block_on(send_one_two_three(tx));
Expand Down Expand Up @@ -405,9 +404,7 @@ fn stress_poll_ready() {
let mut threads = Vec::new();
for _ in 0..NTHREADS {
let sender = tx.clone();
threads.push(thread::spawn(move || {
block_on(stress_poll_ready_sender(sender, AMT))
}));
threads.push(thread::spawn(move || block_on(stress_poll_ready_sender(sender, AMT))));
}
drop(tx);

Expand All @@ -434,7 +431,7 @@ fn try_send_1() {
for i in 0..N {
loop {
if tx.try_send(i).is_ok() {
break
break;
}
}
}
Expand Down Expand Up @@ -529,8 +526,8 @@ fn same_receiver() {

#[test]
fn hash_receiver() {
use std::hash::Hasher;
use std::collections::hash_map::DefaultHasher;
use std::hash::Hasher;

let mut hasher_a1 = DefaultHasher::new();
let mut hasher_a2 = DefaultHasher::new();
Expand Down
4 changes: 2 additions & 2 deletions futures-channel/tests/oneshot.rs
@@ -1,6 +1,6 @@
use futures::channel::oneshot::{self, Sender};
use futures::executor::block_on;
use futures::future::{Future, FutureExt, poll_fn};
use futures::future::{poll_fn, Future, FutureExt};
use futures::task::{Context, Poll};
use futures_test::task::panic_waker_ref;
use std::pin::Pin;
Expand Down Expand Up @@ -83,7 +83,7 @@ fn close() {
rx.close();
block_on(poll_fn(|cx| {
match rx.poll_unpin(cx) {
Poll::Ready(Err(_)) => {},
Poll::Ready(Err(_)) => {}
_ => panic!(),
};
assert!(tx.poll_canceled(cx).is_ready());
Expand Down
10 changes: 4 additions & 6 deletions futures-core/src/future.rs
Expand Up @@ -66,14 +66,12 @@ pub trait TryFuture: Future + private_try_future::Sealed {
/// This method is a stopgap for a compiler limitation that prevents us from
/// directly inheriting from the `Future` trait; in the future it won't be
/// needed.
fn try_poll(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Self::Ok, Self::Error>>;
fn try_poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<Self::Ok, Self::Error>>;
}

impl<F, T, E> TryFuture for F
where F: ?Sized + Future<Output = Result<T, E>>
where
F: ?Sized + Future<Output = Result<T, E>>,
{
type Ok = T;
type Error = E;
Expand All @@ -86,8 +84,8 @@ impl<F, T, E> TryFuture for F

#[cfg(feature = "alloc")]
mod if_alloc {
use alloc::boxed::Box;
use super::*;
use alloc::boxed::Box;

impl<F: FusedFuture + ?Sized + Unpin> FusedFuture for Box<F> {
fn is_terminated(&self) -> bool {
Expand Down
10 changes: 4 additions & 6 deletions futures-core/src/lib.rs
@@ -1,16 +1,12 @@
//! Core traits and types for asynchronous operations in Rust.

#![cfg_attr(feature = "cfg-target-has-atomic", feature(cfg_target_has_atomic))]

#![cfg_attr(not(feature = "std"), no_std)]

#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms, unreachable_pub)]
// It cannot be included in the published code because this lints have false positives in the minimum required version.
#![cfg_attr(test, warn(single_use_lifetimes))]
#![warn(clippy::all)]

#![doc(test(attr(deny(warnings), allow(dead_code, unused_assignments, unused_variables))))]

#![doc(html_root_url = "https://docs.rs/futures-core/0.3.0")]

#[cfg(all(feature = "cfg-target-has-atomic", not(feature = "unstable")))]
Expand All @@ -20,10 +16,12 @@ compile_error!("The `cfg-target-has-atomic` feature requires the `unstable` feat
extern crate alloc;

pub mod future;
#[doc(hidden)] pub use self::future::{Future, FusedFuture, TryFuture};
#[doc(hidden)]
pub use self::future::{FusedFuture, Future, TryFuture};

pub mod stream;
#[doc(hidden)] pub use self::stream::{Stream, FusedStream, TryStream};
#[doc(hidden)]
pub use self::stream::{FusedStream, Stream, TryStream};

#[macro_use]
pub mod task;
Expand Down

0 comments on commit ca52056

Please sign in to comment.