From 9d9274258825ccc1a0ba9c2679b1c6ccb9c7bf85 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Thu, 10 Sep 2020 11:03:29 -0700 Subject: [PATCH 01/11] Refactor IO registration using intrusive linked list --- .github/workflows/ci.yml | 4 +- tokio-util/Cargo.toml | 3 +- tokio-util/src/cfg.rs | 2 + tokio-util/src/lib.rs | 5 + tokio-util/tests/udp.rs | 2 + tokio/src/io/driver/mod.rs | 57 ++-- tokio/src/io/driver/scheduled_io.rs | 368 +++++++++++++++++++-- tokio/src/io/mod.rs | 3 +- tokio/src/io/poll_evented.rs | 262 ++++----------- tokio/src/io/registration.rs | 220 ++---------- tokio/src/net/tcp/listener.rs | 15 +- tokio/src/net/tcp/stream.rs | 249 +++++++------- tokio/src/net/udp/mod.rs | 4 - tokio/src/net/udp/socket.rs | 119 ++----- tokio/src/net/udp/split.rs | 148 --------- tokio/src/net/unix/datagram/mod.rs | 5 - tokio/src/net/unix/datagram/socket.rs | 200 ++--------- tokio/src/net/unix/datagram/split.rs | 68 ---- tokio/src/net/unix/datagram/split_owned.rs | 148 --------- tokio/src/net/unix/listener.rs | 25 +- tokio/src/net/unix/stream.rs | 49 +-- tokio/src/sync/task/atomic_waker.rs | 3 +- tokio/src/util/bit.rs | 2 +- tokio/src/util/linked_list.rs | 52 +++ tokio/src/util/mod.rs | 2 +- tokio/src/util/slab.rs | 2 + tokio/tests/async_send_sync.rs | 4 - tokio/tests/rt_common.rs | 4 +- tokio/tests/udp.rs | 34 +- tokio/tests/uds_datagram.rs | 34 +- 30 files changed, 755 insertions(+), 1338 deletions(-) delete mode 100644 tokio/src/net/udp/split.rs delete mode 100644 tokio/src/net/unix/datagram/split.rs delete mode 100644 tokio/src/net/unix/datagram/split_owned.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a2ae44af522..0ab36cd9a11 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -150,11 +150,11 @@ jobs: run: cargo install cargo-hack - name: check --each-feature - run: cargo hack check --all --each-feature -Z avoid-dev-deps + run: cargo hack check --all --each-feature --skip io-driver -Z avoid-dev-deps # Try with unstable feature flags - name: check --each-feature --unstable - run: cargo hack check --all --each-feature -Z avoid-dev-deps + run: cargo hack check --all --each-feature --skip io-driver -Z avoid-dev-deps env: RUSTFLAGS: --cfg tokio_unstable diff --git a/tokio-util/Cargo.toml b/tokio-util/Cargo.toml index 85b4e5929c4..45daa2b13cc 100644 --- a/tokio-util/Cargo.toml +++ b/tokio-util/Cargo.toml @@ -25,11 +25,10 @@ publish = false default = [] # Shorthand for enabling everything -full = ["codec", "udp", "compat", "io"] +full = ["codec", "compat", "io"] compat = ["futures-io",] codec = ["tokio/stream"] -udp = ["tokio/udp"] io = [] [dependencies] diff --git a/tokio-util/src/cfg.rs b/tokio-util/src/cfg.rs index 2efa5f09aff..f9176747902 100644 --- a/tokio-util/src/cfg.rs +++ b/tokio-util/src/cfg.rs @@ -18,6 +18,7 @@ macro_rules! cfg_compat { } } +/* macro_rules! cfg_udp { ($($item:item)*) => { $( @@ -27,6 +28,7 @@ macro_rules! cfg_udp { )* } } +*/ macro_rules! cfg_io { ($($item:item)*) => { diff --git a/tokio-util/src/lib.rs b/tokio-util/src/lib.rs index 137be7a3630..0f8cfcd4cf8 100644 --- a/tokio-util/src/lib.rs +++ b/tokio-util/src/lib.rs @@ -30,9 +30,14 @@ cfg_codec! { pub mod codec; } +/* +Disabled due to removal of poll_ functions on UdpSocket. + +See https://github.com/tokio-rs/tokio/issues/2830 cfg_udp! { pub mod udp; } +*/ cfg_compat! { pub mod compat; diff --git a/tokio-util/tests/udp.rs b/tokio-util/tests/udp.rs index 4820ac72d00..99763854c09 100644 --- a/tokio-util/tests/udp.rs +++ b/tokio-util/tests/udp.rs @@ -1,3 +1,4 @@ +/* #![warn(rust_2018_idioms)] use tokio::{net::UdpSocket, stream::StreamExt}; @@ -100,3 +101,4 @@ async fn send_framed_lines_codec() -> std::io::Result<()> { Ok(()) } +*/ diff --git a/tokio/src/io/driver/mod.rs b/tokio/src/io/driver/mod.rs index d8c253abb33..30b30203995 100644 --- a/tokio/src/io/driver/mod.rs +++ b/tokio/src/io/driver/mod.rs @@ -12,14 +12,13 @@ use mio::event::Evented; use std::fmt; use std::io; use std::sync::{Arc, Weak}; -use std::task::Waker; use std::time::Duration; /// I/O driver, backed by Mio pub(crate) struct Driver { /// Tracks the number of times `turn` is called. It is safe for this to wrap /// as it is mostly used to determine when to call `compact()` - tick: u16, + tick: u8, /// Reuse the `mio::Events` value across calls to poll. events: Option, @@ -40,6 +39,11 @@ pub(crate) struct Handle { inner: Weak, } +pub(crate) struct ReadyEvent { + tick: u8, + readiness: mio::Ready, +} + pub(super) struct Inner { /// The underlying system event queue. io: mio::Poll, @@ -57,6 +61,11 @@ pub(super) enum Direction { Write, } +enum Tick { + Set(u8), + Clear(u8), +} + // TODO: Don't use a fake token. Instead, reserve a slot entry for the wakeup // token. const TOKEN_WAKEUP: mio::Token = mio::Token(1 << 31); @@ -122,11 +131,11 @@ impl Driver { fn turn(&mut self, max_wait: Option) -> io::Result<()> { // How often to call `compact()` on the resource slab - const COMPACT_INTERVAL: u16 = 256; + const COMPACT_INTERVAL: u8 = 255; self.tick = self.tick.wrapping_add(1); - if self.tick % COMPACT_INTERVAL == 0 { + if self.tick == COMPACT_INTERVAL { self.resources.compact(); } @@ -160,9 +169,6 @@ impl Driver { } fn dispatch(&mut self, token: mio::Token, ready: mio::Ready) { - let mut rd = None; - let mut wr = None; - let addr = slab::Address::from_usize(ADDRESS.unpack(token.0)); let io = match self.resources.get(addr) { @@ -170,29 +176,15 @@ impl Driver { None => return, }; - if io - .set_readiness(Some(token.0), |curr| curr | ready.as_usize()) - .is_err() - { + let set = io.set_readiness(Some(token.0), Tick::Set(self.tick), |curr| { + curr | ready.as_usize() + }); + if set.is_err() { // token no longer valid! return; } - if ready.is_writable() || platform::is_hup(ready) || platform::is_error(ready) { - wr = io.writer.take_waker(); - } - - if !(ready & (!mio::Ready::writable())).is_empty() { - rd = io.reader.take_waker(); - } - - if let Some(w) = rd { - w.wake(); - } - - if let Some(w) = wr { - w.wake(); - } + io.wake(ready); } } @@ -202,8 +194,7 @@ impl Drop for Driver { // If a task is waiting on the I/O resource, notify it. The task // will then attempt to use the I/O resource and fail due to the // driver being shutdown. - io.reader.wake(); - io.writer.wake(); + io.wake(mio::Ready::all()); }) } } @@ -310,16 +301,6 @@ impl Inner { pub(super) fn deregister_source(&self, source: &dyn Evented) -> io::Result<()> { self.io.deregister(source) } - - /// Registers interest in the I/O resource associated with `token`. - pub(super) fn register(&self, io: &slab::Ref, dir: Direction, w: Waker) { - let waker = match dir { - Direction::Read => &io.reader, - Direction::Write => &io.writer, - }; - - waker.register(w); - } } impl Direction { diff --git a/tokio/src/io/driver/scheduled_io.rs b/tokio/src/io/driver/scheduled_io.rs index 566f7daf0d1..2f1d3316ede 100644 --- a/tokio/src/io/driver/scheduled_io.rs +++ b/tokio/src/io/driver/scheduled_io.rs @@ -1,8 +1,17 @@ -use crate::loom::future::AtomicWaker; +use super::{platform, Direction, ReadyEvent, Tick}; use crate::loom::sync::atomic::AtomicUsize; +use crate::loom::sync::Mutex; +use crate::util::bit; +use crate::util::linked_list::{self, LinkedList}; use crate::util::slab::Entry; +use std::cell::UnsafeCell; +use std::future::Future; +use std::marker::PhantomPinned; +use std::pin::Pin; +use std::ptr::NonNull; use std::sync::atomic::Ordering::{AcqRel, Acquire, Release}; +use std::task::{Context, Poll, Waker}; /// Stored in the I/O driver resource slab. #[derive(Debug)] @@ -10,19 +19,80 @@ pub(crate) struct ScheduledIo { /// Packs the resource's readiness with the resource's generation. readiness: AtomicUsize, - /// Task waiting on read readiness - pub(crate) reader: AtomicWaker, + waiters: Mutex, +} + +type WaitList = LinkedList::Target>; + +#[derive(Debug, Default)] +struct Waiters { + /// List of all current waiters + list: WaitList, + + /// Waker used for AsyncRead + reader: Option, + + /// Waker used for AsyncWrite + writer: Option, +} + +#[derive(Debug)] +struct Waiter { + pointers: linked_list::Pointers, + + /// The waker for this task + waker: Option, + + /// The interest this waiter is waiting on + interest: mio::Ready, + + notified: bool, + + /// Should never be `!Unpin` + _p: PhantomPinned, +} + +/// Future returned by `readiness()` +struct Readiness<'a> { + scheduled_io: &'a ScheduledIo, + + state: State, + + /// Entry in the waiter `LinkedList`. + waiter: UnsafeCell, +} + +enum State { + Init, + Waiting, + Done, +} + +// The `ScheduledIo::readiness` (`AtomicUsize`) is packed full of goodness. +// +// | reserved | generation | driver tick | readinesss | +// |----------+------------+--------------+------------| +// | 1 bit | 7 bits + 8 bits + 16 bits | + +const READINESS: bit::Pack = bit::Pack::least_significant(16); - /// Task waiting on write readiness - pub(crate) writer: AtomicWaker, +const TICK: bit::Pack = READINESS.then(8); + +const GENERATION: bit::Pack = TICK.then(7); + +#[test] +fn test_generations_assert_same() { + assert_eq!(super::GENERATION, GENERATION); } +// ===== impl ScheduledIo ===== + impl Entry for ScheduledIo { fn reset(&self) { let state = self.readiness.load(Acquire); - let generation = super::GENERATION.unpack(state); - let next = super::GENERATION.pack_lossy(generation + 1, 0); + let generation = GENERATION.unpack(state); + let next = GENERATION.pack_lossy(generation + 1, 0); self.readiness.store(next, Release); } @@ -32,15 +102,14 @@ impl Default for ScheduledIo { fn default() -> ScheduledIo { ScheduledIo { readiness: AtomicUsize::new(0), - reader: AtomicWaker::new(), - writer: AtomicWaker::new(), + waiters: Mutex::new(Default::default()), } } } impl ScheduledIo { pub(crate) fn generation(&self) -> usize { - super::GENERATION.unpack(self.readiness.load(Acquire)) + GENERATION.unpack(self.readiness.load(Acquire)) } /// Sets the readiness on this `ScheduledIo` by invoking the given closure on @@ -57,51 +126,298 @@ impl ScheduledIo { /// generation, then the corresponding IO resource has been removed and /// replaced with a new resource. In that case, this method returns `Err`. /// Otherwise, this returns the previous readiness. - pub(crate) fn set_readiness( + pub(super) fn set_readiness( &self, token: Option, + tick: Tick, f: impl Fn(usize) -> usize, ) -> Result { let mut current = self.readiness.load(Acquire); loop { - let current_generation = super::GENERATION.unpack(current); + let current_generation = GENERATION.unpack(current); if let Some(token) = token { // Check that the generation for this access is still the // current one. - if super::GENERATION.unpack(token) != current_generation { + if GENERATION.unpack(token) != current_generation { return Err(()); } } - // Mask out the generation bits so that the modifying function - // doesn't see them. + // Mask out the tick/generation bits so that the modifying + // function doesn't see them. let current_readiness = current & mio::Ready::all().as_usize(); - let new = f(current_readiness); + let mut new = f(current_readiness); debug_assert!( - new <= super::ADDRESS.max_value(), - "new readiness value would overwrite generation bits!" + new <= READINESS.max_value(), + "new readiness value would overwrite tick/generation bits!" ); - match self.readiness.compare_exchange( - current, - super::GENERATION.pack(current_generation, new), - AcqRel, - Acquire, - ) { + match tick { + Tick::Set(t) => { + new = TICK.pack(t as usize, new); + } + Tick::Clear(t) => { + if TICK.unpack(current) as u8 != t { + // Trying to clear readiness with an old event! + return Err(()); + } + new = TICK.pack(t as usize, new); + } + } + + new = GENERATION.pack(current_generation, new); + + match self + .readiness + .compare_exchange(current, new, AcqRel, Acquire) + { Ok(_) => return Ok(current), // we lost the race, retry! Err(actual) => current = actual, } } } + + pub(super) fn wake(&self, ready: mio::Ready) { + let mut waiters = self.waiters.lock().unwrap(); + + // check for AsyncRead slot + if !(ready & (!mio::Ready::writable())).is_empty() { + if let Some(waker) = waiters.reader.take() { + waker.wake(); + } + } + + // check for AsyncWrite slot + if ready.is_writable() || platform::is_hup(ready) || platform::is_error(ready) { + if let Some(waker) = waiters.writer.take() { + waker.wake(); + } + } + + // check list of waiters + for waiter in waiters + .list + .drain_filter(|w| !(w.interest & ready).is_empty()) + { + let waiter = unsafe { &mut *waiter.as_ptr() }; + if let Some(waker) = waiter.waker.take() { + waiter.notified = true; + waker.wake(); + } + } + } + + /// Poll version of checking readiness for a certain direction. + /// + /// These are to support `AsyncRead` and `AsyncWrite` polling methods, + /// which cannot use the `async fn` version. This uses reserved reader + /// and writer slots. + pub(in crate::io) fn poll_readiness( + &self, + cx: &mut Context<'_>, + direction: Direction, + ) -> Poll { + let curr = self.readiness.load(Acquire); + + let ready = direction.mask() & mio::Ready::from_usize(READINESS.unpack(curr)); + + if ready.is_empty() { + // Update the task info + let mut waiters = self.waiters.lock().unwrap(); + let slot = match direction { + Direction::Read => &mut waiters.reader, + Direction::Write => &mut waiters.writer, + }; + *slot = Some(cx.waker().clone()); + + // Try again, in case the readiness was changed while we were + // taking the waiters lock + let curr = self.readiness.load(Acquire); + let ready = direction.mask() & mio::Ready::from_usize(READINESS.unpack(curr)); + if ready.is_empty() { + Poll::Pending + } else { + Poll::Ready(ReadyEvent { + tick: TICK.unpack(curr) as u8, + readiness: ready, + }) + } + } else { + Poll::Ready(ReadyEvent { + tick: TICK.unpack(curr) as u8, + readiness: ready, + }) + } + } + + /// An async version of `poll_readiness` which uses a linked list of wakers + pub(crate) async fn readiness(&self, interest: mio::Ready) -> ReadyEvent { + self.readiness_fut(interest).await + } + + // This is in a separate function so that the borrow checker doesn't think + // we are borrowing the `UnsafeCell` possibly over await boundaries. + // + // Go figure. + fn readiness_fut(&self, interest: mio::Ready) -> Readiness<'_> { + Readiness { + scheduled_io: self, + state: State::Init, + waiter: UnsafeCell::new(Waiter { + pointers: linked_list::Pointers::new(), + waker: None, + notified: false, + interest, + _p: PhantomPinned, + }), + } + } + + pub(crate) fn clear_readiness(&self, event: ReadyEvent) { + // This consumes the current readiness state **except** for HUP and + // error. HUP and error are excluded because a) they are final states + // and never transitition out and b) both the read AND the write + // directions need to be able to obvserve these states. + // + // # Platform-specific behavior + // + // HUP and error readiness are platform-specific. On epoll platforms, + // HUP has specific conditions that must be met by both peers of a + // connection in order to be triggered. + // + // On epoll platforms, `EPOLLERR` is signaled through + // `UnixReady::error()` and is important to be observable by both read + // AND write. A specific case that `EPOLLERR` occurs is when the read + // end of a pipe is closed. When this occurs, a peer blocked by + // writing to the pipe should be notified. + // result isn't important + let mask_no_hup = (event.readiness - platform::hup() - platform::error()).as_usize(); + + let _ = self.set_readiness(None, Tick::Clear(event.tick), |curr| curr & (!mask_no_hup)); + } } impl Drop for ScheduledIo { fn drop(&mut self) { - self.writer.wake(); - self.reader.wake(); + self.wake(mio::Ready::all()); + } +} + +unsafe impl Send for ScheduledIo {} +unsafe impl Sync for ScheduledIo {} + +unsafe impl linked_list::Link for Waiter { + type Handle = NonNull; + type Target = Waiter; + + fn as_raw(handle: &NonNull) -> NonNull { + *handle + } + + unsafe fn from_raw(ptr: NonNull) -> NonNull { + ptr + } + + unsafe fn pointers(mut target: NonNull) -> NonNull> { + NonNull::from(&mut target.as_mut().pointers) } } + +// ===== impl Readiness ===== + +impl Future for Readiness<'_> { + type Output = ReadyEvent; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let (scheduled_io, state, waiter) = unsafe { + let me = self.get_unchecked_mut(); + (&me.scheduled_io, &mut me.state, &me.waiter) + }; + + loop { + match *state { + State::Init => { + let mut waiters = scheduled_io.waiters.lock().unwrap(); + + // Safety: called while locked + unsafe { + (*waiter.get()).waker = Some(cx.waker().clone()); + } + + // Insert the waiter into the linked list + // + // safety: pointers from `UnsafeCell` are never null. + waiters + .list + .push_front(unsafe { NonNull::new_unchecked(waiter.get()) }); + *state = State::Waiting; + } + State::Waiting => { + // Currently in the "Waiting" state, implying the caller has + // a waiter stored in the waiter list (guarded by + // `notify.waiters`). In order to access the waker fields, + // we must hold the lock. + + let waiters = scheduled_io.waiters.lock().unwrap(); + + // Safety: called while locked + let w = unsafe { &mut *waiter.get() }; + + if w.notified { + // Our waker has been notified. Reset the fields and + // remove it from the list. + w.waker = None; + w.notified = false; + + *state = State::Done; + } else { + // Update the waker, if necessary. + if !w.waker.as_ref().unwrap().will_wake(cx.waker()) { + w.waker = Some(cx.waker().clone()); + } + + return Poll::Pending; + } + + // Explicit drop of the lock to indicate the scope that the + // lock is held. Because holding the lock is required to + // ensure safe access to fields not held within the lock, it + // is helpful to visualize the scope of the critical + // section. + drop(waiters); + } + State::Done => { + let tick = TICK.unpack(scheduled_io.readiness.load(Acquire)) as u8; + + // Safety: State::Done means it is no longer shared + let w = unsafe { &mut *waiter.get() }; + + return Poll::Ready(ReadyEvent { + tick, + readiness: w.interest, + }); + } + } + } + } +} + +impl Drop for Readiness<'_> { + fn drop(&mut self) { + let mut waiters = self.scheduled_io.waiters.lock().unwrap(); + + // Safety: `waiter` is only ever stored in `waiters` + unsafe { + waiters + .list + .remove(NonNull::new_unchecked(self.waiter.get())) + }; + } +} + +unsafe impl Send for Readiness<'_> {} +unsafe impl Sync for Readiness<'_> {} diff --git a/tokio/src/io/mod.rs b/tokio/src/io/mod.rs index c4b4d7d3401..147c9d56bc0 100644 --- a/tokio/src/io/mod.rs +++ b/tokio/src/io/mod.rs @@ -207,10 +207,9 @@ cfg_io_driver! { pub(crate) mod driver; mod poll_evented; - pub use poll_evented::PollEvented; + pub(crate) use poll_evented::PollEvented; mod registration; - pub use registration::Registration; } cfg_io_std! { diff --git a/tokio/src/io/poll_evented.rs b/tokio/src/io/poll_evented.rs index 9054c3b8733..64d70ceae34 100644 --- a/tokio/src/io/poll_evented.rs +++ b/tokio/src/io/poll_evented.rs @@ -1,13 +1,12 @@ -use crate::io::driver::platform; -use crate::io::{AsyncRead, AsyncWrite, ReadBuf, Registration}; +use crate::io::driver::{Direction, ReadyEvent}; +use crate::io::registration::Registration; +use crate::io::{AsyncRead, AsyncWrite, ReadBuf}; use mio::event::Evented; use std::fmt; use std::io::{self, Read, Write}; use std::marker::Unpin; use std::pin::Pin; -use std::sync::atomic::AtomicUsize; -use std::sync::atomic::Ordering::Relaxed; use std::task::{Context, Poll}; cfg_io_driver! { @@ -53,37 +52,6 @@ cfg_io_driver! { /// [`TcpListener`] implements poll_accept by using [`poll_read_ready`] and /// [`clear_read_ready`]. /// - /// ```rust - /// use tokio::io::PollEvented; - /// - /// use futures::ready; - /// use mio::Ready; - /// use mio::net::{TcpStream, TcpListener}; - /// use std::io; - /// use std::task::{Context, Poll}; - /// - /// struct MyListener { - /// poll_evented: PollEvented, - /// } - /// - /// impl MyListener { - /// pub fn poll_accept(&mut self, cx: &mut Context<'_>) -> Poll> { - /// let ready = Ready::readable(); - /// - /// ready!(self.poll_evented.poll_read_ready(cx, ready))?; - /// - /// match self.poll_evented.get_ref().accept() { - /// Ok((socket, _)) => Poll::Ready(Ok(socket)), - /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - /// self.poll_evented.clear_read_ready(cx, ready)?; - /// Poll::Pending - /// } - /// Err(e) => Poll::Ready(Err(e)), - /// } - /// } - /// } - /// ``` - /// /// ## Platform-specific events /// /// `PollEvented` also allows receiving platform-specific `mio::Ready` events. @@ -101,66 +69,14 @@ cfg_io_driver! { /// [`clear_write_ready`]: method@Self::clear_write_ready /// [`poll_read_ready`]: method@Self::poll_read_ready /// [`poll_write_ready`]: method@Self::poll_write_ready - pub struct PollEvented { + pub(crate) struct PollEvented { io: Option, - inner: Inner, + registration: Registration, } } -struct Inner { - registration: Registration, - - /// Currently visible read readiness - read_readiness: AtomicUsize, - - /// Currently visible write readiness - write_readiness: AtomicUsize, -} - // ===== impl PollEvented ===== -macro_rules! poll_ready { - ($me:expr, $mask:expr, $cache:ident, $take:ident, $poll:expr) => {{ - // Load cached & encoded readiness. - let mut cached = $me.inner.$cache.load(Relaxed); - let mask = $mask | platform::hup() | platform::error(); - - // See if the current readiness matches any bits. - let mut ret = mio::Ready::from_usize(cached) & $mask; - - if ret.is_empty() { - // Readiness does not match, consume the registration's readiness - // stream. This happens in a loop to ensure that the stream gets - // drained. - loop { - let ready = match $poll? { - Poll::Ready(v) => v, - Poll::Pending => return Poll::Pending, - }; - cached |= ready.as_usize(); - - // Update the cache store - $me.inner.$cache.store(cached, Relaxed); - - ret |= ready & mask; - - if !ret.is_empty() { - return Poll::Ready(Ok(ret)); - } - } - } else { - // Check what's new with the registration stream. This will not - // request to be notified - if let Some(ready) = $me.inner.registration.$take()? { - cached |= ready.as_usize(); - $me.inner.$cache.store(cached, Relaxed); - } - - Poll::Ready(Ok(mio::Ready::from_usize(cached))) - } - }}; -} - impl PollEvented where E: Evented, @@ -174,7 +90,7 @@ where /// The runtime is usually set implicitly when this function is called /// from a future driven by a tokio runtime, otherwise runtime can be set /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. - pub fn new(io: E) -> io::Result { + pub(crate) fn new(io: E) -> io::Result { PollEvented::new_with_ready(io, mio::Ready::all()) } @@ -202,27 +118,23 @@ where /// The runtime is usually set implicitly when this function is called /// from a future driven by a tokio runtime, otherwise runtime can be set /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. - pub fn new_with_ready(io: E, ready: mio::Ready) -> io::Result { + pub(crate) fn new_with_ready(io: E, ready: mio::Ready) -> io::Result { let registration = Registration::new_with_ready(&io, ready)?; Ok(Self { io: Some(io), - inner: Inner { - registration, - read_readiness: AtomicUsize::new(0), - write_readiness: AtomicUsize::new(0), - }, + registration, }) } /// Returns a shared reference to the underlying I/O object this readiness /// stream is wrapping. - pub fn get_ref(&self) -> &E { + pub(crate) fn get_ref(&self) -> &E { self.io.as_ref().unwrap() } /// Returns a mutable reference to the underlying I/O object this readiness /// stream is wrapping. - pub fn get_mut(&mut self) -> &mut E { + pub(crate) fn get_mut(&mut self) -> &mut E { self.io.as_mut().unwrap() } @@ -234,12 +146,20 @@ where /// Note that deregistering does not guarantee that the I/O resource can be /// registered with a different reactor. Some I/O resource types can only be /// associated with a single reactor instance for their lifetime. - pub fn into_inner(mut self) -> io::Result { + pub(crate) fn into_inner(mut self) -> io::Result { let io = self.io.take().unwrap(); - self.inner.registration.deregister(&io)?; + self.registration.deregister(&io)?; Ok(io) } + pub(crate) async fn readiness(&self, interest: mio::Ready) -> io::Result { + self.registration.readiness(interest).await + } + + pub(crate) fn clear_readiness(&self, event: ReadyEvent) { + self.registration.clear_readiness(event); + } + /// Checks the I/O resource's read readiness state. /// /// The mask argument allows specifying what readiness to notify on. This @@ -266,51 +186,8 @@ where /// /// This method may not be called concurrently. It takes `&self` to allow /// calling it concurrently with `poll_write_ready`. - pub fn poll_read_ready( - &self, - cx: &mut Context<'_>, - mask: mio::Ready, - ) -> Poll> { - assert!(!mask.is_writable(), "cannot poll for write readiness"); - poll_ready!( - self, - mask, - read_readiness, - take_read_ready, - self.inner.registration.poll_read_ready(cx) - ) - } - - /// Clears the I/O resource's read readiness state and registers the current - /// task to be notified once a read readiness event is received. - /// - /// After calling this function, `poll_read_ready` will return - /// `Poll::Pending` until a new read readiness event has been received. - /// - /// The `mask` argument specifies the readiness bits to clear. This may not - /// include `writable` or `hup`. - /// - /// # Panics - /// - /// This function panics if: - /// - /// * `ready` includes writable or HUP - /// * called from outside of a task context. - pub fn clear_read_ready(&self, cx: &mut Context<'_>, ready: mio::Ready) -> io::Result<()> { - // Cannot clear write readiness - assert!(!ready.is_writable(), "cannot clear write readiness"); - assert!(!platform::is_hup(ready), "cannot clear HUP readiness"); - - self.inner - .read_readiness - .fetch_and(!ready.as_usize(), Relaxed); - - if self.poll_read_ready(cx, ready)?.is_ready() { - // Notify the current task - cx.waker().wake_by_ref(); - } - - Ok(()) + pub(crate) fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll> { + self.registration.poll_readiness(cx, Direction::Read) } /// Checks the I/O resource's write readiness state. @@ -337,41 +214,8 @@ where /// /// This method may not be called concurrently. It takes `&self` to allow /// calling it concurrently with `poll_read_ready`. - pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll> { - poll_ready!( - self, - mio::Ready::writable(), - write_readiness, - take_write_ready, - self.inner.registration.poll_write_ready(cx) - ) - } - - /// Resets the I/O resource's write readiness state and registers the current - /// task to be notified once a write readiness event is received. - /// - /// This only clears writable readiness. HUP (on platforms that support HUP) - /// cannot be cleared as it is a final state. - /// - /// After calling this function, `poll_write_ready(Ready::writable())` will - /// return `NotReady` until a new write readiness event has been received. - /// - /// # Panics - /// - /// This function will panic if called from outside of a task context. - pub fn clear_write_ready(&self, cx: &mut Context<'_>) -> io::Result<()> { - let ready = mio::Ready::writable(); - - self.inner - .write_readiness - .fetch_and(!ready.as_usize(), Relaxed); - - if self.poll_write_ready(cx)?.is_ready() { - // Notify the current task - cx.waker().wake_by_ref(); - } - - Ok(()) + pub(crate) fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll> { + self.registration.poll_readiness(cx, Direction::Write) } } @@ -386,20 +230,22 @@ where cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll> { - ready!(self.poll_read_ready(cx, mio::Ready::readable()))?; + loop { + let ev = ready!(self.poll_read_ready(cx))?; - // We can't assume the `Read` won't look at the read buffer, - // so we have to force initialization here. - let r = (*self).get_mut().read(buf.initialize_unfilled()); + // We can't assume the `Read` won't look at the read buffer, + // so we have to force initialization here. + let r = (*self).get_mut().read(buf.initialize_unfilled()); - if is_wouldblock(&r) { - self.clear_read_ready(cx, mio::Ready::readable())?; - return Poll::Pending; - } + if is_wouldblock(&r) { + self.clear_readiness(ev); + continue; + } - Poll::Ready(r.map(|n| { - buf.add_filled(n); - })) + return Poll::Ready(r.map(|n| { + buf.add_filled(n); + })); + } } } @@ -412,29 +258,33 @@ where cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { - ready!(self.poll_write_ready(cx))?; + loop { + let ev = ready!(self.poll_write_ready(cx))?; - let r = (*self).get_mut().write(buf); + let r = (*self).get_mut().write(buf); - if is_wouldblock(&r) { - self.clear_write_ready(cx)?; - return Poll::Pending; - } + if is_wouldblock(&r) { + self.clear_readiness(ev); + continue; + } - Poll::Ready(r) + return Poll::Ready(r); + } } fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - ready!(self.poll_write_ready(cx))?; + loop { + let ev = ready!(self.poll_write_ready(cx))?; - let r = (*self).get_mut().flush(); + let r = (*self).get_mut().flush(); - if is_wouldblock(&r) { - self.clear_write_ready(cx)?; - return Poll::Pending; - } + if is_wouldblock(&r) { + self.clear_readiness(ev); + continue; + } - Poll::Ready(r) + return Poll::Ready(r); + } } fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { @@ -459,7 +309,7 @@ impl Drop for PollEvented { fn drop(&mut self) { if let Some(io) = self.io.take() { // Ignore errors - let _ = self.inner.registration.deregister(&io); + let _ = self.registration.deregister(&io); } } } diff --git a/tokio/src/io/registration.rs b/tokio/src/io/registration.rs index 8206507280d..259b2d47c3a 100644 --- a/tokio/src/io/registration.rs +++ b/tokio/src/io/registration.rs @@ -1,4 +1,4 @@ -use crate::io::driver::{platform, Direction, Handle, ScheduledIo}; +use crate::io::driver::{Direction, Handle, ReadyEvent, ScheduledIo}; use crate::util::slab; use mio::{self, Evented}; @@ -38,7 +38,7 @@ cfg_io_driver! { /// [`poll_read_ready`]: method@Self::poll_read_ready` /// [`poll_write_ready`]: method@Self::poll_write_ready` #[derive(Debug)] - pub struct Registration { + pub(super) struct Registration { /// Handle to the associated driver. handle: Handle, @@ -53,28 +53,6 @@ unsafe impl Sync for Registration {} // ===== impl Registration ===== impl Registration { - /// Registers the I/O resource with the default reactor. - /// - /// # Return - /// - /// - `Ok` if the registration happened successfully - /// - `Err` if an error was encountered during registration - /// - /// - /// # Panics - /// - /// This function panics if thread-local runtime is not set. - /// - /// The runtime is usually set implicitly when this function is called - /// from a future driven by a tokio runtime, otherwise runtime can be set - /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. - pub fn new(io: &T) -> io::Result - where - T: Evented, - { - Registration::new_with_ready(io, mio::Ready::all()) - } - /// Registers the I/O resource with the default reactor, for a specific `mio::Ready` state. /// `new_with_ready` should be used over `new` when you need control over the readiness state, /// such as when a file descriptor only allows reads. This does not add `hup` or `error` so if @@ -105,7 +83,7 @@ impl Registration { /// The runtime is usually set implicitly when this function is called /// from a future driven by a tokio runtime, otherwise runtime can be set /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. - pub fn new_with_ready(io: &T, ready: mio::Ready) -> io::Result + pub(super) fn new_with_ready(io: &T, ready: mio::Ready) -> io::Result where T: Evented, { @@ -138,7 +116,7 @@ impl Registration { /// no longer result in notifications getting sent for this registration. /// /// `Err` is returned if an error is encountered. - pub fn deregister(&mut self, io: &T) -> io::Result<()> + pub(super) fn deregister(&mut self, io: &T) -> io::Result<()> where T: Evented, { @@ -149,192 +127,32 @@ impl Registration { inner.deregister_source(io) } - /// Polls for events on the I/O resource's read readiness stream. - /// - /// If the I/O resource receives a new read readiness event since the last - /// call to `poll_read_ready`, it is returned. If it has not, the current - /// task is notified once a new event is received. - /// - /// All events except `HUP` are [edge-triggered]. Once `HUP` is returned, - /// the function will always return `Ready(HUP)`. This should be treated as - /// the end of the readiness stream. - /// - /// # Return value - /// - /// There are several possible return values: - /// - /// * `Poll::Ready(Ok(readiness))` means that the I/O resource has received - /// a new readiness event. The readiness value is included. - /// - /// * `Poll::Pending` means that no new readiness events have been received - /// since the last call to `poll_read_ready`. - /// - /// * `Poll::Ready(Err(err))` means that the registration has encountered an - /// error. This could represent a permanent internal error for example. - /// - /// [edge-triggered]: struct@mio::Poll#edge-triggered-and-level-triggered - /// - /// # Panics - /// - /// This function will panic if called from outside of a task context. - pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll> { - // Keep track of task budget - let coop = ready!(crate::coop::poll_proceed(cx)); - - let v = self.poll_ready(Direction::Read, Some(cx)).map_err(|e| { - coop.made_progress(); - e - })?; - match v { - Some(v) => { - coop.made_progress(); - Poll::Ready(Ok(v)) - } - None => Poll::Pending, - } + pub(super) async fn readiness(&self, interest: mio::Ready) -> io::Result { + // TODO: does this need to return a `Result`? + Ok(self.shared.readiness(interest).await) } - /// Consume any pending read readiness event. - /// - /// This function is identical to [`poll_read_ready`] **except** that it - /// will not notify the current task when a new event is received. As such, - /// it is safe to call this function from outside of a task context. - /// - /// [`poll_read_ready`]: method@Self::poll_read_ready - pub fn take_read_ready(&self) -> io::Result> { - self.poll_ready(Direction::Read, None) - } - - /// Polls for events on the I/O resource's write readiness stream. - /// - /// If the I/O resource receives a new write readiness event since the last - /// call to `poll_write_ready`, it is returned. If it has not, the current - /// task is notified once a new event is received. - /// - /// All events except `HUP` are [edge-triggered]. Once `HUP` is returned, - /// the function will always return `Ready(HUP)`. This should be treated as - /// the end of the readiness stream. - /// - /// # Return value - /// - /// There are several possible return values: - /// - /// * `Poll::Ready(Ok(readiness))` means that the I/O resource has received - /// a new readiness event. The readiness value is included. - /// - /// * `Poll::Pending` means that no new readiness events have been received - /// since the last call to `poll_write_ready`. - /// - /// * `Poll::Ready(Err(err))` means that the registration has encountered an - /// error. This could represent a permanent internal error for example. - /// - /// [edge-triggered]: struct@mio::Poll#edge-triggered-and-level-triggered - /// - /// # Panics - /// - /// This function will panic if called from outside of a task context. - pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll> { - // Keep track of task budget - let coop = ready!(crate::coop::poll_proceed(cx)); - - let v = self.poll_ready(Direction::Write, Some(cx)).map_err(|e| { - coop.made_progress(); - e - })?; - match v { - Some(v) => { - coop.made_progress(); - Poll::Ready(Ok(v)) - } - None => Poll::Pending, - } - } - - /// Consumes any pending write readiness event. - /// - /// This function is identical to [`poll_write_ready`] **except** that it - /// will not notify the current task when a new event is received. As such, - /// it is safe to call this function from outside of a task context. - /// - /// [`poll_write_ready`]: method@Self::poll_write_ready - pub fn take_write_ready(&self) -> io::Result> { - self.poll_ready(Direction::Write, None) + pub(super) fn clear_readiness(&self, event: ReadyEvent) { + self.shared.clear_readiness(event); } /// Polls for events on the I/O resource's `direction` readiness stream. /// /// If called with a task context, notify the task when a new event is /// received. - fn poll_ready( + pub(super) fn poll_readiness( &self, + cx: &mut Context<'_>, direction: Direction, - cx: Option<&mut Context<'_>>, - ) -> io::Result> { - let inner = match self.handle.inner() { - Some(inner) => inner, - None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")), - }; - - // If the task should be notified about new events, ensure that it has - // been registered - if let Some(ref cx) = cx { - inner.register(&self.shared, direction, cx.waker().clone()) - } - - let mask = direction.mask(); - let mask_no_hup = (mask - platform::hup() - platform::error()).as_usize(); - - // This consumes the current readiness state **except** for HUP and - // error. HUP and error are excluded because a) they are final states - // and never transitition out and b) both the read AND the write - // directions need to be able to obvserve these states. - // - // # Platform-specific behavior - // - // HUP and error readiness are platform-specific. On epoll platforms, - // HUP has specific conditions that must be met by both peers of a - // connection in order to be triggered. - // - // On epoll platforms, `EPOLLERR` is signaled through - // `UnixReady::error()` and is important to be observable by both read - // AND write. A specific case that `EPOLLERR` occurs is when the read - // end of a pipe is closed. When this occurs, a peer blocked by - // writing to the pipe should be notified. - let curr_ready = self - .shared - .set_readiness(None, |curr| curr & (!mask_no_hup)) - .unwrap_or_else(|_| unreachable!()); - - let mut ready = mask & mio::Ready::from_usize(curr_ready); - - if ready.is_empty() { - if let Some(cx) = cx { - // Update the task info - match direction { - Direction::Read => self.shared.reader.register_by_ref(cx.waker()), - Direction::Write => self.shared.writer.register_by_ref(cx.waker()), - } - - // Try again - let curr_ready = self - .shared - .set_readiness(None, |curr| curr & (!mask_no_hup)) - .unwrap(); - ready = mask & mio::Ready::from_usize(curr_ready); - } + ) -> Poll> { + if self.handle.inner().is_none() { + return Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, "reactor gone"))); } - if ready.is_empty() { - Ok(None) - } else { - Ok(Some(ready)) - } - } -} - -impl Drop for Registration { - fn drop(&mut self) { - drop(self.shared.reader.take_waker()); - drop(self.shared.writer.take_waker()); + // Keep track of task budget + let coop = ready!(crate::coop::poll_proceed(cx)); + let ev = ready!(self.shared.poll_readiness(cx, direction)); + coop.made_progress(); + Poll::Ready(Ok(ev)) } } diff --git a/tokio/src/net/tcp/listener.rs b/tokio/src/net/tcp/listener.rs index 44945e387b4..f00de265d45 100644 --- a/tokio/src/net/tcp/listener.rs +++ b/tokio/src/net/tcp/listener.rs @@ -205,15 +205,16 @@ impl TcpListener { &mut self, cx: &mut Context<'_>, ) -> Poll> { - ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?; + loop { + let ev = ready!(self.io.poll_read_ready(cx))?; - match self.io.get_ref().accept_std() { - Ok(pair) => Poll::Ready(Ok(pair)), - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_read_ready(cx, mio::Ready::readable())?; - Poll::Pending + match self.io.get_ref().accept_std() { + Ok(pair) => return Poll::Ready(Ok(pair)), + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_readiness(ev); + } + Err(e) => return Poll::Ready(Err(e)), } - Err(e) => Poll::Ready(Err(e)), } } diff --git a/tokio/src/net/tcp/stream.rs b/tokio/src/net/tcp/stream.rs index e0348724cff..99fc71121dc 100644 --- a/tokio/src/net/tcp/stream.rs +++ b/tokio/src/net/tcp/stream.rs @@ -299,15 +299,16 @@ impl TcpStream { cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll> { - ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?; + loop { + let ev = ready!(self.io.poll_read_ready(cx))?; - match self.io.get_ref().peek(buf) { - Ok(ret) => Poll::Ready(Ok(ret)), - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_read_ready(cx, mio::Ready::readable())?; - Poll::Pending + match self.io.get_ref().peek(buf) { + Ok(ret) => return Poll::Ready(Ok(ret)), + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_readiness(ev); + } + Err(e) => return Poll::Ready(Err(e)), } - Err(e) => Poll::Ready(Err(e)), } } @@ -703,26 +704,28 @@ impl TcpStream { cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll> { - ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?; - - // Safety: `TcpStream::read` will not peak at the maybe uinitialized bytes. - let b = - unsafe { &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit] as *mut [u8]) }; - match self.io.get_ref().read(b) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_read_ready(cx, mio::Ready::readable())?; - Poll::Pending - } - Ok(n) => { - // Safety: We trust `TcpStream::read` to have filled up `n` bytes - // in the buffer. - unsafe { - buf.assume_init(n); + loop { + let ev = ready!(self.io.poll_read_ready(cx))?; + + // Safety: `TcpStream::read` will not peak at the maybe uinitialized bytes. + let b = unsafe { + &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit] as *mut [u8]) + }; + match self.io.get_ref().read(b) { + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_readiness(ev); + } + Ok(n) => { + // Safety: We trust `TcpStream::read` to have filled up `n` bytes + // in the buffer. + unsafe { + buf.assume_init(n); + } + buf.add_filled(n); + return Poll::Ready(Ok(())); } - buf.add_filled(n); - Poll::Ready(Ok(())) + Err(e) => return Poll::Ready(Err(e)), } - Err(e) => Poll::Ready(Err(e)), } } @@ -731,14 +734,15 @@ impl TcpStream { cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { - ready!(self.io.poll_write_ready(cx))?; + loop { + let ev = ready!(self.io.poll_write_ready(cx))?; - match self.io.get_ref().write(buf) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_write_ready(cx)?; - Poll::Pending + match self.io.get_ref().write(buf) { + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_readiness(ev); + } + x => return Poll::Ready(x), } - x => Poll::Ready(x), } } @@ -749,99 +753,100 @@ impl TcpStream { ) -> Poll> { use std::io::IoSlice; - ready!(self.io.poll_write_ready(cx))?; - - // The `IoVec` (v0.1.x) type can't have a zero-length size, so create - // a dummy version from a 1-length slice which we'll overwrite with - // the `bytes_vectored` method. - static S: &[u8] = &[0]; - const MAX_BUFS: usize = 64; - - // IoSlice isn't Copy, so we must expand this manually ;_; - let mut slices: [IoSlice<'_>; MAX_BUFS] = [ - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - ]; - let cnt = buf.bytes_vectored(&mut slices); - - let iovec = <&IoVec>::from(S); - let mut vecs = [iovec; MAX_BUFS]; - for i in 0..cnt { - vecs[i] = (*slices[i]).into(); - } - - match self.io.get_ref().write_bufs(&vecs[..cnt]) { - Ok(n) => { - buf.advance(n); - Poll::Ready(Ok(n)) + loop { + let ev = ready!(self.io.poll_write_ready(cx))?; + + // The `IoVec` (v0.1.x) type can't have a zero-length size, so create + // a dummy version from a 1-length slice which we'll overwrite with + // the `bytes_vectored` method. + static S: &[u8] = &[0]; + const MAX_BUFS: usize = 64; + + // IoSlice isn't Copy, so we must expand this manually ;_; + let mut slices: [IoSlice<'_>; MAX_BUFS] = [ + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + ]; + let cnt = buf.bytes_vectored(&mut slices); + + let iovec = <&IoVec>::from(S); + let mut vecs = [iovec; MAX_BUFS]; + for i in 0..cnt { + vecs[i] = (*slices[i]).into(); } - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_write_ready(cx)?; - Poll::Pending + + match self.io.get_ref().write_bufs(&vecs[..cnt]) { + Ok(n) => { + buf.advance(n); + return Poll::Ready(Ok(n)); + } + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_readiness(ev); + } + Err(e) => return Poll::Ready(Err(e)), } - Err(e) => Poll::Ready(Err(e)), } } } diff --git a/tokio/src/net/udp/mod.rs b/tokio/src/net/udp/mod.rs index d43121a1ca0..c9bb0f83b61 100644 --- a/tokio/src/net/udp/mod.rs +++ b/tokio/src/net/udp/mod.rs @@ -1,7 +1,3 @@ //! UDP utility types. pub(crate) mod socket; -pub(crate) use socket::UdpSocket; - -mod split; -pub use split::{RecvHalf, ReuniteError, SendHalf}; diff --git a/tokio/src/net/udp/socket.rs b/tokio/src/net/udp/socket.rs index f9d88372035..d50cfa45e15 100644 --- a/tokio/src/net/udp/socket.rs +++ b/tokio/src/net/udp/socket.rs @@ -1,13 +1,10 @@ -use crate::future::poll_fn; use crate::io::PollEvented; -use crate::net::udp::split::{split, RecvHalf, SendHalf}; use crate::net::ToSocketAddrs; use std::convert::TryFrom; use std::fmt; use std::io; use std::net::{self, Ipv4Addr, Ipv6Addr, SocketAddr}; -use std::task::{Context, Poll}; cfg_udp! { /// A UDP socket @@ -67,15 +64,7 @@ impl UdpSocket { /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. pub fn from_std(socket: net::UdpSocket) -> io::Result { let io = mio::net::UdpSocket::from_socket(socket)?; - let io = PollEvented::new(io)?; - Ok(UdpSocket { io }) - } - - /// Splits the `UdpSocket` into a receive half and a send half. The two parts - /// can be used to receive and send datagrams concurrently, even from two - /// different tasks. - pub fn split(self) -> (RecvHalf, SendHalf) { - split(self) + UdpSocket::new(io) } /// Returns the local address that this socket is bound to. @@ -112,8 +101,9 @@ impl UdpSocket { /// will resolve to an error if the socket is not connected. /// /// [`connect`]: method@Self::connect - pub async fn send(&mut self, buf: &[u8]) -> io::Result { - poll_fn(|cx| self.poll_send(cx, buf)).await + pub async fn send(&self, buf: &[u8]) -> io::Result { + self.do_io(mio::Ready::writable(), |sock| sock.send(buf)) + .await } /// Try to send data on the socket to the remote address to which it is @@ -130,29 +120,6 @@ impl UdpSocket { self.io.get_ref().send(buf) } - // Poll IO functions that takes `&self` are provided for the split API. - // - // They are not public because (taken from the doc of `PollEvented`): - // - // While `PollEvented` is `Sync` (if the underlying I/O type is `Sync`), the - // caller must ensure that there are at most two tasks that use a - // `PollEvented` instance concurrently. One for reading and one for writing. - // While violating this requirement is "safe" from a Rust memory model point - // of view, it will result in unexpected behavior in the form of lost - // notifications and tasks hanging. - #[doc(hidden)] - pub fn poll_send(&self, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { - ready!(self.io.poll_write_ready(cx))?; - - match self.io.get_ref().send(buf) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_write_ready(cx)?; - Poll::Pending - } - x => Poll::Ready(x), - } - } - /// Returns a future that receives a single datagram message on the socket from /// the remote address to which it is connected. On success, the future will resolve /// to the number of bytes read. @@ -165,21 +132,9 @@ impl UdpSocket { /// will fail if the socket is not connected. /// /// [`connect`]: method@Self::connect - pub async fn recv(&mut self, buf: &mut [u8]) -> io::Result { - poll_fn(|cx| self.poll_recv(cx, buf)).await - } - - #[doc(hidden)] - pub fn poll_recv(&self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { - ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?; - - match self.io.get_ref().recv(buf) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_read_ready(cx, mio::Ready::readable())?; - Poll::Pending - } - x => Poll::Ready(x), - } + pub async fn recv(&self, buf: &mut [u8]) -> io::Result { + self.do_io(mio::Ready::readable(), |sock| sock.recv(buf)) + .await } /// Returns a future that sends data on the socket to the given address. @@ -187,11 +142,11 @@ impl UdpSocket { /// /// The future will resolve to an error if the IP version of the socket does /// not match that of `target`. - pub async fn send_to(&mut self, buf: &[u8], target: A) -> io::Result { + pub async fn send_to(&self, buf: &[u8], target: A) -> io::Result { let mut addrs = target.to_socket_addrs().await?; match addrs.next() { - Some(target) => poll_fn(|cx| self.poll_send_to(cx, buf, &target)).await, + Some(target) => self.send_to_addr(buf, &target).await, None => Err(io::Error::new( io::ErrorKind::InvalidInput, "no addresses to send data to", @@ -214,23 +169,9 @@ impl UdpSocket { self.io.get_ref().send_to(buf, &target) } - // TODO: Public or not? - #[doc(hidden)] - pub fn poll_send_to( - &self, - cx: &mut Context<'_>, - buf: &[u8], - target: &SocketAddr, - ) -> Poll> { - ready!(self.io.poll_write_ready(cx))?; - - match self.io.get_ref().send_to(buf, target) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_write_ready(cx)?; - Poll::Pending - } - x => Poll::Ready(x), - } + async fn send_to_addr(&self, buf: &[u8], target: &SocketAddr) -> io::Result { + self.do_io(mio::Ready::writable(), |sock| sock.send_to(buf, target)) + .await } /// Returns a future that receives a single datagram on the socket. On success, @@ -239,24 +180,24 @@ impl UdpSocket { /// The function must be called with valid byte array `buf` of sufficient size /// to hold the message bytes. If a message is too long to fit in the supplied /// buffer, excess bytes may be discarded. - pub async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { - poll_fn(|cx| self.poll_recv_from(cx, buf)).await - } - - #[doc(hidden)] - pub fn poll_recv_from( - &self, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { - ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?; - - match self.io.get_ref().recv_from(buf) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_read_ready(cx, mio::Ready::readable())?; - Poll::Pending + pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { + self.do_io(mio::Ready::readable(), |sock| sock.recv_from(buf)) + .await + } + + async fn do_io(&self, interest: mio::Ready, mut op: F) -> io::Result + where + F: FnMut(&mio::net::UdpSocket) -> io::Result, + { + loop { + let event = self.io.readiness(interest).await?; + + match op(self.io.get_ref()) { + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_readiness(event); + } + x => return x, } - x => Poll::Ready(x), } } @@ -423,7 +364,7 @@ impl TryFrom for UdpSocket { impl fmt::Debug for UdpSocket { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.io.get_ref().fmt(f) + self.io.fmt(f) } } diff --git a/tokio/src/net/udp/split.rs b/tokio/src/net/udp/split.rs deleted file mode 100644 index 8d87f1c7c67..00000000000 --- a/tokio/src/net/udp/split.rs +++ /dev/null @@ -1,148 +0,0 @@ -//! [`UdpSocket`](crate::net::UdpSocket) split support. -//! -//! The [`split`](method@crate::net::UdpSocket::split) method splits a -//! `UdpSocket` into a receive half and a send half, which can be used to -//! receive and send datagrams concurrently, even from two different tasks. -//! -//! The halves provide access to the underlying socket, implementing -//! `AsRef`. This allows you to call `UdpSocket` methods that takes -//! `&self`, e.g., to get local address, to get and set socket options, to join -//! or leave multicast groups, etc. -//! -//! The halves can be reunited to the original socket with their `reunite` -//! methods. - -use crate::future::poll_fn; -use crate::net::udp::UdpSocket; - -use std::error::Error; -use std::fmt; -use std::io; -use std::net::SocketAddr; -use std::sync::Arc; - -/// The send half after [`split`](super::UdpSocket::split). -/// -/// Use [`send_to`](method@Self::send_to) or [`send`](method@Self::send) to send -/// datagrams. -#[derive(Debug)] -pub struct SendHalf(Arc); - -/// The recv half after [`split`](super::UdpSocket::split). -/// -/// Use [`recv_from`](method@Self::recv_from) or [`recv`](method@Self::recv) to receive -/// datagrams. -#[derive(Debug)] -pub struct RecvHalf(Arc); - -pub(crate) fn split(socket: UdpSocket) -> (RecvHalf, SendHalf) { - let shared = Arc::new(socket); - let send = shared.clone(); - let recv = shared; - (RecvHalf(recv), SendHalf(send)) -} - -/// Error indicating that two halves were not from the same socket, and thus could -/// not be `reunite`d. -#[derive(Debug)] -pub struct ReuniteError(pub SendHalf, pub RecvHalf); - -impl fmt::Display for ReuniteError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!( - f, - "tried to reunite halves that are not from the same socket" - ) - } -} - -impl Error for ReuniteError {} - -fn reunite(s: SendHalf, r: RecvHalf) -> Result { - if Arc::ptr_eq(&s.0, &r.0) { - drop(r); - // Only two instances of the `Arc` are ever created, one for the - // receiver and one for the sender, and those `Arc`s are never exposed - // externally. And so when we drop one here, the other one must be the - // only remaining one. - Ok(Arc::try_unwrap(s.0).expect("udp: try_unwrap failed in reunite")) - } else { - Err(ReuniteError(s, r)) - } -} - -impl RecvHalf { - /// Attempts to put the two "halves" of a `UdpSocket` back together and - /// recover the original socket. Succeeds only if the two "halves" - /// originated from the same call to `UdpSocket::split`. - pub fn reunite(self, other: SendHalf) -> Result { - reunite(other, self) - } - - /// Returns a future that receives a single datagram on the socket. On success, - /// the future resolves to the number of bytes read and the origin. - /// - /// The function must be called with valid byte array `buf` of sufficient size - /// to hold the message bytes. If a message is too long to fit in the supplied - /// buffer, excess bytes may be discarded. - pub async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { - poll_fn(|cx| self.0.poll_recv_from(cx, buf)).await - } - - /// Returns a future that receives a single datagram message on the socket from - /// the remote address to which it is connected. On success, the future will resolve - /// to the number of bytes read. - /// - /// The function must be called with valid byte array `buf` of sufficient size to - /// hold the message bytes. If a message is too long to fit in the supplied buffer, - /// excess bytes may be discarded. - /// - /// The [`connect`] method will connect this socket to a remote address. The future - /// will fail if the socket is not connected. - /// - /// [`connect`]: super::UdpSocket::connect - pub async fn recv(&mut self, buf: &mut [u8]) -> io::Result { - poll_fn(|cx| self.0.poll_recv(cx, buf)).await - } -} - -impl SendHalf { - /// Attempts to put the two "halves" of a `UdpSocket` back together and - /// recover the original socket. Succeeds only if the two "halves" - /// originated from the same call to `UdpSocket::split`. - pub fn reunite(self, other: RecvHalf) -> Result { - reunite(self, other) - } - - /// Returns a future that sends data on the socket to the given address. - /// On success, the future will resolve to the number of bytes written. - /// - /// The future will resolve to an error if the IP version of the socket does - /// not match that of `target`. - pub async fn send_to(&mut self, buf: &[u8], target: &SocketAddr) -> io::Result { - poll_fn(|cx| self.0.poll_send_to(cx, buf, target)).await - } - - /// Returns a future that sends data on the socket to the remote address to which it is connected. - /// On success, the future will resolve to the number of bytes written. - /// - /// The [`connect`] method will connect this socket to a remote address. The future - /// will resolve to an error if the socket is not connected. - /// - /// [`connect`]: super::UdpSocket::connect - pub async fn send(&mut self, buf: &[u8]) -> io::Result { - poll_fn(|cx| self.0.poll_send(cx, buf)).await - } -} - -impl AsRef for SendHalf { - fn as_ref(&self) -> &UdpSocket { - &self.0 - } -} - -impl AsRef for RecvHalf { - fn as_ref(&self) -> &UdpSocket { - &self.0 - } -} diff --git a/tokio/src/net/unix/datagram/mod.rs b/tokio/src/net/unix/datagram/mod.rs index f484ae34a34..6268b4ac90b 100644 --- a/tokio/src/net/unix/datagram/mod.rs +++ b/tokio/src/net/unix/datagram/mod.rs @@ -1,8 +1,3 @@ //! Unix datagram types. pub(crate) mod socket; -pub(crate) mod split; -pub(crate) mod split_owned; - -pub use split::{RecvHalf, SendHalf}; -pub use split_owned::{OwnedRecvHalf, OwnedSendHalf, ReuniteError}; diff --git a/tokio/src/net/unix/datagram/socket.rs b/tokio/src/net/unix/datagram/socket.rs index ba3a10c4882..eee7ac66de5 100644 --- a/tokio/src/net/unix/datagram/socket.rs +++ b/tokio/src/net/unix/datagram/socket.rs @@ -1,7 +1,4 @@ -use crate::future::poll_fn; use crate::io::PollEvented; -use crate::net::unix::datagram::split::{split, RecvHalf, SendHalf}; -use crate::net::unix::datagram::split_owned::{split_owned, OwnedRecvHalf, OwnedSendHalf}; use std::convert::TryFrom; use std::fmt; @@ -10,7 +7,6 @@ use std::net::Shutdown; use std::os::unix::io::{AsRawFd, RawFd}; use std::os::unix::net::{self, SocketAddr}; use std::path::Path; -use std::task::{Context, Poll}; cfg_uds! { /// An I/O object representing a Unix datagram socket. @@ -300,8 +296,9 @@ impl UnixDatagram { /// # Ok(()) /// # } /// ``` - pub async fn send(&mut self, buf: &[u8]) -> io::Result { - poll_fn(|cx| self.poll_send_priv(cx, buf)).await + pub async fn send(&self, buf: &[u8]) -> io::Result { + self.do_io(mio::Ready::writable(), |sock| sock.send(buf)) + .await } /// Try to send a datagram to the peer without waiting. @@ -371,32 +368,6 @@ impl UnixDatagram { self.io.get_ref().send_to(buf, target) } - // Poll IO functions that takes `&self` are provided for the split API. - // - // They are not public because (taken from the doc of `PollEvented`): - // - // While `PollEvented` is `Sync` (if the underlying I/O type is `Sync`), the - // caller must ensure that there are at most two tasks that use a - // `PollEvented` instance concurrently. One for reading and one for writing. - // While violating this requirement is "safe" from a Rust memory model point - // of view, it will result in unexpected behavior in the form of lost - // notifications and tasks hanging. - pub(crate) fn poll_send_priv( - &self, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - ready!(self.io.poll_write_ready(cx))?; - - match self.io.get_ref().send(buf) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_write_ready(cx)?; - Poll::Pending - } - x => Poll::Ready(x), - } - } - /// Receives data from the socket. /// /// # Examples @@ -423,8 +394,9 @@ impl UnixDatagram { /// # Ok(()) /// # } /// ``` - pub async fn recv(&mut self, buf: &mut [u8]) -> io::Result { - poll_fn(|cx| self.poll_recv_priv(cx, buf)).await + pub async fn recv(&self, buf: &mut [u8]) -> io::Result { + self.do_io(mio::Ready::readable(), |sock| sock.recv(buf)) + .await } /// Try to receive a datagram from the peer without waiting. @@ -455,22 +427,6 @@ impl UnixDatagram { self.io.get_ref().recv(buf) } - pub(crate) fn poll_recv_priv( - &self, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { - ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?; - - match self.io.get_ref().recv(buf) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_read_ready(cx, mio::Ready::readable())?; - Poll::Pending - } - x => Poll::Ready(x), - } - } - /// Sends data on the socket to the specified address. /// /// # Examples @@ -504,28 +460,14 @@ impl UnixDatagram { /// # Ok(()) /// # } /// ``` - pub async fn send_to

(&mut self, buf: &[u8], target: P) -> io::Result + pub async fn send_to

(&self, buf: &[u8], target: P) -> io::Result where - P: AsRef + Unpin, + P: AsRef, { - poll_fn(|cx| self.poll_send_to_priv(cx, buf, target.as_ref())).await - } - - pub(crate) fn poll_send_to_priv( - &self, - cx: &mut Context<'_>, - buf: &[u8], - target: &Path, - ) -> Poll> { - ready!(self.io.poll_write_ready(cx))?; - - match self.io.get_ref().send_to(buf, target) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_write_ready(cx)?; - Poll::Pending - } - x => Poll::Ready(x), - } + self.do_io(mio::Ready::writable(), |sock| { + sock.send_to(buf, target.as_ref()) + }) + .await } /// Receives data from the socket. @@ -561,8 +503,9 @@ impl UnixDatagram { /// # Ok(()) /// # } /// ``` - pub async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { - poll_fn(|cx| self.poll_recv_from_priv(cx, buf)).await + pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { + self.do_io(mio::Ready::readable(), |sock| sock.recv_from(buf)) + .await } /// Try to receive data from the socket without waiting. @@ -601,22 +544,6 @@ impl UnixDatagram { self.io.get_ref().recv_from(buf) } - pub(crate) fn poll_recv_from_priv( - &self, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { - ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?; - - match self.io.get_ref().recv_from(buf) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_read_ready(cx, mio::Ready::readable())?; - Poll::Pending - } - x => Poll::Ready(x), - } - } - /// Returns the local address that this socket is bound to. /// /// # Examples @@ -769,89 +696,20 @@ impl UnixDatagram { self.io.get_ref().shutdown(how) } - // These lifetime markers also appear in the generated documentation, and make - // it more clear that this is a *borrowed* split. - #[allow(clippy::needless_lifetimes)] - /// Split a `UnixDatagram` into a receive half and a send half, which can be used - /// to receive and send the datagram concurrently. - /// - /// This method is more efficient than [`into_split`], but the halves cannot - /// be moved into independently spawned tasks. - /// - /// [`into_split`]: fn@crate::net::UnixDatagram::into_split - /// - /// # Examples - /// ``` - /// # use std::error::Error; - /// # #[tokio::main] - /// # async fn main() -> Result<(), Box> { - /// use tokio::net::UnixDatagram; - /// - /// // Create the pair of sockets - /// let (mut sock1, mut sock2) = UnixDatagram::pair()?; - /// - /// // Split sock1 - /// let (sock1_rx, mut sock1_tx) = sock1.split(); - /// - /// // Since the sockets are paired, the paired send/recv - /// // functions can be used - /// let bytes = b"hello world"; - /// sock1_tx.send(bytes).await?; - /// - /// let mut buff = vec![0u8; 24]; - /// let size = sock2.recv(&mut buff).await?; - /// - /// let dgram = &buff[..size]; - /// assert_eq!(dgram, bytes); - /// - /// # Ok(()) - /// # } - /// ``` - pub fn split<'a>(&'a mut self) -> (RecvHalf<'a>, SendHalf<'a>) { - split(self) - } - - /// Split a `UnixDatagram` into a receive half and a send half, which can be used - /// to receive and send the datagram concurrently. - /// - /// Unlike [`split`], the owned halves can be moved to separate tasks, - /// however this comes at the cost of a heap allocation. - /// - /// **Note:** Dropping the write half will shut down the write half of the - /// datagram. This is equivalent to calling [`shutdown(Write)`]. - /// - /// # Examples - /// ``` - /// # use std::error::Error; - /// # #[tokio::main] - /// # async fn main() -> Result<(), Box> { - /// use tokio::net::UnixDatagram; - /// - /// // Create the pair of sockets - /// let (sock1, mut sock2) = UnixDatagram::pair()?; - /// - /// // Split sock1 - /// let (sock1_rx, mut sock1_tx) = sock1.into_split(); - /// - /// // Since the sockets are paired, the paired send/recv - /// // functions can be used - /// let bytes = b"hello world"; - /// sock1_tx.send(bytes).await?; - /// - /// let mut buff = vec![0u8; 24]; - /// let size = sock2.recv(&mut buff).await?; - /// - /// let dgram = &buff[..size]; - /// assert_eq!(dgram, bytes); - /// - /// # Ok(()) - /// # } - /// ``` - /// - /// [`split`]: fn@crate::net::UnixDatagram::split - /// [`shutdown(Write)`]:fn@crate::net::UnixDatagram::shutdown - pub fn into_split(self) -> (OwnedRecvHalf, OwnedSendHalf) { - split_owned(self) + async fn do_io(&self, interest: mio::Ready, mut op: F) -> io::Result + where + F: FnMut(&mio_uds::UnixDatagram) -> io::Result, + { + loop { + let event = self.io.readiness(interest).await?; + + match op(self.io.get_ref()) { + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_readiness(event); + } + x => return x, + } + } } } diff --git a/tokio/src/net/unix/datagram/split.rs b/tokio/src/net/unix/datagram/split.rs deleted file mode 100644 index e42eeda8844..00000000000 --- a/tokio/src/net/unix/datagram/split.rs +++ /dev/null @@ -1,68 +0,0 @@ -//! `UnixDatagram` split support. -//! -//! A `UnixDatagram` can be split into a `RecvHalf` and a `SendHalf` with the -//! `UnixDatagram::split` method. - -use crate::future::poll_fn; -use crate::net::UnixDatagram; - -use std::io; -use std::os::unix::net::SocketAddr; -use std::path::Path; - -/// Borrowed receive half of a [`UnixDatagram`], created by [`split`]. -/// -/// [`UnixDatagram`]: UnixDatagram -/// [`split`]: crate::net::UnixDatagram::split() -#[derive(Debug)] -pub struct RecvHalf<'a>(&'a UnixDatagram); - -/// Borrowed send half of a [`UnixDatagram`], created by [`split`]. -/// -/// [`UnixDatagram`]: UnixDatagram -/// [`split`]: crate::net::UnixDatagram::split() -#[derive(Debug)] -pub struct SendHalf<'a>(&'a UnixDatagram); - -pub(crate) fn split(stream: &mut UnixDatagram) -> (RecvHalf<'_>, SendHalf<'_>) { - (RecvHalf(&*stream), SendHalf(&*stream)) -} - -impl RecvHalf<'_> { - /// Receives data from the socket. - pub async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { - poll_fn(|cx| self.0.poll_recv_from_priv(cx, buf)).await - } - - /// Receives data from the socket. - pub async fn recv(&mut self, buf: &mut [u8]) -> io::Result { - poll_fn(|cx| self.0.poll_recv_priv(cx, buf)).await - } -} - -impl SendHalf<'_> { - /// Sends data on the socket to the specified address. - pub async fn send_to

(&mut self, buf: &[u8], target: P) -> io::Result - where - P: AsRef + Unpin, - { - poll_fn(|cx| self.0.poll_send_to_priv(cx, buf, target.as_ref())).await - } - - /// Sends data on the socket to the socket's peer. - pub async fn send(&mut self, buf: &[u8]) -> io::Result { - poll_fn(|cx| self.0.poll_send_priv(cx, buf)).await - } -} - -impl AsRef for RecvHalf<'_> { - fn as_ref(&self) -> &UnixDatagram { - self.0 - } -} - -impl AsRef for SendHalf<'_> { - fn as_ref(&self) -> &UnixDatagram { - self.0 - } -} diff --git a/tokio/src/net/unix/datagram/split_owned.rs b/tokio/src/net/unix/datagram/split_owned.rs deleted file mode 100644 index 699771f30e6..00000000000 --- a/tokio/src/net/unix/datagram/split_owned.rs +++ /dev/null @@ -1,148 +0,0 @@ -//! `UnixDatagram` owned split support. -//! -//! A `UnixDatagram` can be split into an `OwnedSendHalf` and a `OwnedRecvHalf` -//! with the `UnixDatagram::into_split` method. - -use crate::future::poll_fn; -use crate::net::UnixDatagram; - -use std::error::Error; -use std::net::Shutdown; -use std::os::unix::net::SocketAddr; -use std::path::Path; -use std::sync::Arc; -use std::{fmt, io}; - -pub(crate) fn split_owned(socket: UnixDatagram) -> (OwnedRecvHalf, OwnedSendHalf) { - let shared = Arc::new(socket); - let send = shared.clone(); - let recv = shared; - ( - OwnedRecvHalf { inner: recv }, - OwnedSendHalf { - inner: send, - shutdown_on_drop: true, - }, - ) -} - -/// Owned send half of a [`UnixDatagram`], created by [`into_split`]. -/// -/// [`UnixDatagram`]: UnixDatagram -/// [`into_split`]: UnixDatagram::into_split() -#[derive(Debug)] -pub struct OwnedSendHalf { - inner: Arc, - shutdown_on_drop: bool, -} - -/// Owned receive half of a [`UnixDatagram`], created by [`into_split`]. -/// -/// [`UnixDatagram`]: UnixDatagram -/// [`into_split`]: UnixDatagram::into_split() -#[derive(Debug)] -pub struct OwnedRecvHalf { - inner: Arc, -} - -/// Error indicating that two halves were not from the same socket, and thus could -/// not be `reunite`d. -#[derive(Debug)] -pub struct ReuniteError(pub OwnedSendHalf, pub OwnedRecvHalf); - -impl fmt::Display for ReuniteError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!( - f, - "tried to reunite halves that are not from the same socket" - ) - } -} - -impl Error for ReuniteError {} - -fn reunite(s: OwnedSendHalf, r: OwnedRecvHalf) -> Result { - if Arc::ptr_eq(&s.inner, &r.inner) { - s.forget(); - // Only two instances of the `Arc` are ever created, one for the - // receiver and one for the sender, and those `Arc`s are never exposed - // externally. And so when we drop one here, the other one must be the - // only remaining one. - Ok(Arc::try_unwrap(r.inner).expect("UnixDatagram: try_unwrap failed in reunite")) - } else { - Err(ReuniteError(s, r)) - } -} - -impl OwnedRecvHalf { - /// Attempts to put the two "halves" of a `UnixDatagram` back together and - /// recover the original socket. Succeeds only if the two "halves" - /// originated from the same call to [`into_split`]. - /// - /// [`into_split`]: UnixDatagram::into_split() - pub fn reunite(self, other: OwnedSendHalf) -> Result { - reunite(other, self) - } - - /// Receives data from the socket. - pub async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { - poll_fn(|cx| self.inner.poll_recv_from_priv(cx, buf)).await - } - - /// Receives data from the socket. - pub async fn recv(&mut self, buf: &mut [u8]) -> io::Result { - poll_fn(|cx| self.inner.poll_recv_priv(cx, buf)).await - } -} - -impl OwnedSendHalf { - /// Attempts to put the two "halves" of a `UnixDatagram` back together and - /// recover the original socket. Succeeds only if the two "halves" - /// originated from the same call to [`into_split`]. - /// - /// [`into_split`]: UnixDatagram::into_split() - pub fn reunite(self, other: OwnedRecvHalf) -> Result { - reunite(self, other) - } - - /// Sends data on the socket to the specified address. - pub async fn send_to

(&mut self, buf: &[u8], target: P) -> io::Result - where - P: AsRef + Unpin, - { - poll_fn(|cx| self.inner.poll_send_to_priv(cx, buf, target.as_ref())).await - } - - /// Sends data on the socket to the socket's peer. - pub async fn send(&mut self, buf: &[u8]) -> io::Result { - poll_fn(|cx| self.inner.poll_send_priv(cx, buf)).await - } - - /// Destroy the send half, but don't close the send half of the stream - /// until the receive half is dropped. If the read half has already been - /// dropped, this closes the stream. - pub fn forget(mut self) { - self.shutdown_on_drop = false; - drop(self); - } -} - -impl Drop for OwnedSendHalf { - fn drop(&mut self) { - if self.shutdown_on_drop { - let _ = self.inner.shutdown(Shutdown::Write); - } - } -} - -impl AsRef for OwnedSendHalf { - fn as_ref(&self) -> &UnixDatagram { - &self.inner - } -} - -impl AsRef for OwnedRecvHalf { - fn as_ref(&self) -> &UnixDatagram { - &self.inner - } -} diff --git a/tokio/src/net/unix/listener.rs b/tokio/src/net/unix/listener.rs index 119dc6fb41c..68baa7f40ac 100644 --- a/tokio/src/net/unix/listener.rs +++ b/tokio/src/net/unix/listener.rs @@ -2,7 +2,6 @@ use crate::future::poll_fn; use crate::io::PollEvented; use crate::net::unix::{Incoming, UnixStream}; -use mio::Ready; use std::convert::TryFrom; use std::fmt; use std::io; @@ -118,19 +117,19 @@ impl UnixListener { &mut self, cx: &mut Context<'_>, ) -> Poll> { - ready!(self.io.poll_read_ready(cx, Ready::readable()))?; - - match self.io.get_ref().accept_std() { - Ok(None) => { - self.io.clear_read_ready(cx, Ready::readable())?; - Poll::Pending - } - Ok(Some((sock, addr))) => Ok((sock, addr)).into(), - Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_read_ready(cx, Ready::readable())?; - Poll::Pending + loop { + let ev = ready!(self.io.poll_read_ready(cx))?; + + match self.io.get_ref().accept_std() { + Ok(None) => { + self.io.clear_readiness(ev); + } + Ok(Some((sock, addr))) => return Ok((sock, addr)).into(), + Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_readiness(ev); + } + Err(err) => return Err(err).into(), } - Err(err) => Err(err).into(), } } diff --git a/tokio/src/net/unix/stream.rs b/tokio/src/net/unix/stream.rs index 6f49849aef3..1134b9d162a 100644 --- a/tokio/src/net/unix/stream.rs +++ b/tokio/src/net/unix/stream.rs @@ -211,26 +211,28 @@ impl UnixStream { cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll> { - ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?; + loop { + let ev = ready!(self.io.poll_read_ready(cx))?; - // Safety: `UnixStream::read` will not peak at the maybe uinitialized bytes. - let b = - unsafe { &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit] as *mut [u8]) }; - match self.io.get_ref().read(b) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_read_ready(cx, mio::Ready::readable())?; - Poll::Pending - } - Ok(n) => { - // Safety: We trust `UnixStream::read` to have filled up `n` bytes - // in the buffer. - unsafe { - buf.assume_init(n); + // Safety: `UnixStream::read` will not peak at the maybe uinitialized bytes. + let b = unsafe { + &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit] as *mut [u8]) + }; + match self.io.get_ref().read(b) { + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_readiness(ev); + } + Ok(n) => { + // Safety: We trust `UnixStream::read` to have filled up `n` bytes + // in the buffer. + unsafe { + buf.assume_init(n); + } + buf.add_filled(n); + return Poll::Ready(Ok(())); } - buf.add_filled(n); - Poll::Ready(Ok(())) + Err(e) => return Poll::Ready(Err(e)), } - Err(e) => Poll::Ready(Err(e)), } } @@ -239,14 +241,15 @@ impl UnixStream { cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { - ready!(self.io.poll_write_ready(cx))?; + loop { + let ev = ready!(self.io.poll_write_ready(cx))?; - match self.io.get_ref().write(buf) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_write_ready(cx)?; - Poll::Pending + match self.io.get_ref().write(buf) { + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_readiness(ev); + } + x => return Poll::Ready(x), } - x => Poll::Ready(x), } } } diff --git a/tokio/src/sync/task/atomic_waker.rs b/tokio/src/sync/task/atomic_waker.rs index 73b1745f1a2..0245ccb7c69 100644 --- a/tokio/src/sync/task/atomic_waker.rs +++ b/tokio/src/sync/task/atomic_waker.rs @@ -142,9 +142,8 @@ impl AtomicWaker { } /// Registers the current waker to be notified on calls to `wake`. - /// - /// This is the same as calling `register_task` with `task::current()`. #[cfg(feature = "io-driver")] + #[allow(unused)] pub(crate) fn register(&self, waker: Waker) { self.do_register(waker); } diff --git a/tokio/src/util/bit.rs b/tokio/src/util/bit.rs index ee756044a50..8f9ca48a50a 100644 --- a/tokio/src/util/bit.rs +++ b/tokio/src/util/bit.rs @@ -1,6 +1,6 @@ use std::fmt; -#[derive(Clone, Copy)] +#[derive(Clone, Copy, PartialEq)] pub(crate) struct Pack { mask: usize, shift: u32, diff --git a/tokio/src/util/linked_list.rs b/tokio/src/util/linked_list.rs index e6d6c5885c4..e296fb41a95 100644 --- a/tokio/src/util/linked_list.rs +++ b/tokio/src/util/linked_list.rs @@ -181,6 +181,12 @@ impl fmt::Debug for LinkedList { } } +impl Default for LinkedList { + fn default() -> Self { + Self::new() + } +} + cfg_sync! { impl LinkedList { pub(crate) fn last(&self) -> Option<&L::Target> { @@ -223,6 +229,52 @@ cfg_rt_threaded! { } } +// ===== impl DrainFilter ===== + +cfg_io_driver! { + pub(crate) struct DrainFilter<'a, T: Link, F> { + list: &'a mut LinkedList, + filter: F, + curr: Option>, + } + + impl LinkedList { + pub(crate) fn drain_filter(&mut self, filter: F) -> DrainFilter<'_, T, F> + where + F: FnMut(&mut T::Target) -> bool, + { + let curr = self.head; + DrainFilter { + curr, + filter, + list: self, + } + } + } + + impl<'a, T, F> Iterator for DrainFilter<'a, T, F> + where + T: Link, + F: FnMut(&mut T::Target) -> bool, + { + type Item = T::Handle; + + fn next(&mut self) -> Option { + while let Some(curr) = self.curr { + // safety: the pointer references data contained by the list + self.curr = unsafe { T::pointers(curr).as_ref() }.next; + + // safety: the value is still owned by the linked list. + if (self.filter)(unsafe { &mut *curr.as_ptr() }) { + return unsafe { self.list.remove(curr) }; + } + } + + None + } + } +} + // ===== impl Pointers ===== impl Pointers { diff --git a/tokio/src/util/mod.rs b/tokio/src/util/mod.rs index c5439f4878b..9fe4d70a456 100644 --- a/tokio/src/util/mod.rs +++ b/tokio/src/util/mod.rs @@ -3,7 +3,7 @@ cfg_io_driver! { pub(crate) mod slab; } -#[cfg(any(feature = "sync", feature = "rt-core"))] +#[cfg(any(feature = "io-driver", feature = "sync", feature = "rt-core"))] pub(crate) mod linked_list; #[cfg(any(feature = "rt-threaded", feature = "macros", feature = "stream"))] diff --git a/tokio/src/util/slab.rs b/tokio/src/util/slab.rs index cb7fd5e9a4b..854232c2adb 100644 --- a/tokio/src/util/slab.rs +++ b/tokio/src/util/slab.rs @@ -141,6 +141,8 @@ unsafe impl Sync for Page {} unsafe impl Send for Page {} unsafe impl Sync for CachedPage {} unsafe impl Send for CachedPage {} +unsafe impl Sync for Ref {} +unsafe impl Send for Ref {} /// A slot in the slab. Contains slot-specific metadata. /// diff --git a/tokio/tests/async_send_sync.rs b/tokio/tests/async_send_sync.rs index f1eed0e410d..6bfa62c7ca9 100644 --- a/tokio/tests/async_send_sync.rs +++ b/tokio/tests/async_send_sync.rs @@ -172,10 +172,6 @@ async_assert_fn!(tokio::net::UdpSocket::send(_, &[u8]): Send & Sync); async_assert_fn!(tokio::net::UdpSocket::recv(_, &mut [u8]): Send & Sync); async_assert_fn!(tokio::net::UdpSocket::send_to(_, &[u8], SocketAddr): Send & Sync); async_assert_fn!(tokio::net::UdpSocket::recv_from(_, &mut [u8]): Send & Sync); -async_assert_fn!(tokio::net::udp::RecvHalf::recv(_, &mut [u8]): Send & Sync); -async_assert_fn!(tokio::net::udp::RecvHalf::recv_from(_, &mut [u8]): Send & Sync); -async_assert_fn!(tokio::net::udp::SendHalf::send(_, &[u8]): Send & Sync); -async_assert_fn!(tokio::net::udp::SendHalf::send_to(_, &[u8], &SocketAddr): Send & Sync); #[cfg(unix)] mod unix_datagram { diff --git a/tokio/tests/rt_common.rs b/tokio/tests/rt_common.rs index 35e2ea81a02..dbf4fa450cc 100644 --- a/tokio/tests/rt_common.rs +++ b/tokio/tests/rt_common.rs @@ -795,6 +795,7 @@ rt_test! { #[test] fn io_notify_while_shutting_down() { use std::net::Ipv6Addr; + use std::sync::Arc; for _ in 1..10 { let runtime = rt(); @@ -802,7 +803,8 @@ rt_test! { runtime.block_on(async { let socket = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0)).await.unwrap(); let addr = socket.local_addr().unwrap(); - let (mut recv_half, mut send_half) = socket.split(); + let send_half = Arc::new(socket); + let recv_half = send_half.clone(); tokio::spawn(async move { let mut buf = [0]; diff --git a/tokio/tests/udp.rs b/tokio/tests/udp.rs index 62a2234fa7c..9da672674fe 100644 --- a/tokio/tests/udp.rs +++ b/tokio/tests/udp.rs @@ -1,6 +1,7 @@ #![warn(rust_2018_idioms)] #![cfg(feature = "full")] +use std::sync::Arc; use tokio::net::UdpSocket; const MSG: &[u8] = b"hello"; @@ -8,8 +9,8 @@ const MSG_LEN: usize = MSG.len(); #[tokio::test] async fn send_recv() -> std::io::Result<()> { - let mut sender = UdpSocket::bind("127.0.0.1:0").await?; - let mut receiver = UdpSocket::bind("127.0.0.1:0").await?; + let sender = UdpSocket::bind("127.0.0.1:0").await?; + let receiver = UdpSocket::bind("127.0.0.1:0").await?; sender.connect(receiver.local_addr()?).await?; receiver.connect(sender.local_addr()?).await?; @@ -25,8 +26,8 @@ async fn send_recv() -> std::io::Result<()> { #[tokio::test] async fn send_to_recv_from() -> std::io::Result<()> { - let mut sender = UdpSocket::bind("127.0.0.1:0").await?; - let mut receiver = UdpSocket::bind("127.0.0.1:0").await?; + let sender = UdpSocket::bind("127.0.0.1:0").await?; + let receiver = UdpSocket::bind("127.0.0.1:0").await?; let receiver_addr = receiver.local_addr()?; sender.send_to(MSG, &receiver_addr).await?; @@ -42,9 +43,10 @@ async fn send_to_recv_from() -> std::io::Result<()> { #[tokio::test] async fn split() -> std::io::Result<()> { let socket = UdpSocket::bind("127.0.0.1:0").await?; - let (mut r, mut s) = socket.split(); + let s = Arc::new(socket); + let r = s.clone(); - let addr = s.as_ref().local_addr()?; + let addr = s.local_addr()?; tokio::spawn(async move { s.send_to(MSG, &addr).await.unwrap(); }); @@ -54,24 +56,6 @@ async fn split() -> std::io::Result<()> { Ok(()) } -#[tokio::test] -async fn reunite() -> std::io::Result<()> { - let socket = UdpSocket::bind("127.0.0.1:0").await?; - let (s, r) = socket.split(); - assert!(s.reunite(r).is_ok()); - Ok(()) -} - -#[tokio::test] -async fn reunite_error() -> std::io::Result<()> { - let socket = UdpSocket::bind("127.0.0.1:0").await?; - let socket1 = UdpSocket::bind("127.0.0.1:0").await?; - let (s, _) = socket.split(); - let (_, r1) = socket1.split(); - assert!(s.reunite(r1).is_err()); - Ok(()) -} - // # Note // // This test is purposely written such that each time `sender` sends data on @@ -86,7 +70,7 @@ async fn try_send_spawn() { const MSG2_LEN: usize = MSG2.len(); let sender = UdpSocket::bind("127.0.0.1:0").await.unwrap(); - let mut receiver = UdpSocket::bind("127.0.0.1:0").await.unwrap(); + let receiver = UdpSocket::bind("127.0.0.1:0").await.unwrap(); receiver .connect(sender.local_addr().unwrap()) diff --git a/tokio/tests/uds_datagram.rs b/tokio/tests/uds_datagram.rs index d3c3535e7f4..bd47f19e446 100644 --- a/tokio/tests/uds_datagram.rs +++ b/tokio/tests/uds_datagram.rs @@ -6,8 +6,9 @@ use tokio::net::UnixDatagram; use tokio::try_join; use std::io; +use std::sync::Arc; -async fn echo_server(mut socket: UnixDatagram) -> io::Result<()> { +async fn echo_server(socket: UnixDatagram) -> io::Result<()> { let mut recv_buf = vec![0u8; 1024]; loop { let (len, peer_addr) = socket.recv_from(&mut recv_buf[..]).await?; @@ -32,7 +33,7 @@ async fn echo() -> io::Result<()> { }); { - let mut socket = UnixDatagram::bind(&client_path).unwrap(); + let socket = UnixDatagram::bind(&client_path).unwrap(); socket.connect(server_path)?; socket.send(b"ECHO").await?; let mut recv_buf = [0u8; 16]; @@ -87,8 +88,8 @@ async fn try_send_recv_never_block() -> io::Result<()> { async fn split() -> std::io::Result<()> { let dir = tempfile::tempdir().unwrap(); let path = dir.path().join("split.sock"); - let socket = UnixDatagram::bind(path.clone())?; - let (mut r, mut s) = socket.into_split(); + let s = Arc::new(UnixDatagram::bind(path.clone())?); + let r = s.clone(); let msg = b"hello"; let ((), ()) = try_join! { @@ -106,28 +107,3 @@ async fn split() -> std::io::Result<()> { Ok(()) } - -#[tokio::test] -async fn reunite() -> std::io::Result<()> { - let dir = tempfile::tempdir().unwrap(); - let path = dir.path().join("reunite.sock"); - let socket = UnixDatagram::bind(path)?; - let (s, r) = socket.into_split(); - assert!(s.reunite(r).is_ok()); - Ok(()) -} - -#[tokio::test] -async fn reunite_error() -> std::io::Result<()> { - let dir = tempfile::tempdir().unwrap(); - let path = dir.path().join("reunit.sock"); - let dir = tempfile::tempdir().unwrap(); - let path1 = dir.path().join("reunit.sock"); - let socket = UnixDatagram::bind(path)?; - let socket1 = UnixDatagram::bind(path1)?; - - let (s, _) = socket.into_split(); - let (_, r1) = socket1.into_split(); - assert!(s.reunite(r1).is_err()); - Ok(()) -} From 7a43d9ca31400a02f21cc93ae6f615c0ec6c2958 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Fri, 11 Sep 2020 17:06:17 -0700 Subject: [PATCH 02/11] Put readiness stuff in io-readiness feature --- .github/workflows/ci.yml | 4 +- tokio/Cargo.toml | 8 +- tokio/src/io/driver/scheduled_io.rs | 307 +++++++++++++++------------- tokio/src/io/poll_evented.rs | 17 +- tokio/src/io/registration.rs | 14 +- tokio/src/lib.rs | 4 +- tokio/src/macros/cfg.rs | 5 +- tokio/src/util/linked_list.rs | 4 +- tokio/src/util/mod.rs | 2 +- 9 files changed, 196 insertions(+), 169 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 0ab36cd9a11..3e721f520c5 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -150,11 +150,11 @@ jobs: run: cargo install cargo-hack - name: check --each-feature - run: cargo hack check --all --each-feature --skip io-driver -Z avoid-dev-deps + run: cargo hack check --all --each-feature --skip io-driver,io-readiness -Z avoid-dev-deps # Try with unstable feature flags - name: check --each-feature --unstable - run: cargo hack check --all --each-feature --skip io-driver -Z avoid-dev-deps + run: cargo hack check --all --each-feature --skip io-driver,io-readiness -Z avoid-dev-deps env: RUSTFLAGS: --cfg tokio_unstable diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index b1d943e3119..6df368ebd80 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -33,7 +33,6 @@ full = [ "blocking", "dns", "fs", - "io-driver", "io-util", "io-std", "macros", @@ -51,7 +50,8 @@ full = [ blocking = ["rt-core"] dns = ["rt-core"] fs = ["rt-core", "io-util"] -io-driver = ["mio", "lazy_static"] +io-driver = ["mio", "lazy_static"] # internal only +io-readiness = [] # internal only io-util = ["memchr"] # stdin, stdout, stderr io-std = ["rt-core"] @@ -85,8 +85,8 @@ sync = ["fnv"] test-util = [] tcp = ["io-driver", "iovec"] time = ["slab"] -udp = ["io-driver"] -uds = ["io-driver", "mio-uds", "libc"] +udp = ["io-driver", "io-readiness"] +uds = ["io-driver", "io-readiness", "mio-uds", "libc"] [dependencies] tokio-macros = { version = "0.3.0", path = "../tokio-macros", optional = true } diff --git a/tokio/src/io/driver/scheduled_io.rs b/tokio/src/io/driver/scheduled_io.rs index 2f1d3316ede..7a33d39bab0 100644 --- a/tokio/src/io/driver/scheduled_io.rs +++ b/tokio/src/io/driver/scheduled_io.rs @@ -2,17 +2,21 @@ use super::{platform, Direction, ReadyEvent, Tick}; use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::Mutex; use crate::util::bit; -use crate::util::linked_list::{self, LinkedList}; use crate::util::slab::Entry; -use std::cell::UnsafeCell; -use std::future::Future; -use std::marker::PhantomPinned; -use std::pin::Pin; -use std::ptr::NonNull; use std::sync::atomic::Ordering::{AcqRel, Acquire, Release}; use std::task::{Context, Poll, Waker}; +cfg_io_readiness! { + use crate::util::linked_list::{self, LinkedList}; + + use std::cell::UnsafeCell; + use std::future::Future; + use std::marker::PhantomPinned; + use std::pin::Pin; + use std::ptr::NonNull; +} + /// Stored in the I/O driver resource slab. #[derive(Debug)] pub(crate) struct ScheduledIo { @@ -22,10 +26,12 @@ pub(crate) struct ScheduledIo { waiters: Mutex, } +#[cfg(feature = "io-readiness")] type WaitList = LinkedList::Target>; #[derive(Debug, Default)] struct Waiters { + #[cfg(feature = "io-readiness")] /// List of all current waiters list: WaitList, @@ -36,36 +42,38 @@ struct Waiters { writer: Option, } -#[derive(Debug)] -struct Waiter { - pointers: linked_list::Pointers, +cfg_io_readiness! { + #[derive(Debug)] + struct Waiter { + pointers: linked_list::Pointers, - /// The waker for this task - waker: Option, + /// The waker for this task + waker: Option, - /// The interest this waiter is waiting on - interest: mio::Ready, + /// The interest this waiter is waiting on + interest: mio::Ready, - notified: bool, + notified: bool, - /// Should never be `!Unpin` - _p: PhantomPinned, -} + /// Should never be `!Unpin` + _p: PhantomPinned, + } -/// Future returned by `readiness()` -struct Readiness<'a> { - scheduled_io: &'a ScheduledIo, + /// Future returned by `readiness()` + struct Readiness<'a> { + scheduled_io: &'a ScheduledIo, - state: State, + state: State, - /// Entry in the waiter `LinkedList`. - waiter: UnsafeCell, -} + /// Entry in the waiter `LinkedList`. + waiter: UnsafeCell, + } -enum State { - Init, - Waiting, - Done, + enum State { + Init, + Waiting, + Done, + } } // The `ScheduledIo::readiness` (`AtomicUsize`) is packed full of goodness. @@ -198,15 +206,18 @@ impl ScheduledIo { } } - // check list of waiters - for waiter in waiters - .list - .drain_filter(|w| !(w.interest & ready).is_empty()) + #[cfg(feature = "io-readiness")] { - let waiter = unsafe { &mut *waiter.as_ptr() }; - if let Some(waker) = waiter.waker.take() { - waiter.notified = true; - waker.wake(); + // check list of waiters + for waiter in waiters + .list + .drain_filter(|w| !(w.interest & ready).is_empty()) + { + let waiter = unsafe { &mut *waiter.as_ptr() }; + if let Some(waker) = waiter.waker.take() { + waiter.notified = true; + waker.wake(); + } } } } @@ -254,29 +265,6 @@ impl ScheduledIo { } } - /// An async version of `poll_readiness` which uses a linked list of wakers - pub(crate) async fn readiness(&self, interest: mio::Ready) -> ReadyEvent { - self.readiness_fut(interest).await - } - - // This is in a separate function so that the borrow checker doesn't think - // we are borrowing the `UnsafeCell` possibly over await boundaries. - // - // Go figure. - fn readiness_fut(&self, interest: mio::Ready) -> Readiness<'_> { - Readiness { - scheduled_io: self, - state: State::Init, - waiter: UnsafeCell::new(Waiter { - pointers: linked_list::Pointers::new(), - waker: None, - notified: false, - interest, - _p: PhantomPinned, - }), - } - } - pub(crate) fn clear_readiness(&self, event: ReadyEvent) { // This consumes the current readiness state **except** for HUP and // error. HUP and error are excluded because a) they are final states @@ -310,114 +298,141 @@ impl Drop for ScheduledIo { unsafe impl Send for ScheduledIo {} unsafe impl Sync for ScheduledIo {} -unsafe impl linked_list::Link for Waiter { - type Handle = NonNull; - type Target = Waiter; +cfg_io_readiness! { + impl ScheduledIo { + /// An async version of `poll_readiness` which uses a linked list of wakers + pub(crate) async fn readiness(&self, interest: mio::Ready) -> ReadyEvent { + self.readiness_fut(interest).await + } - fn as_raw(handle: &NonNull) -> NonNull { - *handle + // This is in a separate function so that the borrow checker doesn't think + // we are borrowing the `UnsafeCell` possibly over await boundaries. + // + // Go figure. + fn readiness_fut(&self, interest: mio::Ready) -> Readiness<'_> { + Readiness { + scheduled_io: self, + state: State::Init, + waiter: UnsafeCell::new(Waiter { + pointers: linked_list::Pointers::new(), + waker: None, + notified: false, + interest, + _p: PhantomPinned, + }), + } + } } - unsafe fn from_raw(ptr: NonNull) -> NonNull { - ptr - } + unsafe impl linked_list::Link for Waiter { + type Handle = NonNull; + type Target = Waiter; - unsafe fn pointers(mut target: NonNull) -> NonNull> { - NonNull::from(&mut target.as_mut().pointers) - } -} + fn as_raw(handle: &NonNull) -> NonNull { + *handle + } -// ===== impl Readiness ===== + unsafe fn from_raw(ptr: NonNull) -> NonNull { + ptr + } -impl Future for Readiness<'_> { - type Output = ReadyEvent; + unsafe fn pointers(mut target: NonNull) -> NonNull> { + NonNull::from(&mut target.as_mut().pointers) + } + } - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let (scheduled_io, state, waiter) = unsafe { - let me = self.get_unchecked_mut(); - (&me.scheduled_io, &mut me.state, &me.waiter) - }; + // ===== impl Readiness ===== - loop { - match *state { - State::Init => { - let mut waiters = scheduled_io.waiters.lock().unwrap(); + impl Future for Readiness<'_> { + type Output = ReadyEvent; - // Safety: called while locked - unsafe { - (*waiter.get()).waker = Some(cx.waker().clone()); - } + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let (scheduled_io, state, waiter) = unsafe { + let me = self.get_unchecked_mut(); + (&me.scheduled_io, &mut me.state, &me.waiter) + }; - // Insert the waiter into the linked list - // - // safety: pointers from `UnsafeCell` are never null. - waiters - .list - .push_front(unsafe { NonNull::new_unchecked(waiter.get()) }); - *state = State::Waiting; - } - State::Waiting => { - // Currently in the "Waiting" state, implying the caller has - // a waiter stored in the waiter list (guarded by - // `notify.waiters`). In order to access the waker fields, - // we must hold the lock. - - let waiters = scheduled_io.waiters.lock().unwrap(); - - // Safety: called while locked - let w = unsafe { &mut *waiter.get() }; - - if w.notified { - // Our waker has been notified. Reset the fields and - // remove it from the list. - w.waker = None; - w.notified = false; - - *state = State::Done; - } else { - // Update the waker, if necessary. - if !w.waker.as_ref().unwrap().will_wake(cx.waker()) { - w.waker = Some(cx.waker().clone()); + loop { + match *state { + State::Init => { + let mut waiters = scheduled_io.waiters.lock().unwrap(); + + // Safety: called while locked + unsafe { + (*waiter.get()).waker = Some(cx.waker().clone()); } - return Poll::Pending; + // Insert the waiter into the linked list + // + // safety: pointers from `UnsafeCell` are never null. + waiters + .list + .push_front(unsafe { NonNull::new_unchecked(waiter.get()) }); + *state = State::Waiting; } + State::Waiting => { + // Currently in the "Waiting" state, implying the caller has + // a waiter stored in the waiter list (guarded by + // `notify.waiters`). In order to access the waker fields, + // we must hold the lock. + + let waiters = scheduled_io.waiters.lock().unwrap(); + + // Safety: called while locked + let w = unsafe { &mut *waiter.get() }; + + if w.notified { + // Our waker has been notified. Reset the fields and + // remove it from the list. + w.waker = None; + w.notified = false; + + *state = State::Done; + } else { + // Update the waker, if necessary. + if !w.waker.as_ref().unwrap().will_wake(cx.waker()) { + w.waker = Some(cx.waker().clone()); + } + + return Poll::Pending; + } - // Explicit drop of the lock to indicate the scope that the - // lock is held. Because holding the lock is required to - // ensure safe access to fields not held within the lock, it - // is helpful to visualize the scope of the critical - // section. - drop(waiters); - } - State::Done => { - let tick = TICK.unpack(scheduled_io.readiness.load(Acquire)) as u8; + // Explicit drop of the lock to indicate the scope that the + // lock is held. Because holding the lock is required to + // ensure safe access to fields not held within the lock, it + // is helpful to visualize the scope of the critical + // section. + drop(waiters); + } + State::Done => { + let tick = TICK.unpack(scheduled_io.readiness.load(Acquire)) as u8; - // Safety: State::Done means it is no longer shared - let w = unsafe { &mut *waiter.get() }; + // Safety: State::Done means it is no longer shared + let w = unsafe { &mut *waiter.get() }; - return Poll::Ready(ReadyEvent { - tick, - readiness: w.interest, - }); + return Poll::Ready(ReadyEvent { + tick, + readiness: w.interest, + }); + } } } } } -} -impl Drop for Readiness<'_> { - fn drop(&mut self) { - let mut waiters = self.scheduled_io.waiters.lock().unwrap(); + impl Drop for Readiness<'_> { + fn drop(&mut self) { + let mut waiters = self.scheduled_io.waiters.lock().unwrap(); - // Safety: `waiter` is only ever stored in `waiters` - unsafe { - waiters - .list - .remove(NonNull::new_unchecked(self.waiter.get())) - }; + // Safety: `waiter` is only ever stored in `waiters` + unsafe { + waiters + .list + .remove(NonNull::new_unchecked(self.waiter.get())) + }; + } } -} -unsafe impl Send for Readiness<'_> {} -unsafe impl Sync for Readiness<'_> {} + unsafe impl Send for Readiness<'_> {} + unsafe impl Sync for Readiness<'_> {} +} diff --git a/tokio/src/io/poll_evented.rs b/tokio/src/io/poll_evented.rs index 64d70ceae34..cfcbae506cd 100644 --- a/tokio/src/io/poll_evented.rs +++ b/tokio/src/io/poll_evented.rs @@ -128,6 +128,7 @@ where /// Returns a shared reference to the underlying I/O object this readiness /// stream is wrapping. + #[cfg(any(feature = "process", feature = "tcp", feature = "udp", feature = "uds",))] pub(crate) fn get_ref(&self) -> &E { self.io.as_ref().unwrap() } @@ -146,16 +147,13 @@ where /// Note that deregistering does not guarantee that the I/O resource can be /// registered with a different reactor. Some I/O resource types can only be /// associated with a single reactor instance for their lifetime. + #[cfg(any(feature = "tcp", feature = "udp", feature = "uds"))] pub(crate) fn into_inner(mut self) -> io::Result { let io = self.io.take().unwrap(); self.registration.deregister(&io)?; Ok(io) } - pub(crate) async fn readiness(&self, interest: mio::Ready) -> io::Result { - self.registration.readiness(interest).await - } - pub(crate) fn clear_readiness(&self, event: ReadyEvent) { self.registration.clear_readiness(event); } @@ -219,6 +217,17 @@ where } } +cfg_io_readiness! { + impl PollEvented + where + E: Evented, + { + pub(crate) async fn readiness(&self, interest: mio::Ready) -> io::Result { + self.registration.readiness(interest).await + } + } +} + // ===== Read / Write impls ===== impl AsyncRead for PollEvented diff --git a/tokio/src/io/registration.rs b/tokio/src/io/registration.rs index 259b2d47c3a..e7adfc3c6d1 100644 --- a/tokio/src/io/registration.rs +++ b/tokio/src/io/registration.rs @@ -127,11 +127,6 @@ impl Registration { inner.deregister_source(io) } - pub(super) async fn readiness(&self, interest: mio::Ready) -> io::Result { - // TODO: does this need to return a `Result`? - Ok(self.shared.readiness(interest).await) - } - pub(super) fn clear_readiness(&self, event: ReadyEvent) { self.shared.clear_readiness(event); } @@ -156,3 +151,12 @@ impl Registration { Poll::Ready(Ok(ev)) } } + +cfg_io_readiness! { + impl Registration { + pub(super) async fn readiness(&self, interest: mio::Ready) -> io::Result { + // TODO: does this need to return a `Result`? + Ok(self.shared.readiness(interest).await) + } + } +} diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs index b31b478dcf1..f057cedc9c9 100644 --- a/tokio/src/lib.rs +++ b/tokio/src/lib.rs @@ -77,7 +77,6 @@ //! - `rt-core`: Enables `tokio::spawn` and the basic (single-threaded) scheduler. //! - `rt-threaded`: Enables the heavier, multi-threaded, work-stealing scheduler. //! - `rt-util`: Enables non-scheduler utilities. -//! - `io-driver`: Enables the `mio` based IO driver. //! - `io-util`: Enables the IO based `Ext` traits. //! - `io-std`: Enable `Stdout`, `Stdin` and `Stderr` types. //! - `net`: Enables `tokio::net` types such as `TcpStream`, `UnixStream` and `UdpSocket`. @@ -269,8 +268,7 @@ //! the [`AsyncRead`], [`AsyncWrite`], and [`AsyncBufRead`] traits. In addition, //! when the "io-util" feature flag is enabled, it also provides combinators and //! functions for working with these traits, forming as an asynchronous -//! counterpart to [`std::io`]. When the "io-driver" feature flag is enabled, it -//! also provides utilities for library authors implementing I/O resources. +//! counterpart to [`std::io`]. //! //! Tokio also includes APIs for performing various kinds of I/O and interacting //! with the operating system asynchronously. These include: diff --git a/tokio/src/macros/cfg.rs b/tokio/src/macros/cfg.rs index ff9f9481952..01a59c88191 100644 --- a/tokio/src/macros/cfg.rs +++ b/tokio/src/macros/cfg.rs @@ -129,16 +129,15 @@ macro_rules! cfg_io_driver { ($($item:item)*) => { $( #[cfg(feature = "io-driver")] - #[cfg_attr(docsrs, doc(cfg(feature = "io-driver")))] $item )* } } -macro_rules! cfg_not_io_driver { +macro_rules! cfg_io_readiness { ($($item:item)*) => { $( - #[cfg(not(feature = "io-driver"))] + #[cfg(feature = "io-readiness")] $item )* } diff --git a/tokio/src/util/linked_list.rs b/tokio/src/util/linked_list.rs index e296fb41a95..9549f07c56e 100644 --- a/tokio/src/util/linked_list.rs +++ b/tokio/src/util/linked_list.rs @@ -107,6 +107,7 @@ impl LinkedList { /// Removes the last element from a list and returns it, or None if it is /// empty. + #[cfg_attr(any(feature = "udp", feature = "uds"), allow(unused))] pub(crate) fn pop_back(&mut self) -> Option { unsafe { let last = self.tail?; @@ -126,6 +127,7 @@ impl LinkedList { } /// Returns whether the linked list doesn not contain any node + #[cfg_attr(any(feature = "udp", feature = "uds"), allow(unused))] pub(crate) fn is_empty(&self) -> bool { if self.head.is_some() { return false; @@ -231,7 +233,7 @@ cfg_rt_threaded! { // ===== impl DrainFilter ===== -cfg_io_driver! { +cfg_io_readiness! { pub(crate) struct DrainFilter<'a, T: Link, F> { list: &'a mut LinkedList, filter: F, diff --git a/tokio/src/util/mod.rs b/tokio/src/util/mod.rs index 9fe4d70a456..d966c0f6afb 100644 --- a/tokio/src/util/mod.rs +++ b/tokio/src/util/mod.rs @@ -3,7 +3,7 @@ cfg_io_driver! { pub(crate) mod slab; } -#[cfg(any(feature = "io-driver", feature = "sync", feature = "rt-core"))] +#[cfg(any(feature = "io-readiness", feature = "sync", feature = "rt-core"))] pub(crate) mod linked_list; #[cfg(any(feature = "rt-threaded", feature = "macros", feature = "stream"))] From e1c9192729bf21719768735c7ba90bfbf2674572 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Mon, 14 Sep 2020 10:24:47 -0700 Subject: [PATCH 03/11] fix unused mut in udp example --- examples/connect.rs | 8 +++----- examples/echo-udp.rs | 2 +- examples/udp-client.rs | 2 +- 3 files changed, 5 insertions(+), 7 deletions(-) diff --git a/examples/connect.rs b/examples/connect.rs index 5d0515a700d..6e909b25efe 100644 --- a/examples/connect.rs +++ b/examples/connect.rs @@ -96,7 +96,6 @@ mod udp { use std::error::Error; use std::io; use std::net::SocketAddr; - use tokio::net::udp::{RecvHalf, SendHalf}; use tokio::net::UdpSocket; pub async fn connect( @@ -114,16 +113,15 @@ mod udp { let socket = UdpSocket::bind(&bind_addr).await?; socket.connect(addr).await?; - let (mut r, mut w) = socket.split(); - future::try_join(send(stdin, &mut w), recv(stdout, &mut r)).await?; + future::try_join(send(stdin, &socket), recv(stdout, &socket)).await?; Ok(()) } async fn send( mut stdin: impl Stream> + Unpin, - writer: &mut SendHalf, + writer: &UdpSocket, ) -> Result<(), io::Error> { while let Some(item) = stdin.next().await { let buf = item?; @@ -135,7 +133,7 @@ mod udp { async fn recv( mut stdout: impl Sink + Unpin, - reader: &mut RecvHalf, + reader: &UdpSocket, ) -> Result<(), io::Error> { loop { let mut buf = vec![0; 1024]; diff --git a/examples/echo-udp.rs b/examples/echo-udp.rs index bc688b9b79b..3027c869696 100644 --- a/examples/echo-udp.rs +++ b/examples/echo-udp.rs @@ -26,7 +26,7 @@ struct Server { impl Server { async fn run(self) -> Result<(), io::Error> { let Server { - mut socket, + socket, mut buf, mut to_send, } = self; diff --git a/examples/udp-client.rs b/examples/udp-client.rs index a191033d820..a394ee66e06 100644 --- a/examples/udp-client.rs +++ b/examples/udp-client.rs @@ -55,7 +55,7 @@ async fn main() -> Result<(), Box> { } .parse()?; - let mut socket = UdpSocket::bind(local_addr).await?; + let socket = UdpSocket::bind(local_addr).await?; const MAX_DATAGRAM_SIZE: usize = 65_507; socket.connect(&remote_addr).await?; let data = get_stdin_data()?; From 5650348b9df17c50906ad5b33140e000830b154b Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Mon, 14 Sep 2020 11:47:17 -0700 Subject: [PATCH 04/11] disable udp-codec example --- examples/udp-codec.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/examples/udp-codec.rs b/examples/udp-codec.rs index 8b64cbc3ab5..f338bd84bf1 100644 --- a/examples/udp-codec.rs +++ b/examples/udp-codec.rs @@ -1,3 +1,8 @@ +fn main() {} + +// Disabled while future of UdpFramed is decided on. +// See https://github.com/tokio-rs/tokio/issues/2830 +/* //! This example leverages `BytesCodec` to create a UDP client and server which //! speak a custom protocol. //! @@ -78,3 +83,4 @@ async fn pong(socket: &mut UdpFramed) -> Result<(), io::Error> { Ok(()) } +*/ From e74bdb05851aeab52cda5a688f69a2487b161818 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Mon, 14 Sep 2020 14:35:39 -0700 Subject: [PATCH 05/11] Fix readiness check when currently ready --- tokio/src/io/driver/scheduled_io.rs | 43 +++++++++++++++++++++++------ 1 file changed, 34 insertions(+), 9 deletions(-) diff --git a/tokio/src/io/driver/scheduled_io.rs b/tokio/src/io/driver/scheduled_io.rs index 7a33d39bab0..7174c61b772 100644 --- a/tokio/src/io/driver/scheduled_io.rs +++ b/tokio/src/io/driver/scheduled_io.rs @@ -53,7 +53,7 @@ cfg_io_readiness! { /// The interest this waiter is waiting on interest: mio::Ready, - notified: bool, + is_ready: bool, /// Should never be `!Unpin` _p: PhantomPinned, @@ -215,7 +215,7 @@ impl ScheduledIo { { let waiter = unsafe { &mut *waiter.as_ptr() }; if let Some(waker) = waiter.waker.take() { - waiter.notified = true; + waiter.is_ready = true; waker.wake(); } } @@ -316,7 +316,7 @@ cfg_io_readiness! { waiter: UnsafeCell::new(Waiter { pointers: linked_list::Pointers::new(), waker: None, - notified: false, + is_ready: false, interest, _p: PhantomPinned, }), @@ -347,6 +347,8 @@ cfg_io_readiness! { type Output = ReadyEvent; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + use std::sync::atomic::Ordering::SeqCst; + let (scheduled_io, state, waiter) = unsafe { let me = self.get_unchecked_mut(); (&me.scheduled_io, &mut me.state, &me.waiter) @@ -355,8 +357,35 @@ cfg_io_readiness! { loop { match *state { State::Init => { + // Optimistically check existing readiness + let curr = scheduled_io.readiness.load(SeqCst); + let readiness = mio::Ready::from_usize(READINESS.unpack(curr)); + + // Safety: `waiter.interest` never changes + let interest = unsafe { (*waiter.get()).interest }; + + if readiness.contains(interest) { + // Currently ready! + let tick = TICK.unpack(curr) as u8; + *state = State::Done; + return Poll::Ready(ReadyEvent { readiness, tick }); + } + + // Wasn't ready, take the lock (and check again while locked). let mut waiters = scheduled_io.waiters.lock().unwrap(); + let curr = scheduled_io.readiness.load(SeqCst); + let readiness = mio::Ready::from_usize(READINESS.unpack(curr)); + + if readiness.contains(interest) { + // Currently ready! + let tick = TICK.unpack(curr) as u8; + *state = State::Done; + return Poll::Ready(ReadyEvent { readiness, tick }); + } + + // Not ready even after locked, insert into list... + // Safety: called while locked unsafe { (*waiter.get()).waker = Some(cx.waker().clone()); @@ -381,12 +410,8 @@ cfg_io_readiness! { // Safety: called while locked let w = unsafe { &mut *waiter.get() }; - if w.notified { - // Our waker has been notified. Reset the fields and - // remove it from the list. - w.waker = None; - w.notified = false; - + if w.is_ready { + // Our waker has been notified. *state = State::Done; } else { // Update the waker, if necessary. From fcadd5b430f79c12c972b4f9ef3475e2d6de8d30 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Mon, 14 Sep 2020 16:13:00 -0700 Subject: [PATCH 06/11] Fix mut in unix datagram docs --- tokio/src/net/unix/datagram/socket.rs | 30 +++++++++++++-------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/tokio/src/net/unix/datagram/socket.rs b/tokio/src/net/unix/datagram/socket.rs index eee7ac66de5..599c800ee4b 100644 --- a/tokio/src/net/unix/datagram/socket.rs +++ b/tokio/src/net/unix/datagram/socket.rs @@ -34,9 +34,9 @@ cfg_uds! { /// /// // Bind each socket to a filesystem path /// let tx_path = tmp.path().join("tx"); - /// let mut tx = UnixDatagram::bind(&tx_path)?; + /// let tx = UnixDatagram::bind(&tx_path)?; /// let rx_path = tmp.path().join("rx"); - /// let mut rx = UnixDatagram::bind(&rx_path)?; + /// let rx = UnixDatagram::bind(&rx_path)?; /// /// let bytes = b"hello world"; /// tx.send_to(bytes, &rx_path).await?; @@ -60,7 +60,7 @@ cfg_uds! { /// use tokio::net::UnixDatagram; /// /// // Create the pair of sockets - /// let (mut sock1, mut sock2) = UnixDatagram::pair()?; + /// let (sock1, sock2) = UnixDatagram::pair()?; /// /// // Since the sockets are paired, the paired send/recv /// // functions can be used @@ -124,7 +124,7 @@ impl UnixDatagram { /// use tokio::net::UnixDatagram; /// /// // Create the pair of sockets - /// let (mut sock1, mut sock2) = UnixDatagram::pair()?; + /// let (sock1, sock2) = UnixDatagram::pair()?; /// /// // Since the sockets are paired, the paired send/recv /// // functions can be used @@ -204,12 +204,12 @@ impl UnixDatagram { /// use tempfile::tempdir; /// /// // Create an unbound socket - /// let mut tx = UnixDatagram::unbound()?; + /// let tx = UnixDatagram::unbound()?; /// /// // Create another, bound socket /// let tmp = tempdir()?; /// let rx_path = tmp.path().join("rx"); - /// let mut rx = UnixDatagram::bind(&rx_path)?; + /// let rx = UnixDatagram::bind(&rx_path)?; /// /// // Send to the bound socket /// let bytes = b"hello world"; @@ -243,12 +243,12 @@ impl UnixDatagram { /// use tempfile::tempdir; /// /// // Create an unbound socket - /// let mut tx = UnixDatagram::unbound()?; + /// let tx = UnixDatagram::unbound()?; /// /// // Create another, bound socket /// let tmp = tempdir()?; /// let rx_path = tmp.path().join("rx"); - /// let mut rx = UnixDatagram::bind(&rx_path)?; + /// let rx = UnixDatagram::bind(&rx_path)?; /// /// // Connect to the bound socket /// tx.connect(&rx_path)?; @@ -280,7 +280,7 @@ impl UnixDatagram { /// use tokio::net::UnixDatagram; /// /// // Create the pair of sockets - /// let (mut sock1, mut sock2) = UnixDatagram::pair()?; + /// let (sock1, sock2) = UnixDatagram::pair()?; /// /// // Since the sockets are paired, the paired send/recv /// // functions can be used @@ -378,7 +378,7 @@ impl UnixDatagram { /// use tokio::net::UnixDatagram; /// /// // Create the pair of sockets - /// let (mut sock1, mut sock2) = UnixDatagram::pair()?; + /// let (sock1, sock2) = UnixDatagram::pair()?; /// /// // Since the sockets are paired, the paired send/recv /// // functions can be used @@ -443,9 +443,9 @@ impl UnixDatagram { /// /// // Bind each socket to a filesystem path /// let tx_path = tmp.path().join("tx"); - /// let mut tx = UnixDatagram::bind(&tx_path)?; + /// let tx = UnixDatagram::bind(&tx_path)?; /// let rx_path = tmp.path().join("rx"); - /// let mut rx = UnixDatagram::bind(&rx_path)?; + /// let rx = UnixDatagram::bind(&rx_path)?; /// /// let bytes = b"hello world"; /// tx.send_to(bytes, &rx_path).await?; @@ -486,9 +486,9 @@ impl UnixDatagram { /// /// // Bind each socket to a filesystem path /// let tx_path = tmp.path().join("tx"); - /// let mut tx = UnixDatagram::bind(&tx_path)?; + /// let tx = UnixDatagram::bind(&tx_path)?; /// let rx_path = tmp.path().join("rx"); - /// let mut rx = UnixDatagram::bind(&rx_path)?; + /// let rx = UnixDatagram::bind(&rx_path)?; /// /// let bytes = b"hello world"; /// tx.send_to(bytes, &rx_path).await?; @@ -675,7 +675,7 @@ impl UnixDatagram { /// use std::net::Shutdown; /// /// // Create an unbound socket - /// let (mut socket, other) = UnixDatagram::pair()?; + /// let (socket, other) = UnixDatagram::pair()?; /// /// socket.shutdown(Shutdown::Both)?; /// From c92db212630835d3e3a8e61ddedcb62f1c561a89 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Tue, 22 Sep 2020 15:32:22 -0700 Subject: [PATCH 07/11] Apply suggestions from code review Co-authored-by: Eliza Weisman Co-authored-by: Alice Ryhl --- tokio/src/io/driver/scheduled_io.rs | 6 +++--- tokio/src/net/tcp/stream.rs | 2 +- tokio/src/net/unix/stream.rs | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/tokio/src/io/driver/scheduled_io.rs b/tokio/src/io/driver/scheduled_io.rs index 7174c61b772..fcc1816888c 100644 --- a/tokio/src/io/driver/scheduled_io.rs +++ b/tokio/src/io/driver/scheduled_io.rs @@ -268,7 +268,7 @@ impl ScheduledIo { pub(crate) fn clear_readiness(&self, event: ReadyEvent) { // This consumes the current readiness state **except** for HUP and // error. HUP and error are excluded because a) they are final states - // and never transitition out and b) both the read AND the write + // and never transition out and b) both the read AND the write // directions need to be able to obvserve these states. // // # Platform-specific behavior @@ -282,9 +282,9 @@ impl ScheduledIo { // AND write. A specific case that `EPOLLERR` occurs is when the read // end of a pipe is closed. When this occurs, a peer blocked by // writing to the pipe should be notified. - // result isn't important let mask_no_hup = (event.readiness - platform::hup() - platform::error()).as_usize(); - + + // result isn't important let _ = self.set_readiness(None, Tick::Clear(event.tick), |curr| curr & (!mask_no_hup)); } } diff --git a/tokio/src/net/tcp/stream.rs b/tokio/src/net/tcp/stream.rs index 99fc71121dc..9fa1b040af9 100644 --- a/tokio/src/net/tcp/stream.rs +++ b/tokio/src/net/tcp/stream.rs @@ -707,7 +707,7 @@ impl TcpStream { loop { let ev = ready!(self.io.poll_read_ready(cx))?; - // Safety: `TcpStream::read` will not peak at the maybe uinitialized bytes. + // Safety: `TcpStream::read` will not peek at the maybe uinitialized bytes. let b = unsafe { &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit] as *mut [u8]) }; diff --git a/tokio/src/net/unix/stream.rs b/tokio/src/net/unix/stream.rs index 1134b9d162a..37104314dc4 100644 --- a/tokio/src/net/unix/stream.rs +++ b/tokio/src/net/unix/stream.rs @@ -214,7 +214,7 @@ impl UnixStream { loop { let ev = ready!(self.io.poll_read_ready(cx))?; - // Safety: `UnixStream::read` will not peak at the maybe uinitialized bytes. + // Safety: `UnixStream::read` will not peek at the maybe uinitialized bytes. let b = unsafe { &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit] as *mut [u8]) }; From a2ffe53262052b9e8c5f7af740a28e2fae28c000 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Tue, 22 Sep 2020 16:10:01 -0700 Subject: [PATCH 08/11] Move do_io method to PollEvented --- tokio/src/io/driver/scheduled_io.rs | 2 +- tokio/src/io/poll_evented.rs | 16 +++++++++++++ tokio/src/net/udp/socket.rs | 28 +++++++--------------- tokio/src/net/unix/datagram/socket.rs | 34 +++++++++------------------ 4 files changed, 36 insertions(+), 44 deletions(-) diff --git a/tokio/src/io/driver/scheduled_io.rs b/tokio/src/io/driver/scheduled_io.rs index fcc1816888c..097cd3ac038 100644 --- a/tokio/src/io/driver/scheduled_io.rs +++ b/tokio/src/io/driver/scheduled_io.rs @@ -283,7 +283,7 @@ impl ScheduledIo { // end of a pipe is closed. When this occurs, a peer blocked by // writing to the pipe should be notified. let mask_no_hup = (event.readiness - platform::hup() - platform::error()).as_usize(); - + // result isn't important let _ = self.set_readiness(None, Tick::Clear(event.tick), |curr| curr & (!mask_no_hup)); } diff --git a/tokio/src/io/poll_evented.rs b/tokio/src/io/poll_evented.rs index cfcbae506cd..ac64653b46d 100644 --- a/tokio/src/io/poll_evented.rs +++ b/tokio/src/io/poll_evented.rs @@ -225,6 +225,22 @@ cfg_io_readiness! { pub(crate) async fn readiness(&self, interest: mio::Ready) -> io::Result { self.registration.readiness(interest).await } + + pub(crate) async fn async_io(&self, interest: mio::Ready, mut op: F) -> io::Result + where + F: FnMut(&E) -> io::Result, + { + loop { + let event = self.readiness(interest).await?; + + match op(self.get_ref()) { + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.clear_readiness(event); + } + x => return x, + } + } + } } } diff --git a/tokio/src/net/udp/socket.rs b/tokio/src/net/udp/socket.rs index d50cfa45e15..937045bd0e4 100644 --- a/tokio/src/net/udp/socket.rs +++ b/tokio/src/net/udp/socket.rs @@ -102,7 +102,8 @@ impl UdpSocket { /// /// [`connect`]: method@Self::connect pub async fn send(&self, buf: &[u8]) -> io::Result { - self.do_io(mio::Ready::writable(), |sock| sock.send(buf)) + self.io + .async_io(mio::Ready::writable(), |sock| sock.send(buf)) .await } @@ -133,7 +134,8 @@ impl UdpSocket { /// /// [`connect`]: method@Self::connect pub async fn recv(&self, buf: &mut [u8]) -> io::Result { - self.do_io(mio::Ready::readable(), |sock| sock.recv(buf)) + self.io + .async_io(mio::Ready::readable(), |sock| sock.recv(buf)) .await } @@ -170,7 +172,8 @@ impl UdpSocket { } async fn send_to_addr(&self, buf: &[u8], target: &SocketAddr) -> io::Result { - self.do_io(mio::Ready::writable(), |sock| sock.send_to(buf, target)) + self.io + .async_io(mio::Ready::writable(), |sock| sock.send_to(buf, target)) .await } @@ -181,26 +184,11 @@ impl UdpSocket { /// to hold the message bytes. If a message is too long to fit in the supplied /// buffer, excess bytes may be discarded. pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { - self.do_io(mio::Ready::readable(), |sock| sock.recv_from(buf)) + self.io + .async_io(mio::Ready::readable(), |sock| sock.recv_from(buf)) .await } - async fn do_io(&self, interest: mio::Ready, mut op: F) -> io::Result - where - F: FnMut(&mio::net::UdpSocket) -> io::Result, - { - loop { - let event = self.io.readiness(interest).await?; - - match op(self.io.get_ref()) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_readiness(event); - } - x => return x, - } - } - } - /// Gets the value of the `SO_BROADCAST` option for this socket. /// /// For more information about this option, see [`set_broadcast`]. diff --git a/tokio/src/net/unix/datagram/socket.rs b/tokio/src/net/unix/datagram/socket.rs index 599c800ee4b..dc48bbd4310 100644 --- a/tokio/src/net/unix/datagram/socket.rs +++ b/tokio/src/net/unix/datagram/socket.rs @@ -297,7 +297,8 @@ impl UnixDatagram { /// # } /// ``` pub async fn send(&self, buf: &[u8]) -> io::Result { - self.do_io(mio::Ready::writable(), |sock| sock.send(buf)) + self.io + .async_io(mio::Ready::writable(), |sock| sock.send(buf)) .await } @@ -395,7 +396,8 @@ impl UnixDatagram { /// # } /// ``` pub async fn recv(&self, buf: &mut [u8]) -> io::Result { - self.do_io(mio::Ready::readable(), |sock| sock.recv(buf)) + self.io + .async_io(mio::Ready::readable(), |sock| sock.recv(buf)) .await } @@ -464,10 +466,11 @@ impl UnixDatagram { where P: AsRef, { - self.do_io(mio::Ready::writable(), |sock| { - sock.send_to(buf, target.as_ref()) - }) - .await + self.io + .async_io(mio::Ready::writable(), |sock| { + sock.send_to(buf, target.as_ref()) + }) + .await } /// Receives data from the socket. @@ -504,7 +507,8 @@ impl UnixDatagram { /// # } /// ``` pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { - self.do_io(mio::Ready::readable(), |sock| sock.recv_from(buf)) + self.io + .async_io(mio::Ready::readable(), |sock| sock.recv_from(buf)) .await } @@ -695,22 +699,6 @@ impl UnixDatagram { pub fn shutdown(&self, how: Shutdown) -> io::Result<()> { self.io.get_ref().shutdown(how) } - - async fn do_io(&self, interest: mio::Ready, mut op: F) -> io::Result - where - F: FnMut(&mio_uds::UnixDatagram) -> io::Result, - { - loop { - let event = self.io.readiness(interest).await?; - - match op(self.io.get_ref()) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_readiness(event); - } - x => return x, - } - } - } } impl TryFrom for mio_uds::UnixDatagram { From f64928baec9d31b0a66a3baa1c1c448965109e35 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Tue, 22 Sep 2020 16:33:00 -0700 Subject: [PATCH 09/11] remove unused code, improve set_readiness comments --- tokio/src/io/driver/scheduled_io.rs | 2 ++ tokio/src/sync/task/atomic_waker.rs | 4 ++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/tokio/src/io/driver/scheduled_io.rs b/tokio/src/io/driver/scheduled_io.rs index 097cd3ac038..48c56a19a18 100644 --- a/tokio/src/io/driver/scheduled_io.rs +++ b/tokio/src/io/driver/scheduled_io.rs @@ -125,6 +125,8 @@ impl ScheduledIo { /// /// # Arguments /// - `token`: the token for this `ScheduledIo`. + /// - `tick`: whether setting the tick or trying to clear readiness for a + /// specific tick. /// - `f`: a closure returning a new readiness value given the previous /// readiness. /// diff --git a/tokio/src/sync/task/atomic_waker.rs b/tokio/src/sync/task/atomic_waker.rs index 0245ccb7c69..ae4cac7c247 100644 --- a/tokio/src/sync/task/atomic_waker.rs +++ b/tokio/src/sync/task/atomic_waker.rs @@ -141,12 +141,12 @@ impl AtomicWaker { } } + /* /// Registers the current waker to be notified on calls to `wake`. - #[cfg(feature = "io-driver")] - #[allow(unused)] pub(crate) fn register(&self, waker: Waker) { self.do_register(waker); } + */ /// Registers the provided waker to be notified on calls to `wake`. /// From dbb99f3c7709cc1caf58322ff2eac0e84bd3281e Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Tue, 22 Sep 2020 17:50:14 -0700 Subject: [PATCH 10/11] update signal driver --- tokio/src/io/poll_evented.rs | 22 ++++++++++++-- tokio/src/io/registration.rs | 19 +----------- tokio/src/macros/cfg.rs | 9 ++++++ tokio/src/net/tcp/listener.rs | 5 ---- tokio/src/net/tcp/stream.rs | 5 ---- tokio/src/net/udp/socket.rs | 5 ---- tokio/src/net/unix/datagram/socket.rs | 5 ---- tokio/src/net/unix/listener.rs | 5 ---- tokio/src/net/unix/stream.rs | 5 ---- tokio/src/signal/unix/driver.rs | 43 +++++++++++++++++---------- 10 files changed, 57 insertions(+), 66 deletions(-) diff --git a/tokio/src/io/poll_evented.rs b/tokio/src/io/poll_evented.rs index ac64653b46d..2c943ea4edb 100644 --- a/tokio/src/io/poll_evented.rs +++ b/tokio/src/io/poll_evented.rs @@ -1,4 +1,4 @@ -use crate::io::driver::{Direction, ReadyEvent}; +use crate::io::driver::{Direction, Handle, ReadyEvent}; use crate::io::registration::Registration; use crate::io::{AsyncRead, AsyncWrite, ReadBuf}; @@ -90,6 +90,7 @@ where /// The runtime is usually set implicitly when this function is called /// from a future driven by a tokio runtime, otherwise runtime can be set /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. + #[cfg_attr(feature = "signal", allow(unused))] pub(crate) fn new(io: E) -> io::Result { PollEvented::new_with_ready(io, mio::Ready::all()) } @@ -118,8 +119,17 @@ where /// The runtime is usually set implicitly when this function is called /// from a future driven by a tokio runtime, otherwise runtime can be set /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. + #[cfg_attr(feature = "signal", allow(unused))] pub(crate) fn new_with_ready(io: E, ready: mio::Ready) -> io::Result { - let registration = Registration::new_with_ready(&io, ready)?; + Self::new_with_ready_and_handle(io, ready, Handle::current()) + } + + pub(crate) fn new_with_ready_and_handle( + io: E, + ready: mio::Ready, + handle: Handle, + ) -> io::Result { + let registration = Registration::new_with_ready_and_handle(&io, ready, handle)?; Ok(Self { io: Some(io), registration, @@ -128,7 +138,13 @@ where /// Returns a shared reference to the underlying I/O object this readiness /// stream is wrapping. - #[cfg(any(feature = "process", feature = "tcp", feature = "udp", feature = "uds",))] + #[cfg(any( + feature = "process", + feature = "tcp", + feature = "udp", + feature = "uds", + feature = "signal" + ))] pub(crate) fn get_ref(&self) -> &E { self.io.as_ref().unwrap() } diff --git a/tokio/src/io/registration.rs b/tokio/src/io/registration.rs index 7396c4b87f0..e4ec096f297 100644 --- a/tokio/src/io/registration.rs +++ b/tokio/src/io/registration.rs @@ -38,7 +38,7 @@ cfg_io_driver! { /// [`poll_read_ready`]: method@Self::poll_read_ready` /// [`poll_write_ready`]: method@Self::poll_write_ready` #[derive(Debug)] - pub(super) struct Registration { + pub(crate) struct Registration { /// Handle to the associated driver. handle: Handle, @@ -74,23 +74,6 @@ impl Registration { /// /// - `Ok` if the registration happened successfully /// - `Err` if an error was encountered during registration - /// - /// - /// # Panics - /// - /// This function panics if thread-local runtime is not set. - /// - /// The runtime is usually set implicitly when this function is called - /// from a future driven by a tokio runtime, otherwise runtime can be set - /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. - pub(super) fn new_with_ready(io: &T, ready: mio::Ready) -> io::Result - where - T: Evented, - { - Self::new_with_ready_and_handle(io, ready, Handle::current()) - } - - /// Same as `new_with_ready` but also accepts an explicit handle. pub(crate) fn new_with_ready_and_handle( io: &T, ready: mio::Ready, diff --git a/tokio/src/macros/cfg.rs b/tokio/src/macros/cfg.rs index 01a59c88191..1336c63de92 100644 --- a/tokio/src/macros/cfg.rs +++ b/tokio/src/macros/cfg.rs @@ -134,6 +134,15 @@ macro_rules! cfg_io_driver { } } +macro_rules! cfg_not_io_driver { + ($($item:item)*) => { + $( + #[cfg(not(feature = "io-driver"))] + $item + )* + } +} + macro_rules! cfg_io_readiness { ($($item:item)*) => { $( diff --git a/tokio/src/net/tcp/listener.rs b/tokio/src/net/tcp/listener.rs index f00de265d45..4ea498b00c2 100644 --- a/tokio/src/net/tcp/listener.rs +++ b/tokio/src/net/tcp/listener.rs @@ -412,11 +412,6 @@ impl TryFrom for mio::net::TcpListener { type Error = io::Error; /// Consumes value, returning the mio I/O object. - /// - /// See [`PollEvented::into_inner`] for more details about - /// resource deregistration that happens during the call. - /// - /// [`PollEvented::into_inner`]: crate::io::PollEvented::into_inner fn try_from(value: TcpListener) -> Result { value.io.into_inner() } diff --git a/tokio/src/net/tcp/stream.rs b/tokio/src/net/tcp/stream.rs index 9fa1b040af9..f4f705b40e4 100644 --- a/tokio/src/net/tcp/stream.rs +++ b/tokio/src/net/tcp/stream.rs @@ -855,11 +855,6 @@ impl TryFrom for mio::net::TcpStream { type Error = io::Error; /// Consumes value, returning the mio I/O object. - /// - /// See [`PollEvented::into_inner`] for more details about - /// resource deregistration that happens during the call. - /// - /// [`PollEvented::into_inner`]: crate::io::PollEvented::into_inner fn try_from(value: TcpStream) -> Result { value.io.into_inner() } diff --git a/tokio/src/net/udp/socket.rs b/tokio/src/net/udp/socket.rs index c9703764e3c..d0dece3e970 100644 --- a/tokio/src/net/udp/socket.rs +++ b/tokio/src/net/udp/socket.rs @@ -328,11 +328,6 @@ impl TryFrom for mio::net::UdpSocket { type Error = io::Error; /// Consumes value, returning the mio I/O object. - /// - /// See [`PollEvented::into_inner`] for more details about - /// resource deregistration that happens during the call. - /// - /// [`PollEvented::into_inner`]: crate::io::PollEvented::into_inner fn try_from(value: UdpSocket) -> Result { value.io.into_inner() } diff --git a/tokio/src/net/unix/datagram/socket.rs b/tokio/src/net/unix/datagram/socket.rs index dc48bbd4310..78baf279194 100644 --- a/tokio/src/net/unix/datagram/socket.rs +++ b/tokio/src/net/unix/datagram/socket.rs @@ -705,11 +705,6 @@ impl TryFrom for mio_uds::UnixDatagram { type Error = io::Error; /// Consumes value, returning the mio I/O object. - /// - /// See [`PollEvented::into_inner`] for more details about - /// resource deregistration that happens during the call. - /// - /// [`PollEvented::into_inner`]: crate::io::PollEvented::into_inner fn try_from(value: UnixDatagram) -> Result { value.io.into_inner() } diff --git a/tokio/src/net/unix/listener.rs b/tokio/src/net/unix/listener.rs index 68baa7f40ac..eb316b074d3 100644 --- a/tokio/src/net/unix/listener.rs +++ b/tokio/src/net/unix/listener.rs @@ -192,11 +192,6 @@ impl TryFrom for mio_uds::UnixListener { type Error = io::Error; /// Consumes value, returning the mio I/O object. - /// - /// See [`PollEvented::into_inner`] for more details about - /// resource deregistration that happens during the call. - /// - /// [`PollEvented::into_inner`]: crate::io::PollEvented::into_inner fn try_from(value: UnixListener) -> Result { value.io.into_inner() } diff --git a/tokio/src/net/unix/stream.rs b/tokio/src/net/unix/stream.rs index 37104314dc4..715ed7aa485 100644 --- a/tokio/src/net/unix/stream.rs +++ b/tokio/src/net/unix/stream.rs @@ -143,11 +143,6 @@ impl TryFrom for mio_uds::UnixStream { type Error = io::Error; /// Consumes value, returning the mio I/O object. - /// - /// See [`PollEvented::into_inner`] for more details about - /// resource deregistration that happens during the call. - /// - /// [`PollEvented::into_inner`]: crate::io::PollEvented::into_inner fn try_from(value: UnixStream) -> Result { value.io.into_inner() } diff --git a/tokio/src/signal/unix/driver.rs b/tokio/src/signal/unix/driver.rs index 0458893cce4..639a483ef24 100644 --- a/tokio/src/signal/unix/driver.rs +++ b/tokio/src/signal/unix/driver.rs @@ -1,13 +1,15 @@ //! Signal driver use crate::io::driver::Driver as IoDriver; -use crate::io::Registration; +use crate::io::PollEvented; use crate::park::Park; use crate::runtime::context; use crate::signal::registry::globals; use mio_uds::UnixStream; use std::io::{self, Read}; +use std::ptr; use std::sync::{Arc, Weak}; +use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; use std::time::Duration; /// Responsible for registering wakeups when an OS signal is received, and @@ -21,11 +23,7 @@ pub(crate) struct Driver { park: IoDriver, /// A pipe for receiving wake events from the signal handler - receiver: UnixStream, - - /// The actual registraiton for `receiver` when active. - /// Lazily bound at the first signal registration. - registration: Registration, + receiver: PollEvented, /// Shared state inner: Arc, @@ -58,13 +56,12 @@ impl Driver { // either, since we can't compare Handles or assume they will always // point to the exact same reactor. let receiver = globals().receiver.try_clone()?; - let registration = - Registration::new_with_ready_and_handle(&receiver, mio::Ready::all(), park.handle())?; + let receiver = + PollEvented::new_with_ready_and_handle(receiver, mio::Ready::all(), park.handle())?; Ok(Self { park, receiver, - registration, inner: Arc::new(Inner(())), }) } @@ -79,17 +76,23 @@ impl Driver { fn process(&self) { // Check if the pipe is ready to read and therefore has "woken" us up - match self.registration.take_read_ready() { - Ok(Some(ready)) => assert!(ready.is_readable()), - Ok(None) => return, // No wake has arrived, bail - Err(e) => panic!("reactor gone: {}", e), - } + // + // To do so, we will `poll_read_ready` with a noop waker, since we don't + // need to actually be notified when read ready... + let waker = unsafe { Waker::from_raw(RawWaker::new(ptr::null(), &NOOP_WAKER_VTABLE)) }; + let mut cx = Context::from_waker(&waker); + + let ev = match self.receiver.poll_read_ready(&mut cx) { + Poll::Ready(Ok(ev)) => ev, + Poll::Ready(Err(e)) => panic!("reactor gone: {}", e), + Poll::Pending => return, // No wake has arrived, bail + }; // Drain the pipe completely so we can receive a new readiness event // if another signal has come in. let mut buf = [0; 128]; loop { - match (&self.receiver).read(&mut buf) { + match self.receiver.get_ref().read(&mut buf) { Ok(0) => panic!("EOF on self-pipe"), Ok(_) => continue, // Keep reading Err(e) if e.kind() == io::ErrorKind::WouldBlock => break, @@ -97,11 +100,21 @@ impl Driver { } } + self.receiver.clear_readiness(ev); + // Broadcast any signals which were received globals().broadcast(); } } +const NOOP_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(noop_clone, noop, noop, noop); + +unsafe fn noop_clone(_data: *const ()) -> RawWaker { + RawWaker::new(ptr::null(), &NOOP_WAKER_VTABLE) +} + +unsafe fn noop(_data: *const ()) {} + // ===== impl Park for Driver ===== impl Park for Driver { From 7293369de4623625f3918e0488cd6990ef6c58af Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Wed, 23 Sep 2020 10:09:40 -0700 Subject: [PATCH 11/11] fix loom unused warning --- tokio/src/io/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/tokio/src/io/mod.rs b/tokio/src/io/mod.rs index 147c9d56bc0..3a22722a69b 100644 --- a/tokio/src/io/mod.rs +++ b/tokio/src/io/mod.rs @@ -207,6 +207,7 @@ cfg_io_driver! { pub(crate) mod driver; mod poll_evented; + #[cfg(not(loom))] pub(crate) use poll_evented::PollEvented; mod registration;