diff --git a/futures-util/src/stream/futures_unordered/mod.rs b/futures-util/src/stream/futures_unordered/mod.rs index 6918a26b91..4a05d8823e 100644 --- a/futures-util/src/stream/futures_unordered/mod.rs +++ b/futures-util/src/stream/futures_unordered/mod.rs @@ -558,7 +558,7 @@ impl FuturesUnordered { pub fn clear(&mut self) { self.clear_head_all(); - // SAFETY: we just cleared all the tasks and we have &mut self + // we just cleared all the tasks, and we have &mut self, so this is safe. unsafe { self.ready_to_run_queue.clear() }; self.is_terminated.store(false, Relaxed); @@ -575,9 +575,24 @@ impl FuturesUnordered { impl Drop for FuturesUnordered { fn drop(&mut self) { + // When a `FuturesUnordered` is dropped we want to drop all futures + // associated with it. At the same time though there may be tons of + // wakers flying around which contain `Task` references + // inside them. We'll let those naturally get deallocated. self.clear_head_all(); - // SAFETY: we just cleared all the tasks and we have &mut self - unsafe { self.ready_to_run_queue.clear() }; + + // Note that at this point we could still have a bunch of tasks in the + // ready to run queue. None of those tasks, however, have futures + // associated with them so they're safe to destroy on any thread. At + // this point the `FuturesUnordered` struct, the owner of the one strong + // reference to the ready to run queue will drop the strong reference. + // At that point whichever thread releases the strong refcount last (be + // it this thread or some other thread as part of an `upgrade`) will + // clear out the ready to run queue and free all remaining tasks. + // + // While that freeing operation isn't guaranteed to happen here, it's + // guaranteed to happen "promptly" as no more "blocking work" will + // happen while there's a strong refcount held. } } diff --git a/futures-util/src/stream/futures_unordered/ready_to_run_queue.rs b/futures-util/src/stream/futures_unordered/ready_to_run_queue.rs index 2bc208682a..5ef6cde83d 100644 --- a/futures-util/src/stream/futures_unordered/ready_to_run_queue.rs +++ b/futures-util/src/stream/futures_unordered/ready_to_run_queue.rs @@ -94,7 +94,7 @@ impl ReadyToRunQueue { // // # Safety // - // - All tasks **must** have had their futures dropped already (by FuturesUnordered::clear_head_all) + // - All tasks **must** have had their futures dropped already (by FuturesUnordered::clear) // - The caller **must** guarantee unique access to `self` pub(crate) unsafe fn clear(&self) { loop { @@ -107,3 +107,16 @@ impl ReadyToRunQueue { } } } + +impl Drop for ReadyToRunQueue { + fn drop(&mut self) { + // Once we're in the destructor for `Inner` we need to clear out + // the ready to run queue of tasks if there's anything left in there. + + // All tasks have had their futures dropped already by the `FuturesUnordered` + // destructor above, and we have &mut self, so this is safe. + unsafe { + self.clear(); + } + } +}