From a04ab1a83fb241cbaf1e2f11ac4594f3e42ffb29 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Wed, 29 Jul 2020 08:50:38 -0700 Subject: [PATCH 01/15] io: rewrite slab to support compaction The I/O driver uses a slab to store per-resource state. Doing this provides two benefits. First, allocating state is streamlined. Second, resources may be safetly indexed using a `usize` type. The `usize` is used passed to the OS's selector when registering for receiving events. The original slab implementation used a `Vec` backed by `RwLock`. This primarily caused contention when reading state. This implementation also only **grew** the slab capacity but never shrank. In #1625, the slab was rewritten to use a lock-free strategy. The lock contention was removed but this implementation was still grow-only. This change adds the ability to release memory. Similar to the previous implementation, it structures the slab to use a vector of pages. This enables growing the slab without having to move any previous entries. It also adds the ability to release pages. This is done by introducing a lock when allocating / releasing slab entries. This does not impact benchmarks, primarily due to the existing implementation not being "done" and also having a lock around allocating and releasing. A `Slab::compact()` function is added. Pages are iterated. When a page is found with no slots in use, the page is freed. The `compact()` function is called occassionally by the I/O driver. --- tokio/src/io/driver/mod.rs | 114 ++-- tokio/src/io/driver/scheduled_io.rs | 72 +-- tokio/src/io/registration.rs | 48 +- tokio/src/loom/std/atomic_ptr.rs | 8 +- tokio/src/util/bit.rs | 14 +- tokio/src/util/slab.rs | 745 ++++++++++++++++++++++++ tokio/src/util/slab/addr.rs | 154 ----- tokio/src/util/slab/entry.rs | 7 - tokio/src/util/slab/generation.rs | 32 - tokio/src/util/slab/mod.rs | 107 ---- tokio/src/util/slab/page.rs | 187 ------ tokio/src/util/slab/shard.rs | 105 ---- tokio/src/util/slab/slot.rs | 42 -- tokio/src/util/slab/stack.rs | 58 -- tokio/src/util/slab/tests/loom_slab.rs | 327 ----------- tokio/src/util/slab/tests/loom_stack.rs | 88 --- tokio/src/util/slab/tests/mod.rs | 2 - 17 files changed, 881 insertions(+), 1229 deletions(-) create mode 100644 tokio/src/util/slab.rs delete mode 100644 tokio/src/util/slab/addr.rs delete mode 100644 tokio/src/util/slab/entry.rs delete mode 100644 tokio/src/util/slab/generation.rs delete mode 100644 tokio/src/util/slab/mod.rs delete mode 100644 tokio/src/util/slab/page.rs delete mode 100644 tokio/src/util/slab/shard.rs delete mode 100644 tokio/src/util/slab/slot.rs delete mode 100644 tokio/src/util/slab/stack.rs delete mode 100644 tokio/src/util/slab/tests/loom_slab.rs delete mode 100644 tokio/src/util/slab/tests/loom_stack.rs delete mode 100644 tokio/src/util/slab/tests/mod.rs diff --git a/tokio/src/io/driver/mod.rs b/tokio/src/io/driver/mod.rs index d8d17f8873b..5d7b7bf46f4 100644 --- a/tokio/src/io/driver/mod.rs +++ b/tokio/src/io/driver/mod.rs @@ -3,23 +3,30 @@ pub(crate) mod platform; mod scheduled_io; pub(crate) use scheduled_io::ScheduledIo; // pub(crate) for tests -use crate::loom::sync::atomic::AtomicUsize; use crate::park::{Park, Unpark}; use crate::runtime::context; -use crate::util::slab::{Address, Slab}; +use crate::util::bit; +use crate::util::slab::{self, Slab}; use mio::event::Evented; use std::fmt; use std::io; -use std::sync::atomic::Ordering::SeqCst; use std::sync::{Arc, Weak}; use std::task::Waker; use std::time::Duration; /// I/O driver, backed by Mio pub(crate) struct Driver { + /// Tracks the number of times `turn` is called. It is safe for this to wrap + /// as it is mostly used to determine when to call `compact()` + tick: u16, + /// Reuse the `mio::Events` value across calls to poll. - events: mio::Events, + events: Option, + + /// Primary slab handle containing the state for each resource registered + /// with this driver. + resources: Slab, /// State shared between the reactor and the handles. inner: Arc, @@ -37,11 +44,8 @@ pub(super) struct Inner { /// The underlying system event queue. io: mio::Poll, - /// Dispatch slabs for I/O and futures events - pub(super) io_dispatch: Slab, - - /// The number of sources in `io_dispatch`. - n_sources: AtomicUsize, + /// Allocates `ScheduledIo` handles when creating new resources. + pub(super) io_dispatch: slab::Allocator, /// Used to wake up the reactor from a call to `turn` wakeup: mio::SetReadiness, @@ -53,7 +57,19 @@ pub(super) enum Direction { Write, } -const TOKEN_WAKEUP: mio::Token = mio::Token(Address::NULL); +// TODO: Don't use a fake token. Instead, reserve a slot entry for the wakeup +// token. +const TOKEN_WAKEUP: mio::Token = mio::Token(1 << 31); + +const ADDRESS: bit::Pack = bit::Pack::least_significant(24); + +// Packs the generation value in the `readiness` field. +// +// The generation prevents a race condition where a slab slot is reused for a +// new socket while the I/O driver is about to apply a readiness event. The +// generaton value is checked when setting new readiness. If the generation do +// not match, then the readiness event is discarded. +const GENERATION: bit::Pack = ADDRESS.then(7); fn _assert_kinds() { fn _assert() {} @@ -69,6 +85,8 @@ impl Driver { pub(crate) fn new() -> io::Result { let io = mio::Poll::new()?; let wakeup_pair = mio::Registration::new2(); + let slab = Slab::new(); + let allocator = slab.allocator(); io.register( &wakeup_pair.0, @@ -78,12 +96,13 @@ impl Driver { )?; Ok(Driver { - events: mio::Events::with_capacity(1024), + tick: 0, + events: Some(mio::Events::with_capacity(1024)), + resources: slab, _wakeup_registration: wakeup_pair.0, inner: Arc::new(Inner { io, - io_dispatch: Slab::new(), - n_sources: AtomicUsize::new(0), + io_dispatch: allocator, wakeup: wakeup_pair.1, }), }) @@ -102,16 +121,27 @@ impl Driver { } fn turn(&mut self, max_wait: Option) -> io::Result<()> { + // How often to call `compact()` on the resource slab + const COMPACT_INTERVAL: u16 = 256; + + self.tick = self.tick.wrapping_add(1); + + if self.tick % COMPACT_INTERVAL == 0 { + self.resources.compact(); + } + + let mut events = self.events.take().expect("i/o driver event store missing"); + // Block waiting for an event to happen, peeling out how many events // happened. - match self.inner.io.poll(&mut self.events, max_wait) { + match self.inner.io.poll(&mut events, max_wait) { Ok(_) => {} Err(e) => return Err(e), } // Process all the events that came in, dispatching appropriately - for event in self.events.iter() { + for event in events.iter() { let token = event.token(); if token == TOKEN_WAKEUP { @@ -124,22 +154,24 @@ impl Driver { } } + self.events = Some(events); + Ok(()) } - fn dispatch(&self, token: mio::Token, ready: mio::Ready) { + fn dispatch(&mut self, token: mio::Token, ready: mio::Ready) { let mut rd = None; let mut wr = None; - let address = Address::from_usize(token.0); + let addr = slab::Address::from_usize(ADDRESS.unpack(token.0)); - let io = match self.inner.io_dispatch.get(address) { + let io = match self.resources.get(addr) { Some(io) => io, None => return, }; if io - .set_readiness(address, |curr| curr | ready.as_usize()) + .set_readiness(Some(token.0), |curr| curr | ready.as_usize()) .is_err() { // token no longer valid! @@ -164,6 +196,18 @@ impl Driver { } } +impl Drop for Driver { + fn drop(&mut self) { + self.resources.for_each(|io| { + // If a task is waiting on the I/O resource, notify it. The task + // will then attempt to use the I/O resource and fail due to the + // driver being shutdown. + io.reader.wake(); + io.writer.wake(); + }) + } +} + impl Park for Driver { type Unpark = Handle; type Error = io::Error; @@ -246,24 +290,20 @@ impl Inner { &self, source: &dyn Evented, ready: mio::Ready, - ) -> io::Result
{ - let address = self.io_dispatch.alloc().ok_or_else(|| { + ) -> io::Result> { + let (address, shared) = self.io_dispatch.allocate().ok_or_else(|| { io::Error::new( io::ErrorKind::Other, "reactor at max registered I/O resources", ) })?; - self.n_sources.fetch_add(1, SeqCst); + let token = GENERATION.pack(shared.generation(), ADDRESS.pack(address.as_usize(), 0)); - self.io.register( - source, - mio::Token(address.to_usize()), - ready, - mio::PollOpt::edge(), - )?; + self.io + .register(source, mio::Token(token), ready, mio::PollOpt::edge())?; - Ok(address) + Ok(shared) } /// Deregisters an I/O resource from the reactor. @@ -271,21 +311,11 @@ impl Inner { self.io.deregister(source) } - pub(super) fn drop_source(&self, address: Address) { - self.io_dispatch.remove(address); - self.n_sources.fetch_sub(1, SeqCst); - } - /// Registers interest in the I/O resource associated with `token`. - pub(super) fn register(&self, token: Address, dir: Direction, w: Waker) { - let sched = self - .io_dispatch - .get(token) - .unwrap_or_else(|| panic!("IO resource for token {:?} does not exist!", token)); - + pub(super) fn register(&self, io: &slab::Ref, dir: Direction, w: Waker) { let waker = match dir { - Direction::Read => &sched.reader, - Direction::Write => &sched.writer, + Direction::Read => &io.reader, + Direction::Write => &io.writer, }; waker.register(w); diff --git a/tokio/src/io/driver/scheduled_io.rs b/tokio/src/io/driver/scheduled_io.rs index 7f6446e3f58..a4f775c24c2 100644 --- a/tokio/src/io/driver/scheduled_io.rs +++ b/tokio/src/io/driver/scheduled_io.rs @@ -1,47 +1,30 @@ use crate::loom::future::AtomicWaker; use crate::loom::sync::atomic::AtomicUsize; -use crate::util::bit; -use crate::util::slab::{Address, Entry, Generation}; +use crate::util::slab::Entry; -use std::sync::atomic::Ordering::{AcqRel, Acquire, SeqCst}; +use std::sync::atomic::Ordering::{AcqRel, Acquire, Release}; +/// Stored in the I/O driver resource slab. #[derive(Debug)] pub(crate) struct ScheduledIo { + /// Packs the resource's readiness with the resource's generation. readiness: AtomicUsize, + + /// Task waiting on read readiness pub(crate) reader: AtomicWaker, + + /// Task waiting on write readiness pub(crate) writer: AtomicWaker, } -const PACK: bit::Pack = bit::Pack::most_significant(Generation::WIDTH); - impl Entry for ScheduledIo { - fn generation(&self) -> Generation { - unpack_generation(self.readiness.load(SeqCst)) - } - - fn reset(&self, generation: Generation) -> bool { - let mut current = self.readiness.load(Acquire); - - loop { - if unpack_generation(current) != generation { - return false; - } - - let next = PACK.pack(generation.next().to_usize(), 0); + fn reset(&self) { + let state = self.readiness.load(Acquire); - match self - .readiness - .compare_exchange(current, next, AcqRel, Acquire) - { - Ok(_) => break, - Err(actual) => current = actual, - } - } + let generation = super::GENERATION.unpack(state); + let next = super::GENERATION.pack_lossy(generation + 1, 0); - drop(self.reader.take_waker()); - drop(self.writer.take_waker()); - - true + self.readiness.store(next, Release); } } @@ -56,6 +39,10 @@ impl Default for ScheduledIo { } impl ScheduledIo { + pub(crate) fn generation(&self) -> usize { + super::GENERATION.unpack(self.readiness.load(Acquire)) + } + #[cfg(all(test, loom))] /// Returns the current readiness value of this `ScheduledIo`, if the /// provided `token` is still a valid access. @@ -92,32 +79,35 @@ impl ScheduledIo { /// Otherwise, this returns the previous readiness. pub(crate) fn set_readiness( &self, - address: Address, + token: Option, f: impl Fn(usize) -> usize, ) -> Result { - let generation = address.generation(); - let mut current = self.readiness.load(Acquire); loop { - // Check that the generation for this access is still the current - // one. - if unpack_generation(current) != generation { - return Err(()); + let current_generation = super::GENERATION.unpack(current); + + if let Some(token) = token { + // Check that the generation for this access is still the + // current one. + if super::GENERATION.unpack(token) != current_generation { + return Err(()); + } } + // Mask out the generation bits so that the modifying function // doesn't see them. let current_readiness = current & mio::Ready::all().as_usize(); let new = f(current_readiness); debug_assert!( - new <= !PACK.max_value(), + new <= super::ADDRESS.max_value(), "new readiness value would overwrite generation bits!" ); match self.readiness.compare_exchange( current, - PACK.pack(generation.to_usize(), new), + super::GENERATION.pack(current_generation, new), AcqRel, Acquire, ) { @@ -135,7 +125,3 @@ impl Drop for ScheduledIo { self.reader.wake(); } } - -fn unpack_generation(src: usize) -> Generation { - Generation::new(PACK.unpack(src)) -} diff --git a/tokio/src/io/registration.rs b/tokio/src/io/registration.rs index 77fe6dbc723..a597125562d 100644 --- a/tokio/src/io/registration.rs +++ b/tokio/src/io/registration.rs @@ -1,5 +1,5 @@ -use crate::io::driver::{platform, Direction, Handle}; -use crate::util::slab::Address; +use crate::io::driver::{platform, Direction, Handle, ScheduledIo}; +use crate::util::slab; use mio::{self, Evented}; use std::io; @@ -39,11 +39,17 @@ cfg_io_driver! { /// [`poll_write_ready`]: method@Self::poll_write_ready` #[derive(Debug)] pub struct Registration { + /// Handle to the associated driver. handle: Handle, - address: Address, + + /// Reference to state stored by the driver. + shared: slab::Ref, } } +unsafe impl Send for Registration {} +unsafe impl Sync for Registration {} + // ===== impl Registration ===== impl Registration { @@ -104,7 +110,7 @@ impl Registration { T: Evented, { let handle = Handle::current(); - let address = if let Some(inner) = handle.inner() { + let shared = if let Some(inner) = handle.inner() { inner.add_source(io, ready)? } else { return Err(io::Error::new( @@ -113,7 +119,7 @@ impl Registration { )); }; - Ok(Registration { handle, address }) + Ok(Registration { handle, shared }) } /// Deregisters the I/O resource from the reactor it is associated with. @@ -272,14 +278,12 @@ impl Registration { // If the task should be notified about new events, ensure that it has // been registered if let Some(ref cx) = cx { - inner.register(self.address, direction, cx.waker().clone()) + inner.register(&self.shared, direction, cx.waker().clone()) } let mask = direction.mask(); let mask_no_hup = (mask - platform::hup() - platform::error()).as_usize(); - let sched = inner.io_dispatch.get(self.address).unwrap(); - // This consumes the current readiness state **except** for HUP and // error. HUP and error are excluded because a) they are final states // and never transitition out and b) both the read AND the write @@ -296,9 +300,10 @@ impl Registration { // AND write. A specific case that `EPOLLERR` occurs is when the read // end of a pipe is closed. When this occurs, a peer blocked by // writing to the pipe should be notified. - let curr_ready = sched - .set_readiness(self.address, |curr| curr & (!mask_no_hup)) - .unwrap_or_else(|_| panic!("address {:?} no longer valid!", self.address)); + let curr_ready = self + .shared + .set_readiness(None, |curr| curr & (!mask_no_hup)) + .unwrap(); let mut ready = mask & mio::Ready::from_usize(curr_ready); @@ -306,14 +311,15 @@ impl Registration { if let Some(cx) = cx { // Update the task info match direction { - Direction::Read => sched.reader.register_by_ref(cx.waker()), - Direction::Write => sched.writer.register_by_ref(cx.waker()), + Direction::Read => self.shared.reader.register_by_ref(cx.waker()), + Direction::Write => self.shared.writer.register_by_ref(cx.waker()), } // Try again - let curr_ready = sched - .set_readiness(self.address, |curr| curr & (!mask_no_hup)) - .unwrap_or_else(|_| panic!("address {:?} no longer valid!", self.address)); + let curr_ready = self + .shared + .set_readiness(None, |curr| curr & (!mask_no_hup)) + .unwrap(); ready = mask & mio::Ready::from_usize(curr_ready); } } @@ -326,15 +332,9 @@ impl Registration { } } -unsafe impl Send for Registration {} -unsafe impl Sync for Registration {} - impl Drop for Registration { fn drop(&mut self) { - let inner = match self.handle.inner() { - Some(inner) => inner, - None => return, - }; - inner.drop_source(self.address); + drop(self.shared.reader.take_waker()); + drop(self.shared.writer.take_waker()); } } diff --git a/tokio/src/loom/std/atomic_ptr.rs b/tokio/src/loom/std/atomic_ptr.rs index f7fd56cc69b..236645f037b 100644 --- a/tokio/src/loom/std/atomic_ptr.rs +++ b/tokio/src/loom/std/atomic_ptr.rs @@ -1,5 +1,5 @@ use std::fmt; -use std::ops::Deref; +use std::ops::{Deref, DerefMut}; /// `AtomicPtr` providing an additional `load_unsync` function. pub(crate) struct AtomicPtr { @@ -21,6 +21,12 @@ impl Deref for AtomicPtr { } } +impl DerefMut for AtomicPtr { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.inner + } +} + impl fmt::Debug for AtomicPtr { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { self.deref().fmt(fmt) diff --git a/tokio/src/util/bit.rs b/tokio/src/util/bit.rs index e61ac2165a3..07f9db3b338 100644 --- a/tokio/src/util/bit.rs +++ b/tokio/src/util/bit.rs @@ -7,16 +7,6 @@ pub(crate) struct Pack { } impl Pack { - /// Value is packed in the `width` most-significant bits. - pub(crate) const fn most_significant(width: u32) -> Pack { - let mask = mask_for(width).reverse_bits(); - - Pack { - mask, - shift: mask.trailing_zeros(), - } - } - /// Value is packed in the `width` least-significant bits. pub(crate) const fn least_significant(width: u32) -> Pack { let mask = mask_for(width); @@ -53,6 +43,10 @@ impl Pack { (base & !self.mask) | (value << self.shift) } + pub(crate) fn pack_lossy(&self, value: usize, base: usize) -> usize { + self.pack(value & self.max_value(), base) + } + pub(crate) fn unpack(&self, src: usize) -> usize { unpack(src, self.mask, self.shift) } diff --git a/tokio/src/util/slab.rs b/tokio/src/util/slab.rs new file mode 100644 index 00000000000..4c40d27c0d4 --- /dev/null +++ b/tokio/src/util/slab.rs @@ -0,0 +1,745 @@ +use crate::loom::cell::UnsafeCell; +use crate::loom::sync::{Arc, Mutex}; +use crate::loom::sync::atomic::AtomicUsize; +use crate::util::bit; +use std::fmt; +use std::mem; +use std::ops; +use std::ptr; +use std::sync::atomic::Ordering::{Acquire, Release}; + +/// Amortized allocation for homogeneous data types. +/// +/// The slab pre-allocates chunks of memory to store values. It uses a similar +/// growing strategy as `Vec`. When new capacity is needed, the slab grows by +/// 2x. +/// +/// # Pages +/// +/// Unlike `Vec`, growing does not require moving existing elements. Instead of +/// being a continuous chunk of memory for all elements, `Slab` is an array of +/// arrays. The top-level array is an array of pages. Each page is 2x bigger +/// than the previous one. When the slab grows, a new page is allocated. +/// +/// Pages are lazily initialized. +/// +/// # Allocating +/// +/// When allocating an object, first previously used slots are reused. If no +/// previously used slot is available, a new slot is initialized in an existing +/// page. If all pages are full, then a new page is allocated. +/// +/// When an allocated object is released, it is pushed into it's page's free +/// list. Allocating scans all pages for a free slot. +/// +/// # Indexing +/// +/// The slab is able to index values using an address. Even when the indexed +/// object has been released, it is still safe to index. This is a key ability +/// for using the slab with the I/O driver. Addresses are registered with the +/// OS's selector and I/O resources can be released without synchronizing with +/// the OS. +/// +/// # Compaction +/// +/// `Slab::compact` will release pages that have been allocated but are no +/// longer used. This is done by scanning the pages and finding pages with no +/// allocated objects. These pages are then freed. +/// +/// # Synchronization +/// +/// The `Slab` structure is able to provide (mostly) unsynchronized reads to +/// values stored in the slab. Insertions and removals are synchronized. Reading +/// objects via `Ref` is fully unsynchronized. Indexing objects uses amortized +/// synhronization. +/// +pub(crate) struct Slab { + /// Array of pages. Each page is synchronized. + pages: [Arc>; NUM_PAGES], + + /// Caches the array pointer & number of initialized slots. + cached: [CachedPage; NUM_PAGES], +} + +/// Allocate values in the associated slab. +pub(crate) struct Allocator { + /// Pages in the slab. The first page has a capacity of 16 elements. Each + /// following page has double the capacity of the previous page. + /// + /// Each returned `Ref` holds a reference count to this `Arc`. + pages: [Arc>; NUM_PAGES], +} + +/// References a slot in the slab. Indexing a slot using an `Address` is memory +/// safe even if the slot has been released or the page has been deallocated. +/// However, it is not guaranteed that the slot has not been reused and is now +/// represents a different value. +/// +/// The I/O driver uses a counter to track the slot's generation. Once accessing +/// the slot, the generations are compared. If they match, the value matches the +/// address. +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub(crate) struct Address(usize); + +/// An entry in the slab. +pub(crate) trait Entry: Default { + /// Reset the entry's value and track the generation. + fn reset(&self); +} + +/// A reference to a value stored in the slab +pub(crate) struct Ref { + value: *const Value, +} + +/// Maximum number of pages a slab can contain. +const NUM_PAGES: usize = 19; + +/// Minimum number of slots a page can contain. +const PAGE_INITIAL_SIZE: usize = 32; +const PAGE_INDEX_SHIFT: u32 = PAGE_INITIAL_SIZE.trailing_zeros() + 1; + +/// A page in the slab +struct Page { + /// Slots + slots: Mutex>, + + // Number of slots currently being used. This is not guaranteed to be up to + // date and should only be used as a hint. + used: AtomicUsize, + + // The number of slots the page can hold. + len: usize, + + // Length of all previous pages combined + prev_len: usize, +} + +struct CachedPage { + /// Pointer to the page's slots. + slots: *const Slot, + + /// Number of initialized slots. + init: usize, +} + +/// Page state +struct Slots { + /// Slots + slots: Vec>, + + head: usize, + + /// Number of slots currently in use. + used: usize, +} + +unsafe impl Sync for Page {} +unsafe impl Send for Page {} +unsafe impl Sync for CachedPage {} +unsafe impl Send for CachedPage {} + +/// A slot in the slab. Contains slot-specific metadata. +/// +/// `#[repr(C)]` guarantees that the struct starts w/ `value`. We use pointer +/// math to map a value pointer to an index in the page. +#[repr(C)] +struct Slot { + /// Pointed to by `Ref`. + value: UnsafeCell>, + + /// Next entry in the free list. + /// + /// Safety: Only written when pushing the slot into the free list and read + /// when removing the slot from the free list. + next: u32, +} + +/// Value paired with a reference to the page +struct Value { + /// Value stored in the value + value: T, + + /// Pointer to the page containing the slot. + /// + /// A raw pointer is used as this creates a ref cycle. + page: *const Page, +} + +impl Slab { + /// Create a new, empty, slab + pub(crate) fn new() -> Slab { + // Initializing arrays is a bit annoying. Instead of manually writing + // out an array and every single entry, `Default::default()` is used to + // initialize the array, then the array is iterated and each value is + // initialized. + let mut slab = Slab { + pages: Default::default(), + cached: Default::default(), + }; + + let mut len = PAGE_INITIAL_SIZE; + let mut prev_len: usize = 0; + + for page in &mut slab.pages { + let page = Arc::get_mut(page).unwrap(); + page.len = len; + page.prev_len = prev_len; + len *= 2; + prev_len += page.len; + + // Ensure we don't exceed the max address space. + debug_assert!( + page.len - 1 + page.prev_len < (1 << 24), + "max = {:b}", + page.len - 1 + page.prev_len + ); + } + + slab + } + + /// Returns a new `Allocator`. + /// + /// The `Allocator` supports concurrent allocation of objects. + pub(crate) fn allocator(&self) -> Allocator { + Allocator { + pages: self.pages.clone(), + } + } + + /// Returns a reference to the value stored at the given address. + /// + /// `&mut self` is used as the call may update internal cached state. + pub(crate) fn get(&mut self, addr: Address) -> Option<&T> { + let page_idx = addr.page(); + let slot_idx = self.pages[page_idx].slot(addr); + + // If the address references a slot that was last seen as uninitialized, + // the `CachedPage` is updated. This requires acquiring the page lock + // and updating the slot pointer and initialized offset. + if self.cached[page_idx].init <= slot_idx { + self.cached[page_idx].refresh(&self.pages[page_idx]); + } + + // If the address **still** references an uninitialized slot, then the + // address is invalid and `None` is returned. + if self.cached[page_idx].init <= slot_idx { + return None; + } + + // Get a reference to the value. The lifetime of the returned reference + // is bound to `&self`. The only way to invalidate the underlying memory + // is to call `compact()`. The lifetimes prevent calling `compact()` + // while references to values are outstanding. + Some(self.cached[page_idx].get(slot_idx)) + } + + /// Calls the given function with a reference to each slot in the slab. The + /// slot may not be in-use. + /// + /// This is used by the I/O driver during the shutdown process to notify + /// each pending task. + pub(crate) fn for_each(&mut self, mut f: impl FnMut(&T)) { + for page_idx in 0..self.pages.len() { + // It is required to avoid holding the lock when calling the + // provided function. The function may attempt to acquire the lock + // itself. If we hold the lock here while calling `f`, a deadlock + // situation is possible. + // + // Instead of iterating the slots directly in `page`, which would + // require holding the lock, the cache is updated and the slots are + // iterated from the cache. + self.cached[page_idx].refresh(&self.pages[page_idx]); + + for slot_idx in 0..self.cached[page_idx].init { + f(self.cached[page_idx].get(slot_idx)); + } + } + } + + // Release memory back to the allocator. + // + // If pages are empty, the underlying memory is released back to the + // allocator. + pub(crate) fn compact(&mut self) { + // Iterate each page except the very first one. The very first page is + // never freed. + for (idx, page) in (&self.pages[1..]).iter().enumerate() { + let mut slots = match page.slots.try_lock() { + Ok(slots) => slots, + // If the lock cannot be acquired due to being held by another + // thread, don't try to compact the page. + _ => continue, + }; + + if slots.used > 0 || slots.slots.capacity() == 0 { + // The page is in use or it has not yet been allocated. Either + // way, there is no more work to do. + continue; + } + + // Remove the slots vector from the page. This is done so that the + // freeing process is done outside of the lock's critical section. + let vec = mem::replace(&mut slots.slots, vec![]); + slots.head = 0; + + // Drop the vec outside of the lock + drop(slots); + + // Clear cache + self.cached[idx].slots = ptr::null(); + self.cached[idx].init = 0; + + drop(vec); + } + } +} + +impl fmt::Debug for Slab { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Slab").finish() + } +} + +impl Allocator { + /// Allocate a new entry and return a handle to the entry. + /// + /// Scans pages from smallest to biggest, stopping when a slot is found. + /// Pages are allocated if necessary. + /// + /// Returns `None` if the slab is full. + pub(crate) fn allocate(&self) -> Option<(Address, Ref)> { + // Find the first available slot. + for page in &self.pages[..] { + if let Some((addr, val)) = page.allocate() { + return Some((addr, val)); + } + } + + None + } +} + +impl fmt::Debug for Allocator { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("slab::Allocator").finish() + } +} + +impl ops::Deref for Ref { + type Target = T; + + fn deref(&self) -> &T { + // Safety: `&mut` is never handed out to the underlying value. The page + // is not freed until all `Ref` values are dropped. + unsafe { &(*self.value).value } + } +} + +impl Drop for Ref { + fn drop(&mut self) { + // Safety: `&mut` is never handed out to the underlying value. The page + // is not freed until all `Ref` values are dropped. + let _ = unsafe { (*self.value).release() }; + } +} + +impl fmt::Debug for Ref { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + (**self).fmt(fmt) + } +} + +impl Page { + // Allocates an object, returns the ref and address. + fn allocate(self: &Arc>) -> Option<(Address, Ref)> { + // Before acquiring the lock, use the `used` hint. + if self.used.load(Acquire) == self.len { + return None; + } + + // Allocating objects requires synchronization + let mut locked = self.slots.lock().unwrap(); + + if locked.head < locked.slots.len() { + // Re-use an already initialized slot. + // + // Help out the borrow checker + let locked = &mut *locked; + + // Get the index of the slot at the head of the free stack. This is + // the slot that will be reused. + let idx = locked.head; + let slot = &locked.slots[idx]; + + // Update the free stack head to point to the next slot. + locked.head = slot.next as usize; + + // Increment the number of used slots + locked.used += 1; + self.used.store(locked.used, Release); + + // Reset the slot + slot.value.with(|ptr| unsafe { (*ptr).value.reset() }); + + // Return a reference to the slot + Some((self.addr(idx), slot.gen_ref(self))) + } else if self.len == locked.slots.len() { + // The page is full + None + } else { + // No initialized slots are available, but the page has more + // capacity. Initialize a new slot. + let idx = locked.slots.len(); + + if idx == 0 { + // The page has not yet been allocated. Allocate the storage for + // all page slots. + locked.slots.reserve_exact(self.len); + } + + // Initialize a new slot + locked.slots.push(Slot { + value: UnsafeCell::new(Value { + value: Default::default(), + page: self.as_ref() as *const _, + }), + next: 0, + }); + + // Increment the head to indicate the free stack is empty + locked.head += 1; + + // Increment the number of used slots + locked.used += 1; + self.used.store(locked.used, Release); + + debug_assert_eq!(locked.slots.len(), locked.head); + + Some((self.addr(idx), locked.slots[idx].gen_ref(self))) + } + } +} + +impl Page { + /// Returns the slot index within the current page referenced by the given + /// address. + fn slot(&self, addr: Address) -> usize { + addr.0 - self.prev_len + } + + /// Returns the address for the given slot + fn addr(&self, slot: usize) -> Address { + Address(slot + self.prev_len) + } +} + +impl Default for Page { + fn default() -> Page { + Page { + used: AtomicUsize::new(0), + slots: Mutex::new(Slots { + slots: Vec::new(), + head: 0, + used: 0, + }), + len: 0, + prev_len: 0, + } + } +} + +impl Page { + /// Release a slot into the page's free list + fn release(&self, value: *const Value) { + let mut locked = self.slots.lock().unwrap(); + + let idx = locked.index_for(value); + locked.slots[idx].next = locked.head as u32; + locked.head = idx; + locked.used -= 1; + + self.used.store(locked.used, Release); + } +} + +impl CachedPage { + /// Refresh the cache + fn refresh(&mut self, page: &Page) { + let slots = page.slots.lock().unwrap(); + self.slots = slots.slots.as_ptr(); + self.init = slots.slots.len(); + } + + // Get a value by index + fn get(&self, idx: usize) -> &T { + assert!(idx < self.init); + + // Safety: Pages are allocated concurrently, but are only ever + // **deallocated** by `Slab`. `Slab` will always have a more + // conservative view on the state of the slot array. Once `CachedPage` + // sees a slot pointer and initialized offset, it will remain valid + // until `compact()` is called. The `compact()` function also updates + // `CachedPage`. + unsafe { + let slot = self.slots.offset(idx as isize); + let value = slot as *const Value; + + &(*value).value + } + } +} + +impl Default for CachedPage { + fn default() -> CachedPage { + CachedPage { + slots: ptr::null(), + init: 0, + } + } +} + +impl Slots { + /// Maps a slot pointer to an offset within the current page. + /// + /// # Panics + /// + /// panics if the provided slot pointer is not contained by the page. + fn index_for(&self, slot: *const Value) -> usize { + use std::mem; + + let base = &self.slots[0] as *const _ as usize; + + assert!(base != 0, "page is unallocated"); + + let slot = slot as usize; + let width = mem::size_of::>(); + + assert!(slot >= base, "unexpected pointer"); + + let idx = (slot - base) / width; + assert!(idx < self.slots.len() as usize); + + idx + } +} + +impl Slot { + // Generates a `Ref` for the slot. This involves bumping the page's ref count. + fn gen_ref(&self, page: &Arc>) -> Ref { + // The ref holds a ref on the page. + mem::forget(page.clone()); + let slot = self as *const Slot; + let value = slot as *const Value; + + Ref { value } + } +} + +impl Value { + // Release the slot, returning the `Arc>` logically owned by the ref. + fn release(&self) -> Arc> { + // Safety: called by `Ref`, which owns an `Arc>` instance. + let page = unsafe { Arc::from_raw(self.page) }; + page.release(self as *const _); + page + } +} + +impl Address { + fn page(self) -> usize { + // Since every page is twice as large as the previous page, and all page + // sizes are powers of two, we can determine the page index that + // contains a given address by shifting the address down by the smallest + // page size and looking at how many twos places necessary to represent + // that number, telling us what power of two page size it fits inside + // of. We can determine the number of twos places by counting the number + // of leading zeros (unused twos places) in the number's binary + // representation, and subtracting that count from the total number of + // bits in a word. + let slot_shifted = (self.0 + PAGE_INITIAL_SIZE) >> PAGE_INDEX_SHIFT; + (bit::pointer_width() - slot_shifted.leading_zeros()) as usize + } + + pub(crate) const fn as_usize(self) -> usize { + self.0 + } + + pub(crate) fn from_usize(src: usize) -> Address { + Address(src) + } +} + +#[cfg(test)] +mod test { + use super::*; + use std::sync::atomic::AtomicUsize; + use std::sync::atomic::Ordering::SeqCst; + + struct Foo { + cnt: AtomicUsize, + id: AtomicUsize, + } + + impl Default for Foo { + fn default() -> Foo { + Foo { + cnt: AtomicUsize::new(0), + id: AtomicUsize::new(0), + } + } + } + + impl Entry for Foo { + fn reset(&self) { + self.cnt.fetch_add(1, SeqCst); + } + } + + #[test] + fn insert_remove() { + let mut slab = Slab::::new(); + let alloc = slab.allocator(); + + let (addr1, foo1) = alloc.allocate().unwrap(); + foo1.id.store(1, SeqCst); + assert_eq!(0, foo1.cnt.load(SeqCst)); + + let (addr2, foo2) = alloc.allocate().unwrap(); + foo2.id.store(2, SeqCst); + assert_eq!(0, foo2.cnt.load(SeqCst)); + + assert_eq!(1, slab.get(addr1).unwrap().id.load(SeqCst)); + assert_eq!(2, slab.get(addr2).unwrap().id.load(SeqCst)); + + drop(foo1); + + assert_eq!(1, slab.get(addr1).unwrap().id.load(SeqCst)); + + let (addr3, foo3) = alloc.allocate().unwrap(); + assert_eq!(addr3, addr1); + assert_eq!(1, foo3.cnt.load(SeqCst)); + foo3.id.store(3, SeqCst); + assert_eq!(3, slab.get(addr3).unwrap().id.load(SeqCst)); + + drop(foo2); + drop(foo3); + + slab.compact(); + + // The first page is never released + assert!(slab.get(addr1).is_some()); + assert!(slab.get(addr2).is_some()); + assert!(slab.get(addr3).is_some()); + } + + #[test] + fn insert_many() { + let mut slab = Slab::::new(); + let alloc = slab.allocator(); + let mut entries = vec![]; + + for i in 0..10_000 { + let (addr, val) = alloc.allocate().unwrap(); + val.id.store(i, SeqCst); + entries.push((addr, val)); + } + + for (i, (addr, v)) in entries.iter().enumerate() { + assert_eq!(i, v.id.load(SeqCst)); + assert_eq!(i, slab.get(*addr).unwrap().id.load(SeqCst)); + } + + entries.clear(); + + for i in 0..10_000 { + let (addr, val) = alloc.allocate().unwrap(); + val.id.store(10_000 - i, SeqCst); + entries.push((addr, val)); + } + + for (i, (addr, v)) in entries.iter().enumerate() { + assert_eq!(10_000 - i, v.id.load(SeqCst)); + assert_eq!(10_000 - i, slab.get(*addr).unwrap().id.load(SeqCst)); + } + } + + #[test] + fn insert_drop_reverse() { + let mut slab = Slab::::new(); + let alloc = slab.allocator(); + let mut entries = vec![]; + + for i in 0..10_000 { + let (addr, val) = alloc.allocate().unwrap(); + val.id.store(i, SeqCst); + entries.push((addr, val)); + } + + for _ in 0..10 { + // Drop 1000 in reverse + for _ in 0..1_000 { + entries.pop(); + } + + // Check remaining + for (i, (addr, v)) in entries.iter().enumerate() { + assert_eq!(i, v.id.load(SeqCst)); + assert_eq!(i, slab.get(*addr).unwrap().id.load(SeqCst)); + } + } + } + + #[test] + fn no_compaction_if_page_still_in_use() { + let mut slab = Slab::::new(); + let alloc = slab.allocator(); + let mut entries1 = vec![]; + let mut entries2 = vec![]; + + for i in 0..10_000 { + let (addr, val) = alloc.allocate().unwrap(); + val.id.store(i, SeqCst); + + if i % 2 == 0 { + entries1.push((addr, val, i)); + } else { + entries2.push(val); + } + } + + drop(entries2); + + for (addr, _, i) in &entries1 { + assert_eq!(*i, slab.get(*addr).unwrap().id.load(SeqCst)); + } + } + + #[test] + fn compact_all() { + let mut slab = Slab::::new(); + let alloc = slab.allocator(); + let mut entries = vec![]; + + for i in 0..10_000 { + let (addr, val) = alloc.allocate().unwrap(); + val.id.store(i, SeqCst); + + entries.push((addr, val)); + } + + let mut addrs = vec![]; + + for (addr, _) in entries { + addrs.push(addr); + } + + slab.compact(); + + // The first page is never freed + for addr in &addrs[PAGE_INITIAL_SIZE..] { + assert!(slab.get(*addr).is_none()); + } + } +} diff --git a/tokio/src/util/slab/addr.rs b/tokio/src/util/slab/addr.rs deleted file mode 100644 index c14e32e9095..00000000000 --- a/tokio/src/util/slab/addr.rs +++ /dev/null @@ -1,154 +0,0 @@ -//! Tracks the location of an entry in a slab. -//! -//! # Index packing -//! -//! A slab index consists of multiple indices packed into a single `usize` value -//! that correspond to different parts of the slab. -//! -//! The least significant `MAX_PAGES + INITIAL_PAGE_SIZE.trailing_zeros() + 1` -//! bits store the address within a shard, starting at 0 for the first slot on -//! the first page. To index a slot within a shard, we first find the index of -//! the page that the address falls on, and then the offset of the slot within -//! that page. -//! -//! Since every page is twice as large as the previous page, and all page sizes -//! are powers of two, we can determine the page index that contains a given -//! address by shifting the address down by the smallest page size and looking -//! at how many twos places necessary to represent that number, telling us what -//! power of two page size it fits inside of. We can determine the number of -//! twos places by counting the number of leading zeros (unused twos places) in -//! the number's binary representation, and subtracting that count from the -//! total number of bits in a word. -//! -//! Once we know what page contains an address, we can subtract the size of all -//! previous pages from the address to determine the offset within the page. -//! -//! After the page address, the next `MAX_THREADS.trailing_zeros() + 1` least -//! significant bits are the thread ID. These are used to index the array of -//! shards to find which shard a slot belongs to. If an entry is being removed -//! and the thread ID of its index matches that of the current thread, we can -//! use the `remove_local` fast path; otherwise, we have to use the synchronized -//! `remove_remote` path. -//! -//! Finally, a generation value is packed into the index. The `RESERVED_BITS` -//! most significant bits are left unused, and the remaining bits between the -//! last bit of the thread ID and the first reserved bit are used to store the -//! generation. The generation is used as part of an atomic read-modify-write -//! loop every time a `ScheduledIo`'s readiness is modified, or when the -//! resource is removed, to guard against the ABA problem. -//! -//! Visualized: -//! -//! ```text -//! ┌──────────┬───────────────┬──────────────────┬──────────────────────────┐ -//! │ reserved │ generation │ thread ID │ address │ -//! └▲─────────┴▲──────────────┴▲─────────────────┴▲────────────────────────▲┘ -//! │ │ │ │ │ -//! bits(usize) │ bits(MAX_THREADS) │ 0 -//! │ │ -//! bits(usize) - RESERVED MAX_PAGES + bits(INITIAL_PAGE_SIZE) -//! ``` - -use crate::util::bit; -use crate::util::slab::{Generation, INITIAL_PAGE_SIZE, MAX_PAGES, MAX_THREADS}; - -use std::usize; - -/// References the location at which an entry is stored in a slab. -#[derive(Debug, Copy, Clone, Eq, PartialEq)] -pub(crate) struct Address(usize); - -const PAGE_INDEX_SHIFT: u32 = INITIAL_PAGE_SIZE.trailing_zeros() + 1; - -/// Address in the shard -const SLOT: bit::Pack = bit::Pack::least_significant(MAX_PAGES as u32 + PAGE_INDEX_SHIFT); - -/// Masks the thread identifier -const THREAD: bit::Pack = SLOT.then(MAX_THREADS.trailing_zeros() + 1); - -/// Masks the generation -const GENERATION: bit::Pack = THREAD - .then(bit::pointer_width().wrapping_sub(RESERVED.width() + THREAD.width() + SLOT.width())); - -// Chosen arbitrarily -const RESERVED: bit::Pack = bit::Pack::most_significant(5); - -impl Address { - /// Represents no entry, picked to avoid collision with Mio's internals. - /// This value should not be passed to mio. - pub(crate) const NULL: usize = usize::MAX >> 1; - - /// Re-exported by `Generation`. - pub(super) const GENERATION_WIDTH: u32 = GENERATION.width(); - - pub(super) fn new(shard_index: usize, generation: Generation) -> Address { - let mut repr = 0; - - repr = SLOT.pack(shard_index, repr); - repr = GENERATION.pack(generation.to_usize(), repr); - - Address(repr) - } - - /// Convert from a `usize` representation. - pub(crate) fn from_usize(src: usize) -> Address { - assert_ne!(src, Self::NULL); - - Address(src) - } - - /// Convert to a `usize` representation - pub(crate) fn to_usize(self) -> usize { - self.0 - } - - pub(crate) fn generation(self) -> Generation { - Generation::new(GENERATION.unpack(self.0)) - } - - /// Returns the page index - pub(super) fn page(self) -> usize { - // Since every page is twice as large as the previous page, and all page - // sizes are powers of two, we can determine the page index that - // contains a given address by shifting the address down by the smallest - // page size and looking at how many twos places necessary to represent - // that number, telling us what power of two page size it fits inside - // of. We can determine the number of twos places by counting the number - // of leading zeros (unused twos places) in the number's binary - // representation, and subtracting that count from the total number of - // bits in a word. - let slot_shifted = (self.slot() + INITIAL_PAGE_SIZE) >> PAGE_INDEX_SHIFT; - (bit::pointer_width() - slot_shifted.leading_zeros()) as usize - } - - /// Returns the slot index - pub(super) fn slot(self) -> usize { - SLOT.unpack(self.0) - } -} - -#[cfg(test)] -cfg_not_loom! { - use proptest::proptest; - - #[test] - fn test_pack_format() { - assert_eq!(5, RESERVED.width()); - assert_eq!(0b11111, RESERVED.max_value()); - } - - proptest! { - #[test] - fn address_roundtrips( - slot in 0usize..SLOT.max_value(), - generation in 0usize..Generation::MAX, - ) { - let address = Address::new(slot, Generation::new(generation)); - // Round trip - let address = Address::from_usize(address.to_usize()); - - assert_eq!(address.slot(), slot); - assert_eq!(address.generation().to_usize(), generation); - } - } -} diff --git a/tokio/src/util/slab/entry.rs b/tokio/src/util/slab/entry.rs deleted file mode 100644 index 2e0b10b0fdf..00000000000 --- a/tokio/src/util/slab/entry.rs +++ /dev/null @@ -1,7 +0,0 @@ -use crate::util::slab::Generation; - -pub(crate) trait Entry: Default { - fn generation(&self) -> Generation; - - fn reset(&self, generation: Generation) -> bool; -} diff --git a/tokio/src/util/slab/generation.rs b/tokio/src/util/slab/generation.rs deleted file mode 100644 index 4b16b2caf65..00000000000 --- a/tokio/src/util/slab/generation.rs +++ /dev/null @@ -1,32 +0,0 @@ -use crate::util::bit; -use crate::util::slab::Address; - -/// An mutation identifier for a slot in the slab. The generation helps prevent -/// accessing an entry with an outdated token. -#[derive(Copy, Clone, Debug, PartialEq, Eq, Ord, PartialOrd)] -pub(crate) struct Generation(usize); - -impl Generation { - pub(crate) const WIDTH: u32 = Address::GENERATION_WIDTH; - - pub(super) const MAX: usize = bit::mask_for(Address::GENERATION_WIDTH); - - /// Create a new generation - /// - /// # Panics - /// - /// Panics if `value` is greater than max generation. - pub(crate) fn new(value: usize) -> Generation { - assert!(value <= Self::MAX); - Generation(value) - } - - /// Returns the next generation value - pub(crate) fn next(self) -> Generation { - Generation((self.0 + 1) & Self::MAX) - } - - pub(crate) fn to_usize(self) -> usize { - self.0 - } -} diff --git a/tokio/src/util/slab/mod.rs b/tokio/src/util/slab/mod.rs deleted file mode 100644 index 5082970507e..00000000000 --- a/tokio/src/util/slab/mod.rs +++ /dev/null @@ -1,107 +0,0 @@ -//! A lock-free concurrent slab. - -mod addr; -pub(crate) use addr::Address; - -mod entry; -pub(crate) use entry::Entry; - -mod generation; -pub(crate) use generation::Generation; - -mod page; - -mod shard; -use shard::Shard; - -mod slot; -use slot::Slot; - -mod stack; -use stack::TransferStack; - -#[cfg(all(loom, test))] -mod tests; - -use crate::loom::sync::Mutex; -use crate::util::bit; - -use std::fmt; - -#[cfg(target_pointer_width = "64")] -const MAX_THREADS: usize = 4096; - -#[cfg(target_pointer_width = "32")] -const MAX_THREADS: usize = 2048; - -/// Max number of pages per slab -const MAX_PAGES: usize = bit::pointer_width() as usize / 4; - -cfg_not_loom! { - /// Size of first page - const INITIAL_PAGE_SIZE: usize = 32; -} - -cfg_loom! { - const INITIAL_PAGE_SIZE: usize = 2; -} - -/// A sharded slab. -pub(crate) struct Slab { - // Signal shard for now. Eventually there will be more. - shard: Shard, - local: Mutex<()>, -} - -unsafe impl Send for Slab {} -unsafe impl Sync for Slab {} - -impl Slab { - /// Returns a new slab with the default configuration parameters. - pub(crate) fn new() -> Slab { - Slab { - shard: Shard::new(), - local: Mutex::new(()), - } - } - - /// allocs a value into the slab, returning a key that can be used to - /// access it. - /// - /// If this function returns `None`, then the shard for the current thread - /// is full and no items can be added until some are removed, or the maximum - /// number of shards has been reached. - pub(crate) fn alloc(&self) -> Option
{ - // we must lock the slab to alloc an item. - let _local = self.local.lock().unwrap(); - self.shard.alloc() - } - - /// Removes the value associated with the given key from the slab. - pub(crate) fn remove(&self, idx: Address) { - // try to lock the slab so that we can use `remove_local`. - let lock = self.local.try_lock(); - - // if we were able to lock the slab, we are "local" and can use the fast - // path; otherwise, we will use `remove_remote`. - if lock.is_ok() { - self.shard.remove_local(idx) - } else { - self.shard.remove_remote(idx) - } - } - - /// Return a reference to the value associated with the given key. - /// - /// If the slab does not contain a value for the given key, `None` is - /// returned instead. - pub(crate) fn get(&self, token: Address) -> Option<&T> { - self.shard.get(token) - } -} - -impl fmt::Debug for Slab { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Slab").field("shard", &self.shard).finish() - } -} diff --git a/tokio/src/util/slab/page.rs b/tokio/src/util/slab/page.rs deleted file mode 100644 index 0000e934dea..00000000000 --- a/tokio/src/util/slab/page.rs +++ /dev/null @@ -1,187 +0,0 @@ -use crate::loom::cell::UnsafeCell; -use crate::util::slab::{Address, Entry, Slot, TransferStack, INITIAL_PAGE_SIZE}; - -use std::fmt; - -/// Data accessed only by the thread that owns the shard. -pub(crate) struct Local { - head: UnsafeCell, -} - -/// Data accessed by any thread. -pub(crate) struct Shared { - remote: TransferStack, - size: usize, - prev_sz: usize, - slab: UnsafeCell]>>>, -} - -/// Returns the size of the page at index `n` -pub(super) fn size(n: usize) -> usize { - INITIAL_PAGE_SIZE << n -} - -impl Local { - pub(crate) fn new() -> Self { - Self { - head: UnsafeCell::new(0), - } - } - - fn head(&self) -> usize { - self.head.with(|head| unsafe { *head }) - } - - fn set_head(&self, new_head: usize) { - self.head.with_mut(|head| unsafe { - *head = new_head; - }) - } -} - -impl Shared { - pub(crate) fn new(size: usize, prev_sz: usize) -> Shared { - Self { - prev_sz, - size, - remote: TransferStack::new(), - slab: UnsafeCell::new(None), - } - } - - /// Allocates storage for this page if it does not allready exist. - /// - /// This requires unique access to the page (e.g. it is called from the - /// thread that owns the page, or, in the case of `SingleShard`, while the - /// lock is held). In order to indicate this, a reference to the page's - /// `Local` data is taken by this function; the `Local` argument is not - /// actually used, but requiring it ensures that this is only called when - /// local access is held. - #[cold] - fn alloc_page(&self, _: &Local) { - debug_assert!(self.slab.with(|s| unsafe { (*s).is_none() })); - - let mut slab = Vec::with_capacity(self.size); - slab.extend((1..self.size).map(Slot::new)); - slab.push(Slot::new(Address::NULL)); - - self.slab.with_mut(|s| { - // this mut access is safe — it only occurs to initially - // allocate the page, which only happens on this thread; if the - // page has not yet been allocated, other threads will not try - // to access it yet. - unsafe { - *s = Some(slab.into_boxed_slice()); - } - }); - } - - pub(crate) fn alloc(&self, local: &Local) -> Option
{ - let head = local.head(); - - // are there any items on the local free list? (fast path) - let head = if head < self.size { - head - } else { - // if the local free list is empty, pop all the items on the remote - // free list onto the local free list. - self.remote.pop_all()? - }; - - // if the head is still null, both the local and remote free lists are - // empty --- we can't fit any more items on this page. - if head == Address::NULL { - return None; - } - - // do we need to allocate storage for this page? - let page_needs_alloc = self.slab.with(|s| unsafe { (*s).is_none() }); - if page_needs_alloc { - self.alloc_page(local); - } - - let gen = self.slab.with(|slab| { - let slab = unsafe { &*(slab) } - .as_ref() - .expect("page must have been allocated to alloc!"); - - let slot = &slab[head]; - - local.set_head(slot.next()); - slot.generation() - }); - - let index = head + self.prev_sz; - - Some(Address::new(index, gen)) - } - - pub(crate) fn get(&self, addr: Address) -> Option<&T> { - let page_offset = addr.slot() - self.prev_sz; - - self.slab - .with(|slab| unsafe { &*slab }.as_ref()?.get(page_offset)) - .map(|slot| slot.get()) - } - - pub(crate) fn remove_local(&self, local: &Local, addr: Address) { - let offset = addr.slot() - self.prev_sz; - - self.slab.with(|slab| { - let slab = unsafe { &*slab }.as_ref(); - - let slot = if let Some(slot) = slab.and_then(|slab| slab.get(offset)) { - slot - } else { - return; - }; - - if slot.reset(addr.generation()) { - slot.set_next(local.head()); - local.set_head(offset); - } - }) - } - - pub(crate) fn remove_remote(&self, addr: Address) { - let offset = addr.slot() - self.prev_sz; - - self.slab.with(|slab| { - let slab = unsafe { &*slab }.as_ref(); - - let slot = if let Some(slot) = slab.and_then(|slab| slab.get(offset)) { - slot - } else { - return; - }; - - if !slot.reset(addr.generation()) { - return; - } - - self.remote.push(offset, |next| slot.set_next(next)); - }) - } -} - -impl fmt::Debug for Local { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.head.with(|head| { - let head = unsafe { *head }; - f.debug_struct("Local") - .field("head", &format_args!("{:#0x}", head)) - .finish() - }) - } -} - -impl fmt::Debug for Shared { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Shared") - .field("remote", &self.remote) - .field("prev_sz", &self.prev_sz) - .field("size", &self.size) - // .field("slab", &self.slab) - .finish() - } -} diff --git a/tokio/src/util/slab/shard.rs b/tokio/src/util/slab/shard.rs deleted file mode 100644 index eaca6f656a2..00000000000 --- a/tokio/src/util/slab/shard.rs +++ /dev/null @@ -1,105 +0,0 @@ -use crate::util::slab::{page, Address, Entry, MAX_PAGES}; - -use std::fmt; - -// ┌─────────────┐ ┌────────┐ -// │ page 1 │ │ │ -// ├─────────────┤ ┌───▶│ next──┼─┐ -// │ page 2 │ │ ├────────┤ │ -// │ │ │ │XXXXXXXX│ │ -// │ local_free──┼─┘ ├────────┤ │ -// │ global_free─┼─┐ │ │◀┘ -// ├─────────────┤ └───▶│ next──┼─┐ -// │ page 3 │ ├────────┤ │ -// └─────────────┘ │XXXXXXXX│ │ -// ... ├────────┤ │ -// ┌─────────────┐ │XXXXXXXX│ │ -// │ page n │ ├────────┤ │ -// └─────────────┘ │ │◀┘ -// │ next──┼───▶ -// ├────────┤ -// │XXXXXXXX│ -// └────────┘ -// ... -pub(super) struct Shard { - /// The local free list for each page. - /// - /// These are only ever accessed from this shard's thread, so they are - /// stored separately from the shared state for the page that can be - /// accessed concurrently, to minimize false sharing. - local: Box<[page::Local]>, - /// The shared state for each page in this shard. - /// - /// This consists of the page's metadata (size, previous size), remote free - /// list, and a pointer to the actual array backing that page. - shared: Box<[page::Shared]>, -} - -impl Shard { - pub(super) fn new() -> Shard { - let mut total_sz = 0; - let shared = (0..MAX_PAGES) - .map(|page_num| { - let sz = page::size(page_num); - let prev_sz = total_sz; - total_sz += sz; - page::Shared::new(sz, prev_sz) - }) - .collect(); - - let local = (0..MAX_PAGES).map(|_| page::Local::new()).collect(); - - Shard { local, shared } - } - - pub(super) fn alloc(&self) -> Option
{ - // Can we fit the value into an existing page? - for (page_idx, page) in self.shared.iter().enumerate() { - let local = self.local(page_idx); - - if let Some(page_offset) = page.alloc(local) { - return Some(page_offset); - } - } - - None - } - - pub(super) fn get(&self, addr: Address) -> Option<&T> { - let page_idx = addr.page(); - - if page_idx > self.shared.len() { - return None; - } - - self.shared[page_idx].get(addr) - } - - /// Remove an item on the shard's local thread. - pub(super) fn remove_local(&self, addr: Address) { - let page_idx = addr.page(); - - if let Some(page) = self.shared.get(page_idx) { - page.remove_local(self.local(page_idx), addr); - } - } - - /// Remove an item, while on a different thread from the shard's local thread. - pub(super) fn remove_remote(&self, addr: Address) { - if let Some(page) = self.shared.get(addr.page()) { - page.remove_remote(addr); - } - } - - fn local(&self, i: usize) -> &page::Local { - &self.local[i] - } -} - -impl fmt::Debug for Shard { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Shard") - .field("shared", &self.shared) - .finish() - } -} diff --git a/tokio/src/util/slab/slot.rs b/tokio/src/util/slab/slot.rs deleted file mode 100644 index 0608b261899..00000000000 --- a/tokio/src/util/slab/slot.rs +++ /dev/null @@ -1,42 +0,0 @@ -use crate::loom::cell::UnsafeCell; -use crate::util::slab::{Entry, Generation}; - -/// Stores an entry in the slab. -pub(super) struct Slot { - next: UnsafeCell, - entry: T, -} - -impl Slot { - /// Initialize a new `Slot` linked to `next`. - /// - /// The entry is initialized to a default value. - pub(super) fn new(next: usize) -> Slot { - Slot { - next: UnsafeCell::new(next), - entry: T::default(), - } - } - - pub(super) fn get(&self) -> &T { - &self.entry - } - - pub(super) fn generation(&self) -> Generation { - self.entry.generation() - } - - pub(super) fn reset(&self, generation: Generation) -> bool { - self.entry.reset(generation) - } - - pub(super) fn next(&self) -> usize { - self.next.with(|next| unsafe { *next }) - } - - pub(super) fn set_next(&self, next: usize) { - self.next.with_mut(|n| unsafe { - (*n) = next; - }) - } -} diff --git a/tokio/src/util/slab/stack.rs b/tokio/src/util/slab/stack.rs deleted file mode 100644 index 0ae0d71006b..00000000000 --- a/tokio/src/util/slab/stack.rs +++ /dev/null @@ -1,58 +0,0 @@ -use crate::loom::sync::atomic::AtomicUsize; -use crate::util::slab::Address; - -use std::fmt; -use std::sync::atomic::Ordering; -use std::usize; - -pub(super) struct TransferStack { - head: AtomicUsize, -} - -impl TransferStack { - pub(super) fn new() -> Self { - Self { - head: AtomicUsize::new(Address::NULL), - } - } - - pub(super) fn pop_all(&self) -> Option { - let val = self.head.swap(Address::NULL, Ordering::Acquire); - - if val == Address::NULL { - None - } else { - Some(val) - } - } - - pub(super) fn push(&self, value: usize, before: impl Fn(usize)) { - let mut next = self.head.load(Ordering::Relaxed); - - loop { - before(next); - - match self - .head - .compare_exchange(next, value, Ordering::AcqRel, Ordering::Acquire) - { - // lost the race! - Err(actual) => next = actual, - Ok(_) => return, - } - } - } -} - -impl fmt::Debug for TransferStack { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - // Loom likes to dump all its internal state in `fmt::Debug` impls, so - // we override this to just print the current value in tests. - f.debug_struct("TransferStack") - .field( - "head", - &format_args!("{:#x}", self.head.load(Ordering::Relaxed)), - ) - .finish() - } -} diff --git a/tokio/src/util/slab/tests/loom_slab.rs b/tokio/src/util/slab/tests/loom_slab.rs deleted file mode 100644 index 48e94f00341..00000000000 --- a/tokio/src/util/slab/tests/loom_slab.rs +++ /dev/null @@ -1,327 +0,0 @@ -use crate::io::driver::ScheduledIo; -use crate::util::slab::{Address, Slab}; - -use loom::sync::{Arc, Condvar, Mutex}; -use loom::thread; - -#[test] -fn local_remove() { - loom::model(|| { - let slab = Arc::new(Slab::new()); - - let s = slab.clone(); - let t1 = thread::spawn(move || { - let idx = store_val(&s, 1); - assert_eq!(get_val(&s, idx), Some(1)); - s.remove(idx); - assert_eq!(get_val(&s, idx), None); - let idx = store_val(&s, 2); - assert_eq!(get_val(&s, idx), Some(2)); - s.remove(idx); - assert_eq!(get_val(&s, idx), None); - }); - - let s = slab.clone(); - let t2 = thread::spawn(move || { - let idx = store_val(&s, 3); - assert_eq!(get_val(&s, idx), Some(3)); - s.remove(idx); - assert_eq!(get_val(&s, idx), None); - let idx = store_val(&s, 4); - s.remove(idx); - assert_eq!(get_val(&s, idx), None); - }); - - let s = slab; - let idx1 = store_val(&s, 5); - assert_eq!(get_val(&s, idx1), Some(5)); - let idx2 = store_val(&s, 6); - assert_eq!(get_val(&s, idx2), Some(6)); - s.remove(idx1); - assert_eq!(get_val(&s, idx1), None); - assert_eq!(get_val(&s, idx2), Some(6)); - s.remove(idx2); - assert_eq!(get_val(&s, idx2), None); - - t1.join().expect("thread 1 should not panic"); - t2.join().expect("thread 2 should not panic"); - }); -} - -#[test] -fn remove_remote() { - loom::model(|| { - let slab = Arc::new(Slab::new()); - - let idx1 = store_val(&slab, 1); - assert_eq!(get_val(&slab, idx1), Some(1)); - - let idx2 = store_val(&slab, 2); - assert_eq!(get_val(&slab, idx2), Some(2)); - - let idx3 = store_val(&slab, 3); - assert_eq!(get_val(&slab, idx3), Some(3)); - - let s = slab.clone(); - let t1 = thread::spawn(move || { - assert_eq!(get_val(&s, idx2), Some(2)); - s.remove(idx2); - assert_eq!(get_val(&s, idx2), None); - }); - - let s = slab.clone(); - let t2 = thread::spawn(move || { - assert_eq!(get_val(&s, idx3), Some(3)); - s.remove(idx3); - assert_eq!(get_val(&s, idx3), None); - }); - - t1.join().expect("thread 1 should not panic"); - t2.join().expect("thread 2 should not panic"); - - assert_eq!(get_val(&slab, idx1), Some(1)); - assert_eq!(get_val(&slab, idx2), None); - assert_eq!(get_val(&slab, idx3), None); - }); -} - -#[test] -fn remove_remote_and_reuse() { - loom::model(|| { - let slab = Arc::new(Slab::new()); - - let idx1 = store_val(&slab, 1); - let idx2 = store_val(&slab, 2); - - assert_eq!(get_val(&slab, idx1), Some(1)); - assert_eq!(get_val(&slab, idx2), Some(2)); - - let s = slab.clone(); - let t1 = thread::spawn(move || { - s.remove(idx1); - let value = get_val(&s, idx1); - - // We may or may not see the new value yet, depending on when - // this occurs, but we must either see the new value or `None`; - // the old value has been removed! - assert!(value == None || value == Some(3)); - }); - - let idx3 = store_when_free(&slab, 3); - t1.join().expect("thread 1 should not panic"); - - assert_eq!(get_val(&slab, idx3), Some(3)); - assert_eq!(get_val(&slab, idx2), Some(2)); - }); -} - -#[test] -fn concurrent_alloc_remove() { - loom::model(|| { - let slab = Arc::new(Slab::new()); - let pair = Arc::new((Mutex::new(None), Condvar::new())); - - let slab2 = slab.clone(); - let pair2 = pair.clone(); - let remover = thread::spawn(move || { - let (lock, cvar) = &*pair2; - for _ in 0..2 { - let mut next = lock.lock().unwrap(); - while next.is_none() { - next = cvar.wait(next).unwrap(); - } - let key = next.take().unwrap(); - slab2.remove(key); - assert_eq!(get_val(&slab2, key), None); - cvar.notify_one(); - } - }); - - let (lock, cvar) = &*pair; - for i in 0..2 { - let key = store_val(&slab, i); - - let mut next = lock.lock().unwrap(); - *next = Some(key); - cvar.notify_one(); - - // Wait for the item to be removed. - while next.is_some() { - next = cvar.wait(next).unwrap(); - } - - assert_eq!(get_val(&slab, key), None); - } - - remover.join().unwrap(); - }) -} - -#[test] -fn concurrent_remove_remote_and_reuse() { - loom::model(|| { - let slab = Arc::new(Slab::new()); - - let idx1 = store_val(&slab, 1); - let idx2 = store_val(&slab, 2); - - assert_eq!(get_val(&slab, idx1), Some(1)); - assert_eq!(get_val(&slab, idx2), Some(2)); - - let s = slab.clone(); - let s2 = slab.clone(); - let t1 = thread::spawn(move || { - s.remove(idx1); - }); - - let t2 = thread::spawn(move || { - s2.remove(idx2); - }); - - let idx3 = store_when_free(&slab, 3); - t1.join().expect("thread 1 should not panic"); - t2.join().expect("thread 1 should not panic"); - - assert!(get_val(&slab, idx1).is_none()); - assert!(get_val(&slab, idx2).is_none()); - assert_eq!(get_val(&slab, idx3), Some(3)); - }); -} - -#[test] -fn alloc_remove_get() { - loom::model(|| { - let slab = Arc::new(Slab::new()); - let pair = Arc::new((Mutex::new(None), Condvar::new())); - - let slab2 = slab.clone(); - let pair2 = pair.clone(); - let t1 = thread::spawn(move || { - let slab = slab2; - let (lock, cvar) = &*pair2; - // allocate one entry just so that we have to use the final one for - // all future allocations. - let _key0 = store_val(&slab, 0); - let key = store_val(&slab, 1); - - let mut next = lock.lock().unwrap(); - *next = Some(key); - cvar.notify_one(); - // remove the second entry - slab.remove(key); - // store a new readiness at the same location (since the slab - // already has an entry in slot 0) - store_val(&slab, 2); - }); - - let (lock, cvar) = &*pair; - // wait for the second entry to be stored... - let mut next = lock.lock().unwrap(); - while next.is_none() { - next = cvar.wait(next).unwrap(); - } - let key = next.unwrap(); - - // our generation will be stale when the second store occurs at that - // index, we must not see the value of that store. - let val = get_val(&slab, key); - assert_ne!(val, Some(2), "generation must have advanced!"); - - t1.join().unwrap(); - }) -} - -#[test] -fn alloc_remove_set() { - loom::model(|| { - let slab = Arc::new(Slab::new()); - let pair = Arc::new((Mutex::new(None), Condvar::new())); - - let slab2 = slab.clone(); - let pair2 = pair.clone(); - let t1 = thread::spawn(move || { - let slab = slab2; - let (lock, cvar) = &*pair2; - // allocate one entry just so that we have to use the final one for - // all future allocations. - let _key0 = store_val(&slab, 0); - let key = store_val(&slab, 1); - - let mut next = lock.lock().unwrap(); - *next = Some(key); - cvar.notify_one(); - - slab.remove(key); - // remove the old entry and insert a new one, with a new generation. - let key2 = slab.alloc().expect("store key 2"); - // after the remove, we must not see the value written with the - // stale index. - assert_eq!( - get_val(&slab, key), - None, - "stale set must no longer be visible" - ); - assert_eq!(get_val(&slab, key2), Some(0)); - key2 - }); - - let (lock, cvar) = &*pair; - - // wait for the second entry to be stored. the index we get from the - // other thread may become stale after a write. - let mut next = lock.lock().unwrap(); - while next.is_none() { - next = cvar.wait(next).unwrap(); - } - let key = next.unwrap(); - - // try to write to the index with our generation - slab.get(key).map(|val| val.set_readiness(key, |_| 2)); - - let key2 = t1.join().unwrap(); - // after the remove, we must not see the value written with the - // stale index either. - assert_eq!( - get_val(&slab, key), - None, - "stale set must no longer be visible" - ); - assert_eq!(get_val(&slab, key2), Some(0)); - }); -} - -fn get_val(slab: &Arc>, address: Address) -> Option { - slab.get(address).and_then(|s| s.get_readiness(address)) -} - -fn store_val(slab: &Arc>, readiness: usize) -> Address { - let key = slab.alloc().expect("allocate slot"); - - if let Some(slot) = slab.get(key) { - slot.set_readiness(key, |_| readiness) - .expect("generation should still be valid!"); - } else { - panic!("slab did not contain a value for {:?}", key); - } - - key -} - -fn store_when_free(slab: &Arc>, readiness: usize) -> Address { - let key = loop { - if let Some(key) = slab.alloc() { - break key; - } - - thread::yield_now(); - }; - - if let Some(slot) = slab.get(key) { - slot.set_readiness(key, |_| readiness) - .expect("generation should still be valid!"); - } else { - panic!("slab did not contain a value for {:?}", key); - } - - key -} diff --git a/tokio/src/util/slab/tests/loom_stack.rs b/tokio/src/util/slab/tests/loom_stack.rs deleted file mode 100644 index 47ad46d3a1c..00000000000 --- a/tokio/src/util/slab/tests/loom_stack.rs +++ /dev/null @@ -1,88 +0,0 @@ -use crate::util::slab::TransferStack; - -use loom::cell::UnsafeCell; -use loom::sync::Arc; -use loom::thread; - -#[test] -fn transfer_stack() { - loom::model(|| { - let causalities = [UnsafeCell::new(None), UnsafeCell::new(None)]; - let shared = Arc::new((causalities, TransferStack::new())); - let shared1 = shared.clone(); - let shared2 = shared.clone(); - - // Spawn two threads that both try to push to the stack. - let t1 = thread::spawn(move || { - let (causalities, stack) = &*shared1; - stack.push(0, |prev| { - causalities[0].with_mut(|c| unsafe { - *c = Some(prev); - }); - }); - }); - - let t2 = thread::spawn(move || { - let (causalities, stack) = &*shared2; - stack.push(1, |prev| { - causalities[1].with_mut(|c| unsafe { - *c = Some(prev); - }); - }); - }); - - let (causalities, stack) = &*shared; - - // Try to pop from the stack... - let mut idx = stack.pop_all(); - while idx == None { - idx = stack.pop_all(); - thread::yield_now(); - } - let idx = idx.unwrap(); - - let saw_both = causalities[idx].with(|val| { - let val = unsafe { *val }; - assert!( - val.is_some(), - "UnsafeCell write must happen-before index is pushed to the stack!", - ); - // were there two entries in the stack? if so, check that - // both saw a write. - if let Some(c) = causalities.get(val.unwrap()) { - c.with(|val| { - let val = unsafe { *val }; - assert!( - val.is_some(), - "UnsafeCell write must happen-before index is pushed to the stack!", - ); - }); - true - } else { - false - } - }); - - // We only saw one push. Ensure that the other push happens too. - if !saw_both { - // Try to pop from the stack... - let mut idx = stack.pop_all(); - while idx == None { - idx = stack.pop_all(); - thread::yield_now(); - } - let idx = idx.unwrap(); - - causalities[idx].with(|val| { - let val = unsafe { *val }; - assert!( - val.is_some(), - "UnsafeCell write must happen-before index is pushed to the stack!", - ); - }); - } - - t1.join().unwrap(); - t2.join().unwrap(); - }); -} diff --git a/tokio/src/util/slab/tests/mod.rs b/tokio/src/util/slab/tests/mod.rs deleted file mode 100644 index 7f793544667..00000000000 --- a/tokio/src/util/slab/tests/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -mod loom_slab; -mod loom_stack; From 8a9b88b64af68f4b09c3be3f7d7080be0b3ccf95 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Mon, 10 Aug 2020 16:10:23 -0700 Subject: [PATCH 02/15] fmt --- tokio/src/util/slab.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/util/slab.rs b/tokio/src/util/slab.rs index 4c40d27c0d4..df86e89c314 100644 --- a/tokio/src/util/slab.rs +++ b/tokio/src/util/slab.rs @@ -1,6 +1,6 @@ use crate::loom::cell::UnsafeCell; -use crate::loom::sync::{Arc, Mutex}; use crate::loom::sync::atomic::AtomicUsize; +use crate::loom::sync::{Arc, Mutex}; use crate::util::bit; use std::fmt; use std::mem; From 7d3063bc6ae6fbbe2f1b26c3aba58db4c3601f8c Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Mon, 10 Aug 2020 21:02:07 -0700 Subject: [PATCH 03/15] clippy --- tokio/src/util/slab.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/util/slab.rs b/tokio/src/util/slab.rs index df86e89c314..751a56aee0f 100644 --- a/tokio/src/util/slab.rs +++ b/tokio/src/util/slab.rs @@ -483,7 +483,7 @@ impl CachedPage { // until `compact()` is called. The `compact()` function also updates // `CachedPage`. unsafe { - let slot = self.slots.offset(idx as isize); + let slot = self.slots.add(idx); let value = slot as *const Value; &(*value).value From 7bb3ec3d992215e3a13f0e7994d917614b4077d6 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Mon, 10 Aug 2020 21:20:49 -0700 Subject: [PATCH 04/15] get compiling with loom --- tokio/src/io/driver/mod.rs | 97 ----------------------------- tokio/src/io/driver/scheduled_io.rs | 20 ------ tokio/src/util/slab.rs | 25 ++++---- 3 files changed, 14 insertions(+), 128 deletions(-) diff --git a/tokio/src/io/driver/mod.rs b/tokio/src/io/driver/mod.rs index 5d7b7bf46f4..d8c253abb33 100644 --- a/tokio/src/io/driver/mod.rs +++ b/tokio/src/io/driver/mod.rs @@ -333,100 +333,3 @@ impl Direction { } } } - -#[cfg(all(test, loom))] -mod tests { - use super::*; - use loom::thread; - - // No-op `Evented` impl just so we can have something to pass to `add_source`. - struct NotEvented; - - impl Evented for NotEvented { - fn register( - &self, - _: &mio::Poll, - _: mio::Token, - _: mio::Ready, - _: mio::PollOpt, - ) -> io::Result<()> { - Ok(()) - } - - fn reregister( - &self, - _: &mio::Poll, - _: mio::Token, - _: mio::Ready, - _: mio::PollOpt, - ) -> io::Result<()> { - Ok(()) - } - - fn deregister(&self, _: &mio::Poll) -> io::Result<()> { - Ok(()) - } - } - - #[test] - fn tokens_unique_when_dropped() { - loom::model(|| { - let reactor = Driver::new().unwrap(); - let inner = reactor.inner; - let inner2 = inner.clone(); - - let token_1 = inner.add_source(&NotEvented, mio::Ready::all()).unwrap(); - let thread = thread::spawn(move || { - inner2.drop_source(token_1); - }); - - let token_2 = inner.add_source(&NotEvented, mio::Ready::all()).unwrap(); - thread.join().unwrap(); - - assert!(token_1 != token_2); - }) - } - - #[test] - fn tokens_unique_when_dropped_on_full_page() { - loom::model(|| { - let reactor = Driver::new().unwrap(); - let inner = reactor.inner; - let inner2 = inner.clone(); - // add sources to fill up the first page so that the dropped index - // may be reused. - for _ in 0..31 { - inner.add_source(&NotEvented, mio::Ready::all()).unwrap(); - } - - let token_1 = inner.add_source(&NotEvented, mio::Ready::all()).unwrap(); - let thread = thread::spawn(move || { - inner2.drop_source(token_1); - }); - - let token_2 = inner.add_source(&NotEvented, mio::Ready::all()).unwrap(); - thread.join().unwrap(); - - assert!(token_1 != token_2); - }) - } - - #[test] - fn tokens_unique_concurrent_add() { - loom::model(|| { - let reactor = Driver::new().unwrap(); - let inner = reactor.inner; - let inner2 = inner.clone(); - - let thread = thread::spawn(move || { - let token_2 = inner2.add_source(&NotEvented, mio::Ready::all()).unwrap(); - token_2 - }); - - let token_1 = inner.add_source(&NotEvented, mio::Ready::all()).unwrap(); - let token_2 = thread.join().unwrap(); - - assert!(token_1 != token_2); - }) - } -} diff --git a/tokio/src/io/driver/scheduled_io.rs b/tokio/src/io/driver/scheduled_io.rs index a4f775c24c2..566f7daf0d1 100644 --- a/tokio/src/io/driver/scheduled_io.rs +++ b/tokio/src/io/driver/scheduled_io.rs @@ -43,26 +43,6 @@ impl ScheduledIo { super::GENERATION.unpack(self.readiness.load(Acquire)) } - #[cfg(all(test, loom))] - /// Returns the current readiness value of this `ScheduledIo`, if the - /// provided `token` is still a valid access. - /// - /// # Returns - /// - /// If the given token's generation no longer matches the `ScheduledIo`'s - /// generation, then the corresponding IO resource has been removed and - /// replaced with a new resource. In that case, this method returns `None`. - /// Otherwise, this returns the current readiness. - pub(crate) fn get_readiness(&self, address: Address) -> Option { - let ready = self.readiness.load(Acquire); - - if unpack_generation(ready) != address.generation() { - return None; - } - - Some(ready & !PACK.mask()) - } - /// Sets the readiness on this `ScheduledIo` by invoking the given closure on /// the current value, returning the previous readiness value. /// diff --git a/tokio/src/util/slab.rs b/tokio/src/util/slab.rs index 751a56aee0f..d76176cc1a6 100644 --- a/tokio/src/util/slab.rs +++ b/tokio/src/util/slab.rs @@ -312,7 +312,7 @@ impl Allocator { pub(crate) fn allocate(&self) -> Option<(Address, Ref)> { // Find the first available slot. for page in &self.pages[..] { - if let Some((addr, val)) = page.allocate() { + if let Some((addr, val)) = Page::allocate(page) { return Some((addr, val)); } } @@ -353,14 +353,17 @@ impl fmt::Debug for Ref { impl Page { // Allocates an object, returns the ref and address. - fn allocate(self: &Arc>) -> Option<(Address, Ref)> { + // + // `self: &Arc>` is avoided here as this would not work with the + // loom `Arc`. + fn allocate(me: &Arc>) -> Option<(Address, Ref)> { // Before acquiring the lock, use the `used` hint. - if self.used.load(Acquire) == self.len { + if me.used.load(Acquire) == me.len { return None; } // Allocating objects requires synchronization - let mut locked = self.slots.lock().unwrap(); + let mut locked = me.slots.lock().unwrap(); if locked.head < locked.slots.len() { // Re-use an already initialized slot. @@ -378,14 +381,14 @@ impl Page { // Increment the number of used slots locked.used += 1; - self.used.store(locked.used, Release); + me.used.store(locked.used, Release); // Reset the slot slot.value.with(|ptr| unsafe { (*ptr).value.reset() }); // Return a reference to the slot - Some((self.addr(idx), slot.gen_ref(self))) - } else if self.len == locked.slots.len() { + Some((me.addr(idx), slot.gen_ref(me))) + } else if me.len == locked.slots.len() { // The page is full None } else { @@ -396,14 +399,14 @@ impl Page { if idx == 0 { // The page has not yet been allocated. Allocate the storage for // all page slots. - locked.slots.reserve_exact(self.len); + locked.slots.reserve_exact(me.len); } // Initialize a new slot locked.slots.push(Slot { value: UnsafeCell::new(Value { value: Default::default(), - page: self.as_ref() as *const _, + page: &**me as *const _, }), next: 0, }); @@ -413,11 +416,11 @@ impl Page { // Increment the number of used slots locked.used += 1; - self.used.store(locked.used, Release); + me.used.store(locked.used, Release); debug_assert_eq!(locked.slots.len(), locked.head); - Some((self.addr(idx), locked.slots[idx].gen_ref(self))) + Some((me.addr(idx), locked.slots[idx].gen_ref(me))) } } } From 13a7fd5d3e22fc4e64dbed6ae6be0bf28bc94865 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Tue, 11 Aug 2020 13:00:05 -0700 Subject: [PATCH 05/15] remove old, inaccurate safety comment --- tokio/src/util/slab.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/tokio/src/util/slab.rs b/tokio/src/util/slab.rs index d76176cc1a6..a12bab44cf5 100644 --- a/tokio/src/util/slab.rs +++ b/tokio/src/util/slab.rs @@ -149,9 +149,6 @@ struct Slot { value: UnsafeCell>, /// Next entry in the free list. - /// - /// Safety: Only written when pushing the slot into the free list and read - /// when removing the slot from the free list. next: u32, } From 439262c8cd494f84843115cc2c5a3ad1fc53c82d Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Tue, 11 Aug 2020 13:07:29 -0700 Subject: [PATCH 06/15] add note on `index_for --- tokio/src/util/slab.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tokio/src/util/slab.rs b/tokio/src/util/slab.rs index a12bab44cf5..5a6c749fecb 100644 --- a/tokio/src/util/slab.rs +++ b/tokio/src/util/slab.rs @@ -503,6 +503,9 @@ impl Default for CachedPage { impl Slots { /// Maps a slot pointer to an offset within the current page. /// + /// The pointer math removes the `usize` index from the `Ref` struct, + /// shrinking the struct to a single pointer size. + /// /// # Panics /// /// panics if the provided slot pointer is not contained by the page. From 6604eae94e5f604a7efd74bb51c9ebd3a0f91aed Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Tue, 11 Aug 2020 13:32:53 -0700 Subject: [PATCH 07/15] disable slab unit tests when in loom mode --- tokio/src/util/slab.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/util/slab.rs b/tokio/src/util/slab.rs index 5a6c749fecb..0586186e0e9 100644 --- a/tokio/src/util/slab.rs +++ b/tokio/src/util/slab.rs @@ -574,7 +574,7 @@ impl Address { } } -#[cfg(test)] +#[cfg(all(test, not(loom)))] mod test { use super::*; use std::sync::atomic::AtomicUsize; From ed6c9db2073b54b67c8fb4607718c47102db8c88 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Tue, 11 Aug 2020 13:48:11 -0700 Subject: [PATCH 08/15] Apply suggestions from code review Co-authored-by: Alice Ryhl --- tokio/src/util/slab.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tokio/src/util/slab.rs b/tokio/src/util/slab.rs index 0586186e0e9..d1db6deaeda 100644 --- a/tokio/src/util/slab.rs +++ b/tokio/src/util/slab.rs @@ -51,7 +51,7 @@ use std::sync::atomic::Ordering::{Acquire, Release}; /// The `Slab` structure is able to provide (mostly) unsynchronized reads to /// values stored in the slab. Insertions and removals are synchronized. Reading /// objects via `Ref` is fully unsynchronized. Indexing objects uses amortized -/// synhronization. +/// synchronization. /// pub(crate) struct Slab { /// Array of pages. Each page is synchronized. @@ -281,7 +281,7 @@ impl Slab { let vec = mem::replace(&mut slots.slots, vec![]); slots.head = 0; - // Drop the vec outside of the lock + // Drop the lock so we can drop the vector outside the lock below. drop(slots); // Clear cache From 5ba90abfc3a1bdd4d38eae1c675cddde7e00d88d Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Tue, 11 Aug 2020 13:53:23 -0700 Subject: [PATCH 09/15] expand on safety comment --- tokio/src/util/slab.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tokio/src/util/slab.rs b/tokio/src/util/slab.rs index d1db6deaeda..65a9701b3a4 100644 --- a/tokio/src/util/slab.rs +++ b/tokio/src/util/slab.rs @@ -229,6 +229,9 @@ impl Slab { // is bound to `&self`. The only way to invalidate the underlying memory // is to call `compact()`. The lifetimes prevent calling `compact()` // while references to values are outstanding. + // + // The referenced data is never mutated. Only `&self` references are + // used and the data is `Sync`. Some(self.cached[page_idx].get(slot_idx)) } From f93acbaa1fc17b467d41a899f7eccb112a5460e5 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Tue, 11 Aug 2020 14:02:22 -0700 Subject: [PATCH 10/15] expand another comment --- tokio/src/util/slab.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tokio/src/util/slab.rs b/tokio/src/util/slab.rs index 65a9701b3a4..375861622cd 100644 --- a/tokio/src/util/slab.rs +++ b/tokio/src/util/slab.rs @@ -507,7 +507,9 @@ impl Slots { /// Maps a slot pointer to an offset within the current page. /// /// The pointer math removes the `usize` index from the `Ref` struct, - /// shrinking the struct to a single pointer size. + /// shrinking the struct to a single pointer size. The contents of the + /// function is safe, the resulting `usize` is bounds checked before being + /// used. /// /// # Panics /// From 1b3f51809ae79a2d9880789ff6cc3feba5c34466 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Tue, 11 Aug 2020 14:33:58 -0700 Subject: [PATCH 11/15] avoid locking in compact if possible --- tokio/src/util/slab.rs | 55 +++++++++++++++++++++++++++--------------- 1 file changed, 36 insertions(+), 19 deletions(-) diff --git a/tokio/src/util/slab.rs b/tokio/src/util/slab.rs index 375861622cd..23b30fb1452 100644 --- a/tokio/src/util/slab.rs +++ b/tokio/src/util/slab.rs @@ -1,12 +1,12 @@ use crate::loom::cell::UnsafeCell; -use crate::loom::sync::atomic::AtomicUsize; +use crate::loom::sync::atomic::{AtomicBool, AtomicUsize}; use crate::loom::sync::{Arc, Mutex}; use crate::util::bit; use std::fmt; use std::mem; use std::ops; use std::ptr; -use std::sync::atomic::Ordering::{Acquire, Release}; +use std::sync::atomic::Ordering::Relaxed; /// Amortized allocation for homogeneous data types. /// @@ -108,6 +108,9 @@ struct Page { // date and should only be used as a hint. used: AtomicUsize, + // Set to `true` when the page has been allocated. + allocated: AtomicBool, + // The number of slots the page can hold. len: usize, @@ -266,6 +269,12 @@ impl Slab { // Iterate each page except the very first one. The very first page is // never freed. for (idx, page) in (&self.pages[1..]).iter().enumerate() { + if page.used.load(Relaxed) != 0 || !page.allocated.load(Relaxed) { + // If the page has slots in use or the memory has not been + // allocated then it cannot be compacted. + continue; + } + let mut slots = match page.slots.try_lock() { Ok(slots) => slots, // If the lock cannot be acquired due to being held by another @@ -279,6 +288,8 @@ impl Slab { continue; } + page.allocated.store(false, Relaxed); + // Remove the slots vector from the page. This is done so that the // freeing process is done outside of the lock's critical section. let vec = mem::replace(&mut slots.slots, vec![]); @@ -358,7 +369,7 @@ impl Page { // loom `Arc`. fn allocate(me: &Arc>) -> Option<(Address, Ref)> { // Before acquiring the lock, use the `used` hint. - if me.used.load(Acquire) == me.len { + if me.used.load(Relaxed) == me.len { return None; } @@ -381,7 +392,7 @@ impl Page { // Increment the number of used slots locked.used += 1; - me.used.store(locked.used, Release); + me.used.store(locked.used, Relaxed); // Reset the slot slot.value.with(|ptr| unsafe { (*ptr).value.reset() }); @@ -416,7 +427,8 @@ impl Page { // Increment the number of used slots locked.used += 1; - me.used.store(locked.used, Release); + me.used.store(locked.used, Relaxed); + me.allocated.store(true, Relaxed); debug_assert_eq!(locked.slots.len(), locked.head); @@ -442,6 +454,7 @@ impl Default for Page { fn default() -> Page { Page { used: AtomicUsize::new(0), + allocated: AtomicBool::new(false), slots: Mutex::new(Slots { slots: Vec::new(), head: 0, @@ -463,7 +476,7 @@ impl Page { locked.head = idx; locked.used -= 1; - self.used.store(locked.used, Release); + self.used.store(locked.used, Relaxed); } } @@ -730,24 +743,28 @@ mod test { let alloc = slab.allocator(); let mut entries = vec![]; - for i in 0..10_000 { - let (addr, val) = alloc.allocate().unwrap(); - val.id.store(i, SeqCst); + for _ in 0..2 { + entries.clear(); - entries.push((addr, val)); - } + for i in 0..10_000 { + let (addr, val) = alloc.allocate().unwrap(); + val.id.store(i, SeqCst); - let mut addrs = vec![]; + entries.push((addr, val)); + } - for (addr, _) in entries { - addrs.push(addr); - } + let mut addrs = vec![]; - slab.compact(); + for (addr, _) in entries.drain(..) { + addrs.push(addr); + } - // The first page is never freed - for addr in &addrs[PAGE_INITIAL_SIZE..] { - assert!(slab.get(*addr).is_none()); + slab.compact(); + + // The first page is never freed + for addr in &addrs[PAGE_INITIAL_SIZE..] { + assert!(slab.get(*addr).is_none()); + } } } } From c237fbc7be3d158d8ed560da2a4c0bba6a765452 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Tue, 11 Aug 2020 15:03:02 -0700 Subject: [PATCH 12/15] expand on forget(Arc) comment --- tokio/src/util/slab.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tokio/src/util/slab.rs b/tokio/src/util/slab.rs index 23b30fb1452..eb59f9b8c1d 100644 --- a/tokio/src/util/slab.rs +++ b/tokio/src/util/slab.rs @@ -547,9 +547,12 @@ impl Slots { } impl Slot { - // Generates a `Ref` for the slot. This involves bumping the page's ref count. + /// Generates a `Ref` for the slot. This involves bumping the page's ref count. fn gen_ref(&self, page: &Arc>) -> Ref { - // The ref holds a ref on the page. + // The ref holds a ref on the page. The `Arc` is forgotten here and is + // resurrected in `release` when the `Ref` is dropped. By avoiding to + // hold on to an explicit `Arc` value, the struct size of `Ref` is + // reduced. mem::forget(page.clone()); let slot = self as *const Slot; let value = slot as *const Value; From 6a9fd9b76b04c76c936b470eb766b5d77824e202 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Tue, 11 Aug 2020 15:57:02 -0700 Subject: [PATCH 13/15] improve debug output --- tokio/src/util/slab.rs | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/tokio/src/util/slab.rs b/tokio/src/util/slab.rs index eb59f9b8c1d..cb7fd5e9a4b 100644 --- a/tokio/src/util/slab.rs +++ b/tokio/src/util/slab.rs @@ -309,7 +309,7 @@ impl Slab { impl fmt::Debug for Slab { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("Slab").finish() + debug(fmt, "Slab", &self.pages[..]) } } @@ -334,7 +334,7 @@ impl Allocator { impl fmt::Debug for Allocator { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("slab::Allocator").finish() + debug(fmt, "slab::Allocator", &self.pages[..]) } } @@ -595,6 +595,23 @@ impl Address { } } +fn debug(fmt: &mut fmt::Formatter<'_>, name: &str, pages: &[Arc>]) -> fmt::Result { + let mut capacity = 0; + let mut len = 0; + + for page in pages { + if page.allocated.load(Relaxed) { + capacity += page.len; + len += page.used.load(Relaxed); + } + } + + fmt.debug_struct(name) + .field("len", &len) + .field("capacity", &capacity) + .finish() +} + #[cfg(all(test, not(loom)))] mod test { use super::*; From cfbfdfac4cef5598467a35474c2125ba0e05e5f9 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Tue, 11 Aug 2020 21:29:07 -0700 Subject: [PATCH 14/15] document pack_lossy --- tokio/src/util/bit.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tokio/src/util/bit.rs b/tokio/src/util/bit.rs index 07f9db3b338..ee756044a50 100644 --- a/tokio/src/util/bit.rs +++ b/tokio/src/util/bit.rs @@ -43,6 +43,10 @@ impl Pack { (base & !self.mask) | (value << self.shift) } + /// Packs the value with `base`, losing any bits of `value` that fit. + /// + /// If `value` is larger than the max value that can be represented by the + /// allotted width, the most significant bits are truncated. pub(crate) fn pack_lossy(&self, value: usize, base: usize) -> usize { self.pack(value & self.max_value(), base) } From 1f7bc005b98bd6eb0e9f82a33646ea679d001c6f Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Tue, 11 Aug 2020 21:37:43 -0700 Subject: [PATCH 15/15] change panic --- tokio/src/io/registration.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/io/registration.rs b/tokio/src/io/registration.rs index a597125562d..63aaff56d86 100644 --- a/tokio/src/io/registration.rs +++ b/tokio/src/io/registration.rs @@ -303,7 +303,7 @@ impl Registration { let curr_ready = self .shared .set_readiness(None, |curr| curr & (!mask_no_hup)) - .unwrap(); + .unwrap_or_else(|_| unreachable!()); let mut ready = mask & mio::Ready::from_usize(curr_ready);