Skip to content

Commit

Permalink
runtime: fix memory leak/growth when creating many runtimes (#3564)
Browse files Browse the repository at this point in the history
  • Loading branch information
ipetkov committed Mar 16, 2021
1 parent e6103d6 commit e4f7668
Show file tree
Hide file tree
Showing 18 changed files with 437 additions and 285 deletions.
13 changes: 11 additions & 2 deletions .github/workflows/ci.yml
Expand Up @@ -82,14 +82,23 @@ jobs:
sudo apt-get install -y valgrind
# Compile tests
- name: cargo build
- name: cargo build test-mem
run: cargo build --features rt-net --bin test-mem
working-directory: tests-integration

# Run with valgrind
- name: Run valgrind
- name: Run valgrind test-mem
run: valgrind --leak-check=full --show-leak-kinds=all ./target/debug/test-mem

# Compile tests
- name: cargo build test-process-signal
run: cargo build --features rt-process-signal --bin test-process-signal
working-directory: tests-integration

# Run with valgrind
- name: Run valgrind test-process-signal
run: valgrind --leak-check=full --show-leak-kinds=all ./target/debug/test-process-signal

test-unstable:
name: test tokio full --unstable
runs-on: ${{ matrix.os }}
Expand Down
6 changes: 6 additions & 0 deletions tests-integration/Cargo.toml
Expand Up @@ -12,9 +12,15 @@ name = "test-cat"
name = "test-mem"
required-features = ["rt-net"]

[[bin]]
name = "test-process-signal"
required-features = ["rt-process-signal"]

[features]
# For mem check
rt-net = ["tokio/rt", "tokio/rt-multi-thread", "tokio/net"]
# For test-process-signal
rt-process-signal = ["rt", "tokio/process", "tokio/signal"]

full = [
"macros",
Expand Down
11 changes: 11 additions & 0 deletions tests-integration/src/bin/test-process-signal.rs
@@ -0,0 +1,11 @@
// https://github.com/tokio-rs/tokio/issues/3550
fn main() {
for _ in 0..1000 {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();

drop(rt);
}
}
2 changes: 1 addition & 1 deletion tokio/src/io/driver/interest.rs
@@ -1,4 +1,4 @@
#![cfg_attr(not(feature = "net"), allow(unreachable_pub))]
#![cfg_attr(not(feature = "net"), allow(dead_code, unreachable_pub))]

use crate::io::driver::Ready;

Expand Down
2 changes: 2 additions & 0 deletions tokio/src/io/driver/registration.rs
@@ -1,3 +1,5 @@
#![cfg_attr(not(feature = "net"), allow(dead_code))]

use crate::io::driver::{Direction, Handle, Interest, ReadyEvent, ScheduledIo};
use crate::util::slab;

Expand Down
5 changes: 5 additions & 0 deletions tokio/src/io/poll_evented.rs
Expand Up @@ -121,6 +121,11 @@ impl<E: Source> PollEvented<E> {
}

/// Returns a reference to the registration
#[cfg(any(
feature = "net",
all(unix, feature = "process"),
all(unix, feature = "signal"),
))]
pub(crate) fn registration(&self) -> &Registration {
&self.registration
}
Expand Down
60 changes: 18 additions & 42 deletions tokio/src/process/unix/driver.rs
Expand Up @@ -6,8 +6,8 @@ use crate::park::Park;
use crate::process::unix::orphan::ReapOrphanQueue;
use crate::process::unix::GlobalOrphanQueue;
use crate::signal::unix::driver::Driver as SignalDriver;
use crate::signal::unix::{signal_with_handle, InternalStream, Signal, SignalKind};
use crate::sync::mpsc::error::TryRecvError;
use crate::signal::unix::{signal_with_handle, SignalKind};
use crate::sync::watch;

use std::io;
use std::time::Duration;
Expand All @@ -16,7 +16,7 @@ use std::time::Duration;
#[derive(Debug)]
pub(crate) struct Driver {
park: SignalDriver,
inner: CoreDriver<Signal, GlobalOrphanQueue>,
inner: CoreDriver<watch::Receiver<()>, GlobalOrphanQueue>,
}

#[derive(Debug)]
Expand All @@ -25,27 +25,25 @@ struct CoreDriver<S, Q> {
orphan_queue: Q,
}

trait HasChanged {
fn has_changed(&mut self) -> bool;
}

impl<T> HasChanged for watch::Receiver<T> {
fn has_changed(&mut self) -> bool {
self.try_has_changed().and_then(Result::ok).is_some()
}
}

// ===== impl CoreDriver =====

impl<S, Q> CoreDriver<S, Q>
where
S: InternalStream,
S: HasChanged,
Q: ReapOrphanQueue,
{
fn got_signal(&mut self) -> bool {
match self.sigchild.try_recv() {
Ok(()) => true,
Err(TryRecvError::Empty) => false,
Err(TryRecvError::Closed) => panic!("signal was deregistered"),
}
}

fn process(&mut self) {
if self.got_signal() {
// Drain all notifications which may have been buffered
// so we can try to reap all orphans in one batch
while self.got_signal() {}

if self.sigchild.has_changed() {
self.orphan_queue.reap_orphans();
}
}
Expand Down Expand Up @@ -97,8 +95,6 @@ impl Park for Driver {
mod test {
use super::*;
use crate::process::unix::orphan::test::MockQueue;
use crate::sync::mpsc::error::TryRecvError;
use std::task::{Context, Poll};

struct MockStream {
total_try_recv: usize,
Expand All @@ -114,17 +110,10 @@ mod test {
}
}

impl InternalStream for MockStream {
fn poll_recv(&mut self, _cx: &mut Context<'_>) -> Poll<Option<()>> {
unimplemented!();
}

fn try_recv(&mut self) -> Result<(), TryRecvError> {
impl HasChanged for MockStream {
fn has_changed(&mut self) -> bool {
self.total_try_recv += 1;
match self.values.remove(0) {
Some(()) => Ok(()),
None => Err(TryRecvError::Empty),
}
self.values.remove(0).is_some()
}
}

Expand All @@ -140,17 +129,4 @@ mod test {
assert_eq!(1, driver.sigchild.total_try_recv);
assert_eq!(0, driver.orphan_queue.total_reaps.get());
}

#[test]
fn coalesce_signals_before_reaping() {
let mut driver = CoreDriver {
sigchild: MockStream::new(vec![Some(()), Some(()), None]),
orphan_queue: MockQueue::<()>::new(),
};

driver.process();

assert_eq!(3, driver.sigchild.total_try_recv);
assert_eq!(1, driver.orphan_queue.total_reaps.get());
}
}
17 changes: 6 additions & 11 deletions tokio/src/process/unix/reap.rs
Expand Up @@ -15,7 +15,7 @@ use std::task::Poll;
#[derive(Debug)]
pub(crate) struct Reaper<W, Q, S>
where
W: Wait + Unpin,
W: Wait,
Q: OrphanQueue<W>,
{
inner: Option<W>,
Expand All @@ -25,7 +25,7 @@ where

impl<W, Q, S> Deref for Reaper<W, Q, S>
where
W: Wait + Unpin,
W: Wait,
Q: OrphanQueue<W>,
{
type Target = W;
Expand All @@ -37,7 +37,7 @@ where

impl<W, Q, S> Reaper<W, Q, S>
where
W: Wait + Unpin,
W: Wait,
Q: OrphanQueue<W>,
{
pub(crate) fn new(inner: W, orphan_queue: Q, signal: S) -> Self {
Expand All @@ -61,7 +61,7 @@ impl<W, Q, S> Future for Reaper<W, Q, S>
where
W: Wait + Unpin,
Q: OrphanQueue<W> + Unpin,
S: InternalStream,
S: InternalStream + Unpin,
{
type Output = io::Result<ExitStatus>;

Expand Down Expand Up @@ -106,7 +106,7 @@ where

impl<W, Q, S> Kill for Reaper<W, Q, S>
where
W: Kill + Wait + Unpin,
W: Kill + Wait,
Q: OrphanQueue<W>,
{
fn kill(&mut self) -> io::Result<()> {
Expand All @@ -116,7 +116,7 @@ where

impl<W, Q, S> Drop for Reaper<W, Q, S>
where
W: Wait + Unpin,
W: Wait,
Q: OrphanQueue<W>,
{
fn drop(&mut self) {
Expand All @@ -134,7 +134,6 @@ mod test {
use super::*;

use crate::process::unix::orphan::test::MockQueue;
use crate::sync::mpsc::error::TryRecvError;
use futures::future::FutureExt;
use std::os::unix::process::ExitStatusExt;
use std::process::ExitStatus;
Expand Down Expand Up @@ -206,10 +205,6 @@ mod test {
None => Poll::Pending,
}
}

fn try_recv(&mut self) -> Result<(), TryRecvError> {
unimplemented!();
}
}

#[test]
Expand Down
40 changes: 40 additions & 0 deletions tokio/src/signal/mod.rs
Expand Up @@ -42,6 +42,8 @@
//! }
//! # }
//! ```
use crate::sync::watch::Receiver;
use std::task::{Context, Poll};

mod ctrl_c;
pub use ctrl_c::ctrl_c;
Expand All @@ -58,3 +60,41 @@ mod os {

pub mod unix;
pub mod windows;

mod reusable_box;
use self::reusable_box::ReusableBoxFuture;

#[derive(Debug)]
struct RxFuture {
inner: ReusableBoxFuture<Receiver<()>>,
}

async fn make_future(mut rx: Receiver<()>) -> Receiver<()> {
match rx.changed().await {
Ok(()) => rx,
Err(_) => panic!("signal sender went away"),
}
}

impl RxFuture {
fn new(rx: Receiver<()>) -> Self {
Self {
inner: ReusableBoxFuture::new(make_future(rx)),
}
}

async fn recv(&mut self) -> Option<()> {
use crate::future::poll_fn;
poll_fn(|cx| self.poll_recv(cx)).await
}

fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> {
match self.inner.poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(rx) => {
self.inner.set(make_future(rx));
Poll::Ready(Some(()))
}
}
}
}

0 comments on commit e4f7668

Please sign in to comment.