From e2589a0e401e27457618920e66caa0f14e9a69ad Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Wed, 7 Jul 2021 10:53:57 +0200 Subject: [PATCH] runtime: add OwnedTasks (#3909) --- tokio/src/runtime/basic_scheduler.rs | 98 +++----------- tokio/src/runtime/task/core.rs | 11 -- tokio/src/runtime/task/list.rs | 33 +++++ tokio/src/runtime/task/mod.rs | 15 +-- tokio/src/runtime/task/stack.rs | 83 ------------ tokio/src/runtime/tests/task.rs | 39 +----- tokio/src/runtime/thread_pool/worker.rs | 164 ++++-------------------- tokio/src/util/linked_list.rs | 49 ------- 8 files changed, 87 insertions(+), 405 deletions(-) create mode 100644 tokio/src/runtime/task/list.rs delete mode 100644 tokio/src/runtime/task/stack.rs diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs index 13dfb69739f..9efe3844a39 100644 --- a/tokio/src/runtime/basic_scheduler.rs +++ b/tokio/src/runtime/basic_scheduler.rs @@ -2,16 +2,14 @@ use crate::future::poll_fn; use crate::loom::sync::atomic::AtomicBool; use crate::loom::sync::Mutex; use crate::park::{Park, Unpark}; -use crate::runtime::task::{self, JoinHandle, Schedule, Task}; +use crate::runtime::task::{self, JoinHandle, OwnedTasks, Schedule, Task}; use crate::sync::notify::Notify; -use crate::util::linked_list::{Link, LinkedList}; use crate::util::{waker_ref, Wake, WakerRef}; use std::cell::RefCell; use std::collections::VecDeque; use std::fmt; use std::future::Future; -use std::ptr::NonNull; use std::sync::atomic::Ordering::{AcqRel, Acquire, Release}; use std::sync::Arc; use std::task::Poll::{Pending, Ready}; @@ -57,9 +55,6 @@ pub(crate) struct Spawner { } struct Tasks { - /// Collection of all active tasks spawned onto this executor. - owned: LinkedList>, > as Link>::Target>, - /// Local run queue. /// /// Tasks notified from the current thread are pushed into this queue. @@ -69,23 +64,23 @@ struct Tasks { /// A remote scheduler entry. /// /// These are filled in by remote threads sending instructions to the scheduler. -enum Entry { +enum RemoteMsg { /// A remote thread wants to spawn a task. Schedule(task::Notified>), - /// A remote thread wants a task to be released by the scheduler. We only - /// have access to its header. - Release(NonNull), } // Safety: Used correctly, the task header is "thread safe". Ultimately the task // is owned by the current thread executor, for which this instruction is being // sent. -unsafe impl Send for Entry {} +unsafe impl Send for RemoteMsg {} /// Scheduler state shared between threads. struct Shared { /// Remote run queue. None if the `Runtime` has been dropped. - queue: Mutex>>, + queue: Mutex>>, + + /// Collection of all active tasks spawned onto this executor. + owned: OwnedTasks>, /// Unpark the blocked thread. unpark: Box, @@ -125,6 +120,7 @@ impl BasicScheduler

{ let spawner = Spawner { shared: Arc::new(Shared { queue: Mutex::new(Some(VecDeque::with_capacity(INITIAL_CAPACITY))), + owned: OwnedTasks::new(), unpark: unpark as Box, woken: AtomicBool::new(false), }), @@ -132,7 +128,6 @@ impl BasicScheduler

{ let inner = Mutex::new(Some(Inner { tasks: Some(Tasks { - owned: LinkedList::new(), queue: VecDeque::with_capacity(INITIAL_CAPACITY), }), spawner: spawner.clone(), @@ -227,7 +222,7 @@ impl Inner

{ .borrow_mut() .queue .pop_front() - .map(Entry::Schedule) + .map(RemoteMsg::Schedule) }) } else { context @@ -235,7 +230,7 @@ impl Inner

{ .borrow_mut() .queue .pop_front() - .map(Entry::Schedule) + .map(RemoteMsg::Schedule) .or_else(|| scheduler.spawner.pop()) }; @@ -251,26 +246,7 @@ impl Inner

{ }; match entry { - Entry::Schedule(task) => crate::coop::budget(|| task.run()), - Entry::Release(ptr) => { - // Safety: the task header is only legally provided - // internally in the header, so we know that it is a - // valid (or in particular *allocated*) header that - // is part of the linked list. - unsafe { - let removed = context.tasks.borrow_mut().owned.remove(ptr); - - // TODO: This seems like it should hold, because - // there doesn't seem to be an avenue for anyone - // else to fiddle with the owned tasks - // collection *after* a remote thread has marked - // it as released, and at that point, the only - // location at which it can be removed is here - // or in the Drop implementation of the - // scheduler. - debug_assert!(removed.is_some()); - } - } + RemoteMsg::Schedule(task) => crate::coop::budget(|| task.run()), } } @@ -335,14 +311,7 @@ impl Drop for BasicScheduler

{ }; enter(&mut inner, |scheduler, context| { - // Loop required here to ensure borrow is dropped between iterations - #[allow(clippy::while_let_loop)] - loop { - let task = match context.tasks.borrow_mut().owned.pop_back() { - Some(task) => task, - None => break, - }; - + while let Some(task) = context.shared.owned.pop_back() { task.shutdown(); } @@ -358,13 +327,9 @@ impl Drop for BasicScheduler

{ if let Some(remote_queue) = remote_queue.take() { for entry in remote_queue { match entry { - Entry::Schedule(task) => { + RemoteMsg::Schedule(task) => { task.shutdown(); } - Entry::Release(..) => { - // Do nothing, each entry in the linked list was *just* - // dropped by the scheduler above. - } } } } @@ -375,7 +340,7 @@ impl Drop for BasicScheduler

{ // The assert below is unrelated to this mutex. drop(remote_queue); - assert!(context.tasks.borrow().owned.is_empty()); + assert!(context.shared.owned.is_empty()); }); } } @@ -400,7 +365,7 @@ impl Spawner { handle } - fn pop(&self) -> Option { + fn pop(&self) -> Option { match self.shared.queue.lock().as_mut() { Some(queue) => queue.pop_front(), None => None, @@ -430,39 +395,14 @@ impl Schedule for Arc { fn bind(task: Task) -> Arc { CURRENT.with(|maybe_cx| { let cx = maybe_cx.expect("scheduler context missing"); - cx.tasks.borrow_mut().owned.push_front(task); + cx.shared.owned.push_front(task); cx.shared.clone() }) } fn release(&self, task: &Task) -> Option> { - CURRENT.with(|maybe_cx| { - let ptr = NonNull::from(task.header()); - - if let Some(cx) = maybe_cx { - // safety: the task is inserted in the list in `bind`. - unsafe { cx.tasks.borrow_mut().owned.remove(ptr) } - } else { - // By sending an `Entry::Release` to the runtime, we ask the - // runtime to remove this task from the linked list in - // `Tasks::owned`. - // - // If the queue is `None`, then the task was already removed - // from that list in the destructor of `BasicScheduler`. We do - // not do anything in this case for the same reason that - // `Entry::Release` messages are ignored in the remote queue - // drain loop of `BasicScheduler`'s destructor. - if let Some(queue) = self.queue.lock().as_mut() { - queue.push_back(Entry::Release(ptr)); - } - - self.unpark.unpark(); - // Returning `None` here prevents the task plumbing from being - // freed. It is then up to the scheduler through the queue we - // just added to, or its Drop impl to free the task. - None - } - }) + // SAFETY: Inserted into the list in bind above. + unsafe { self.owned.remove(task) } } fn schedule(&self, task: task::Notified) { @@ -473,7 +413,7 @@ impl Schedule for Arc { _ => { let mut guard = self.queue.lock(); if let Some(queue) = guard.as_mut() { - queue.push_back(Entry::Schedule(task)); + queue.push_back(RemoteMsg::Schedule(task)); drop(guard); self.unpark.unpark(); } else { diff --git a/tokio/src/runtime/task/core.rs b/tokio/src/runtime/task/core.rs index 428c921fe0d..e4624c7b709 100644 --- a/tokio/src/runtime/task/core.rs +++ b/tokio/src/runtime/task/core.rs @@ -66,9 +66,6 @@ pub(crate) struct Header { /// Pointer to next task, used with the injection queue pub(crate) queue_next: UnsafeCell>>, - /// Pointer to the next task in the transfer stack - pub(super) stack_next: UnsafeCell>>, - /// Table of function pointers for executing actions on the task. pub(super) vtable: &'static Vtable, @@ -104,7 +101,6 @@ impl Cell { state, owned: UnsafeCell::new(linked_list::Pointers::new()), queue_next: UnsafeCell::new(None), - stack_next: UnsafeCell::new(None), vtable: raw::vtable::(), #[cfg(all(tokio_unstable, feature = "tracing"))] id, @@ -299,13 +295,6 @@ impl CoreStage { cfg_rt_multi_thread! { impl Header { - pub(crate) fn shutdown(&self) { - use crate::runtime::task::RawTask; - - let task = unsafe { RawTask::from_raw(self.into()) }; - task.shutdown(); - } - pub(crate) unsafe fn set_next(&self, next: Option>) { self.queue_next.with_mut(|ptr| *ptr = next); } diff --git a/tokio/src/runtime/task/list.rs b/tokio/src/runtime/task/list.rs new file mode 100644 index 00000000000..45e22a72af2 --- /dev/null +++ b/tokio/src/runtime/task/list.rs @@ -0,0 +1,33 @@ +use crate::loom::sync::Mutex; +use crate::runtime::task::Task; +use crate::util::linked_list::{Link, LinkedList}; + +pub(crate) struct OwnedTasks { + list: Mutex, as Link>::Target>>, +} + +impl OwnedTasks { + pub(crate) fn new() -> Self { + Self { + list: Mutex::new(LinkedList::new()), + } + } + + pub(crate) fn push_front(&self, task: Task) { + self.list.lock().push_front(task); + } + + pub(crate) fn pop_back(&self) -> Option> { + self.list.lock().pop_back() + } + + /// The caller must ensure that if the provided task is stored in a + /// linked list, then it is in this linked list. + pub(crate) unsafe fn remove(&self, task: &Task) -> Option> { + self.list.lock().remove(task.header().into()) + } + + pub(crate) fn is_empty(&self) -> bool { + self.list.lock().is_empty() + } +} diff --git a/tokio/src/runtime/task/mod.rs b/tokio/src/runtime/task/mod.rs index 58b8c2a15e8..6b1b8c63886 100644 --- a/tokio/src/runtime/task/mod.rs +++ b/tokio/src/runtime/task/mod.rs @@ -13,6 +13,9 @@ mod join; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::join::JoinHandle; +mod list; +pub(super) use self::list::OwnedTasks; + mod raw; use self::raw::RawTask; @@ -21,11 +24,6 @@ use self::state::State; mod waker; -cfg_rt_multi_thread! { - mod stack; - pub(crate) use self::stack::TransferStack; -} - use crate::future::Future; use crate::util::linked_list; @@ -62,11 +60,10 @@ pub(crate) trait Schedule: Sync + Sized + 'static { fn bind(task: Task) -> Self; /// The task has completed work and is ready to be released. The scheduler - /// is free to drop it whenever. + /// should release it immediately and return it. The task module will batch + /// the ref-dec with setting other options. /// - /// If the scheduler can immediately release the task, it should return - /// it as part of the function. This enables the task module to batch - /// the ref-dec with other options. + /// If the scheduler has already released the task, then None is returned. fn release(&self, task: &Task) -> Option>; /// Schedule the task diff --git a/tokio/src/runtime/task/stack.rs b/tokio/src/runtime/task/stack.rs deleted file mode 100644 index 9dd8d3f43f9..00000000000 --- a/tokio/src/runtime/task/stack.rs +++ /dev/null @@ -1,83 +0,0 @@ -use crate::loom::sync::atomic::AtomicPtr; -use crate::runtime::task::{Header, Task}; - -use std::marker::PhantomData; -use std::ptr::{self, NonNull}; -use std::sync::atomic::Ordering::{Acquire, Relaxed, Release}; - -/// Concurrent stack of tasks, used to pass ownership of a task from one worker -/// to another. -pub(crate) struct TransferStack { - head: AtomicPtr

, - _p: PhantomData, -} - -impl TransferStack { - pub(crate) fn new() -> TransferStack { - TransferStack { - head: AtomicPtr::new(ptr::null_mut()), - _p: PhantomData, - } - } - - pub(crate) fn push(&self, task: Task) { - let task = task.into_raw(); - - // We don't care about any memory associated w/ setting the `head` - // field, just the current value. - // - // The compare-exchange creates a release sequence. - let mut curr = self.head.load(Relaxed); - - loop { - unsafe { - task.as_ref() - .stack_next - .with_mut(|ptr| *ptr = NonNull::new(curr)) - }; - - let res = self - .head - .compare_exchange(curr, task.as_ptr() as *mut _, Release, Relaxed); - - match res { - Ok(_) => return, - Err(actual) => { - curr = actual; - } - } - } - } - - pub(crate) fn drain(&self) -> impl Iterator> { - struct Iter(Option>, PhantomData); - - impl Iterator for Iter { - type Item = Task; - - fn next(&mut self) -> Option> { - let task = self.0?; - - // Move the cursor forward - self.0 = unsafe { task.as_ref().stack_next.with(|ptr| *ptr) }; - - // Return the task - unsafe { Some(Task::from_raw(task)) } - } - } - - impl Drop for Iter { - fn drop(&mut self) { - use std::process; - - if self.0.is_some() { - // we have bugs - process::abort(); - } - } - } - - let ptr = self.head.swap(ptr::null_mut(), Acquire); - Iter(NonNull::new(ptr), PhantomData) - } -} diff --git a/tokio/src/runtime/tests/task.rs b/tokio/src/runtime/tests/task.rs index 7c2012523cb..1f3e89d7661 100644 --- a/tokio/src/runtime/tests/task.rs +++ b/tokio/src/runtime/tests/task.rs @@ -1,5 +1,4 @@ -use crate::runtime::task::{self, Schedule, Task}; -use crate::util::linked_list::{Link, LinkedList}; +use crate::runtime::task::{self, OwnedTasks, Schedule, Task}; use crate::util::TryLock; use std::collections::VecDeque; @@ -51,10 +50,9 @@ fn with(f: impl FnOnce(Runtime)) { let _reset = Reset; let rt = Runtime(Arc::new(Inner { - released: task::TransferStack::new(), + owned: OwnedTasks::new(), core: TryLock::new(Core { queue: VecDeque::new(), - tasks: LinkedList::new(), }), })); @@ -66,13 +64,12 @@ fn with(f: impl FnOnce(Runtime)) { struct Runtime(Arc); struct Inner { - released: task::TransferStack, core: TryLock, + owned: OwnedTasks, } struct Core { queue: VecDeque>, - tasks: LinkedList, as Link>::Target>, } static CURRENT: TryLock> = TryLock::new(None); @@ -91,8 +88,6 @@ impl Runtime { task.run(); } - self.0.maintenance(); - n } @@ -107,7 +102,7 @@ impl Runtime { fn shutdown(&self) { let mut core = self.0.core.try_lock().unwrap(); - for task in core.tasks.iter() { + while let Some(task) = self.0.owned.pop_back() { task.shutdown(); } @@ -117,40 +112,20 @@ impl Runtime { drop(core); - while !self.0.core.try_lock().unwrap().tasks.is_empty() { - self.0.maintenance(); - } - } -} - -impl Inner { - fn maintenance(&self) { - use std::mem::ManuallyDrop; - - for task in self.released.drain() { - let task = ManuallyDrop::new(task); - - // safety: see worker.rs - unsafe { - let ptr = task.header().into(); - self.core.try_lock().unwrap().tasks.remove(ptr); - } - } + assert!(self.0.owned.is_empty()); } } impl Schedule for Runtime { fn bind(task: Task) -> Runtime { let rt = CURRENT.try_lock().unwrap().as_ref().unwrap().clone(); - rt.0.core.try_lock().unwrap().tasks.push_front(task); + rt.0.owned.push_front(task); rt } fn release(&self, task: &Task) -> Option> { // safety: copying worker.rs - let task = unsafe { Task::from_raw(task.header().into()) }; - self.0.released.push(task); - None + unsafe { self.0.owned.remove(task) } } fn schedule(&self, task: task::Notified) { diff --git a/tokio/src/runtime/thread_pool/worker.rs b/tokio/src/runtime/thread_pool/worker.rs index 70cbddbd05e..4ae0f5a2592 100644 --- a/tokio/src/runtime/thread_pool/worker.rs +++ b/tokio/src/runtime/thread_pool/worker.rs @@ -11,9 +11,9 @@ use crate::park::{Park, Unpark}; use crate::runtime; use crate::runtime::enter::EnterContext; use crate::runtime::park::{Parker, Unparker}; +use crate::runtime::task::OwnedTasks; use crate::runtime::thread_pool::{AtomicCell, Idle}; use crate::runtime::{queue, task}; -use crate::util::linked_list::{Link, LinkedList}; use crate::util::FastRand; use std::cell::RefCell; @@ -53,9 +53,6 @@ struct Core { /// True if the scheduler is being shutdown is_shutdown: bool, - /// Tasks owned by the core - tasks: LinkedList::Target>, - /// Parker /// /// Stored in an `Option` as the parker is added / removed to make the @@ -78,6 +75,9 @@ pub(super) struct Shared { /// Coordinates idle workers idle: Idle, + /// Collection of all active tasks spawned onto this executor. + owned: OwnedTasks>, + /// Cores that have observed the shutdown signal /// /// The core is **not** placed back in the worker to avoid it from being @@ -91,10 +91,6 @@ struct Remote { /// Steal tasks from this worker. steal: queue::Steal>, - /// Transfers tasks to be released. Any worker pushes tasks, only the owning - /// worker pops. - pending_drop: task::TransferStack>, - /// Unparks the associated worker thread unpark: Unparker, } @@ -142,22 +138,18 @@ pub(super) fn create(size: usize, park: Parker) -> (Arc, Launch) { run_queue, is_searching: false, is_shutdown: false, - tasks: LinkedList::new(), park: Some(park), rand: FastRand::new(seed()), })); - remotes.push(Remote { - steal, - pending_drop: task::TransferStack::new(), - unpark, - }); + remotes.push(Remote { steal, unpark }); } let shared = Arc::new(Shared { remotes: remotes.into_boxed_slice(), inject: queue::Inject::new(), idle: Idle::new(size), + owned: OwnedTasks::new(), shutdown_cores: Mutex::new(vec![]), }); @@ -203,18 +195,20 @@ where CURRENT.with(|maybe_cx| { match (crate::runtime::enter::context(), maybe_cx.is_some()) { (EnterContext::Entered { .. }, true) => { - // We are on a thread pool runtime thread, so we just need to set up blocking. + // We are on a thread pool runtime thread, so we just need to + // set up blocking. had_entered = true; } (EnterContext::Entered { allow_blocking }, false) => { - // We are on an executor, but _not_ on the thread pool. - // That is _only_ okay if we are in a thread pool runtime's block_on method: + // We are on an executor, but _not_ on the thread pool. That is + // _only_ okay if we are in a thread pool runtime's block_on + // method: if allow_blocking { had_entered = true; return; } else { - // This probably means we are on the basic_scheduler or in a LocalSet, - // where it is _not_ okay to block. + // This probably means we are on the basic_scheduler or in a + // LocalSet, where it is _not_ okay to block. panic!("can call blocking only when running on the multi-threaded runtime"); } } @@ -538,42 +532,25 @@ impl Core { true } - /// Runs maintenance work such as free pending tasks and check the pool's - /// state. + /// Runs maintenance work such as checking the pool's state. fn maintenance(&mut self, worker: &Worker) { - self.drain_pending_drop(worker); - if !self.is_shutdown { // Check if the scheduler has been shutdown self.is_shutdown = worker.inject().is_closed(); } } - // Signals all tasks to shut down, and waits for them to complete. Must run - // before we enter the single-threaded phase of shutdown processing. + /// Signals all tasks to shut down, and waits for them to complete. Must run + /// before we enter the single-threaded phase of shutdown processing. fn pre_shutdown(&mut self, worker: &Worker) { // Signal to all tasks to shut down. - for header in self.tasks.iter() { + while let Some(header) = worker.shared.owned.pop_back() { header.shutdown(); } - - loop { - self.drain_pending_drop(worker); - - if self.tasks.is_empty() { - break; - } - - // Wait until signalled - let park = self.park.as_mut().expect("park missing"); - park.park().expect("park failed"); - } } // Shutdown the core fn shutdown(&mut self) { - assert!(self.tasks.is_empty()); - // Take the core let mut park = self.park.take().expect("park missing"); @@ -582,24 +559,6 @@ impl Core { park.shutdown(); } - - fn drain_pending_drop(&mut self, worker: &Worker) { - use std::mem::ManuallyDrop; - - for task in worker.remote().pending_drop.drain() { - let task = ManuallyDrop::new(task); - - // safety: tasks are only pushed into the `pending_drop` stacks that - // are associated with the list they are inserted into. When a task - // is pushed into `pending_drop`, the ref-inc is skipped, so we must - // not ref-dec here. - // - // See `bind` and `release` implementations. - unsafe { - self.tasks.remove(task.header().into()); - } - } - } } impl Worker { @@ -607,15 +566,6 @@ impl Worker { fn inject(&self) -> &queue::Inject> { &self.shared.inject } - - /// Return a reference to this worker's remote data - fn remote(&self) -> &Remote { - &self.shared.remotes[self.index] - } - - fn eq(&self, other: &Worker) -> bool { - self.shared.ptr_eq(&other.shared) && self.index == other.index - } } impl task::Schedule for Arc { @@ -624,12 +574,7 @@ impl task::Schedule for Arc { let cx = maybe_cx.expect("scheduler context missing"); // Track the task - cx.core - .borrow_mut() - .as_mut() - .expect("scheduler core missing") - .tasks - .push_front(task); + cx.worker.shared.owned.push_front(task); // Return a clone of the worker cx.worker.clone() @@ -637,75 +582,8 @@ impl task::Schedule for Arc { } fn release(&self, task: &Task) -> Option { - use std::ptr::NonNull; - - enum Immediate { - // Task has been synchronously removed from the Core owned by the - // current thread - Removed(Option), - // Task is owned by another thread, so we need to notify it to clean - // up the task later. - MaybeRemote, - } - - let immediate = CURRENT.with(|maybe_cx| { - let cx = match maybe_cx { - Some(cx) => cx, - None => return Immediate::MaybeRemote, - }; - - if !self.eq(&cx.worker) { - // Task owned by another core, so we need to notify it. - return Immediate::MaybeRemote; - } - - let mut maybe_core = cx.core.borrow_mut(); - - if let Some(core) = &mut *maybe_core { - // Directly remove the task - // - // safety: the task is inserted in the list in `bind`. - unsafe { - let ptr = NonNull::from(task.header()); - return Immediate::Removed(core.tasks.remove(ptr)); - } - } - - Immediate::MaybeRemote - }); - - // Checks if we were called from within a worker, allowing for immediate - // removal of a scheduled task. Else we have to go through the slower - // process below where we remotely mark a task as dropped. - match immediate { - Immediate::Removed(task) => return task, - Immediate::MaybeRemote => (), - }; - - // Track the task to be released by the worker that owns it - // - // Safety: We get a new handle without incrementing the ref-count. - // A ref-count is held by the "owned" linked list and it is only - // ever removed from that list as part of the release process: this - // method or popping the task from `pending_drop`. Thus, we can rely - // on the ref-count held by the linked-list to keep the memory - // alive. - // - // When the task is removed from the stack, it is forgotten instead - // of dropped. - let task = unsafe { Task::from_raw(task.header().into()) }; - - self.remote().pending_drop.push(task); - - // The worker core has been handed off to another thread. In the - // event that the scheduler is currently shutting down, the thread - // that owns the task may be waiting on the release to complete - // shutdown. - if self.inject().is_closed() { - self.remote().unpark.unpark(); - } - - None + // SAFETY: Inserted into owned in bind. + unsafe { self.shared.owned.remove(task) } } fn schedule(&self, task: Notified) { @@ -825,6 +703,8 @@ impl Shared { return; } + debug_assert!(self.owned.is_empty()); + for mut core in cores.drain(..) { core.shutdown(); } diff --git a/tokio/src/util/linked_list.rs b/tokio/src/util/linked_list.rs index a74f56215d9..1eab81c317c 100644 --- a/tokio/src/util/linked_list.rs +++ b/tokio/src/util/linked_list.rs @@ -236,37 +236,6 @@ impl Default for LinkedList { } } -// ===== impl Iter ===== - -cfg_rt_multi_thread! { - pub(crate) struct Iter<'a, T: Link> { - curr: Option>, - _p: core::marker::PhantomData<&'a T>, - } - - impl LinkedList { - pub(crate) fn iter(&self) -> Iter<'_, L> { - Iter { - curr: self.head, - _p: core::marker::PhantomData, - } - } - } - - impl<'a, T: Link> Iterator for Iter<'a, T> { - type Item = &'a T::Target; - - fn next(&mut self) -> Option<&'a T::Target> { - let curr = self.curr?; - // safety: the pointer references data contained by the list - self.curr = unsafe { T::pointers(curr).as_ref() }.get_next(); - - // safety: the value is still owned by the linked list. - Some(unsafe { &*curr.as_ptr() }) - } - } -} - // ===== impl DrainFilter ===== cfg_io_readiness! { @@ -645,24 +614,6 @@ mod tests { } } - #[test] - fn iter() { - let a = entry(5); - let b = entry(7); - - let mut list = LinkedList::<&Entry, <&Entry as Link>::Target>::new(); - - assert_eq!(0, list.iter().count()); - - list.push_front(a.as_ref()); - list.push_front(b.as_ref()); - - let mut i = list.iter(); - assert_eq!(7, i.next().unwrap().val); - assert_eq!(5, i.next().unwrap().val); - assert!(i.next().is_none()); - } - proptest::proptest! { #[test] fn fuzz_linked_list(ops: Vec) {