diff --git a/tentacle/Cargo.toml b/tentacle/Cargo.toml index 156ceb0a..df0d3f36 100644 --- a/tentacle/Cargo.toml +++ b/tentacle/Cargo.toml @@ -56,6 +56,9 @@ env_logger = "0.6.0" crossbeam-channel = "0.3.6" systemstat = "0.1.3" futures-test = "0.3.5" +## lock on 1.1 +## https://github.com/myrrlyn/funty/issues/3 +funty = "=1.1.0" [target.'cfg(unix)'.dev-dependencies] nix = "0.13.0" diff --git a/tentacle/src/channel/bound.rs b/tentacle/src/channel/bound.rs index d37c5678..39211f9d 100644 --- a/tentacle/src/channel/bound.rs +++ b/tentacle/src/channel/bound.rs @@ -583,7 +583,12 @@ impl Receiver { } None => { let state = decode_state(inner.state.load(SeqCst)); - if state.is_open || state.num_messages != 0 { + if state.is_closed() { + // If closed flag is set AND there are no pending messages + // it means end of stream + self.inner = None; + Poll::Ready(None) + } else { // If queue is open, we need to return Pending // to be woken up when new messages arrive. // If queue is closed but num_messages is non-zero, @@ -592,11 +597,6 @@ impl Receiver { // so we need to park until sender unparks the task // after queueing the message. Poll::Pending - } else { - // If closed flag is set AND there are no pending messages - // it means end of stream - self.inner = None; - Poll::Ready(None) } } } @@ -661,8 +661,26 @@ impl Drop for Receiver { // Drain the channel of all pending messages self.close(); if self.inner.is_some() { - while let Poll::Ready(Some(..)) = self.next_message() { - // ... + loop { + match self.next_message() { + Poll::Ready(Some(_)) => {} + Poll::Ready(None) => break, + Poll::Pending => { + let state = decode_state(self.inner.as_ref().unwrap().state.load(SeqCst)); + + // If the channel is closed, then there is no need to park. + if state.is_closed() { + break; + } + + // TODO: Spinning isn't ideal, it might be worth + // investigating using a condvar or some other strategy + // here. That said, if this case is hit, then another thread + // is about to push the value into the queue and this isn't + // the only spinlock in the impl right now. + ::std::thread::yield_now(); + } + } } } } diff --git a/tentacle/src/channel/mod.rs b/tentacle/src/channel/mod.rs index 4ee9ba4b..e3d951b3 100644 --- a/tentacle/src/channel/mod.rs +++ b/tentacle/src/channel/mod.rs @@ -44,6 +44,12 @@ struct State { num_messages: usize, } +impl State { + fn is_closed(&self) -> bool { + !self.is_open && self.num_messages == 0 + } +} + fn decode_state(num: usize) -> State { State { is_open: num & OPEN_MASK == OPEN_MASK, diff --git a/tentacle/src/channel/tests/mpsc_close.rs b/tentacle/src/channel/tests/mpsc_close.rs index ca66fbe3..3e6f63df 100644 --- a/tentacle/src/channel/tests/mpsc_close.rs +++ b/tentacle/src/channel/tests/mpsc_close.rs @@ -3,8 +3,12 @@ use futures::executor::block_on; use futures::future::FutureExt; use futures::sink::SinkExt; use futures::stream::StreamExt; -use std::sync::Arc; +use std::future::Future; +use std::pin::Pin; +use std::sync::{Arc, Weak}; +use std::task::{Context, Poll}; use std::thread; +use std::time::{Duration, Instant}; #[test] fn smoke() { @@ -141,3 +145,137 @@ fn single_receiver_drop_closes_channel_and_drains() { assert!(sender.is_closed()); } } + +// Stress test that `try_send()`s occurring concurrently with receiver +// close/drops don't appear as successful sends. +#[test] +fn stress_try_send_as_receiver_closes() { + const AMT: usize = 10000; + // To provide variable timing characteristics (in the hopes of + // reproducing the collision that leads to a race), we busy-re-poll + // the test MPSC receiver a variable number of times before actually + // stopping. We vary this countdown between 1 and the following + // value. + const MAX_COUNTDOWN: usize = 20; + // When we detect that a successfully sent item is still in the + // queue after a disconnect, we spin for up to 100ms to confirm that + // it is a persistent condition and not a concurrency illusion. + const SPIN_TIMEOUT_S: u64 = 10; + const SPIN_SLEEP_MS: u64 = 10; + struct TestRx { + rx: mpsc::Receiver>, + // The number of times to query `rx` before dropping it. + poll_count: usize, + } + struct TestTask { + command_rx: mpsc::Receiver, + test_rx: Option>>, + countdown: usize, + } + impl TestTask { + /// Create a new TestTask + fn new() -> (TestTask, mpsc::Sender) { + let (command_tx, command_rx) = mpsc::channel::(0); + ( + TestTask { + command_rx, + test_rx: None, + countdown: 0, // 0 means no countdown is in progress. + }, + command_tx, + ) + } + } + impl Future for TestTask { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + // Poll the test channel, if one is present. + if let Some(rx) = &mut self.test_rx { + if let Poll::Ready(v) = rx.poll_next_unpin(cx) { + let _ = v.expect("test finished unexpectedly!"); + } + self.countdown -= 1; + // Busy-poll until the countdown is finished. + cx.waker().wake_by_ref(); + } + // Accept any newly submitted MPSC channels for testing. + match self + .command_rx + .poll_next_unpin(cx) + .map(|item| item.map(|i| i.1)) + { + Poll::Ready(Some(TestRx { rx, poll_count })) => { + self.test_rx = Some(rx); + self.countdown = poll_count; + cx.waker().wake_by_ref(); + } + Poll::Ready(None) => return Poll::Ready(()), + Poll::Pending => {} + } + if self.countdown == 0 { + // Countdown complete -- drop the Receiver. + self.test_rx = None; + } + Poll::Pending + } + } + let (f, cmd_tx) = TestTask::new(); + let bg = thread::spawn(move || block_on(f)); + for i in 0..AMT { + let (test_tx, rx) = mpsc::channel(0); + let poll_count = i % MAX_COUNTDOWN; + cmd_tx.try_send(TestRx { rx, poll_count }).unwrap(); + let mut prev_weak: Option> = None; + let mut attempted_sends = 0; + let mut successful_sends = 0; + loop { + // Create a test item. + let item = Arc::new(()); + let weak = Arc::downgrade(&item); + match test_tx.try_send(item) { + Ok(_) => { + prev_weak = Some(weak); + successful_sends += 1; + } + Err(ref e) if e.is_full() => {} + Err(ref e) if e.is_disconnected() => { + // Test for evidence of the race condition. + if let Some(prev_weak) = prev_weak { + if prev_weak.upgrade().is_some() { + // The previously sent item is still allocated. + // However, there appears to be some aspect of the + // concurrency that can legitimately cause the Arc + // to be momentarily valid. Spin for up to 100ms + // waiting for the previously sent item to be + // dropped. + let t0 = Instant::now(); + let mut spins = 0; + loop { + if prev_weak.upgrade().is_none() { + break; + } + assert!( + t0.elapsed() < Duration::from_secs(SPIN_TIMEOUT_S), + "item not dropped on iteration {} after \ + {} sends ({} successful). spin=({})", + i, + attempted_sends, + successful_sends, + spins + ); + spins += 1; + thread::sleep(Duration::from_millis(SPIN_SLEEP_MS)); + } + } + } + break; + } + Err(ref e) => panic!("unexpected error: {}", e), + } + attempted_sends += 1; + } + } + drop(cmd_tx); + bg.join().expect("background thread join"); +} diff --git a/tentacle/src/channel/unbound.rs b/tentacle/src/channel/unbound.rs index 488e746a..0b604365 100644 --- a/tentacle/src/channel/unbound.rs +++ b/tentacle/src/channel/unbound.rs @@ -400,7 +400,12 @@ impl UnboundedReceiver { } None => { let state = decode_state(inner.state.load(SeqCst)); - if state.is_open || state.num_messages != 0 { + if state.is_closed() { + // If closed flag is set AND there are no pending messages + // it means end of stream + self.inner = None; + Poll::Ready(None) + } else { // If queue is open, we need to return Pending // to be woken up when new messages arrive. // If queue is closed but num_messages is non-zero, @@ -409,11 +414,6 @@ impl UnboundedReceiver { // so we need to park until sender unparks the task // after queueing the message. Poll::Pending - } else { - // If closed flag is set AND there are no pending messages - // it means end of stream - self.inner = None; - Poll::Ready(None) } } } @@ -466,8 +466,26 @@ impl Drop for UnboundedReceiver { // Drain the channel of all pending messages self.close(); if self.inner.is_some() { - while let Poll::Ready(Some(..)) = self.next_message() { - // ... + loop { + match self.next_message() { + Poll::Ready(Some(_)) => {} + Poll::Ready(None) => break, + Poll::Pending => { + let state = decode_state(self.inner.as_ref().unwrap().state.load(SeqCst)); + + // If the channel is closed, then there is no need to park. + if state.is_closed() { + break; + } + + // TODO: Spinning isn't ideal, it might be worth + // investigating using a condvar or some other strategy + // here. That said, if this case is hit, then another thread + // is about to push the value into the queue and this isn't + // the only spinlock in the impl right now. + ::std::thread::yield_now(); + } + } } } }