Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

0.3: Backports #2389

Merged
merged 13 commits into from Apr 10, 2021
6 changes: 6 additions & 0 deletions .editorconfig
@@ -0,0 +1,6 @@
[*.rs]
end_of_line = lf
insert_final_newline = true
charset = utf-8
indent_style = space
indent_size = 4
22 changes: 16 additions & 6 deletions futures-channel/src/mpsc/mod.rs
Expand Up @@ -1021,8 +1021,10 @@ impl<T> Receiver<T> {
/// only when you've otherwise arranged to be notified when the channel is
/// no longer empty.
///
/// This function will panic if called after `try_next` or `poll_next` has
/// returned `None`.
/// This function returns:
/// * `Ok(Some(t))` when message is fetched
/// * `Ok(None)` when channel is closed and no messages left in the queue
/// * `Err(e)` when there are no messages available, but channel is not yet closed
pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
match self.next_message() {
Poll::Ready(msg) => {
Expand All @@ -1033,7 +1035,10 @@ impl<T> Receiver<T> {
}

fn next_message(&mut self) -> Poll<Option<T>> {
let inner = self.inner.as_mut().expect("Receiver::next_message called after `None`");
let inner = match self.inner.as_mut() {
None => return Poll::Ready(None),
Some(inner) => inner,
};
// Pop off a message
match unsafe { inner.message_queue.pop_spin() } {
Some(msg) => {
Expand Down Expand Up @@ -1169,8 +1174,10 @@ impl<T> UnboundedReceiver<T> {
/// only when you've otherwise arranged to be notified when the channel is
/// no longer empty.
///
/// This function will panic if called after `try_next` or `poll_next` has
/// returned `None`.
/// This function returns:
/// * `Ok(Some(t))` when message is fetched
/// * `Ok(None)` when channel is closed and no messages left in the queue
/// * `Err(e)` when there are no messages available, but channel is not yet closed
pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
match self.next_message() {
Poll::Ready(msg) => {
Expand All @@ -1181,7 +1188,10 @@ impl<T> UnboundedReceiver<T> {
}

fn next_message(&mut self) -> Poll<Option<T>> {
let inner = self.inner.as_mut().expect("Receiver::next_message called after `None`");
let inner = match self.inner.as_mut() {
None => return Poll::Ready(None),
Some(inner) => inner,
};
// Pop off a message
match unsafe { inner.message_queue.pop_spin() } {
Some(msg) => {
Expand Down
22 changes: 22 additions & 0 deletions futures-channel/tests/mpsc-close.rs
Expand Up @@ -276,3 +276,25 @@ fn stress_try_send_as_receiver_closes() {
bg.join()
.expect("background thread join");
}

#[test]
fn unbounded_try_next_after_none() {
let (tx, mut rx) = mpsc::unbounded::<String>();
// Drop the sender, close the channel.
drop(tx);
// Receive the end of channel.
assert_eq!(Ok(None), rx.try_next().map_err(|_| ()));
// None received, check we can call `try_next` again.
assert_eq!(Ok(None), rx.try_next().map_err(|_| ()));
}

#[test]
fn bounded_try_next_after_none() {
let (tx, mut rx) = mpsc::channel::<String>(17);
// Drop the sender, close the channel.
drop(tx);
// Receive the end of channel.
assert_eq!(Ok(None), rx.try_next().map_err(|_| ()));
// None received, check we can call `try_next` again.
assert_eq!(Ok(None), rx.try_next().map_err(|_| ()));
}
7 changes: 7 additions & 0 deletions futures-util/src/future/select_all.rs
Expand Up @@ -42,6 +42,13 @@ pub fn select_all<I>(iter: I) -> SelectAll<I::Item>
assert_future::<(<I::Item as Future>::Output, usize, Vec<I::Item>), _>(ret)
}

impl<Fut> SelectAll<Fut> {
/// Consumes this combinator, returning the underlying futures.
pub fn into_inner(self) -> Vec<Fut> {
self.inner
}
}

impl<Fut: Future + Unpin> Future for SelectAll<Fut> {
type Output = (Fut::Output, usize, Vec<Fut>);

Expand Down
11 changes: 11 additions & 0 deletions futures-util/src/io/mod.rs
Expand Up @@ -601,6 +601,17 @@ pub trait AsyncSeekExt: AsyncSeek {
{
assert_future::<Result<u64>, _>(Seek::new(self, pos))
}

/// Creates a future which will return the current seek position from the
/// start of the stream.
///
/// This is equivalent to `self.seek(SeekFrom::Current(0))`.
fn stream_position(&mut self) -> Seek<'_, Self>
where
Self: Unpin,
{
self.seek(SeekFrom::Current(0))
}
}

impl<S: AsyncSeek + ?Sized> AsyncSeekExt for S {}
Expand Down
11 changes: 7 additions & 4 deletions futures-util/src/lib.rs
Expand Up @@ -309,18 +309,18 @@ macro_rules! delegate_all {

pub mod future;
#[doc(hidden)]
pub use crate::future::{FutureExt, TryFutureExt};
pub use crate::future::{Future, FutureExt, TryFuture, TryFutureExt};

pub mod stream;
#[doc(hidden)]
pub use crate::stream::{StreamExt, TryStreamExt};
pub use crate::stream::{Stream, StreamExt, TryStream, TryStreamExt};

#[cfg(feature = "sink")]
#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
pub mod sink;
#[cfg(feature = "sink")]
#[doc(hidden)]
pub use crate::sink::SinkExt;
pub use crate::sink::{Sink, SinkExt};

pub mod task;

Expand All @@ -337,7 +337,10 @@ pub mod io;
#[cfg(feature = "io")]
#[cfg(feature = "std")]
#[doc(hidden)]
pub use crate::io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
pub use crate::io::{
AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite,
AsyncWriteExt,
};

#[cfg(feature = "alloc")]
pub mod lock;
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/stream/mod.rs
Expand Up @@ -19,7 +19,7 @@ pub use futures_core::stream::{FusedStream, Stream, TryStream};
mod stream;
pub use self::stream::{
Chain, Collect, Concat, Cycle, Enumerate, Filter, FilterMap, FlatMap, Flatten, Fold, ForEach,
Fuse, Inspect, Map, Next, Peek, Peekable, Scan, SelectNextSome, Skip, SkipWhile, StreamExt,
Fuse, Inspect, Map, Next, NextIf, NextIfEq, Peek, Peekable, Scan, SelectNextSome, Skip, SkipWhile, StreamExt,
StreamFuture, Take, TakeUntil, TakeWhile, Then, Unzip, Zip,
};

Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/stream/stream/mod.rs
Expand Up @@ -123,7 +123,7 @@ pub use self::select_next_some::SelectNextSome;

mod peek;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::peek::{Peek, Peekable};
pub use self::peek::{Peek, Peekable, NextIf, NextIfEq};

mod skip;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
Expand Down