Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

io: rewrite slab to support compaction #2757

Merged
merged 16 commits into from Aug 12, 2020
Merged
211 changes: 72 additions & 139 deletions tokio/src/io/driver/mod.rs
Expand Up @@ -3,23 +3,30 @@ pub(crate) mod platform;
mod scheduled_io;
pub(crate) use scheduled_io::ScheduledIo; // pub(crate) for tests

use crate::loom::sync::atomic::AtomicUsize;
use crate::park::{Park, Unpark};
use crate::runtime::context;
use crate::util::slab::{Address, Slab};
use crate::util::bit;
use crate::util::slab::{self, Slab};

use mio::event::Evented;
use std::fmt;
use std::io;
use std::sync::atomic::Ordering::SeqCst;
use std::sync::{Arc, Weak};
use std::task::Waker;
use std::time::Duration;

/// I/O driver, backed by Mio
pub(crate) struct Driver {
/// Tracks the number of times `turn` is called. It is safe for this to wrap
/// as it is mostly used to determine when to call `compact()`
tick: u16,

/// Reuse the `mio::Events` value across calls to poll.
events: mio::Events,
events: Option<mio::Events>,

/// Primary slab handle containing the state for each resource registered
/// with this driver.
resources: Slab<ScheduledIo>,

/// State shared between the reactor and the handles.
inner: Arc<Inner>,
Expand All @@ -37,11 +44,8 @@ pub(super) struct Inner {
/// The underlying system event queue.
io: mio::Poll,

/// Dispatch slabs for I/O and futures events
pub(super) io_dispatch: Slab<ScheduledIo>,

/// The number of sources in `io_dispatch`.
n_sources: AtomicUsize,
/// Allocates `ScheduledIo` handles when creating new resources.
pub(super) io_dispatch: slab::Allocator<ScheduledIo>,

/// Used to wake up the reactor from a call to `turn`
wakeup: mio::SetReadiness,
Expand All @@ -53,7 +57,19 @@ pub(super) enum Direction {
Write,
}

const TOKEN_WAKEUP: mio::Token = mio::Token(Address::NULL);
// TODO: Don't use a fake token. Instead, reserve a slot entry for the wakeup
// token.
const TOKEN_WAKEUP: mio::Token = mio::Token(1 << 31);

const ADDRESS: bit::Pack = bit::Pack::least_significant(24);

// Packs the generation value in the `readiness` field.
//
// The generation prevents a race condition where a slab slot is reused for a
// new socket while the I/O driver is about to apply a readiness event. The
// generaton value is checked when setting new readiness. If the generation do
// not match, then the readiness event is discarded.
const GENERATION: bit::Pack = ADDRESS.then(7);

fn _assert_kinds() {
fn _assert<T: Send + Sync>() {}
Expand All @@ -69,6 +85,8 @@ impl Driver {
pub(crate) fn new() -> io::Result<Driver> {
let io = mio::Poll::new()?;
let wakeup_pair = mio::Registration::new2();
let slab = Slab::new();
let allocator = slab.allocator();

io.register(
&wakeup_pair.0,
Expand All @@ -78,12 +96,13 @@ impl Driver {
)?;

Ok(Driver {
events: mio::Events::with_capacity(1024),
tick: 0,
events: Some(mio::Events::with_capacity(1024)),
resources: slab,
_wakeup_registration: wakeup_pair.0,
inner: Arc::new(Inner {
io,
io_dispatch: Slab::new(),
n_sources: AtomicUsize::new(0),
io_dispatch: allocator,
wakeup: wakeup_pair.1,
}),
})
Expand All @@ -102,16 +121,27 @@ impl Driver {
}

fn turn(&mut self, max_wait: Option<Duration>) -> io::Result<()> {
// How often to call `compact()` on the resource slab
const COMPACT_INTERVAL: u16 = 256;

self.tick = self.tick.wrapping_add(1);

if self.tick % COMPACT_INTERVAL == 0 {
self.resources.compact();
}
carllerche marked this conversation as resolved.
Show resolved Hide resolved

let mut events = self.events.take().expect("i/o driver event store missing");

// Block waiting for an event to happen, peeling out how many events
// happened.
match self.inner.io.poll(&mut self.events, max_wait) {
match self.inner.io.poll(&mut events, max_wait) {
Ok(_) => {}
Err(e) => return Err(e),
}

// Process all the events that came in, dispatching appropriately

for event in self.events.iter() {
for event in events.iter() {
let token = event.token();

if token == TOKEN_WAKEUP {
Expand All @@ -124,22 +154,24 @@ impl Driver {
}
}

self.events = Some(events);

Ok(())
}

fn dispatch(&self, token: mio::Token, ready: mio::Ready) {
fn dispatch(&mut self, token: mio::Token, ready: mio::Ready) {
let mut rd = None;
let mut wr = None;

let address = Address::from_usize(token.0);
let addr = slab::Address::from_usize(ADDRESS.unpack(token.0));

let io = match self.inner.io_dispatch.get(address) {
let io = match self.resources.get(addr) {
Some(io) => io,
None => return,
};

if io
.set_readiness(address, |curr| curr | ready.as_usize())
.set_readiness(Some(token.0), |curr| curr | ready.as_usize())
.is_err()
{
// token no longer valid!
Expand All @@ -164,6 +196,18 @@ impl Driver {
}
}

impl Drop for Driver {
fn drop(&mut self) {
self.resources.for_each(|io| {
// If a task is waiting on the I/O resource, notify it. The task
// will then attempt to use the I/O resource and fail due to the
// driver being shutdown.
io.reader.wake();
io.writer.wake();
})
}
}

impl Park for Driver {
type Unpark = Handle;
type Error = io::Error;
Expand Down Expand Up @@ -246,46 +290,32 @@ impl Inner {
&self,
source: &dyn Evented,
ready: mio::Ready,
) -> io::Result<Address> {
let address = self.io_dispatch.alloc().ok_or_else(|| {
) -> io::Result<slab::Ref<ScheduledIo>> {
let (address, shared) = self.io_dispatch.allocate().ok_or_else(|| {
io::Error::new(
io::ErrorKind::Other,
"reactor at max registered I/O resources",
)
})?;

self.n_sources.fetch_add(1, SeqCst);
let token = GENERATION.pack(shared.generation(), ADDRESS.pack(address.as_usize(), 0));

self.io.register(
source,
mio::Token(address.to_usize()),
ready,
mio::PollOpt::edge(),
)?;
self.io
.register(source, mio::Token(token), ready, mio::PollOpt::edge())?;

Ok(address)
Ok(shared)
}

/// Deregisters an I/O resource from the reactor.
pub(super) fn deregister_source(&self, source: &dyn Evented) -> io::Result<()> {
self.io.deregister(source)
}

pub(super) fn drop_source(&self, address: Address) {
self.io_dispatch.remove(address);
self.n_sources.fetch_sub(1, SeqCst);
}

/// Registers interest in the I/O resource associated with `token`.
pub(super) fn register(&self, token: Address, dir: Direction, w: Waker) {
let sched = self
.io_dispatch
.get(token)
.unwrap_or_else(|| panic!("IO resource for token {:?} does not exist!", token));

pub(super) fn register(&self, io: &slab::Ref<ScheduledIo>, dir: Direction, w: Waker) {
let waker = match dir {
Direction::Read => &sched.reader,
Direction::Write => &sched.writer,
Direction::Read => &io.reader,
Direction::Write => &io.writer,
};

waker.register(w);
Expand All @@ -303,100 +333,3 @@ impl Direction {
}
}
}

#[cfg(all(test, loom))]
mod tests {
hawkw marked this conversation as resolved.
Show resolved Hide resolved
use super::*;
use loom::thread;

// No-op `Evented` impl just so we can have something to pass to `add_source`.
struct NotEvented;

impl Evented for NotEvented {
fn register(
&self,
_: &mio::Poll,
_: mio::Token,
_: mio::Ready,
_: mio::PollOpt,
) -> io::Result<()> {
Ok(())
}

fn reregister(
&self,
_: &mio::Poll,
_: mio::Token,
_: mio::Ready,
_: mio::PollOpt,
) -> io::Result<()> {
Ok(())
}

fn deregister(&self, _: &mio::Poll) -> io::Result<()> {
Ok(())
}
}

#[test]
fn tokens_unique_when_dropped() {
loom::model(|| {
let reactor = Driver::new().unwrap();
let inner = reactor.inner;
let inner2 = inner.clone();

let token_1 = inner.add_source(&NotEvented, mio::Ready::all()).unwrap();
let thread = thread::spawn(move || {
inner2.drop_source(token_1);
});

let token_2 = inner.add_source(&NotEvented, mio::Ready::all()).unwrap();
thread.join().unwrap();

assert!(token_1 != token_2);
})
}

#[test]
fn tokens_unique_when_dropped_on_full_page() {
loom::model(|| {
let reactor = Driver::new().unwrap();
let inner = reactor.inner;
let inner2 = inner.clone();
// add sources to fill up the first page so that the dropped index
// may be reused.
for _ in 0..31 {
inner.add_source(&NotEvented, mio::Ready::all()).unwrap();
}

let token_1 = inner.add_source(&NotEvented, mio::Ready::all()).unwrap();
let thread = thread::spawn(move || {
inner2.drop_source(token_1);
});

let token_2 = inner.add_source(&NotEvented, mio::Ready::all()).unwrap();
thread.join().unwrap();

assert!(token_1 != token_2);
})
}

#[test]
fn tokens_unique_concurrent_add() {
loom::model(|| {
let reactor = Driver::new().unwrap();
let inner = reactor.inner;
let inner2 = inner.clone();

let thread = thread::spawn(move || {
let token_2 = inner2.add_source(&NotEvented, mio::Ready::all()).unwrap();
token_2
});

let token_1 = inner.add_source(&NotEvented, mio::Ready::all()).unwrap();
let token_2 = thread.join().unwrap();

assert!(token_1 != token_2);
})
}
}