Skip to content

Commit

Permalink
Refactor IO registration using intrusive linked list
Browse files Browse the repository at this point in the history
  • Loading branch information
seanmonstar committed Sep 11, 2020
1 parent be7462e commit 9ce4e61
Show file tree
Hide file tree
Showing 30 changed files with 755 additions and 1,307 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Expand Up @@ -150,11 +150,11 @@ jobs:
run: cargo install cargo-hack

- name: check --each-feature
run: cargo hack check --all --each-feature -Z avoid-dev-deps
run: cargo hack check --all --each-feature --skip io-driver -Z avoid-dev-deps

# Try with unstable feature flags
- name: check --each-feature --unstable
run: cargo hack check --all --each-feature -Z avoid-dev-deps
run: cargo hack check --all --each-feature --skip io-driver -Z avoid-dev-deps
env:
RUSTFLAGS: --cfg tokio_unstable

Expand Down
3 changes: 1 addition & 2 deletions tokio-util/Cargo.toml
Expand Up @@ -25,11 +25,10 @@ publish = false
default = []

# Shorthand for enabling everything
full = ["codec", "udp", "compat", "io"]
full = ["codec", "compat", "io"]

compat = ["futures-io",]
codec = ["tokio/stream"]
udp = ["tokio/udp"]
io = []

[dependencies]
Expand Down
2 changes: 2 additions & 0 deletions tokio-util/src/cfg.rs
Expand Up @@ -18,6 +18,7 @@ macro_rules! cfg_compat {
}
}

/*
macro_rules! cfg_udp {
($($item:item)*) => {
$(
Expand All @@ -27,6 +28,7 @@ macro_rules! cfg_udp {
)*
}
}
*/

macro_rules! cfg_io {
($($item:item)*) => {
Expand Down
5 changes: 5 additions & 0 deletions tokio-util/src/lib.rs
Expand Up @@ -30,9 +30,14 @@ cfg_codec! {
pub mod codec;
}

/*
Disabled due to removal of poll_ functions on UdpSocket.
See https://github.com/tokio-rs/tokio/issues/2830
cfg_udp! {
pub mod udp;
}
*/

cfg_compat! {
pub mod compat;
Expand Down
2 changes: 2 additions & 0 deletions tokio-util/tests/udp.rs
@@ -1,3 +1,4 @@
/*
#![warn(rust_2018_idioms)]
use tokio::{net::UdpSocket, stream::StreamExt};
Expand Down Expand Up @@ -100,3 +101,4 @@ async fn send_framed_lines_codec() -> std::io::Result<()> {
Ok(())
}
*/
57 changes: 19 additions & 38 deletions tokio/src/io/driver/mod.rs
Expand Up @@ -12,14 +12,13 @@ use mio::event::Evented;
use std::fmt;
use std::io;
use std::sync::{Arc, Weak};
use std::task::Waker;
use std::time::Duration;

/// I/O driver, backed by Mio
pub(crate) struct Driver {
/// Tracks the number of times `turn` is called. It is safe for this to wrap
/// as it is mostly used to determine when to call `compact()`
tick: u16,
tick: u8,

/// Reuse the `mio::Events` value across calls to poll.
events: Option<mio::Events>,
Expand All @@ -40,6 +39,11 @@ pub(crate) struct Handle {
inner: Weak<Inner>,
}

pub(crate) struct ReadyEvent {
tick: u8,
readiness: mio::Ready,
}

pub(super) struct Inner {
/// The underlying system event queue.
io: mio::Poll,
Expand All @@ -57,6 +61,11 @@ pub(super) enum Direction {
Write,
}

enum Tick {
Set(u8),
Clear(u8),
}

// TODO: Don't use a fake token. Instead, reserve a slot entry for the wakeup
// token.
const TOKEN_WAKEUP: mio::Token = mio::Token(1 << 31);
Expand Down Expand Up @@ -122,11 +131,11 @@ impl Driver {

fn turn(&mut self, max_wait: Option<Duration>) -> io::Result<()> {
// How often to call `compact()` on the resource slab
const COMPACT_INTERVAL: u16 = 256;
const COMPACT_INTERVAL: u8 = 255;

self.tick = self.tick.wrapping_add(1);

if self.tick % COMPACT_INTERVAL == 0 {
if self.tick == COMPACT_INTERVAL {
self.resources.compact();
}

Expand Down Expand Up @@ -160,39 +169,22 @@ impl Driver {
}

fn dispatch(&mut self, token: mio::Token, ready: mio::Ready) {
let mut rd = None;
let mut wr = None;

let addr = slab::Address::from_usize(ADDRESS.unpack(token.0));

let io = match self.resources.get(addr) {
Some(io) => io,
None => return,
};

if io
.set_readiness(Some(token.0), |curr| curr | ready.as_usize())
.is_err()
{
let set = io.set_readiness(Some(token.0), Tick::Set(self.tick), |curr| {
curr | ready.as_usize()
});
if set.is_err() {
// token no longer valid!
return;
}

if ready.is_writable() || platform::is_hup(ready) || platform::is_error(ready) {
wr = io.writer.take_waker();
}

if !(ready & (!mio::Ready::writable())).is_empty() {
rd = io.reader.take_waker();
}

if let Some(w) = rd {
w.wake();
}

if let Some(w) = wr {
w.wake();
}
io.wake(ready);
}
}

Expand All @@ -202,8 +194,7 @@ impl Drop for Driver {
// If a task is waiting on the I/O resource, notify it. The task
// will then attempt to use the I/O resource and fail due to the
// driver being shutdown.
io.reader.wake();
io.writer.wake();
io.wake(mio::Ready::all());
})
}
}
Expand Down Expand Up @@ -310,16 +301,6 @@ impl Inner {
pub(super) fn deregister_source(&self, source: &dyn Evented) -> io::Result<()> {
self.io.deregister(source)
}

/// Registers interest in the I/O resource associated with `token`.
pub(super) fn register(&self, io: &slab::Ref<ScheduledIo>, dir: Direction, w: Waker) {
let waker = match dir {
Direction::Read => &io.reader,
Direction::Write => &io.writer,
};

waker.register(w);
}
}

impl Direction {
Expand Down

0 comments on commit 9ce4e61

Please sign in to comment.