Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

io: update to Mio 0.7 #2893

Merged
merged 10 commits into from Oct 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
22 changes: 9 additions & 13 deletions tokio/Cargo.toml
Expand Up @@ -58,9 +58,9 @@ net = ["dns", "tcp", "udp", "uds"]
process = [
"lazy_static",
"libc",
"mio",
"mio-named-pipes",
"mio-uds",
"mio/os-poll",
"mio/os-util",
"mio/uds",
"signal-hook-registry",
"winapi/threadpoollegacyapiset",
]
Expand All @@ -74,18 +74,18 @@ rt-threaded = [
signal = [
"lazy_static",
"libc",
"mio",
"mio-uds",
"mio/os-poll",
"mio/uds",
"signal-hook-registry",
"winapi/consoleapi",
]
stream = ["futures-core"]
sync = ["fnv"]
test-util = []
tcp = ["lazy_static", "mio"]
tcp = ["lazy_static", "mio/tcp", "mio/os-poll"]
time = ["slab"]
udp = ["lazy_static", "mio"]
uds = ["lazy_static", "libc", "mio", "mio-uds"]
udp = ["lazy_static", "mio/udp", "mio/os-poll"]
uds = ["lazy_static", "libc", "mio/uds", "mio/os-poll"]

[dependencies]
tokio-macros = { version = "0.3.0", path = "../tokio-macros", optional = true }
Expand All @@ -98,20 +98,16 @@ fnv = { version = "1.0.6", optional = true }
futures-core = { version = "0.3.0", optional = true }
lazy_static = { version = "1.0.2", optional = true }
memchr = { version = "2.2", optional = true }
mio = { version = "0.6.20", optional = true }
mio = { version = "0.7.2", optional = true }
num_cpus = { version = "1.8.0", optional = true }
parking_lot = { version = "0.11.0", optional = true } # Not in full
slab = { version = "0.4.1", optional = true } # Backs `DelayQueue`
tracing = { version = "0.1.16", default-features = false, features = ["std"], optional = true } # Not in full

[target.'cfg(unix)'.dependencies]
mio-uds = { version = "0.6.5", optional = true }
libc = { version = "0.2.42", optional = true }
signal-hook-registry = { version = "1.1.1", optional = true }

[target.'cfg(windows)'.dependencies]
mio-named-pipes = { version = "0.1.6", optional = true }

[target.'cfg(windows)'.dependencies.winapi]
version = "0.3.8"
default-features = false
Expand Down
83 changes: 35 additions & 48 deletions tokio/src/io/driver/mod.rs
@@ -1,4 +1,5 @@
pub(crate) mod platform;
mod ready;
use ready::Ready;

mod scheduled_io;
pub(crate) use scheduled_io::ScheduledIo; // pub(crate) for tests
Expand All @@ -8,7 +9,6 @@ use crate::runtime::context;
use crate::util::bit;
use crate::util::slab::{self, Slab};

use mio::event::Evented;
use std::fmt;
use std::io;
use std::sync::{Arc, Weak};
Expand All @@ -27,10 +27,11 @@ pub(crate) struct Driver {
/// with this driver.
resources: Slab<ScheduledIo>,

/// The system event queue
poll: mio::Poll,

/// State shared between the reactor and the handles.
inner: Arc<Inner>,

_wakeup_registration: mio::Registration,
}

/// A reference to an I/O driver
Expand All @@ -41,18 +42,18 @@ pub(crate) struct Handle {

pub(crate) struct ReadyEvent {
tick: u8,
readiness: mio::Ready,
ready: Ready,
}

pub(super) struct Inner {
/// The underlying system event queue.
io: mio::Poll,
/// Registers I/O resources
registry: mio::Registry,

/// Allocates `ScheduledIo` handles when creating new resources.
pub(super) io_dispatch: slab::Allocator<ScheduledIo>,

/// Used to wake up the reactor from a call to `turn`
wakeup: mio::SetReadiness,
waker: mio::Waker,
}

#[derive(Debug, Eq, PartialEq, Clone, Copy)]
Expand Down Expand Up @@ -92,27 +93,22 @@ impl Driver {
/// Creates a new event loop, returning any error that happened during the
/// creation.
pub(crate) fn new() -> io::Result<Driver> {
let io = mio::Poll::new()?;
let wakeup_pair = mio::Registration::new2();
let poll = mio::Poll::new()?;
let waker = mio::Waker::new(poll.registry(), TOKEN_WAKEUP)?;
let registry = poll.registry().try_clone()?;

let slab = Slab::new();
let allocator = slab.allocator();

io.register(
&wakeup_pair.0,
TOKEN_WAKEUP,
mio::Ready::readable(),
mio::PollOpt::level(),
)?;

Ok(Driver {
tick: 0,
events: Some(mio::Events::with_capacity(1024)),
resources: slab,
_wakeup_registration: wakeup_pair.0,
poll,
inner: Arc::new(Inner {
io,
registry,
io_dispatch: allocator,
wakeup: wakeup_pair.1,
waker,
}),
})
}
Expand Down Expand Up @@ -143,23 +139,18 @@ impl Driver {

// Block waiting for an event to happen, peeling out how many events
// happened.
match self.inner.io.poll(&mut events, max_wait) {
match self.poll.poll(&mut events, max_wait) {
Ok(_) => {}
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
Err(e) => return Err(e),
}

// Process all the events that came in, dispatching appropriately

for event in events.iter() {
let token = event.token();

if token == TOKEN_WAKEUP {
self.inner
.wakeup
.set_readiness(mio::Ready::empty())
.unwrap();
} else {
self.dispatch(token, event.readiness());
if token != TOKEN_WAKEUP {
self.dispatch(token, Ready::from_mio(event));
}
}

Expand All @@ -168,18 +159,17 @@ impl Driver {
Ok(())
}

fn dispatch(&mut self, token: mio::Token, ready: mio::Ready) {
fn dispatch(&mut self, token: mio::Token, ready: Ready) {
let addr = slab::Address::from_usize(ADDRESS.unpack(token.0));

let io = match self.resources.get(addr) {
Some(io) => io,
None => return,
};

let set = io.set_readiness(Some(token.0), Tick::Set(self.tick), |curr| {
curr | ready.as_usize()
});
if set.is_err() {
let res = io.set_readiness(Some(token.0), Tick::Set(self.tick), |curr| curr | ready);

if res.is_err() {
// token no longer valid!
return;
}
Expand All @@ -194,7 +184,7 @@ impl Drop for Driver {
// If a task is waiting on the I/O resource, notify it. The task
// will then attempt to use the I/O resource and fail due to the
// driver being shutdown.
io.wake(mio::Ready::all());
io.wake(Ready::ALL);
})
}
}
Expand Down Expand Up @@ -250,7 +240,7 @@ impl Handle {
/// return immediately.
fn wakeup(&self) {
if let Some(inner) = self.inner() {
inner.wakeup.set_readiness(mio::Ready::readable()).unwrap();
inner.waker.wake().expect("failed to wake I/O driver");
}
}

Expand Down Expand Up @@ -279,8 +269,8 @@ impl Inner {
/// The registration token is returned.
pub(super) fn add_source(
&self,
source: &dyn Evented,
ready: mio::Ready,
source: &mut impl mio::event::Source,
interest: mio::Interest,
) -> io::Result<slab::Ref<ScheduledIo>> {
let (address, shared) = self.io_dispatch.allocate().ok_or_else(|| {
io::Error::new(
Expand All @@ -291,26 +281,23 @@ impl Inner {

let token = GENERATION.pack(shared.generation(), ADDRESS.pack(address.as_usize(), 0));

self.io
.register(source, mio::Token(token), ready, mio::PollOpt::edge())?;
self.registry
.register(source, mio::Token(token), interest)?;

Ok(shared)
}

/// Deregisters an I/O resource from the reactor.
pub(super) fn deregister_source(&self, source: &dyn Evented) -> io::Result<()> {
self.io.deregister(source)
pub(super) fn deregister_source(&self, source: &mut impl mio::event::Source) -> io::Result<()> {
self.registry.deregister(source)
}
}

impl Direction {
pub(super) fn mask(self) -> mio::Ready {
pub(super) fn mask(self) -> Ready {
match self {
Direction::Read => {
// Everything except writable is signaled through read.
mio::Ready::all() - mio::Ready::writable()
}
Direction::Write => mio::Ready::writable() | platform::hup() | platform::error(),
Direction::Read => Ready::READABLE | Ready::READ_CLOSED,
Direction::Write => Ready::WRITABLE | Ready::WRITE_CLOSED,
}
}
}