Skip to content

Commit

Permalink
fix: port futures-rs fix on channel
Browse files Browse the repository at this point in the history
  • Loading branch information
driftluo committed Feb 13, 2021
1 parent 18068e5 commit fc9b51a
Show file tree
Hide file tree
Showing 5 changed files with 200 additions and 17 deletions.
3 changes: 3 additions & 0 deletions tentacle/Cargo.toml
Expand Up @@ -56,6 +56,9 @@ env_logger = "0.6.0"
crossbeam-channel = "0.3.6"
systemstat = "0.1.3"
futures-test = "0.3.5"
## lock on 1.1
## https://github.com/myrrlyn/funty/issues/3
funty = "=1.1.0"

[target.'cfg(unix)'.dev-dependencies]
nix = "0.13.0"
Expand Down
34 changes: 26 additions & 8 deletions tentacle/src/channel/bound.rs
Expand Up @@ -583,7 +583,12 @@ impl<T> Receiver<T> {
}
None => {
let state = decode_state(inner.state.load(SeqCst));
if state.is_open || state.num_messages != 0 {
if state.is_closed() {
// If closed flag is set AND there are no pending messages
// it means end of stream
self.inner = None;
Poll::Ready(None)
} else {
// If queue is open, we need to return Pending
// to be woken up when new messages arrive.
// If queue is closed but num_messages is non-zero,
Expand All @@ -592,11 +597,6 @@ impl<T> Receiver<T> {
// so we need to park until sender unparks the task
// after queueing the message.
Poll::Pending
} else {
// If closed flag is set AND there are no pending messages
// it means end of stream
self.inner = None;
Poll::Ready(None)
}
}
}
Expand Down Expand Up @@ -661,8 +661,26 @@ impl<T> Drop for Receiver<T> {
// Drain the channel of all pending messages
self.close();
if self.inner.is_some() {
while let Poll::Ready(Some(..)) = self.next_message() {
// ...
loop {
match self.next_message() {
Poll::Ready(Some(_)) => {}
Poll::Ready(None) => break,
Poll::Pending => {
let state = decode_state(self.inner.as_ref().unwrap().state.load(SeqCst));

// If the channel is closed, then there is no need to park.
if state.is_closed() {
break;
}

// TODO: Spinning isn't ideal, it might be worth
// investigating using a condvar or some other strategy
// here. That said, if this case is hit, then another thread
// is about to push the value into the queue and this isn't
// the only spinlock in the impl right now.
::std::thread::yield_now();
}
}
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions tentacle/src/channel/mod.rs
Expand Up @@ -44,6 +44,12 @@ struct State {
num_messages: usize,
}

impl State {
fn is_closed(&self) -> bool {
!self.is_open && self.num_messages == 0
}
}

fn decode_state(num: usize) -> State {
State {
is_open: num & OPEN_MASK == OPEN_MASK,
Expand Down
140 changes: 139 additions & 1 deletion tentacle/src/channel/tests/mpsc_close.rs
Expand Up @@ -3,8 +3,12 @@ use futures::executor::block_on;
use futures::future::FutureExt;
use futures::sink::SinkExt;
use futures::stream::StreamExt;
use std::sync::Arc;
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Weak};
use std::task::{Context, Poll};
use std::thread;
use std::time::{Duration, Instant};

#[test]
fn smoke() {
Expand Down Expand Up @@ -141,3 +145,137 @@ fn single_receiver_drop_closes_channel_and_drains() {
assert!(sender.is_closed());
}
}

// Stress test that `try_send()`s occurring concurrently with receiver
// close/drops don't appear as successful sends.
#[test]
fn stress_try_send_as_receiver_closes() {
const AMT: usize = 10000;
// To provide variable timing characteristics (in the hopes of
// reproducing the collision that leads to a race), we busy-re-poll
// the test MPSC receiver a variable number of times before actually
// stopping. We vary this countdown between 1 and the following
// value.
const MAX_COUNTDOWN: usize = 20;
// When we detect that a successfully sent item is still in the
// queue after a disconnect, we spin for up to 100ms to confirm that
// it is a persistent condition and not a concurrency illusion.
const SPIN_TIMEOUT_S: u64 = 10;
const SPIN_SLEEP_MS: u64 = 10;
struct TestRx {
rx: mpsc::Receiver<Arc<()>>,
// The number of times to query `rx` before dropping it.
poll_count: usize,
}
struct TestTask {
command_rx: mpsc::Receiver<TestRx>,
test_rx: Option<mpsc::Receiver<Arc<()>>>,
countdown: usize,
}
impl TestTask {
/// Create a new TestTask
fn new() -> (TestTask, mpsc::Sender<TestRx>) {
let (command_tx, command_rx) = mpsc::channel::<TestRx>(0);
(
TestTask {
command_rx,
test_rx: None,
countdown: 0, // 0 means no countdown is in progress.
},
command_tx,
)
}
}
impl Future for TestTask {
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// Poll the test channel, if one is present.
if let Some(rx) = &mut self.test_rx {
if let Poll::Ready(v) = rx.poll_next_unpin(cx) {
let _ = v.expect("test finished unexpectedly!");
}
self.countdown -= 1;
// Busy-poll until the countdown is finished.
cx.waker().wake_by_ref();
}
// Accept any newly submitted MPSC channels for testing.
match self
.command_rx
.poll_next_unpin(cx)
.map(|item| item.map(|i| i.1))
{
Poll::Ready(Some(TestRx { rx, poll_count })) => {
self.test_rx = Some(rx);
self.countdown = poll_count;
cx.waker().wake_by_ref();
}
Poll::Ready(None) => return Poll::Ready(()),
Poll::Pending => {}
}
if self.countdown == 0 {
// Countdown complete -- drop the Receiver.
self.test_rx = None;
}
Poll::Pending
}
}
let (f, cmd_tx) = TestTask::new();
let bg = thread::spawn(move || block_on(f));
for i in 0..AMT {
let (test_tx, rx) = mpsc::channel(0);
let poll_count = i % MAX_COUNTDOWN;
cmd_tx.try_send(TestRx { rx, poll_count }).unwrap();
let mut prev_weak: Option<Weak<()>> = None;
let mut attempted_sends = 0;
let mut successful_sends = 0;
loop {
// Create a test item.
let item = Arc::new(());
let weak = Arc::downgrade(&item);
match test_tx.try_send(item) {
Ok(_) => {
prev_weak = Some(weak);
successful_sends += 1;
}
Err(ref e) if e.is_full() => {}
Err(ref e) if e.is_disconnected() => {
// Test for evidence of the race condition.
if let Some(prev_weak) = prev_weak {
if prev_weak.upgrade().is_some() {
// The previously sent item is still allocated.
// However, there appears to be some aspect of the
// concurrency that can legitimately cause the Arc
// to be momentarily valid. Spin for up to 100ms
// waiting for the previously sent item to be
// dropped.
let t0 = Instant::now();
let mut spins = 0;
loop {
if prev_weak.upgrade().is_none() {
break;
}
assert!(
t0.elapsed() < Duration::from_secs(SPIN_TIMEOUT_S),
"item not dropped on iteration {} after \
{} sends ({} successful). spin=({})",
i,
attempted_sends,
successful_sends,
spins
);
spins += 1;
thread::sleep(Duration::from_millis(SPIN_SLEEP_MS));
}
}
}
break;
}
Err(ref e) => panic!("unexpected error: {}", e),
}
attempted_sends += 1;
}
}
drop(cmd_tx);
bg.join().expect("background thread join");
}
34 changes: 26 additions & 8 deletions tentacle/src/channel/unbound.rs
Expand Up @@ -400,7 +400,12 @@ impl<T> UnboundedReceiver<T> {
}
None => {
let state = decode_state(inner.state.load(SeqCst));
if state.is_open || state.num_messages != 0 {
if state.is_closed() {
// If closed flag is set AND there are no pending messages
// it means end of stream
self.inner = None;
Poll::Ready(None)
} else {
// If queue is open, we need to return Pending
// to be woken up when new messages arrive.
// If queue is closed but num_messages is non-zero,
Expand All @@ -409,11 +414,6 @@ impl<T> UnboundedReceiver<T> {
// so we need to park until sender unparks the task
// after queueing the message.
Poll::Pending
} else {
// If closed flag is set AND there are no pending messages
// it means end of stream
self.inner = None;
Poll::Ready(None)
}
}
}
Expand Down Expand Up @@ -466,8 +466,26 @@ impl<T> Drop for UnboundedReceiver<T> {
// Drain the channel of all pending messages
self.close();
if self.inner.is_some() {
while let Poll::Ready(Some(..)) = self.next_message() {
// ...
loop {
match self.next_message() {
Poll::Ready(Some(_)) => {}
Poll::Ready(None) => break,
Poll::Pending => {
let state = decode_state(self.inner.as_ref().unwrap().state.load(SeqCst));

// If the channel is closed, then there is no need to park.
if state.is_closed() {
break;
}

// TODO: Spinning isn't ideal, it might be worth
// investigating using a condvar or some other strategy
// here. That said, if this case is hit, then another thread
// is about to push the value into the queue and this isn't
// the only spinlock in the impl right now.
::std::thread::yield_now();
}
}
}
}
}
Expand Down

0 comments on commit fc9b51a

Please sign in to comment.