Skip to content

Commit

Permalink
sync: move broadcast error types into broadcast::error module (#2937)
Browse files Browse the repository at this point in the history
Refs: #2928
  • Loading branch information
taiki-e committed Oct 9, 2020
1 parent afe5352 commit 2e05399
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 74 deletions.
157 changes: 89 additions & 68 deletions tokio/src/sync/broadcast.rs
Expand Up @@ -55,8 +55,8 @@
//! [`Sender::subscribe`]: crate::sync::broadcast::Sender::subscribe
//! [`Receiver`]: crate::sync::broadcast::Receiver
//! [`channel`]: crate::sync::broadcast::channel
//! [`RecvError::Lagged`]: crate::sync::broadcast::RecvError::Lagged
//! [`RecvError::Closed`]: crate::sync::broadcast::RecvError::Closed
//! [`RecvError::Lagged`]: crate::sync::broadcast::error::RecvError::Lagged
//! [`RecvError::Closed`]: crate::sync::broadcast::error::RecvError::Closed
//! [`recv`]: crate::sync::broadcast::Receiver::recv
//!
//! # Examples
Expand Down Expand Up @@ -197,53 +197,97 @@ pub struct Receiver<T> {
next: u64,
}

/// Error returned by [`Sender::send`][Sender::send].
///
/// A **send** operation can only fail if there are no active receivers,
/// implying that the message could never be received. The error contains the
/// message being sent as a payload so it can be recovered.
#[derive(Debug)]
pub struct SendError<T>(pub T);
pub mod error {
//! Broadcast error types

/// An error returned from the [`recv`] function on a [`Receiver`].
///
/// [`recv`]: crate::sync::broadcast::Receiver::recv
/// [`Receiver`]: crate::sync::broadcast::Receiver
#[derive(Debug, PartialEq)]
pub enum RecvError {
/// There are no more active senders implying no further messages will ever
/// be sent.
Closed,
use std::fmt;

/// The receiver lagged too far behind. Attempting to receive again will
/// return the oldest message still retained by the channel.
/// Error returned by from the [`send`] function on a [`Sender`].
///
/// Includes the number of skipped messages.
Lagged(u64),
}
/// A **send** operation can only fail if there are no active receivers,
/// implying that the message could never be received. The error contains the
/// message being sent as a payload so it can be recovered.
///
/// [`send`]: crate::sync::broadcast::Sender::send
/// [`Sender`]: crate::sync::broadcast::Sender
#[derive(Debug)]
pub struct SendError<T>(pub T);

/// An error returned from the [`try_recv`] function on a [`Receiver`].
///
/// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
/// [`Receiver`]: crate::sync::broadcast::Receiver
#[derive(Debug, PartialEq)]
pub enum TryRecvError {
/// The channel is currently empty. There are still active
/// [`Sender`][Sender] handles, so data may yet become available.
Empty,

/// There are no more active senders implying no further messages will ever
/// be sent.
Closed,

/// The receiver lagged too far behind and has been forcibly disconnected.
/// Attempting to receive again will return the oldest message still
/// retained by the channel.
///
/// Includes the number of skipped messages.
Lagged(u64),
impl<T> fmt::Display for SendError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "channel closed")
}
}

impl<T: fmt::Debug> std::error::Error for SendError<T> {}

/// An error returned from the [`recv`] function on a [`Receiver`].
///
/// [`recv`]: crate::sync::broadcast::Receiver::recv
/// [`Receiver`]: crate::sync::broadcast::Receiver
#[derive(Debug, PartialEq)]
pub enum RecvError {
/// There are no more active senders implying no further messages will ever
/// be sent.
Closed,

/// The receiver lagged too far behind. Attempting to receive again will
/// return the oldest message still retained by the channel.
///
/// Includes the number of skipped messages.
Lagged(u64),
}

impl fmt::Display for RecvError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
RecvError::Closed => write!(f, "channel closed"),
RecvError::Lagged(amt) => write!(f, "channel lagged by {}", amt),
}
}
}

impl std::error::Error for RecvError {}

/// An error returned from the [`try_recv`] function on a [`Receiver`].
///
/// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
/// [`Receiver`]: crate::sync::broadcast::Receiver
#[derive(Debug, PartialEq)]
pub enum TryRecvError {
/// The channel is currently empty. There are still active
/// [`Sender`] handles, so data may yet become available.
///
/// [`Sender`]: crate::sync::broadcast::Sender
Empty,

/// There are no more active senders implying no further messages will ever
/// be sent.
Closed,

/// The receiver lagged too far behind and has been forcibly disconnected.
/// Attempting to receive again will return the oldest message still
/// retained by the channel.
///
/// Includes the number of skipped messages.
Lagged(u64),
}

impl fmt::Display for TryRecvError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
TryRecvError::Empty => write!(f, "channel empty"),
TryRecvError::Closed => write!(f, "channel closed"),
TryRecvError::Lagged(amt) => write!(f, "channel lagged by {}", amt),
}
}
}

impl std::error::Error for TryRecvError {}
}

use self::error::*;

/// Data shared between senders and receivers
struct Shared<T> {
/// slots in the channel
Expand Down Expand Up @@ -371,8 +415,8 @@ const MAX_RECEIVERS: usize = usize::MAX >> 2;
/// [`Sender::subscribe`]: crate::sync::broadcast::Sender::subscribe
/// [`Receiver`]: crate::sync::broadcast::Receiver
/// [`recv`]: crate::sync::broadcast::Receiver::recv
/// [`SendError`]: crate::sync::broadcast::SendError
/// [`RecvError`]: crate::sync::broadcast::RecvError
/// [`SendError`]: crate::sync::broadcast::error::SendError
/// [`RecvError`]: crate::sync::broadcast::error::RecvError
///
/// # Examples
///
Expand Down Expand Up @@ -1112,27 +1156,4 @@ impl<'a, T> Drop for RecvGuard<'a, T> {
}
}

impl fmt::Display for RecvError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
RecvError::Closed => write!(f, "channel closed"),
RecvError::Lagged(amt) => write!(f, "channel lagged by {}", amt),
}
}
}

impl std::error::Error for RecvError {}

impl fmt::Display for TryRecvError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
TryRecvError::Empty => write!(f, "channel empty"),
TryRecvError::Closed => write!(f, "channel closed"),
TryRecvError::Lagged(amt) => write!(f, "channel lagged by {}", amt),
}
}
}

impl std::error::Error for TryRecvError {}

fn is_unpin<T: Unpin>() {}
2 changes: 1 addition & 1 deletion tokio/src/sync/tests/loom_broadcast.rs
@@ -1,5 +1,5 @@
use crate::sync::broadcast;
use crate::sync::broadcast::RecvError::{Closed, Lagged};
use crate::sync::broadcast::error::RecvError::{Closed, Lagged};

use loom::future::block_on;
use loom::sync::Arc;
Expand Down
10 changes: 5 additions & 5 deletions tokio/tests/sync_broadcast.rs
Expand Up @@ -23,7 +23,7 @@ macro_rules! assert_empty {
($e:expr) => {
match $e.try_recv() {
Ok(value) => panic!("expected empty; got = {:?}", value),
Err(broadcast::TryRecvError::Empty) => {}
Err(broadcast::error::TryRecvError::Empty) => {}
Err(e) => panic!("expected empty; got = {:?}", e),
}
};
Expand All @@ -32,7 +32,7 @@ macro_rules! assert_empty {
macro_rules! assert_lagged {
($e:expr, $n:expr) => {
match assert_err!($e) {
broadcast::TryRecvError::Lagged(n) => {
broadcast::error::TryRecvError::Lagged(n) => {
assert_eq!(n, $n);
}
_ => panic!("did not lag"),
Expand All @@ -43,7 +43,7 @@ macro_rules! assert_lagged {
macro_rules! assert_closed {
($e:expr) => {
match assert_err!($e) {
broadcast::TryRecvError::Closed => {}
broadcast::error::TryRecvError::Closed => {}
_ => panic!("did not lag"),
}
};
Expand Down Expand Up @@ -491,6 +491,6 @@ fn lagging_receiver_recovers_after_wrap_open() {
assert_empty!(rx);
}

fn is_closed(err: broadcast::RecvError) -> bool {
matches!(err, broadcast::RecvError::Closed)
fn is_closed(err: broadcast::error::RecvError) -> bool {
matches!(err, broadcast::error::RecvError::Closed)
}

0 comments on commit 2e05399

Please sign in to comment.