Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dropping threaded runtime with time and IO enabled results in memory leaks #2535

Closed
edigaryev opened this issue May 13, 2020 · 25 comments
Closed
Labels
A-tokio Area: The main tokio crate C-bug Category: This is a bug. M-runtime Module: tokio/runtime

Comments

@edigaryev
Copy link

edigaryev commented May 13, 2020

Version

$ cargo tree | grep tokio
└── tokio v0.2.20
    └── tokio-macros v0.2.5

Platform

$ uname -srvmo
Linux 5.5.0-2-amd64 #1 SMP Debian 5.5.17-1 (2020-04-15) x86_64 GNU/Linux

Description

This code:

use tokio::time::delay_for;

fn main() {
    let rt = tokio::runtime::Builder::new()
        .threaded_scheduler()
        .max_threads(1)
        .enable_time()
        .enable_io()
        .build()
        .unwrap();

    rt.spawn(my_loop());

    std::thread::sleep(std::time::Duration::from_secs(1));
}

async fn my_loop() {
    println!("I'm alive!");

    loop {
        delay_for(std::time::Duration::from_millis(100)).await;
    }
}

When run under Valgrind produces the following:

$ valgrind --leak-check=full target/debug/leaky-termination 
==107658== Memcheck, a memory error detector
==107658== Copyright (C) 2002-2017, and GNU GPL'd, by Julian Seward et al.
==107658== Using Valgrind-3.15.0 and LibVEX; rerun with -h for copyright info
==107658== Command: target/debug/leaky-termination
==107658== 
I'm alive!
==107658== 
==107658== HEAP SUMMARY:
==107658==     in use at exit: 20,176 bytes in 31 blocks
==107658==   total heap usage: 130 allocs, 99 frees, 47,133 bytes allocated
==107658== 
==107658== 320 (64 direct, 256 indirect) bytes in 1 blocks are definitely lost in loss record 25 of 31
==107658==    at 0x483677F: malloc (vg_replace_malloc.c:309)
==107658==    by 0x1B528B: alloc::alloc::alloc (alloc.rs:81)
==107658==    by 0x1B51FB: alloc::alloc::exchange_malloc (alloc.rs:203)
==107658==    by 0x1AFE37: new<mio::poll::ReadinessNode> (boxed.rs:174)
==107658==    by 0x1AFE37: mio::poll::ReadinessQueue::new (poll.rs:2100)
==107658==    by 0x1AD47B: mio::poll::Poll::new (poll.rs:658)
==107658==    by 0x15E88C: tokio::io::driver::Driver::new (mod.rs:70)
==107658==    by 0x140C93: tokio::runtime::io::variant::create_driver (io.rs:35)
==107658==    by 0x153AF7: tokio::runtime::builder::Builder::build_threaded_runtime (builder.rs:474)
==107658==    by 0x1522A8: tokio::runtime::builder::Builder::build (builder.rs:318)
==107658==    by 0x1210B7: leaky_termination::main (main.rs:4)
==107658==    by 0x127B1F: std::rt::lang_start::{{closure}} (rt.rs:67)
==107658==    by 0x1D0B62: {{closure}} (rt.rs:52)
==107658==    by 0x1D0B62: std::panicking::try::do_call (panicking.rs:305)
==107658== 
==107658== 19,856 (72 direct, 19,784 indirect) bytes in 1 blocks are definitely lost in loss record 31 of 31
==107658==    at 0x483677F: malloc (vg_replace_malloc.c:309)
==107658==    by 0x17B18B: alloc::alloc::alloc (alloc.rs:81)
==107658==    by 0x17B0FB: alloc::alloc::exchange_malloc (alloc.rs:203)
==107658==    by 0x16284D: alloc::sync::Arc<T>::new (sync.rs:302)
==107658==    by 0x146130: tokio::time::driver::Driver<T>::new (mod.rs:129)
==107658==    by 0x16FAC2: tokio::runtime::time::variant::create_driver (time.rs:29)
==107658==    by 0x153CB2: tokio::runtime::builder::Builder::build_threaded_runtime (builder.rs:475)
==107658==    by 0x1522A8: tokio::runtime::builder::Builder::build (builder.rs:318)
==107658==    by 0x1210B7: leaky_termination::main (main.rs:4)
==107658==    by 0x127B1F: std::rt::lang_start::{{closure}} (rt.rs:67)
==107658==    by 0x1D0B62: {{closure}} (rt.rs:52)
==107658==    by 0x1D0B62: std::panicking::try::do_call (panicking.rs:305)
==107658==    by 0x1D3566: __rust_maybe_catch_panic (lib.rs:86)
==107658== 
==107658== LEAK SUMMARY:
==107658==    definitely lost: 136 bytes in 2 blocks
==107658==    indirectly lost: 20,040 bytes in 29 blocks
==107658==      possibly lost: 0 bytes in 0 blocks
==107658==    still reachable: 0 bytes in 0 blocks
==107658==         suppressed: 0 bytes in 0 blocks
==107658== 
==107658== For lists of detected and suppressed errors, rerun with: -s
==107658== ERROR SUMMARY: 2 errors from 2 contexts (suppressed: 0 from 0)

From the description of Runtime.shutdown_timeout():

Usually, dropping a Runtime handle is sufficient as tasks are able to shutdown in a timely fashion.

However, dropping a Runtime will wait indefinitely for all tasks to terminate, and there are cases where a long blocking task has been spawned, which can block dropping Runtime.

I was expecting the runtime to either:

  • preempt my_loop() at the next await and drop it
  • wait indefinitely for my_loop() to be ready

..and then to terminate cleanly on Drop (contrary to what Runtime.shutdown_timeout() might do when timeout expires).

Instead, runtime wasn't terminated cleanly for some reason.

@carllerche
Copy link
Member

How reliably to you reproduce this? I have attempted to run the example but have not seen that error.

@carllerche
Copy link
Member

One potential problem that I do see is, unless we collect all the join handles of spawned threads, there is potential for valgrind to complain.

@edigaryev
Copy link
Author

edigaryev commented May 13, 2020

How reliably to you reproduce this?

This is reliably reproducible for me with rustc 1.42.0 (b8cedc004 2020-03-09), even when building the latest Valgrind from Git (168dc791).

My colleague also reports the same results on his machine with rustc 1.40.0 (73528e339 2019-12-16), Linux 5.2.5 and Valgrind 3.14.0.

I have attempted to run the example but have not seen that error.

I wonder what setup do you have in regard to software versions (assuming you're running x86_64)?

@carllerche
Copy link
Member

It's going to be racy repro.

I'm running it in a virtualbox VM.

@carllerche
Copy link
Member

Out of curiosity, if you do the following, does the leak still happen?

fn main() {
    let rt = tokio::runtime::Builder::new()
        .threaded_scheduler()
        .max_threads(1)
        .enable_time()
        .enable_io()
        .build()
        .unwrap();

    rt.spawn(my_loop());

    std::thread::sleep(std::time::Duration::from_secs(1));

    // New code here
    drop(rt);

    std::thread::sleep(std::time::Duration::from_secs(1));
}

@edigaryev
Copy link
Author

Out of curiosity, if you do the following, does the leak still happen?

It is indeed racy. Now I only get a leak in 50% of the cases:

$ while true; do valgrind --leak-check=full target/debug/leaky-termination 2>&1 | grep -q LEAK; echo $?; done
0
1
0
0
1
0
1
0
1
1

@Darksonn Darksonn added A-tokio Area: The main tokio crate C-bug Category: This is a bug. M-runtime Module: tokio/runtime labels May 14, 2020
@edigaryev
Copy link
Author

Seems to be related to #1830.

@Darksonn
Copy link
Contributor

I can reproduce this locally. I don't see how it is related to #1830, you don't even spawn tasks anywhere.

@edigaryev
Copy link
Author

you don't even spawn tasks anywhere

rt.spawn(my_loop());

Eh?

@Darksonn
Copy link
Contributor

Ah, I can't read. Anyway, it is not related. The runtime should drop running tasks when dropped.

@benesch
Copy link
Contributor

benesch commented May 21, 2020

If folks are still having trouble, this reproduces really reliably for me with the rust-rdkafka test suite. There only caveat is that there is a dependency on Docker/Docker Compose to get Kafka running.

git clone https://github.com/fede1024/rust-rdkafka.git
cd rust-rdkafka
docker-compose up -d 
cargo test --no-run
valgrind --error-exitcode=100 --leak-check=full target/debug/test_high_consumers-* --nocapture --test-threads=1

@carllerche
Copy link
Member

It is unclear to me if this is an actual bug or an unfortunate race. One way to check would be to track all std::thread::JoinHandle values returned from here and join them all before completing shutdown.

If valgrind still complains after, then something else is going on.

@benesch
Copy link
Contributor

benesch commented May 22, 2020

I tried your suggestion @carllerche and Valgrind still complained. And indeed if it were just a shutdown race I'd really expect the code sample you posted above

fn main() {
    let rt = tokio::runtime::Builder::new()
        .threaded_scheduler()
        .max_threads(1)
        .enable_time()
        .enable_io()
        .build()
        .unwrap();

    rt.spawn(my_loop());

    std::thread::sleep(std::time::Duration::from_secs(1));

    // New code here
    drop(rt);

    std::thread::sleep(std::time::Duration::from_secs(1));
}

not to exhibit the race. And yet it does with quite a bit of regularity.

As best as I can tell, an Unparker is getting stashed away in a Worker that's never getting cleaned up, but I haven't managed to track it down beyond that (I'm I'm not super confident in this investigation yet).

@benesch
Copy link
Contributor

benesch commented May 23, 2020

It seems like there might be a reference counting cycle. The time::Driver gets a waker, which keeps a refcount to a task that is bound to a Worker... but the Worker also keeps a reference to the time::Driver. So neither ever gets dropped.

I'm not very familiar with this code, and there are more than a few layers of indirection, so it's hard for me to investigate much further. I wanted to try telling the runtime to drain the time::Driver on drop but couldn't figure out how to get the plumbing to work.

@carllerche
Copy link
Member

Thanks for the investigation. I will try to dig in some shortly and will report back what I find.

@benesch
Copy link
Contributor

benesch commented May 23, 2020

Awesome, thanks @carllerche! One thing that occurs to me: when you attempting to repro initially, were you running Valgrind with --leak-check=full? Possibly that's why you weren't seeing the error, since Valgrind doesn't report memory leaks by default.

@benesch
Copy link
Contributor

benesch commented May 23, 2020

More clues! This patch "fixes" the leak.

diff --git a/tokio/src/runtime/task/core.rs b/tokio/src/runtime/task/core.rs
index f4756c23..08d0fe83 100644
--- a/tokio/src/runtime/task/core.rs
+++ b/tokio/src/runtime/task/core.rs
@@ -257,11 +257,15 @@ impl<T: Future, S: Schedule> Core<T, S> {
 
         let task = ManuallyDrop::new(task);
 
-        self.scheduler.with(|ptr| {
+        self.scheduler.with_mut(|ptr| {
             // Safety: Can only be called after initial `poll`, which is the
             // only time the field is mutated.
             match unsafe { &*ptr } {
-                Some(scheduler) => scheduler.release(&*task),
+                Some(scheduler) => {
+                    let out = scheduler.release(&*task);
+                    unsafe { *ptr = None };
+                    out
+                }
                 // Task was never polled
                 None => None,
             }

This ensures that when the worker shuts down and releases all associated tasks, those tasks no longer hold a reference to the worker. That way, even if the time driver is holding a reference to one of those tasks, that task doesn't keep the worker alive.

I say "fixes" because there are very clearly complicated safety requirements around mutating the scheduler pointer, and I'm just mutating it blindly here. But I hope this is helpful nonetheless.

@carllerche
Copy link
Member

Sorry for the delay, I was setting up a new linux box and now I can repro 💯

@carllerche
Copy link
Member

I can also see that your patch fixes the memory leak. Unfortunately, it is not thread safe. The scheduler arc should be dropped when the task is dropped, but it seems like this is not happening. I will try to dig into why the leak is happening.

@benesch
Copy link
Contributor

benesch commented May 31, 2020 via email

@carllerche
Copy link
Member

The problem is the time driver holds wakers for the task but we currently don't force the time driver to purge on drop.

@carllerche
Copy link
Member

More specifically, the time driver handles this problem by using a weak reference in the unpark handle. However, the threaded runtime uses a custom Parker which does not. Fixing this will probably require a bit of a cleanup of this subsystem.

emgre added a commit to emgre/tokio that referenced this issue Jul 8, 2020
In threaded runtime, the unparker now owns a weak reference to the inner data. This breaks the cycle of Arc and properly releases the io driver and its worker threads.
emgre added a commit to emgre/tokio that referenced this issue Jul 8, 2020
In threaded runtime, the unparker now owns a weak reference to the inner data. This breaks the cycle of Arc and properly releases the io driver and its worker threads.
emgre added a commit to emgre/tokio that referenced this issue Jul 23, 2020
carllerche pushed a commit that referenced this issue Jul 29, 2020
JoinHandle of threads created by the pool are now tracked and properly joined at
shutdown. If the thread does not return within the timeout, then it's not joined and
left to the OS for cleanup.

Also, break a cycle between wakers held by the timer and the runtime.

Fixes #2641, #2535
@koivunej
Copy link
Contributor

koivunej commented Jan 5, 2021

Looking at this with tokio 1.0.1 and an updated reproducer, there are no more definitely lost or indirectly lost with valgrind 3.17.0.GIT (f4d98ff79). On repeated runs it seems that the numbers for possibly lost and still reachable are constant.

Updated reproducer
use tokio::time::sleep;

fn main() {
    let rt = tokio::runtime::Builder::new_multi_thread()
        .worker_threads(1)
        .enable_time()
        .enable_io()
        .build()
        .unwrap();

    rt.spawn(my_loop());

    std::thread::sleep(std::time::Duration::from_secs(1));

    /*
    drop(rt);

    std::thread::sleep(std::time::Duration::from_secs(1));
    */
}

async fn my_loop() {
    println!("I'm alive!");
    loop {
        sleep(std::time::Duration::from_millis(100)).await;
    }
}

valgrind --leak-check=full --show-leak-kinds=all --num-callers=500 report for debug build. It does look like most of what remains are the signal handling related, apart from:

If .enable_io() is left out, there are only two blocks left: valgrind report for without enable_io.

Did not look into the implementation(s), just stumbled upon the issue and wanted to see if this was accidentially left open.

@benesch
Copy link
Contributor

benesch commented Jan 6, 2021

Agreed, this issue is also confirmed resolved for me in rust-rdkafka with Tokio v1.0.1.

@carllerche
Copy link
Member

Great, closing 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio Area: The main tokio crate C-bug Category: This is a bug. M-runtime Module: tokio/runtime
Projects
None yet
Development

No branches or pull requests

5 participants