diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index 0b2017958f1..fdce440fb41 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -58,9 +58,9 @@ net = ["dns", "tcp", "udp", "uds"] process = [ "lazy_static", "libc", - "mio", - "mio-named-pipes", - "mio-uds", + "mio/os-poll", + "mio/os-util", + "mio/uds", "signal-hook-registry", "winapi/threadpoollegacyapiset", ] @@ -74,18 +74,18 @@ rt-threaded = [ signal = [ "lazy_static", "libc", - "mio", - "mio-uds", + "mio/os-poll", + "mio/uds", "signal-hook-registry", "winapi/consoleapi", ] stream = ["futures-core"] sync = ["fnv"] test-util = [] -tcp = ["lazy_static", "mio"] +tcp = ["lazy_static", "mio/tcp", "mio/os-poll"] time = ["slab"] -udp = ["lazy_static", "mio"] -uds = ["lazy_static", "libc", "mio", "mio-uds"] +udp = ["lazy_static", "mio/udp", "mio/os-poll"] +uds = ["lazy_static", "libc", "mio/uds", "mio/os-poll"] [dependencies] tokio-macros = { version = "0.3.0", path = "../tokio-macros", optional = true } @@ -98,20 +98,16 @@ fnv = { version = "1.0.6", optional = true } futures-core = { version = "0.3.0", optional = true } lazy_static = { version = "1.0.2", optional = true } memchr = { version = "2.2", optional = true } -mio = { version = "0.6.20", optional = true } +mio = { version = "0.7.2", optional = true } num_cpus = { version = "1.8.0", optional = true } parking_lot = { version = "0.11.0", optional = true } # Not in full slab = { version = "0.4.1", optional = true } # Backs `DelayQueue` tracing = { version = "0.1.16", default-features = false, features = ["std"], optional = true } # Not in full [target.'cfg(unix)'.dependencies] -mio-uds = { version = "0.6.5", optional = true } libc = { version = "0.2.42", optional = true } signal-hook-registry = { version = "1.1.1", optional = true } -[target.'cfg(windows)'.dependencies] -mio-named-pipes = { version = "0.1.6", optional = true } - [target.'cfg(windows)'.dependencies.winapi] version = "0.3.8" default-features = false diff --git a/tokio/src/io/driver/mod.rs b/tokio/src/io/driver/mod.rs index 30b30203995..c4f5887a930 100644 --- a/tokio/src/io/driver/mod.rs +++ b/tokio/src/io/driver/mod.rs @@ -1,4 +1,5 @@ -pub(crate) mod platform; +mod ready; +use ready::Ready; mod scheduled_io; pub(crate) use scheduled_io::ScheduledIo; // pub(crate) for tests @@ -8,7 +9,6 @@ use crate::runtime::context; use crate::util::bit; use crate::util::slab::{self, Slab}; -use mio::event::Evented; use std::fmt; use std::io; use std::sync::{Arc, Weak}; @@ -27,10 +27,11 @@ pub(crate) struct Driver { /// with this driver. resources: Slab, + /// The system event queue + poll: mio::Poll, + /// State shared between the reactor and the handles. inner: Arc, - - _wakeup_registration: mio::Registration, } /// A reference to an I/O driver @@ -41,18 +42,18 @@ pub(crate) struct Handle { pub(crate) struct ReadyEvent { tick: u8, - readiness: mio::Ready, + ready: Ready, } pub(super) struct Inner { - /// The underlying system event queue. - io: mio::Poll, + /// Registers I/O resources + registry: mio::Registry, /// Allocates `ScheduledIo` handles when creating new resources. pub(super) io_dispatch: slab::Allocator, /// Used to wake up the reactor from a call to `turn` - wakeup: mio::SetReadiness, + waker: mio::Waker, } #[derive(Debug, Eq, PartialEq, Clone, Copy)] @@ -92,27 +93,22 @@ impl Driver { /// Creates a new event loop, returning any error that happened during the /// creation. pub(crate) fn new() -> io::Result { - let io = mio::Poll::new()?; - let wakeup_pair = mio::Registration::new2(); + let poll = mio::Poll::new()?; + let waker = mio::Waker::new(poll.registry(), TOKEN_WAKEUP)?; + let registry = poll.registry().try_clone()?; + let slab = Slab::new(); let allocator = slab.allocator(); - io.register( - &wakeup_pair.0, - TOKEN_WAKEUP, - mio::Ready::readable(), - mio::PollOpt::level(), - )?; - Ok(Driver { tick: 0, events: Some(mio::Events::with_capacity(1024)), resources: slab, - _wakeup_registration: wakeup_pair.0, + poll, inner: Arc::new(Inner { - io, + registry, io_dispatch: allocator, - wakeup: wakeup_pair.1, + waker, }), }) } @@ -143,23 +139,18 @@ impl Driver { // Block waiting for an event to happen, peeling out how many events // happened. - match self.inner.io.poll(&mut events, max_wait) { + match self.poll.poll(&mut events, max_wait) { Ok(_) => {} + Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {} Err(e) => return Err(e), } // Process all the events that came in, dispatching appropriately - for event in events.iter() { let token = event.token(); - if token == TOKEN_WAKEUP { - self.inner - .wakeup - .set_readiness(mio::Ready::empty()) - .unwrap(); - } else { - self.dispatch(token, event.readiness()); + if token != TOKEN_WAKEUP { + self.dispatch(token, Ready::from_mio(event)); } } @@ -168,7 +159,7 @@ impl Driver { Ok(()) } - fn dispatch(&mut self, token: mio::Token, ready: mio::Ready) { + fn dispatch(&mut self, token: mio::Token, ready: Ready) { let addr = slab::Address::from_usize(ADDRESS.unpack(token.0)); let io = match self.resources.get(addr) { @@ -176,10 +167,9 @@ impl Driver { None => return, }; - let set = io.set_readiness(Some(token.0), Tick::Set(self.tick), |curr| { - curr | ready.as_usize() - }); - if set.is_err() { + let res = io.set_readiness(Some(token.0), Tick::Set(self.tick), |curr| curr | ready); + + if res.is_err() { // token no longer valid! return; } @@ -194,7 +184,7 @@ impl Drop for Driver { // If a task is waiting on the I/O resource, notify it. The task // will then attempt to use the I/O resource and fail due to the // driver being shutdown. - io.wake(mio::Ready::all()); + io.wake(Ready::ALL); }) } } @@ -250,7 +240,7 @@ impl Handle { /// return immediately. fn wakeup(&self) { if let Some(inner) = self.inner() { - inner.wakeup.set_readiness(mio::Ready::readable()).unwrap(); + inner.waker.wake().expect("failed to wake I/O driver"); } } @@ -279,8 +269,8 @@ impl Inner { /// The registration token is returned. pub(super) fn add_source( &self, - source: &dyn Evented, - ready: mio::Ready, + source: &mut impl mio::event::Source, + interest: mio::Interest, ) -> io::Result> { let (address, shared) = self.io_dispatch.allocate().ok_or_else(|| { io::Error::new( @@ -291,26 +281,23 @@ impl Inner { let token = GENERATION.pack(shared.generation(), ADDRESS.pack(address.as_usize(), 0)); - self.io - .register(source, mio::Token(token), ready, mio::PollOpt::edge())?; + self.registry + .register(source, mio::Token(token), interest)?; Ok(shared) } /// Deregisters an I/O resource from the reactor. - pub(super) fn deregister_source(&self, source: &dyn Evented) -> io::Result<()> { - self.io.deregister(source) + pub(super) fn deregister_source(&self, source: &mut impl mio::event::Source) -> io::Result<()> { + self.registry.deregister(source) } } impl Direction { - pub(super) fn mask(self) -> mio::Ready { + pub(super) fn mask(self) -> Ready { match self { - Direction::Read => { - // Everything except writable is signaled through read. - mio::Ready::all() - mio::Ready::writable() - } - Direction::Write => mio::Ready::writable() | platform::hup() | platform::error(), + Direction::Read => Ready::READABLE | Ready::READ_CLOSED, + Direction::Write => Ready::WRITABLE | Ready::WRITE_CLOSED, } } } diff --git a/tokio/src/io/driver/ready.rs b/tokio/src/io/driver/ready.rs new file mode 100644 index 00000000000..8b556e94900 --- /dev/null +++ b/tokio/src/io/driver/ready.rs @@ -0,0 +1,187 @@ +use std::fmt; +use std::ops; + +const READABLE: usize = 0b0_01; +const WRITABLE: usize = 0b0_10; +const READ_CLOSED: usize = 0b0_0100; +const WRITE_CLOSED: usize = 0b0_1000; + +/// A set of readiness event kinds. +/// +/// `Ready` is set of operation descriptors indicating which kind of an +/// operation is ready to be performed. +/// +/// This struct only represents portable event kinds. Portable events are +/// events that can be raised on any platform while guaranteeing no false +/// positives. +#[derive(Clone, Copy, PartialEq, PartialOrd)] +pub(crate) struct Ready(usize); + +impl Ready { + /// Returns the empty `Ready` set. + pub(crate) const EMPTY: Ready = Ready(0); + + /// Returns a `Ready` representing readable readiness. + pub(crate) const READABLE: Ready = Ready(READABLE); + + /// Returns a `Ready` representing writable readiness. + pub(crate) const WRITABLE: Ready = Ready(WRITABLE); + + /// Returns a `Ready` representing read closed readiness. + pub(crate) const READ_CLOSED: Ready = Ready(READ_CLOSED); + + /// Returns a `Ready` representing write closed readiness. + pub(crate) const WRITE_CLOSED: Ready = Ready(WRITE_CLOSED); + + /// Returns a `Ready` representing readiness for all operations. + pub(crate) const ALL: Ready = Ready(READABLE | WRITABLE | READ_CLOSED | WRITE_CLOSED); + + pub(crate) fn from_mio(event: &mio::event::Event) -> Ready { + let mut ready = Ready::EMPTY; + + if event.is_readable() { + ready |= Ready::READABLE; + } + + if event.is_writable() { + ready |= Ready::WRITABLE; + } + + if event.is_read_closed() { + ready |= Ready::READ_CLOSED; + } + + if event.is_write_closed() { + ready |= Ready::WRITE_CLOSED; + } + + ready + } + + /// Returns true if `Ready` is the empty set + pub(crate) fn is_empty(self) -> bool { + self == Ready::EMPTY + } + + /// Returns true if the value includes readable readiness + pub(crate) fn is_readable(self) -> bool { + self.contains(Ready::READABLE) || self.is_read_closed() + } + + /// Returns true if the value includes writable readiness + pub(crate) fn is_writable(self) -> bool { + self.contains(Ready::WRITABLE) || self.is_write_closed() + } + + /// Returns true if the value includes read closed readiness + pub(crate) fn is_read_closed(self) -> bool { + self.contains(Ready::READ_CLOSED) + } + + /// Returns true if the value includes write closed readiness + pub(crate) fn is_write_closed(self) -> bool { + self.contains(Ready::WRITE_CLOSED) + } + + /// Returns true if `self` is a superset of `other`. + /// + /// `other` may represent more than one readiness operations, in which case + /// the function only returns true if `self` contains all readiness + /// specified in `other`. + pub(crate) fn contains>(self, other: T) -> bool { + let other = other.into(); + (self & other) == other + } + + /// Create a `Ready` instance using the given `usize` representation. + /// + /// The `usize` representation must have been obtained from a call to + /// `Readiness::as_usize`. + /// + /// This function is mainly provided to allow the caller to get a + /// readiness value from an `AtomicUsize`. + pub(crate) fn from_usize(val: usize) -> Ready { + Ready(val & Ready::ALL.as_usize()) + } + + /// Returns a `usize` representation of the `Ready` value. + /// + /// This function is mainly provided to allow the caller to store a + /// readiness value in an `AtomicUsize`. + pub(crate) fn as_usize(self) -> usize { + self.0 + } +} + +cfg_io_readiness! { + impl Ready { + pub(crate) fn from_interest(interest: mio::Interest) -> Ready { + let mut ready = Ready::EMPTY; + + if interest.is_readable() { + ready |= Ready::READABLE; + ready |= Ready::READ_CLOSED; + } + + if interest.is_writable() { + ready |= Ready::WRITABLE; + ready |= Ready::WRITE_CLOSED; + } + + ready + } + + pub(crate) fn intersection(self, interest: mio::Interest) -> Ready { + Ready(self.0 & Ready::from_interest(interest).0) + } + + pub(crate) fn satisfies(self, interest: mio::Interest) -> bool { + self.0 & Ready::from_interest(interest).0 != 0 + } + } +} + +impl> ops::BitOr for Ready { + type Output = Ready; + + #[inline] + fn bitor(self, other: T) -> Ready { + Ready(self.0 | other.into().0) + } +} + +impl> ops::BitOrAssign for Ready { + #[inline] + fn bitor_assign(&mut self, other: T) { + self.0 |= other.into().0; + } +} + +impl> ops::BitAnd for Ready { + type Output = Ready; + + #[inline] + fn bitand(self, other: T) -> Ready { + Ready(self.0 & other.into().0) + } +} + +impl> ops::Sub for Ready { + type Output = Ready; + + #[inline] + fn sub(self, other: T) -> Ready { + Ready(self.0 & !other.into().0) + } +} + +impl fmt::Debug for Ready { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Ready") + .field("is_readable", &self.is_readable()) + .field("is_writable", &self.is_writable()) + .field("is_read_closed", &self.is_read_closed()) + .field("is_write_closed", &self.is_write_closed()) + .finish() + } +} diff --git a/tokio/src/io/driver/scheduled_io.rs b/tokio/src/io/driver/scheduled_io.rs index f63fd7ab3d3..bdf217987d2 100644 --- a/tokio/src/io/driver/scheduled_io.rs +++ b/tokio/src/io/driver/scheduled_io.rs @@ -1,4 +1,4 @@ -use super::{platform, Direction, ReadyEvent, Tick}; +use super::{Direction, Ready, ReadyEvent, Tick}; use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::Mutex; use crate::util::bit; @@ -52,7 +52,7 @@ cfg_io_readiness! { waker: Option, /// The interest this waiter is waiting on - interest: mio::Ready, + interest: mio::Interest, is_ready: bool, @@ -141,8 +141,8 @@ impl ScheduledIo { &self, token: Option, tick: Tick, - f: impl Fn(usize) -> usize, - ) -> Result { + f: impl Fn(Ready) -> Ready, + ) -> Result<(), ()> { let mut current = self.readiness.load(Acquire); loop { @@ -158,52 +158,46 @@ impl ScheduledIo { // Mask out the tick/generation bits so that the modifying // function doesn't see them. - let current_readiness = current & mio::Ready::all().as_usize(); - let mut new = f(current_readiness); + let current_readiness = Ready::from_usize(current); + let new = f(current_readiness); - debug_assert!( - new <= READINESS.max_value(), - "new readiness value would overwrite tick/generation bits!" - ); - - match tick { - Tick::Set(t) => { - new = TICK.pack(t as usize, new); - } + let packed = match tick { + Tick::Set(t) => TICK.pack(t as usize, new.as_usize()), Tick::Clear(t) => { if TICK.unpack(current) as u8 != t { // Trying to clear readiness with an old event! return Err(()); } - new = TICK.pack(t as usize, new); + + TICK.pack(t as usize, new.as_usize()) } - } + }; - new = GENERATION.pack(current_generation, new); + let next = GENERATION.pack(current_generation, packed); match self .readiness - .compare_exchange(current, new, AcqRel, Acquire) + .compare_exchange(current, next, AcqRel, Acquire) { - Ok(_) => return Ok(current), + Ok(_) => return Ok(()), // we lost the race, retry! Err(actual) => current = actual, } } } - pub(super) fn wake(&self, ready: mio::Ready) { + pub(super) fn wake(&self, ready: Ready) { let mut waiters = self.waiters.lock(); // check for AsyncRead slot - if !(ready & (!mio::Ready::writable())).is_empty() { + if ready.is_readable() { if let Some(waker) = waiters.reader.take() { waker.wake(); } } // check for AsyncWrite slot - if ready.is_writable() || platform::is_hup(ready) || platform::is_error(ready) { + if ready.is_writable() { if let Some(waker) = waiters.writer.take() { waker.wake(); } @@ -212,10 +206,7 @@ impl ScheduledIo { #[cfg(any(feature = "udp", feature = "uds"))] { // check list of waiters - for waiter in waiters - .list - .drain_filter(|w| !(w.interest & ready).is_empty()) - { + for waiter in waiters.list.drain_filter(|w| ready.satisfies(w.interest)) { let waiter = unsafe { &mut *waiter.as_ptr() }; if let Some(waker) = waiter.waker.take() { waiter.is_ready = true; @@ -237,7 +228,7 @@ impl ScheduledIo { ) -> Poll { let curr = self.readiness.load(Acquire); - let ready = direction.mask() & mio::Ready::from_usize(READINESS.unpack(curr)); + let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr)); if ready.is_empty() { // Update the task info @@ -251,50 +242,36 @@ impl ScheduledIo { // Try again, in case the readiness was changed while we were // taking the waiters lock let curr = self.readiness.load(Acquire); - let ready = direction.mask() & mio::Ready::from_usize(READINESS.unpack(curr)); + let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr)); if ready.is_empty() { Poll::Pending } else { Poll::Ready(ReadyEvent { tick: TICK.unpack(curr) as u8, - readiness: ready, + ready, }) } } else { Poll::Ready(ReadyEvent { tick: TICK.unpack(curr) as u8, - readiness: ready, + ready, }) } } pub(crate) fn clear_readiness(&self, event: ReadyEvent) { - // This consumes the current readiness state **except** for HUP and - // error. HUP and error are excluded because a) they are final states - // and never transition out and b) both the read AND the write - // directions need to be able to obvserve these states. - // - // # Platform-specific behavior - // - // HUP and error readiness are platform-specific. On epoll platforms, - // HUP has specific conditions that must be met by both peers of a - // connection in order to be triggered. - // - // On epoll platforms, `EPOLLERR` is signaled through - // `UnixReady::error()` and is important to be observable by both read - // AND write. A specific case that `EPOLLERR` occurs is when the read - // end of a pipe is closed. When this occurs, a peer blocked by - // writing to the pipe should be notified. - let mask_no_hup = (event.readiness - platform::hup() - platform::error()).as_usize(); + // This consumes the current readiness state **except** for closed + // states. Closed states are excluded because they are final states. + let mask_no_closed = event.ready - Ready::READ_CLOSED - Ready::WRITE_CLOSED; // result isn't important - let _ = self.set_readiness(None, Tick::Clear(event.tick), |curr| curr & (!mask_no_hup)); + let _ = self.set_readiness(None, Tick::Clear(event.tick), |curr| curr - mask_no_closed); } } impl Drop for ScheduledIo { fn drop(&mut self) { - self.wake(mio::Ready::all()); + self.wake(Ready::ALL); } } @@ -304,7 +281,7 @@ unsafe impl Sync for ScheduledIo {} cfg_io_readiness! { impl ScheduledIo { /// An async version of `poll_readiness` which uses a linked list of wakers - pub(crate) async fn readiness(&self, interest: mio::Ready) -> ReadyEvent { + pub(crate) async fn readiness(&self, interest: mio::Interest) -> ReadyEvent { self.readiness_fut(interest).await } @@ -312,7 +289,7 @@ cfg_io_readiness! { // we are borrowing the `UnsafeCell` possibly over await boundaries. // // Go figure. - fn readiness_fut(&self, interest: mio::Ready) -> Readiness<'_> { + fn readiness_fut(&self, interest: mio::Interest) -> Readiness<'_> { Readiness { scheduled_io: self, state: State::Init, @@ -362,29 +339,31 @@ cfg_io_readiness! { State::Init => { // Optimistically check existing readiness let curr = scheduled_io.readiness.load(SeqCst); - let readiness = mio::Ready::from_usize(READINESS.unpack(curr)); + let ready = Ready::from_usize(READINESS.unpack(curr)); // Safety: `waiter.interest` never changes let interest = unsafe { (*waiter.get()).interest }; + let ready = ready.intersection(interest); - if readiness.contains(interest) { + if !ready.is_empty() { // Currently ready! let tick = TICK.unpack(curr) as u8; *state = State::Done; - return Poll::Ready(ReadyEvent { readiness: interest, tick }); + return Poll::Ready(ReadyEvent { ready, tick }); } // Wasn't ready, take the lock (and check again while locked). let mut waiters = scheduled_io.waiters.lock(); let curr = scheduled_io.readiness.load(SeqCst); - let readiness = mio::Ready::from_usize(READINESS.unpack(curr)); + let ready = Ready::from_usize(READINESS.unpack(curr)); + let ready = ready.intersection(interest); - if readiness.contains(interest) { + if !ready.is_empty() { // Currently ready! let tick = TICK.unpack(curr) as u8; *state = State::Done; - return Poll::Ready(ReadyEvent { readiness, tick }); + return Poll::Ready(ReadyEvent { ready, tick }); } // Not ready even after locked, insert into list... @@ -440,7 +419,7 @@ cfg_io_readiness! { return Poll::Ready(ReadyEvent { tick, - readiness: w.interest, + ready: Ready::from_interest(w.interest), }); } } diff --git a/tokio/src/io/poll_evented.rs b/tokio/src/io/poll_evented.rs index 2c943ea4edb..4457195fd76 100644 --- a/tokio/src/io/poll_evented.rs +++ b/tokio/src/io/poll_evented.rs @@ -2,7 +2,7 @@ use crate::io::driver::{Direction, Handle, ReadyEvent}; use crate::io::registration::Registration; use crate::io::{AsyncRead, AsyncWrite, ReadBuf}; -use mio::event::Evented; +use mio::event::Source; use std::fmt; use std::io::{self, Read, Write}; use std::marker::Unpin; @@ -69,7 +69,7 @@ cfg_io_driver! { /// [`clear_write_ready`]: method@Self::clear_write_ready /// [`poll_read_ready`]: method@Self::poll_read_ready /// [`poll_write_ready`]: method@Self::poll_write_ready - pub(crate) struct PollEvented { + pub(crate) struct PollEvented { io: Option, registration: Registration, } @@ -77,10 +77,7 @@ cfg_io_driver! { // ===== impl PollEvented ===== -impl PollEvented -where - E: Evented, -{ +impl PollEvented { /// Creates a new `PollEvented` associated with the default reactor. /// /// # Panics @@ -92,26 +89,15 @@ where /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. #[cfg_attr(feature = "signal", allow(unused))] pub(crate) fn new(io: E) -> io::Result { - PollEvented::new_with_ready(io, mio::Ready::all()) + PollEvented::new_with_interest(io, mio::Interest::READABLE | mio::Interest::WRITABLE) } - /// Creates a new `PollEvented` associated with the default reactor, for specific `mio::Ready` - /// state. `new_with_ready` should be used over `new` when you need control over the readiness + /// Creates a new `PollEvented` associated with the default reactor, for specific `mio::Interest` + /// state. `new_with_interest` should be used over `new` when you need control over the readiness /// state, such as when a file descriptor only allows reads. This does not add `hup` or `error` /// so if you are interested in those states, you will need to add them to the readiness state /// passed to this function. /// - /// An example to listen to read only - /// - /// ```rust - /// ##[cfg(unix)] - /// mio::Ready::from_usize( - /// mio::Ready::readable().as_usize() - /// | mio::unix::UnixReady::error().as_usize() - /// | mio::unix::UnixReady::hup().as_usize() - /// ); - /// ``` - /// /// # Panics /// /// This function panics if thread-local runtime is not set. @@ -120,16 +106,16 @@ where /// from a future driven by a tokio runtime, otherwise runtime can be set /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. #[cfg_attr(feature = "signal", allow(unused))] - pub(crate) fn new_with_ready(io: E, ready: mio::Ready) -> io::Result { - Self::new_with_ready_and_handle(io, ready, Handle::current()) + pub(crate) fn new_with_interest(io: E, interest: mio::Interest) -> io::Result { + Self::new_with_interest_and_handle(io, interest, Handle::current()) } - pub(crate) fn new_with_ready_and_handle( - io: E, - ready: mio::Ready, + pub(crate) fn new_with_interest_and_handle( + mut io: E, + interest: mio::Interest, handle: Handle, ) -> io::Result { - let registration = Registration::new_with_ready_and_handle(&io, ready, handle)?; + let registration = Registration::new_with_interest_and_handle(&mut io, interest, handle)?; Ok(Self { io: Some(io), registration, @@ -155,21 +141,6 @@ where self.io.as_mut().unwrap() } - /// Consumes self, returning the inner I/O object - /// - /// This function will deregister the I/O resource from the reactor before - /// returning. If the deregistration operation fails, an error is returned. - /// - /// Note that deregistering does not guarantee that the I/O resource can be - /// registered with a different reactor. Some I/O resource types can only be - /// associated with a single reactor instance for their lifetime. - #[cfg(any(feature = "tcp", feature = "udp", feature = "uds"))] - pub(crate) fn into_inner(mut self) -> io::Result { - let io = self.io.take().unwrap(); - self.registration.deregister(&io)?; - Ok(io) - } - pub(crate) fn clear_readiness(&self, event: ReadyEvent) { self.registration.clear_readiness(event); } @@ -234,15 +205,12 @@ where } cfg_io_readiness! { - impl PollEvented - where - E: Evented, - { - pub(crate) async fn readiness(&self, interest: mio::Ready) -> io::Result { + impl PollEvented { + pub(crate) async fn readiness(&self, interest: mio::Interest) -> io::Result { self.registration.readiness(interest).await } - pub(crate) async fn async_io(&self, interest: mio::Ready, mut op: F) -> io::Result + pub(crate) async fn async_io(&self, interest: mio::Interest, mut op: F) -> io::Result where F: FnMut(&E) -> io::Result, { @@ -262,10 +230,7 @@ cfg_io_readiness! { // ===== Read / Write impls ===== -impl AsyncRead for PollEvented -where - E: Evented + Read + Unpin, -{ +impl AsyncRead for PollEvented { fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -290,10 +255,7 @@ where } } -impl AsyncWrite for PollEvented -where - E: Evented + Write + Unpin, -{ +impl AsyncWrite for PollEvented { fn poll_write( mut self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -340,17 +302,17 @@ fn is_wouldblock(r: &io::Result) -> bool { } } -impl fmt::Debug for PollEvented { +impl fmt::Debug for PollEvented { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("PollEvented").field("io", &self.io).finish() } } -impl Drop for PollEvented { +impl Drop for PollEvented { fn drop(&mut self) { - if let Some(io) = self.io.take() { + if let Some(mut io) = self.io.take() { // Ignore errors - let _ = self.registration.deregister(&io); + let _ = self.registration.deregister(&mut io); } } } diff --git a/tokio/src/io/registration.rs b/tokio/src/io/registration.rs index e4ec096f297..03221b60b1e 100644 --- a/tokio/src/io/registration.rs +++ b/tokio/src/io/registration.rs @@ -1,7 +1,7 @@ use crate::io::driver::{Direction, Handle, ReadyEvent, ScheduledIo}; use crate::util::slab; -use mio::{self, Evented}; +use mio::event::Source; use std::io; use std::task::{Context, Poll}; @@ -53,37 +53,23 @@ unsafe impl Sync for Registration {} // ===== impl Registration ===== impl Registration { - /// Registers the I/O resource with the default reactor, for a specific `mio::Ready` state. - /// `new_with_ready` should be used over `new` when you need control over the readiness state, + /// Registers the I/O resource with the default reactor, for a specific `mio::Interest`. + /// `new_with_interest` should be used over `new` when you need control over the readiness state, /// such as when a file descriptor only allows reads. This does not add `hup` or `error` so if /// you are interested in those states, you will need to add them to the readiness state passed /// to this function. /// - /// An example to listen to read only - /// - /// ```rust - /// ##[cfg(unix)] - /// mio::Ready::from_usize( - /// mio::Ready::readable().as_usize() - /// | mio::unix::UnixReady::error().as_usize() - /// | mio::unix::UnixReady::hup().as_usize() - /// ); - /// ``` - /// /// # Return /// /// - `Ok` if the registration happened successfully /// - `Err` if an error was encountered during registration - pub(crate) fn new_with_ready_and_handle( - io: &T, - ready: mio::Ready, + pub(crate) fn new_with_interest_and_handle( + io: &mut impl Source, + interest: mio::Interest, handle: Handle, - ) -> io::Result - where - T: Evented, - { + ) -> io::Result { let shared = if let Some(inner) = handle.inner() { - inner.add_source(io, ready)? + inner.add_source(io, interest)? } else { return Err(io::Error::new( io::ErrorKind::Other, @@ -110,10 +96,7 @@ impl Registration { /// no longer result in notifications getting sent for this registration. /// /// `Err` is returned if an error is encountered. - pub(super) fn deregister(&mut self, io: &T) -> io::Result<()> - where - T: Evented, - { + pub(super) fn deregister(&mut self, io: &mut impl Source) -> io::Result<()> { let inner = match self.handle.inner() { Some(inner) => inner, None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")), @@ -148,7 +131,7 @@ impl Registration { cfg_io_readiness! { impl Registration { - pub(super) async fn readiness(&self, interest: mio::Ready) -> io::Result { + pub(super) async fn readiness(&self, interest: mio::Interest) -> io::Result { // TODO: does this need to return a `Result`? Ok(self.shared.readiness(interest).await) } diff --git a/tokio/src/net/tcp/listener.rs b/tokio/src/net/tcp/listener.rs index ef0a87c011c..b8f61a63718 100644 --- a/tokio/src/net/tcp/listener.rs +++ b/tokio/src/net/tcp/listener.rs @@ -150,7 +150,7 @@ impl TcpListener { } fn bind_addr(addr: SocketAddr) -> io::Result { - let listener = mio::net::TcpListener::bind(&addr)?; + let listener = mio::net::TcpListener::bind(addr)?; TcpListener::new(listener) } @@ -193,23 +193,14 @@ impl TcpListener { &mut self, cx: &mut Context<'_>, ) -> Poll> { - let (io, addr) = ready!(self.poll_accept_std(cx))?; - - let io = mio::net::TcpStream::from_stream(io)?; - let io = TcpStream::new(io)?; - - Poll::Ready(Ok((io, addr))) - } - - fn poll_accept_std( - &mut self, - cx: &mut Context<'_>, - ) -> Poll> { loop { let ev = ready!(self.io.poll_read_ready(cx))?; - match self.io.get_ref().accept_std() { - Ok(pair) => return Poll::Ready(Ok(pair)), + match self.io.get_ref().accept() { + Ok((io, addr)) => { + let io = TcpStream::new(io)?; + return Poll::Ready(Ok((io, addr))); + } Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { self.io.clear_readiness(ev); } @@ -265,7 +256,7 @@ impl TcpListener { /// from a future driven by a tokio runtime, otherwise runtime can be set /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. pub fn from_std(listener: net::TcpListener) -> io::Result { - let io = mio::net::TcpListener::from_std(listener)?; + let io = mio::net::TcpListener::from_std(listener); let io = PollEvented::new(io)?; Ok(TcpListener { io }) } @@ -408,15 +399,6 @@ impl crate::stream::Stream for TcpListener { } } -impl TryFrom for mio::net::TcpListener { - type Error = io::Error; - - /// Consumes value, returning the mio I/O object. - fn try_from(value: TcpListener) -> Result { - value.io.into_inner() - } -} - impl TryFrom for TcpListener { type Error = io::Error; diff --git a/tokio/src/net/tcp/stream.rs b/tokio/src/net/tcp/stream.rs index f35f8b0cb6a..50536e9826c 100644 --- a/tokio/src/net/tcp/stream.rs +++ b/tokio/src/net/tcp/stream.rs @@ -7,10 +7,9 @@ use crate::net::{to_socket_addrs, ToSocketAddrs}; use std::convert::TryFrom; use std::fmt; use std::io::{self, Read, Write}; -use std::net::{self, Shutdown, SocketAddr}; +use std::net::{Shutdown, SocketAddr}; use std::pin::Pin; use std::task::{Context, Poll}; -use std::time::Duration; cfg_tcp! { /// A TCP stream between a local and a remote socket. @@ -137,7 +136,7 @@ impl TcpStream { /// Establishes a connection to the specified `addr`. async fn connect_addr(addr: SocketAddr) -> io::Result { - let sys = mio::net::TcpStream::connect(&addr)?; + let sys = mio::net::TcpStream::connect(addr)?; let stream = TcpStream::new(sys)?; // Once we've connected, wait for the stream to be writable as @@ -186,40 +185,12 @@ impl TcpStream { /// The runtime is usually set implicitly when this function is called /// from a future driven by a tokio runtime, otherwise runtime can be set /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. - pub fn from_std(stream: net::TcpStream) -> io::Result { - let io = mio::net::TcpStream::from_stream(stream)?; + pub fn from_std(stream: std::net::TcpStream) -> io::Result { + let io = mio::net::TcpStream::from_std(stream); let io = PollEvented::new(io)?; Ok(TcpStream { io }) } - /// Connects `TcpStream` asynchronously that may be built with a net2 `TcpBuilder`. - /// - /// This function is intended to be replaced with some sort of TcpSocket builder. - /// See https://github.com/tokio-rs/tokio/issues/2902 - /// - /// Despite being hidden, this function is part of the public API of Tokio v0.3, but - /// will be removed in v1.0 in favor of a better design. - #[doc(hidden)] - pub async fn connect_std(stream: net::TcpStream, addr: &SocketAddr) -> io::Result { - let io = mio::net::TcpStream::connect_stream(stream, addr)?; - let io = PollEvented::new(io)?; - let stream = TcpStream { io }; - - // Once we've connected, wait for the stream to be writable as - // that's when the actual connection has been initiated. Once we're - // writable we check for `take_socket_error` to see if the connect - // actually hit an error or not. - // - // If all that succeeded then we ship everything on up. - poll_fn(|cx| stream.io.poll_write_ready(cx)).await?; - - if let Some(e) = stream.io.get_ref().take_error()? { - return Err(e); - } - - Ok(stream) - } - /// Returns the local address that this stream is bound to. /// /// # Examples @@ -429,144 +400,6 @@ impl TcpStream { self.io.get_ref().set_nodelay(nodelay) } - /// Gets the value of the `SO_RCVBUF` option on this socket. - /// - /// For more information about this option, see [`set_recv_buffer_size`]. - /// - /// [`set_recv_buffer_size`]: TcpStream::set_recv_buffer_size - /// - /// # Examples - /// - /// ```no_run - /// use tokio::net::TcpStream; - /// - /// # async fn dox() -> Result<(), Box> { - /// let stream = TcpStream::connect("127.0.0.1:8080").await?; - /// - /// println!("{:?}", stream.recv_buffer_size()?); - /// # Ok(()) - /// # } - /// ``` - pub fn recv_buffer_size(&self) -> io::Result { - self.io.get_ref().recv_buffer_size() - } - - /// Sets the value of the `SO_RCVBUF` option on this socket. - /// - /// Changes the size of the operating system's receive buffer associated - /// with the socket. - /// - /// # Examples - /// - /// ```no_run - /// use tokio::net::TcpStream; - /// - /// # async fn dox() -> Result<(), Box> { - /// let stream = TcpStream::connect("127.0.0.1:8080").await?; - /// - /// stream.set_recv_buffer_size(100)?; - /// # Ok(()) - /// # } - /// ``` - pub fn set_recv_buffer_size(&self, size: usize) -> io::Result<()> { - self.io.get_ref().set_recv_buffer_size(size) - } - - /// Gets the value of the `SO_SNDBUF` option on this socket. - /// - /// For more information about this option, see [`set_send_buffer_size`]. - /// - /// [`set_send_buffer_size`]: TcpStream::set_send_buffer_size - /// - /// # Examples - /// - /// ```no_run - /// use tokio::net::TcpStream; - /// - /// # async fn dox() -> Result<(), Box> { - /// let stream = TcpStream::connect("127.0.0.1:8080").await?; - /// - /// println!("{:?}", stream.send_buffer_size()?); - /// # Ok(()) - /// # } - /// ``` - pub fn send_buffer_size(&self) -> io::Result { - self.io.get_ref().send_buffer_size() - } - - /// Sets the value of the `SO_SNDBUF` option on this socket. - /// - /// Changes the size of the operating system's send buffer associated with - /// the socket. - /// - /// # Examples - /// - /// ```no_run - /// use tokio::net::TcpStream; - /// - /// # async fn dox() -> Result<(), Box> { - /// let stream = TcpStream::connect("127.0.0.1:8080").await?; - /// - /// stream.set_send_buffer_size(100)?; - /// # Ok(()) - /// # } - /// ``` - pub fn set_send_buffer_size(&self, size: usize) -> io::Result<()> { - self.io.get_ref().set_send_buffer_size(size) - } - - /// Returns whether keepalive messages are enabled on this socket, and if so - /// the duration of time between them. - /// - /// For more information about this option, see [`set_keepalive`]. - /// - /// [`set_keepalive`]: TcpStream::set_keepalive - /// - /// # Examples - /// - /// ```no_run - /// use tokio::net::TcpStream; - /// - /// # async fn dox() -> Result<(), Box> { - /// let stream = TcpStream::connect("127.0.0.1:8080").await?; - /// - /// println!("{:?}", stream.keepalive()?); - /// # Ok(()) - /// # } - /// ``` - pub fn keepalive(&self) -> io::Result> { - self.io.get_ref().keepalive() - } - - /// Sets whether keepalive messages are enabled to be sent on this socket. - /// - /// On Unix, this option will set the `SO_KEEPALIVE` as well as the - /// `TCP_KEEPALIVE` or `TCP_KEEPIDLE` option (depending on your platform). - /// On Windows, this will set the `SIO_KEEPALIVE_VALS` option. - /// - /// If `None` is specified then keepalive messages are disabled, otherwise - /// the duration specified will be the time to remain idle before sending a - /// TCP keepalive probe. - /// - /// Some platforms specify this value in seconds, so sub-second - /// specifications may be omitted. - /// - /// # Examples - /// - /// ```no_run - /// use tokio::net::TcpStream; - /// - /// # async fn dox() -> Result<(), Box> { - /// let stream = TcpStream::connect("127.0.0.1:8080").await?; - /// - /// stream.set_keepalive(None)?; - /// # Ok(()) - /// # } - /// ``` - pub fn set_keepalive(&self, keepalive: Option) -> io::Result<()> { - self.io.get_ref().set_keepalive(keepalive) - } - /// Gets the value of the `IP_TTL` option for this socket. /// /// For more information about this option, see [`set_ttl`]. @@ -610,57 +443,6 @@ impl TcpStream { self.io.get_ref().set_ttl(ttl) } - /// Reads the linger duration for this socket by getting the `SO_LINGER` - /// option. - /// - /// For more information about this option, see [`set_linger`]. - /// - /// [`set_linger`]: TcpStream::set_linger - /// - /// # Examples - /// - /// ```no_run - /// use tokio::net::TcpStream; - /// - /// # async fn dox() -> Result<(), Box> { - /// let stream = TcpStream::connect("127.0.0.1:8080").await?; - /// - /// println!("{:?}", stream.linger()?); - /// # Ok(()) - /// # } - /// ``` - pub fn linger(&self) -> io::Result> { - self.io.get_ref().linger() - } - - /// Sets the linger duration of this socket by setting the `SO_LINGER` - /// option. - /// - /// This option controls the action taken when a stream has unsent messages - /// and the stream is closed. If `SO_LINGER` is set, the system - /// shall block the process until it can transmit the data or until the - /// time expires. - /// - /// If `SO_LINGER` is not specified, and the stream is closed, the system - /// handles the call in a way that allows the process to continue as quickly - /// as possible. - /// - /// # Examples - /// - /// ```no_run - /// use tokio::net::TcpStream; - /// - /// # async fn dox() -> Result<(), Box> { - /// let stream = TcpStream::connect("127.0.0.1:8080").await?; - /// - /// stream.set_linger(None)?; - /// # Ok(()) - /// # } - /// ``` - pub fn set_linger(&self, dur: Option) -> io::Result<()> { - self.io.get_ref().set_linger(dur) - } - // These lifetime markers also appear in the generated documentation, and make // it more clear that this is a *borrowed* split. #[allow(clippy::needless_lifetimes)] @@ -749,23 +531,14 @@ impl TcpStream { } } -impl TryFrom for mio::net::TcpStream { - type Error = io::Error; - - /// Consumes value, returning the mio I/O object. - fn try_from(value: TcpStream) -> Result { - value.io.into_inner() - } -} - -impl TryFrom for TcpStream { +impl TryFrom for TcpStream { type Error = io::Error; /// Consumes stream, returning the tokio I/O object. /// /// This is equivalent to /// [`TcpStream::from_std(stream)`](TcpStream::from_std). - fn try_from(stream: net::TcpStream) -> Result { + fn try_from(stream: std::net::TcpStream) -> Result { Self::from_std(stream) } } diff --git a/tokio/src/net/udp/socket.rs b/tokio/src/net/udp/socket.rs index 1b9431849d4..3a4c7c22d5b 100644 --- a/tokio/src/net/udp/socket.rs +++ b/tokio/src/net/udp/socket.rs @@ -36,7 +36,7 @@ impl UdpSocket { } fn bind_addr(addr: SocketAddr) -> io::Result { - let sys = mio::net::UdpSocket::bind(&addr)?; + let sys = mio::net::UdpSocket::bind(addr)?; UdpSocket::new(sys) } @@ -63,7 +63,7 @@ impl UdpSocket { /// from a future driven by a tokio runtime, otherwise runtime can be set /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. pub fn from_std(socket: net::UdpSocket) -> io::Result { - let io = mio::net::UdpSocket::from_socket(socket)?; + let io = mio::net::UdpSocket::from_std(socket); UdpSocket::new(io) } @@ -103,7 +103,7 @@ impl UdpSocket { /// [`connect`]: method@Self::connect pub async fn send(&self, buf: &[u8]) -> io::Result { self.io - .async_io(mio::Ready::writable(), |sock| sock.send(buf)) + .async_io(mio::Interest::WRITABLE, |sock| sock.send(buf)) .await } @@ -135,7 +135,7 @@ impl UdpSocket { /// [`connect`]: method@Self::connect pub async fn recv(&self, buf: &mut [u8]) -> io::Result { self.io - .async_io(mio::Ready::readable(), |sock| sock.recv(buf)) + .async_io(mio::Interest::READABLE, |sock| sock.recv(buf)) .await } @@ -148,7 +148,7 @@ impl UdpSocket { let mut addrs = to_socket_addrs(target).await?; match addrs.next() { - Some(target) => self.send_to_addr(buf, &target).await, + Some(target) => self.send_to_addr(buf, target).await, None => Err(io::Error::new( io::ErrorKind::InvalidInput, "no addresses to send data to", @@ -168,12 +168,12 @@ impl UdpSocket { /// /// [`ErrorKind::WouldBlock`]: std::io::ErrorKind::WouldBlock pub fn try_send_to(&self, buf: &[u8], target: SocketAddr) -> io::Result { - self.io.get_ref().send_to(buf, &target) + self.io.get_ref().send_to(buf, target) } - async fn send_to_addr(&self, buf: &[u8], target: &SocketAddr) -> io::Result { + async fn send_to_addr(&self, buf: &[u8], target: SocketAddr) -> io::Result { self.io - .async_io(mio::Ready::writable(), |sock| sock.send_to(buf, target)) + .async_io(mio::Interest::WRITABLE, |sock| sock.send_to(buf, target)) .await } @@ -185,7 +185,7 @@ impl UdpSocket { /// buffer, excess bytes may be discarded. pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { self.io - .async_io(mio::Ready::readable(), |sock| sock.recv_from(buf)) + .async_io(mio::Interest::READABLE, |sock| sock.recv_from(buf)) .await } @@ -324,30 +324,21 @@ impl UdpSocket { } } -impl TryFrom for mio::net::UdpSocket { - type Error = io::Error; - - /// Consumes value, returning the mio I/O object. - fn try_from(value: UdpSocket) -> Result { - value.io.into_inner() - } -} - -impl TryFrom for UdpSocket { +impl TryFrom for UdpSocket { type Error = io::Error; /// Consumes stream, returning the tokio I/O object. /// /// This is equivalent to /// [`UdpSocket::from_std(stream)`](UdpSocket::from_std). - fn try_from(stream: net::UdpSocket) -> Result { + fn try_from(stream: std::net::UdpSocket) -> Result { Self::from_std(stream) } } impl fmt::Debug for UdpSocket { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.io.fmt(f) + self.io.get_ref().fmt(f) } } diff --git a/tokio/src/net/unix/datagram/socket.rs b/tokio/src/net/unix/datagram/socket.rs index 78baf279194..20c6c2272ef 100644 --- a/tokio/src/net/unix/datagram/socket.rs +++ b/tokio/src/net/unix/datagram/socket.rs @@ -1,11 +1,12 @@ use crate::io::PollEvented; +use crate::net::unix::SocketAddr; use std::convert::TryFrom; use std::fmt; use std::io; use std::net::Shutdown; use std::os::unix::io::{AsRawFd, RawFd}; -use std::os::unix::net::{self, SocketAddr}; +use std::os::unix::net; use std::path::Path; cfg_uds! { @@ -77,7 +78,7 @@ cfg_uds! { /// # } /// ``` pub struct UnixDatagram { - io: PollEvented, + io: PollEvented, } } @@ -107,7 +108,7 @@ impl UnixDatagram { where P: AsRef, { - let socket = mio_uds::UnixDatagram::bind(path)?; + let socket = mio::net::UnixDatagram::bind(path)?; UnixDatagram::new(socket) } @@ -141,7 +142,7 @@ impl UnixDatagram { /// # } /// ``` pub fn pair() -> io::Result<(UnixDatagram, UnixDatagram)> { - let (a, b) = mio_uds::UnixDatagram::pair()?; + let (a, b) = mio::net::UnixDatagram::pair()?; let a = UnixDatagram::new(a)?; let b = UnixDatagram::new(b)?; @@ -183,12 +184,12 @@ impl UnixDatagram { /// # } /// ``` pub fn from_std(datagram: net::UnixDatagram) -> io::Result { - let socket = mio_uds::UnixDatagram::from_datagram(datagram)?; + let socket = mio::net::UnixDatagram::from_std(datagram); let io = PollEvented::new(socket)?; Ok(UnixDatagram { io }) } - fn new(socket: mio_uds::UnixDatagram) -> io::Result { + fn new(socket: mio::net::UnixDatagram) -> io::Result { let io = PollEvented::new(socket)?; Ok(UnixDatagram { io }) } @@ -225,7 +226,7 @@ impl UnixDatagram { /// # } /// ``` pub fn unbound() -> io::Result { - let socket = mio_uds::UnixDatagram::unbound()?; + let socket = mio::net::UnixDatagram::unbound()?; UnixDatagram::new(socket) } @@ -298,7 +299,7 @@ impl UnixDatagram { /// ``` pub async fn send(&self, buf: &[u8]) -> io::Result { self.io - .async_io(mio::Ready::writable(), |sock| sock.send(buf)) + .async_io(mio::Interest::WRITABLE, |sock| sock.send(buf)) .await } @@ -397,7 +398,7 @@ impl UnixDatagram { /// ``` pub async fn recv(&self, buf: &mut [u8]) -> io::Result { self.io - .async_io(mio::Ready::readable(), |sock| sock.recv(buf)) + .async_io(mio::Interest::READABLE, |sock| sock.recv(buf)) .await } @@ -467,7 +468,7 @@ impl UnixDatagram { P: AsRef, { self.io - .async_io(mio::Ready::writable(), |sock| { + .async_io(mio::Interest::WRITABLE, |sock| { sock.send_to(buf, target.as_ref()) }) .await @@ -507,9 +508,12 @@ impl UnixDatagram { /// # } /// ``` pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { - self.io - .async_io(mio::Ready::readable(), |sock| sock.recv_from(buf)) - .await + let (n, addr) = self + .io + .async_io(mio::Interest::READABLE, |sock| sock.recv_from(buf)) + .await?; + + Ok((n, SocketAddr(addr))) } /// Try to receive data from the socket without waiting. @@ -545,7 +549,8 @@ impl UnixDatagram { /// # } /// ``` pub fn try_recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { - self.io.get_ref().recv_from(buf) + let (n, addr) = self.io.get_ref().recv_from(buf)?; + Ok((n, SocketAddr(addr))) } /// Returns the local address that this socket is bound to. @@ -589,7 +594,7 @@ impl UnixDatagram { /// # } /// ``` pub fn local_addr(&self) -> io::Result { - self.io.get_ref().local_addr() + self.io.get_ref().local_addr().map(SocketAddr) } /// Returns the address of this socket's peer. @@ -638,7 +643,7 @@ impl UnixDatagram { /// # } /// ``` pub fn peer_addr(&self) -> io::Result { - self.io.get_ref().peer_addr() + self.io.get_ref().peer_addr().map(SocketAddr) } /// Returns the value of the `SO_ERROR` option. @@ -701,23 +706,14 @@ impl UnixDatagram { } } -impl TryFrom for mio_uds::UnixDatagram { - type Error = io::Error; - - /// Consumes value, returning the mio I/O object. - fn try_from(value: UnixDatagram) -> Result { - value.io.into_inner() - } -} - -impl TryFrom for UnixDatagram { +impl TryFrom for UnixDatagram { type Error = io::Error; /// Consumes stream, returning the Tokio I/O object. /// /// This is equivalent to /// [`UnixDatagram::from_std(stream)`](UnixDatagram::from_std). - fn try_from(stream: net::UnixDatagram) -> Result { + fn try_from(stream: std::os::unix::net::UnixDatagram) -> Result { Self::from_std(stream) } } diff --git a/tokio/src/net/unix/listener.rs b/tokio/src/net/unix/listener.rs index dc8cb08ef4b..5d586ec3509 100644 --- a/tokio/src/net/unix/listener.rs +++ b/tokio/src/net/unix/listener.rs @@ -1,12 +1,12 @@ use crate::future::poll_fn; use crate::io::PollEvented; -use crate::net::unix::{Incoming, UnixStream}; +use crate::net::unix::{Incoming, SocketAddr, UnixStream}; use std::convert::TryFrom; use std::fmt; use std::io; use std::os::unix::io::{AsRawFd, RawFd}; -use std::os::unix::net::{self, SocketAddr}; +use std::os::unix::net; use std::path::Path; use std::task::{Context, Poll}; @@ -46,7 +46,7 @@ cfg_uds! { /// } /// ``` pub struct UnixListener { - io: PollEvented, + io: PollEvented, } } @@ -64,7 +64,7 @@ impl UnixListener { where P: AsRef, { - let listener = mio_uds::UnixListener::bind(path)?; + let listener = mio::net::UnixListener::bind(path)?; let io = PollEvented::new(listener)?; Ok(UnixListener { io }) } @@ -83,14 +83,14 @@ impl UnixListener { /// from a future driven by a tokio runtime, otherwise runtime can be set /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. pub fn from_std(listener: net::UnixListener) -> io::Result { - let listener = mio_uds::UnixListener::from_listener(listener)?; + let listener = mio::net::UnixListener::from_std(listener); let io = PollEvented::new(listener)?; Ok(UnixListener { io }) } /// Returns the local socket address of this listener. pub fn local_addr(&self) -> io::Result { - self.io.get_ref().local_addr() + self.io.get_ref().local_addr().map(SocketAddr) } /// Returns the value of the `SO_ERROR` option. @@ -111,24 +111,15 @@ impl UnixListener { &mut self, cx: &mut Context<'_>, ) -> Poll> { - let (io, addr) = ready!(self.poll_accept_std(cx))?; - - let io = mio_uds::UnixStream::from_stream(io)?; - Ok((UnixStream::new(io)?, addr)).into() - } - - fn poll_accept_std( - &mut self, - cx: &mut Context<'_>, - ) -> Poll> { loop { let ev = ready!(self.io.poll_read_ready(cx))?; - match self.io.get_ref().accept_std() { - Ok(None) => { - self.io.clear_readiness(ev); + match self.io.get_ref().accept() { + Ok((sock, addr)) => { + let addr = SocketAddr(addr); + let sock = UnixStream::new(sock)?; + return Poll::Ready(Ok((sock, addr))); } - Ok(Some((sock, addr))) => return Ok((sock, addr)).into(), Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { self.io.clear_readiness(ev); } @@ -192,23 +183,14 @@ impl crate::stream::Stream for UnixListener { } } -impl TryFrom for mio_uds::UnixListener { - type Error = io::Error; - - /// Consumes value, returning the mio I/O object. - fn try_from(value: UnixListener) -> Result { - value.io.into_inner() - } -} - -impl TryFrom for UnixListener { +impl TryFrom for UnixListener { type Error = io::Error; /// Consumes stream, returning the tokio I/O object. /// /// This is equivalent to /// [`UnixListener::from_std(stream)`](UnixListener::from_std). - fn try_from(stream: net::UnixListener) -> io::Result { + fn try_from(stream: std::os::unix::net::UnixListener) -> io::Result { Self::from_std(stream) } } diff --git a/tokio/src/net/unix/mod.rs b/tokio/src/net/unix/mod.rs index b079fe04d7d..21aa4fe7f2d 100644 --- a/tokio/src/net/unix/mod.rs +++ b/tokio/src/net/unix/mod.rs @@ -14,6 +14,9 @@ pub use split::{ReadHalf, WriteHalf}; mod split_owned; pub use split_owned::{OwnedReadHalf, OwnedWriteHalf, ReuniteError}; +mod socketaddr; +pub use socketaddr::SocketAddr; + pub(crate) mod stream; pub(crate) use stream::UnixStream; diff --git a/tokio/src/net/unix/socketaddr.rs b/tokio/src/net/unix/socketaddr.rs new file mode 100644 index 00000000000..48f7b96b8c2 --- /dev/null +++ b/tokio/src/net/unix/socketaddr.rs @@ -0,0 +1,31 @@ +use std::fmt; +use std::path::Path; + +/// An address associated with a Tokio Unix socket. +pub struct SocketAddr(pub(super) mio::net::SocketAddr); + +impl SocketAddr { + /// Returns `true` if the address is unnamed. + /// + /// Documentation reflected in [`SocketAddr`] + /// + /// [`SocketAddr`]: std::os::unix::net::SocketAddr + pub fn is_unnamed(&self) -> bool { + self.0.is_unnamed() + } + + /// Returns the contents of this address if it is a `pathname` address. + /// + /// Documentation reflected in [`SocketAddr`] + /// + /// [`SocketAddr`]: std::os::unix::net::SocketAddr + pub fn as_pathname(&self) -> Option<&Path> { + self.0.as_pathname() + } +} + +impl fmt::Debug for SocketAddr { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + self.0.fmt(fmt) + } +} diff --git a/tokio/src/net/unix/stream.rs b/tokio/src/net/unix/stream.rs index 715ed7aa485..d0f98f438b3 100644 --- a/tokio/src/net/unix/stream.rs +++ b/tokio/src/net/unix/stream.rs @@ -3,13 +3,14 @@ use crate::io::{AsyncRead, AsyncWrite, PollEvented, ReadBuf}; use crate::net::unix::split::{split, ReadHalf, WriteHalf}; use crate::net::unix::split_owned::{split_owned, OwnedReadHalf, OwnedWriteHalf}; use crate::net::unix::ucred::{self, UCred}; +use crate::net::unix::SocketAddr; use std::convert::TryFrom; use std::fmt; use std::io::{self, Read, Write}; use std::net::Shutdown; use std::os::unix::io::{AsRawFd, RawFd}; -use std::os::unix::net::{self, SocketAddr}; +use std::os::unix::net; use std::path::Path; use std::pin::Pin; use std::task::{Context, Poll}; @@ -21,7 +22,7 @@ cfg_uds! { /// from a listener with `UnixListener::incoming`. Additionally, a pair of /// anonymous Unix sockets can be created with `UnixStream::pair`. pub struct UnixStream { - io: PollEvented, + io: PollEvented, } } @@ -35,7 +36,7 @@ impl UnixStream { where P: AsRef, { - let stream = mio_uds::UnixStream::connect(path)?; + let stream = mio::net::UnixStream::connect(path)?; let stream = UnixStream::new(stream)?; poll_fn(|cx| stream.io.poll_write_ready(cx)).await?; @@ -56,7 +57,7 @@ impl UnixStream { /// from a future driven by a tokio runtime, otherwise runtime can be set /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. pub fn from_std(stream: net::UnixStream) -> io::Result { - let stream = mio_uds::UnixStream::from_stream(stream)?; + let stream = mio::net::UnixStream::from_std(stream); let io = PollEvented::new(stream)?; Ok(UnixStream { io }) @@ -68,26 +69,26 @@ impl UnixStream { /// communicating back and forth between one another. Each socket will /// be associated with the default event loop's handle. pub fn pair() -> io::Result<(UnixStream, UnixStream)> { - let (a, b) = mio_uds::UnixStream::pair()?; + let (a, b) = mio::net::UnixStream::pair()?; let a = UnixStream::new(a)?; let b = UnixStream::new(b)?; Ok((a, b)) } - pub(crate) fn new(stream: mio_uds::UnixStream) -> io::Result { + pub(crate) fn new(stream: mio::net::UnixStream) -> io::Result { let io = PollEvented::new(stream)?; Ok(UnixStream { io }) } /// Returns the socket address of the local half of this connection. pub fn local_addr(&self) -> io::Result { - self.io.get_ref().local_addr() + self.io.get_ref().local_addr().map(SocketAddr) } /// Returns the socket address of the remote half of this connection. pub fn peer_addr(&self) -> io::Result { - self.io.get_ref().peer_addr() + self.io.get_ref().peer_addr().map(SocketAddr) } /// Returns effective credentials of the process which called `connect` or `pair`. @@ -139,15 +140,6 @@ impl UnixStream { } } -impl TryFrom for mio_uds::UnixStream { - type Error = io::Error; - - /// Consumes value, returning the mio I/O object. - fn try_from(value: UnixStream) -> Result { - value.io.into_inner() - } -} - impl TryFrom for UnixStream { type Error = io::Error; diff --git a/tokio/src/process/unix/mod.rs b/tokio/src/process/unix/mod.rs index a8df74a3345..46f1d790db1 100644 --- a/tokio/src/process/unix/mod.rs +++ b/tokio/src/process/unix/mod.rs @@ -32,9 +32,8 @@ use crate::process::kill::Kill; use crate::process::SpawnedChild; use crate::signal::unix::{signal, Signal, SignalKind}; -use mio::event::Evented; -use mio::unix::{EventedFd, UnixReady}; -use mio::{Poll as MioPoll, PollOpt, Ready, Token}; +use mio::event::Source; +use mio::unix::SourceFd; use std::fmt; use std::future::Future; use std::io; @@ -173,32 +172,30 @@ where } } -impl Evented for Fd +impl Source for Fd where T: AsRawFd, { fn register( - &self, - poll: &MioPoll, - token: Token, - interest: Ready, - opts: PollOpt, + &mut self, + registry: &mio::Registry, + token: mio::Token, + interest: mio::Interest, ) -> io::Result<()> { - EventedFd(&self.as_raw_fd()).register(poll, token, interest | UnixReady::hup(), opts) + SourceFd(&self.as_raw_fd()).register(registry, token, interest) } fn reregister( - &self, - poll: &MioPoll, - token: Token, - interest: Ready, - opts: PollOpt, + &mut self, + registry: &mio::Registry, + token: mio::Token, + interest: mio::Interest, ) -> io::Result<()> { - EventedFd(&self.as_raw_fd()).reregister(poll, token, interest | UnixReady::hup(), opts) + SourceFd(&self.as_raw_fd()).reregister(registry, token, interest) } - fn deregister(&self, poll: &MioPoll) -> io::Result<()> { - EventedFd(&self.as_raw_fd()).deregister(poll) + fn deregister(&mut self, registry: &mio::Registry) -> io::Result<()> { + SourceFd(&self.as_raw_fd()).deregister(registry) } } diff --git a/tokio/src/process/windows.rs b/tokio/src/process/windows.rs index 1fbdee21d6d..1aa6c89043a 100644 --- a/tokio/src/process/windows.rs +++ b/tokio/src/process/windows.rs @@ -20,7 +20,7 @@ use crate::process::kill::Kill; use crate::process::SpawnedChild; use crate::sync::oneshot; -use mio_named_pipes::NamedPipe; +use mio::windows::NamedPipe; use std::fmt; use std::future::Future; use std::io; diff --git a/tokio/src/signal/unix.rs b/tokio/src/signal/unix.rs index 30a05872331..77f300bb192 100644 --- a/tokio/src/signal/unix.rs +++ b/tokio/src/signal/unix.rs @@ -9,7 +9,7 @@ use crate::signal::registry::{globals, EventId, EventInfo, Globals, Init, Storag use crate::sync::mpsc::{channel, Receiver}; use libc::c_int; -use mio_uds::UnixStream; +use mio::net::UnixStream; use std::io::{self, Error, ErrorKind, Write}; use std::pin::Pin; use std::sync::atomic::{AtomicBool, Ordering}; diff --git a/tokio/src/signal/unix/driver.rs b/tokio/src/signal/unix/driver.rs index 639a483ef24..ae60c22fad2 100644 --- a/tokio/src/signal/unix/driver.rs +++ b/tokio/src/signal/unix/driver.rs @@ -5,7 +5,8 @@ use crate::io::PollEvented; use crate::park::Park; use crate::runtime::context; use crate::signal::registry::globals; -use mio_uds::UnixStream; + +use mio::net::UnixStream; use std::io::{self, Read}; use std::ptr; use std::sync::{Arc, Weak}; @@ -42,22 +43,41 @@ pub(super) struct Inner(()); impl Driver { /// Creates a new signal `Driver` instance that delegates wakeups to `park`. pub(crate) fn new(park: IoDriver) -> io::Result { + use std::mem::ManuallyDrop; + use std::os::unix::io::{AsRawFd, FromRawFd}; + // NB: We give each driver a "fresh" reciever file descriptor to avoid // the issues described in alexcrichton/tokio-process#42. // // In the past we would reuse the actual receiver file descriptor and // swallow any errors around double registration of the same descriptor. - // I'm not sure if the second (failed) registration simply doesn't end up - // receiving wake up notifications, or there could be some race condition - // when consuming readiness events, but having distinct descriptors for - // distinct PollEvented instances appears to mitigate this. + // I'm not sure if the second (failed) registration simply doesn't end + // up receiving wake up notifications, or there could be some race + // condition when consuming readiness events, but having distinct + // descriptors for distinct PollEvented instances appears to mitigate + // this. // // Unfortunately we cannot just use a single global PollEvented instance // either, since we can't compare Handles or assume they will always // point to the exact same reactor. - let receiver = globals().receiver.try_clone()?; - let receiver = - PollEvented::new_with_ready_and_handle(receiver, mio::Ready::all(), park.handle())?; + // + // Mio 0.7 removed `try_clone()` as an API due to unexpected behavior + // with registering dups with the same reactor. In this case, duping is + // safe as each dup is registered with separate reactors **and** we + // only expect at least one dup to receive the notification. + + // Manually drop as we don't actually own this instance of UnixStream. + let receiver_fd = globals().receiver.as_raw_fd(); + + // safety: there is nothing unsafe about this, but the `from_raw_fd` fn is marked as unsafe. + let original = + ManuallyDrop::new(unsafe { std::os::unix::net::UnixStream::from_raw_fd(receiver_fd) }); + let receiver = UnixStream::from_std(original.try_clone()?); + let receiver = PollEvented::new_with_interest_and_handle( + receiver, + mio::Interest::READABLE | mio::Interest::WRITABLE, + park.handle(), + )?; Ok(Self { park, diff --git a/tokio/tests/uds_datagram.rs b/tokio/tests/uds_datagram.rs index bd47f19e446..18dfcca028f 100644 --- a/tokio/tests/uds_datagram.rs +++ b/tokio/tests/uds_datagram.rs @@ -44,6 +44,33 @@ async fn echo() -> io::Result<()> { Ok(()) } +#[tokio::test] +async fn echo_from() -> io::Result<()> { + let dir = tempfile::tempdir().unwrap(); + let server_path = dir.path().join("server.sock"); + let client_path = dir.path().join("client.sock"); + + let server_socket = UnixDatagram::bind(server_path.clone())?; + + tokio::spawn(async move { + if let Err(e) = echo_server(server_socket).await { + eprintln!("Error in echo server: {}", e); + } + }); + + { + let socket = UnixDatagram::bind(&client_path).unwrap(); + socket.connect(&server_path)?; + socket.send(b"ECHO").await?; + let mut recv_buf = [0u8; 16]; + let (len, addr) = socket.recv_from(&mut recv_buf[..]).await?; + assert_eq!(&recv_buf[..len], b"ECHO"); + assert_eq!(addr.as_pathname(), Some(server_path.as_path())); + } + + Ok(()) +} + // Even though we use sync non-blocking io we still need a reactor. #[tokio::test] async fn try_send_recv_never_block() -> io::Result<()> {