Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WARNING - Memory use is high but worker has no data to store to disk #4193

Open
Hoeze opened this issue Oct 27, 2020 · 17 comments · May be fixed by #4221
Open

WARNING - Memory use is high but worker has no data to store to disk #4193

Hoeze opened this issue Oct 27, 2020 · 17 comments · May be fixed by #4221

Comments

@Hoeze
Copy link

Hoeze commented Oct 27, 2020

I end up getting a lot of error messages like this:

distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 6.15 GB -- Worker memory limit: 8.45 GB

I'm relatively sure that this warning is actually true.
Also, the workers hitting this warning end up in idling all the time.

Is there a way to automatically retire + restart the workers that hit this error message?

Anything else we need to know?:
I'm using Dask v2.30

Environment:

  • Dask version: v2.30
  • Python version: 3.7.8
  • Operating System: Centos 7
  • Install method (conda, pip, source): conda
@Hoeze
Copy link
Author

Hoeze commented Oct 27, 2020

A related issue I hit when residing workers a couple of times:
#3018

At some point, these high-memory workers are just stuck and do not do anything anymore.

@quasiben
Copy link
Member

I don't think there is a method to auto-restart. You can manually call client.restart() which will restart all the workers. Do you have a concise reproducer ?

@Hoeze
Copy link
Author

Hoeze commented Oct 29, 2020

Thanks for your answer @quasiben.
Unfortunately, this does not help in my case.
I want to run my jobs unattended but to finish the last task I often have to manually restart a hanging worker (await client.cluster.workers[<id>].restart()).
Also, client.restart() would restart the whole cluster and delete all the progress, so I would need to re-run all the computations.

@quasiben
Copy link
Member

Hmm, could there in fact be a leak ? Can you run with more memory ?Can you describe the kind of operation(s) the worker is doing during this time ?

@Hoeze
Copy link
Author

Hoeze commented Oct 29, 2020

My function is fetching data from a SQL server and reads some data from TileDB. Afterwards it uses Pandas to do some aggregations and return a numpy array.
Then I use dask.array.from_delayed() to map this function across a range of 2500 inputs.

I made sure that all connections are one-shot and therefore get removed after loading the data.
However, I cannot completely rule out the possibility that one of the libraries is leaking memory but with memory-profiler I could not find a hint what would leak the memory.

More memory helps indeed, but only to keep the workers alive longer.

@mrocklin
Copy link
Member

Is there a way to automatically retire + restart the workers that hit this error message?

I like this idea.

In practice Dask is unable to control memory leaks. These often come from other libraries that weren't designed to run in a multi-threaded context (indeed, glibc malloc itself has some issues here). I think that smoothly retiring workers in this situation would be a good way for us to add some robustness to this situation.

One challenge to this is what to do with the memory that is currently stored in the worker. It would be nice to transfer it to other workers before restarting (this will happen naturally as part of the worker retire process), but it might be hard to bring it back from disk, given the memory leak that has occurred.

As a first step, we could call await self.close_gracefully() alongside issuing that warning. @Hoeze would you be comfortable testing this out in your situation and reporting back? You will probably also have to add a keyword argument to close_gracefully that gets passed into the nanny= keyword of close, maybe something like this.

diff --git a/distributed/worker.py b/distributed/worker.py
index 00d69329..06013718 100644
--- a/distributed/worker.py
+++ b/distributed/worker.py
@@ -1178,12 +1178,15 @@ class Worker(ServerNode):
             setproctitle("dask-worker [closed]")
         return "OK"
 
-    async def close_gracefully(self):
+    async def close_gracefully(self, nanny=None):
         """ Gracefully shut down a worker
 
         This first informs the scheduler that we're shutting down, and asks it
         to move our data elsewhere.  Afterwards, we close as normal
         """
+        if nanny is None:
+            nanny = not self.lifetime_restart
+
         if self.status in (Status.closing, Status.closing_gracefully):
             await self.finished()
 
@@ -1193,7 +1196,7 @@ class Worker(ServerNode):
         logger.info("Closing worker gracefully: %s", self.address)
         self.status = Status.closing_gracefully
         await self.scheduler.retire_workers(workers=[self.address], remove=False)
-        await self.close(safe=True, nanny=not self.lifetime_restart)
+        await self.close(safe=True, nanny=nanny)
 
     async def terminate(self, comm=None, report=True, **kwargs):
         await self.close(report=report, **kwargs)
@@ -2683,6 +2686,7 @@ class Worker(ServerNode):
                         if self.memory_limit is not None
                         else "None",
                     )
+                    await self.close_gracefully(nanny=False)
                     break
                 k, v, weight = self.data.fast.evict()
                 del k, v

@Hoeze
Copy link
Author

Hoeze commented Oct 31, 2020

Thanks a lot @mrocklin, I will test your patch in the next week.

@mrocklin
Copy link
Member

mrocklin commented Oct 31, 2020 via email

@Hoeze
Copy link
Author

Hoeze commented Oct 31, 2020

OK sure. I'm happy to debug :)
Nonetheless if you have tips how to get dask workers into debug mode (with pycharm) I'd be even more happy :D

@mrocklin
Copy link
Member

mrocklin commented Oct 31, 2020 via email

@Hoeze
Copy link
Author

Hoeze commented Nov 5, 2020

Hi @mrocklin, I implemented your solution in #4221.
It solves the problem of memory-leaking workers very effectively.

However, there are still workers that end up freezing.
I created a minimal example to reproduce a memory-leaking function that freezes up workers:

import dask
import dask.distributed
import numpy as np
import time

cluster = dask.distributed.LocalCluster(n_workers=2, threads_per_worker=1, memory_limit="512M")
client = dask.distributed.Client(cluster)

x = {}
def memory_leaking_fn(data, sleep=0):
    x[data] = np.random.randint(100, size=12 * 1024**2 // 8)
    
    time.sleep(sleep)
    return data

futures = client.map(memory_leaking_fn, range(1000))
futures = client.map(memory_leaking_fn, range(1000), np.repeat(0.1, 1000))

for f in futures:
    print(f.result())

@mrocklin
Copy link
Member

mrocklin commented Nov 6, 2020

However, there are still workers that end up freezing.

I'm curious, what happens? Are there any logs or signs about what might be wrong?

@Hoeze
Copy link
Author

Hoeze commented Nov 6, 2020

@mrocklin it's the last remaining worker that never gets restarted.

Basically, I can reproduce two scenarios:

  • Dask restarting all workers simultaneously with loosing all progress and restarting from scratch
    This is bad and should be avoided somehow

  • Dask restarting all workers but one, resulting in one frozen worker.
    I think what happens here is the following:

    1. workers A and B hit memory limit
    2. worker A restarts gracefully and transfers its data worker B
    3. Since worker B is now the last worker available, it cannot restart without loosing all stored data
      => Something keeps worker B alive to store the readily computed futures.

    With sleep=0, after a number of restarts, dask does not spawn a new worker by itself and therefore keeps being completely frozen. Re-scaling by hand works, until dask again stops spawning new workers.
    Logs from this situation:

    distributed.nanny - WARNING - Restarting worker
    distributed.nanny - WARNING - Restarting worker
    distributed.nanny - WARNING - Restarting worker
    distributed.nanny - WARNING - Restarting worker
    distributed.nanny - WARNING - Restarting worker
    distributed.scheduler - ERROR - Couldn't gather keys {'memory_leaking_fn-e8b32c600313a2e6e07f8122bf2c2b83': ['tcp://127.0.0.1:34312']} state: ['processing'] workers: ['tcp://127.0.0.1:34312']
    NoneType: None
    distributed.scheduler - ERROR - Workers don't have promised key: ['tcp://127.0.0.1:34312'], memory_leaking_fn-e8b32c600313a2e6e07f8122bf2c2b83
    NoneType: None
    distributed.client - WARNING - Couldn't gather 1 keys, rescheduling {'memory_leaking_fn-e8b32c600313a2e6e07f8122bf2c2b83': ('tcp://127.0.0.1:34312',)}
    distributed.utils - ERROR - Timed out trying to connect to tcp://127.0.0.1:37228 after 3 s
    ConnectionRefusedError: [Errno 111] Connection refused
    
    The above exception was the direct cause of the following exception:
    
    Traceback (most recent call last):
      File "/data/nasif12/home_if12/hoelzlwi/Projects/dask/distributed/distributed/comm/core.py", line 288, in connect
        timeout=min(intermediate_cap, time_left()),
      File "/opt/modules/i12g/anaconda/envs/florian3/lib/python3.7/asyncio/tasks.py", line 442, in wait_for
        return fut.result()
      File "/data/nasif12/home_if12/hoelzlwi/Projects/dask/distributed/distributed/comm/tcp.py", line 362, in connect
        convert_stream_closed_error(self, e)
      File "/data/nasif12/home_if12/hoelzlwi/Projects/dask/distributed/distributed/comm/tcp.py", line 124, in convert_stream_closed_error
        ) from exc
    distributed.comm.core.CommClosedError: in <distributed.comm.tcp.TCPConnector object at 0x2b868ba18760>: ConnectionRefusedError: [Errno 111] Connection refused
    
    The above exception was the direct cause of the following exception:
    
    Traceback (most recent call last):
      File "/data/nasif12/home_if12/hoelzlwi/Projects/dask/distributed/distributed/utils.py", line 655, in log_errors
        yield
      File "/data/nasif12/home_if12/hoelzlwi/Projects/dask/distributed/distributed/scheduler.py", line 3533, in retire_workers
        lock=False,
      File "/data/nasif12/home_if12/hoelzlwi/Projects/dask/distributed/distributed/scheduler.py", line 3297, in replicate
        for w, who_has in gathers.items()
      File "/data/nasif12/home_if12/hoelzlwi/Projects/dask/distributed/distributed/utils_comm.py", line 390, in retry_operation
        operation=operation,
      File "/data/nasif12/home_if12/hoelzlwi/Projects/dask/distributed/distributed/utils_comm.py", line 370, in retry
        return await coro()
      File "/data/nasif12/home_if12/hoelzlwi/Projects/dask/distributed/distributed/core.py", line 880, in send_recv_from_rpc
        comm = await self.pool.connect(self.addr)
      File "/data/nasif12/home_if12/hoelzlwi/Projects/dask/distributed/distributed/core.py", line 1035, in connect
        **self.connection_args,
      File "/data/nasif12/home_if12/hoelzlwi/Projects/dask/distributed/distributed/comm/core.py", line 310, in connect
        ) from active_exception
    OSError: Timed out trying to connect to tcp://127.0.0.1:37228 after 3 s
    distributed.core - ERROR - Timed out trying to connect to tcp://127.0.0.1:37228 after 3 s
    ConnectionRefusedError: [Errno 111] Connection refused
    
    The above exception was the direct cause of the following exception:
    
    Traceback (most recent call last):
      File "/data/nasif12/home_if12/hoelzlwi/Projects/dask/distributed/distributed/comm/core.py", line 288, in connect
        timeout=min(intermediate_cap, time_left()),
      File "/opt/modules/i12g/anaconda/envs/florian3/lib/python3.7/asyncio/tasks.py", line 442, in wait_for
        return fut.result()
      File "/data/nasif12/home_if12/hoelzlwi/Projects/dask/distributed/distributed/comm/tcp.py", line 362, in connect
        convert_stream_closed_error(self, e)
      File "/data/nasif12/home_if12/hoelzlwi/Projects/dask/distributed/distributed/comm/tcp.py", line 124, in convert_stream_closed_error
        ) from exc
    distributed.comm.core.CommClosedError: in <distributed.comm.tcp.TCPConnector object at 0x2b868ba18760>: ConnectionRefusedError: [Errno 111] Connection refused
    
    The above exception was the direct cause of the following exception:
    
    Traceback (most recent call last):
      File "/data/nasif12/home_if12/hoelzlwi/Projects/dask/distributed/distributed/core.py", line 528, in handle_comm
        result = await result
      File "/data/nasif12/home_if12/hoelzlwi/Projects/dask/distributed/distributed/scheduler.py", line 3533, in retire_workers
        lock=False,
      File "/data/nasif12/home_if12/hoelzlwi/Projects/dask/distributed/distributed/scheduler.py", line 3297, in replicate
        for w, who_has in gathers.items()
      File "/data/nasif12/home_if12/hoelzlwi/Projects/dask/distributed/distributed/utils_comm.py", line 390, in retry_operation
        operation=operation,
      File "/data/nasif12/home_if12/hoelzlwi/Projects/dask/distributed/distributed/utils_comm.py", line 370, in retry
        return await coro()
      File "/data/nasif12/home_if12/hoelzlwi/Projects/dask/distributed/distributed/core.py", line 880, in send_recv_from_rpc
        comm = await self.pool.connect(self.addr)
      File "/data/nasif12/home_if12/hoelzlwi/Projects/dask/distributed/distributed/core.py", line 1035, in connect
        **self.connection_args,
      File "/data/nasif12/home_if12/hoelzlwi/Projects/dask/distributed/distributed/comm/core.py", line 310, in connect
        ) from active_exception
    OSError: Timed out trying to connect to tcp://127.0.0.1:37228 after 3 s
    distributed.nanny - WARNING - Restarting worker
    distributed.nanny - WARNING - Restarting worker
    distributed.nanny - WARNING - Restarting worker
    

This happens a bit randomly, depending on the time delay between both workers restart.
One can watch this nicely on the dashboard, though.

(I edited the test script in my last post slightly: change the sleep time + have only two workers)

@KrishanBhasin
Copy link
Contributor

Hey @Hoeze, the errors in your traceback look a lot like the kinds of problems that 2.30.1 was supposed to address. Have you tried cherry-picking your commits over the top of that release to see if you still encounter those problems?

@Hoeze
Copy link
Author

Hoeze commented Nov 10, 2020

@KrishanBhasin I am currently running with a fork of d7f532c (see #4221).
So, yes, I am already using the fixes from 2.30.1, but they do not seem to solve the issue.

The workers still keep freezing and I cannot get my calculations to finish.

@Hoeze
Copy link
Author

Hoeze commented Dec 8, 2020

Hi all, are there some news on this topic?
I tried current master but it somehow only makes things worse.
There are:

I imagine all those instabilities could be fixed by in-place worker restarts + having the nanny hold all results, as this would make them failure-proof to worker failures.

Is there a way to push progress on this issue or get professional support on it?

@yishairasowsky
Copy link

@mrocklin @Hoeze @quasiben @KrishanBhasin @jedbrown
I would love to know what I can do to solve the issue, because I am suffering from it as well.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants