Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] Worker stuck in "closing-gracefully" state #3018

Open
AnesBenmerzoug opened this issue Sep 2, 2019 · 15 comments
Open

[Bug] Worker stuck in "closing-gracefully" state #3018

AnesBenmerzoug opened this issue Sep 2, 2019 · 15 comments

Comments

@AnesBenmerzoug
Copy link

I have a cluster composed of 8 workers distributed accross in k8s pods in AWS.
I noticed that sometimes one of the workers gets stuck in the "closing-gracefully" state because of an assertion error in the worker's close_gracefully() method.

For some reason the scheduler still tries to send tasks to this worker, which just fail after some time because the worker does not actually execute them.

Manually closing the worker using the client's retire_workers() methods works and I'm currently using it as a workaround.

After digging around the code base a bit, I found that the part of the code responsible for this behaviour is in the scheduler's replicate() method. The failing assertion, which I did not completely understand, is not handled properly and so leads the worker to not close properly.

From analyzing the expression I could conclude the following:

  • n_missing is greater than 0, otherwise the method would have returned
  • branching_factor's default value is used which 2

From those two points it seems that len(ts.who_has) is 0

Unfortunately I did not yet find a minimal example to reproduce this example.

Stack Trace
2019-09-02 at 01:40:09 | LOCALDEV | DASK     | INFO     | distributed.worker:close_gracefully:1116 - Closing worker gracefully: tcp://10.165.119.248:37069
2019-09-02 at 01:40:10 | LOCALDEV | DASK     | INFO     | distributed.worker:transition_executing_done:1646 - Comm closed
2019-09-02 at 01:40:10 | LOCALDEV | DASK     | INFO     | distributed.worker:transition_executing_done:1646 - Comm closed
2019-09-02 at 01:40:11 | LOCALDEV | DASK     | WARNING  | distributed.utils_perf:_gc_callback:204 - full garbage collections took 30% CPU time recently (threshold: 10%)
2019-09-02 at 01:40:13 | LOCALDEV | DASK     | WARNING  | distributed.utils_perf:_gc_callback:204 - full garbage collections took 30% CPU time recently (threshold: 10%)
2019-09-02 at 01:40:13 | LOCALDEV | DASK     | INFO     | agena_data_acquisition.tools.flow.utilities:set_container_components_on_hash_cache:152 - Setting categories and timestamps on hash cache
2019-09-02 at 01:40:13 | LOCALDEV | DASK     | INFO     | distributed.worker:transition_executing_done:1646 - Comm closed
2019-09-02 at 01:40:13 | LOCALDEV | DASK     | INFO     | distributed.worker:transition_executing_done:1646 - Comm closed
2019-09-02 at 01:40:15 | LOCALDEV | DASK     | ERROR    | tornado.ioloop:_run_callback:763 - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x7f625d4db400>>, <Task finished coro=<Worker.close_gracefully() done, defined at /opt/venv/lib/python3.6/site-packages/distributed/worker.py:1104> exception=AssertionError()>)
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/usr/lib/python3.6/multiprocessing/forkserver.py", line 196, in main
    _serve_one(s, listener, alive_r, old_handlers)
    │          │  │         │        └ {<Signals.SIGCHLD: 17>: <Handlers.SIG_DFL: 0>, <Signals.SIGINT: 2>: <built-in function default_int_handler>}
    │          │  │         └ 17
    │          │  └ <socket.socket [closed] fd=-1, family=AddressFamily.AF_UNIX, type=SocketKind.SOCK_STREAM, proto=0>
    │          └ <socket.socket [closed] fd=-1, family=AddressFamily.AF_UNIX, type=SocketKind.SOCK_STREAM, proto=0><function _serve_one at 0x7f625c10c730>
  File "/usr/lib/python3.6/multiprocessing/forkserver.py", line 231, in _serve_one
    code = spawn._main(child_r)
           │     │     └ 11
           │     └ <function _main at 0x7f625c111d08><module 'multiprocessing.spawn' from '/usr/lib/python3.6/multiprocessing/spawn.py'>
  File "/usr/lib/python3.6/multiprocessing/spawn.py", line 118, in _main
    return self._bootstrap()
           │    └ <function BaseProcess._bootstrap at 0x7f625d3ddd08><ForkServerProcess(Dask Worker process (from Nanny), started daemon)>
  File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
    │    └ <function BaseProcess.run at 0x7f625d3dd510><ForkServerProcess(Dask Worker process (from Nanny), started daemon)>
  File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
    │    │        │    │        │    └ {}
    │    │        │    │        └ <ForkServerProcess(Dask Worker process (from Nanny), started daemon)>
    │    │        │    └ (<bound method WorkerProcess._run of <class 'distributed.nanny.WorkerProcess'>>, (), {'worker_kwargs': {'scheduler_ip': 'tcp:...
    │    │        └ <ForkServerProcess(Dask Worker process (from Nanny), started daemon)>
    │    └ <bound method AsyncProcess._run of <class 'distributed.process.AsyncProcess'>><ForkServerProcess(Dask Worker process (from Nanny), started daemon)>
  File "/opt/venv/lib/python3.6/site-packages/distributed/process.py", line 179, in _run
    target(*args, **kwargs)
    │       │       └ {'worker_kwargs': {'scheduler_ip': 'tcp://dask-scheduler:8786', 'nthreads': 4, 'local_directory': '/tmp', 'services': None, '...
    │       └ ()
    └ <bound method WorkerProcess._run of <class 'distributed.nanny.WorkerProcess'>>
  File "/opt/venv/lib/python3.6/site-packages/distributed/nanny.py", line 697, in _run
    loop.run_sync(run)
    │    │        └ <function WorkerProcess._run.<locals>.run at 0x7f623d58c8c8>
    │    └ <function IOLoop.run_sync at 0x7f625a0e0378><tornado.platform.asyncio.AsyncIOLoop object at 0x7f625d4db400>
  File "/opt/venv/lib/python3.6/site-packages/tornado/ioloop.py", line 526, in run_sync
    self.start()
    │    └ <function BaseAsyncIOLoop.start at 0x7f625993b6a8><tornado.platform.asyncio.AsyncIOLoop object at 0x7f625d4db400>
  File "/opt/venv/lib/python3.6/site-packages/tornado/platform/asyncio.py", line 148, in start
    self.asyncio_loop.run_forever()
    │    │            └ <function BaseEventLoop.run_forever at 0x7f625acee158>
    │    └ <_UnixSelectorEventLoop running=True closed=False debug=False><tornado.platform.asyncio.AsyncIOLoop object at 0x7f625d4db400>
  File "/usr/lib/python3.6/asyncio/base_events.py", line 438, in run_forever
    self._run_once()
    │    └ <function BaseEventLoop._run_once at 0x7f625acf9620><_UnixSelectorEventLoop running=True closed=False debug=False>
  File "/usr/lib/python3.6/asyncio/base_events.py", line 1451, in _run_once
    handle._run()
    │      └ <function Handle._run at 0x7f625acdb2f0><Handle IOLoop.add_future.<locals>.<lambda>(<Task finishe...ertionError()>) at /opt/venv/lib/python3.6/site-packages/tornado/...
  File "/usr/lib/python3.6/asyncio/events.py", line 145, in _run
    self._callback(*self._args)
    │    │          │    └ <member '_args' of 'Handle' objects>
    │    │          └ <Handle IOLoop.add_future.<locals>.<lambda>(<Task finishe...ertionError()>) at /opt/venv/lib/python3.6/site-packages/tornado/...
    │    └ <member '_callback' of 'Handle' objects><Handle IOLoop.add_future.<locals>.<lambda>(<Task finishe...ertionError()>) at /opt/venv/lib/python3.6/site-packages/tornado/...
  File "/opt/venv/lib/python3.6/site-packages/tornado/ioloop.py", line 690, in <lambda>
    lambda f: self._run_callback(functools.partial(callback, future))
           │  │    │             │         │       │         └ <Task finished coro=<Worker.close_gracefully() done, defined at /opt/venv/lib/python3.6/site-packages/distributed/worker.py:1...
           │  │    │             │         │       └ <bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x7f625d4db400>>
           │  │    │             │         └ <class 'functools.partial'>
           │  │    │             └ <module 'functools' from '/usr/lib/python3.6/functools.py'>
           │  │    └ <function IOLoop._run_callback at 0x7f625a0e0b70>
           │  └ <tornado.platform.asyncio.AsyncIOLoop object at 0x7f625d4db400><Task finished coro=<Worker.close_gracefully() done, defined at /opt/venv/lib/python3.6/site-packages/distributed/worker.py:1...
> File "/opt/venv/lib/python3.6/site-packages/tornado/ioloop.py", line 743, in _run_callback
    ret = callback()
          └ functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x7f625d4db4...
  File "/opt/venv/lib/python3.6/site-packages/tornado/ioloop.py", line 767, in _discard_future_result
    future.result()
    │      └ <method 'result' of '_asyncio.Task' objects><Task finished coro=<Worker.close_gracefully() done, defined at /opt/venv/lib/python3.6/site-packages/distributed/worker.py:1...
  File "/opt/venv/lib/python3.6/site-packages/distributed/worker.py", line 1118, in close_gracefully
    await self.scheduler.retire_workers(workers=[self.address], remove=False)
          │    │                                 │    └ <property object at 0x7f62590b5098>
          │    │                                 └ <Worker: tcp://10.165.119.248:37069, closing-gracefully, stored: 37, running: 2/4, ready: 0, comm: 0, waiting: 0>
          │    └ <pooled rpc to 'tcp://dask-scheduler:8786'><Worker: tcp://10.165.119.248:37069, closing-gracefully, stored: 37, running: 2/4, ready: 0, comm: 0, waiting: 0>
  File "/opt/venv/lib/python3.6/site-packages/distributed/core.py", line 750, in send_recv_from_rpc
    result = await send_recv(comm=comm, op=key, **kwargs)
                   │         │    │        │      └ {'workers': ['tcp://10.165.119.248:37069'], 'remove': False}
                   │         │    │        └ 'retire_workers'
                   │         │    └ <TCP ConnectionPool local=tcp://10.165.119.248:58730 remote=tcp://dask-scheduler:8786>
                   │         └ <TCP ConnectionPool local=tcp://10.165.119.248:58730 remote=tcp://dask-scheduler:8786><function send_recv at 0x7f625912dc80>
  File "/opt/venv/lib/python3.6/site-packages/distributed/core.py", line 559, in send_recv
    six.reraise(*clean_exception(**response))
    │   │        │                 └ {'status': 'uncaught-error', 'text': '', 'exception': AssertionError(), 'traceback': <traceback object at 0x7f60f8ba7e48>}
    │   │        └ <function clean_exception at 0x7f62590b6e18>
    │   └ <function reraise at 0x7f625a0b17b8><module 'six' from '/opt/venv/lib/python3.6/site-packages/six.py'>
  File "/opt/venv/lib/python3.6/site-packages/six.py", line 692, in reraise
    raise value.with_traceback(tb)
          │                    └ NoneNone
  File "/opt/venv/lib/python3.6/site-packages/distributed/core.py", line 416, in handle_comm
    result = await result
  File "/opt/venv/lib/python3.6/site-packages/distributed/scheduler.py", line 3117, in retire_workers
    delete=False,
  File "/opt/venv/lib/python3.6/site-packages/tornado/gen.py", line 748, in run
    yielded = self.gen.send(value)
  File "/opt/venv/lib/python3.6/site-packages/distributed/scheduler.py", line 2881, in replicate
    assert count > 0

AssertionError: assert count > 0
2019-09-02 at 01:55:24 | LOCALDEV | DASK     | INFO     | distributed.worker:run:3377 - Run out-of-band function 'collect'
2019-09-02 at 01:55:25 | LOCALDEV | DASK     | WARNING  | distributed.utils_perf:_gc_callback:204 - full garbage collections took 30% CPU time recently (threshold: 10%)
2019-09-02 at 02:11:15 | LOCALDEV | DASK     | INFO     | distributed.worker:run:3377 - Run out-of-band function 'collect'
2019-09-02 at 02:11:15 | LOCALDEV | DASK     | WARNING  | distributed.utils_perf:_gc_callback:204 - full garbage collections took 30% CPU time recently (threshold: 10%)
2019-09-02 at 02:27:08 | LOCALDEV | DASK     | INFO     | distributed.worker:run:3377 - Run out-of-band function 'collect'
Workers status
>>> pprint(client.run(lambda dask_worker: dask_worker.status))
{'tcp://10.165.118.150:37697': 'running',
 'tcp://10.165.118.190:40097': 'running',
 'tcp://10.165.119.194:43069': 'closing-gracefully',
 'tcp://10.165.134.5:37665': 'running',
 'tcp://10.165.134.98:37897': 'running',
 'tcp://10.165.135.151:41777': 'running',
 'tcp://10.165.135.24:39843': 'running',
 'tcp://10.165.135.75:41797': 'running'}
Manually Closing the Worker
>>> client.retire_workers(['tcp://10.165.119.194:43069'])
{'tcp://10.165.119.194:43069': {'type': 'Worker', 'id': 'tcp://10.165.119.194:43069', 'host': '10.165.119.194', 'resources': {}, 'local_directory': '/tmp/worker-insrgwz0', 'name': 'tcp://10.165.119.194:43069', 'nthreads': 4, 'memory_limit': 6000000000, 'last_seen': 1567423741.9343932, 'services': {'dashboard': 40601}, 'metrics': {'cpu': 4.0, 'memory': 1920286720, 'time': 1567423741.4320846, 'read_bytes': 285.9653648363741, 'write_bytes': 773.9062670746627, 'num_fds': 1284, 'executing': 0, 'in_memory': 0, 'ready': 0, 'in_flight': 0, 'bandwidth': 100000000}, 'nanny': 'tcp://10.165.119.194:32957'}}

>>> pprint(client.run(lambda dask_worker: dask_worker.status))
{'tcp://10.165.118.150:41061': 'running',
 'tcp://10.165.118.190:40097': 'running',
 'tcp://10.165.119.194:39373': 'running',
 'tcp://10.165.134.5:37665': 'running',
 'tcp://10.165.134.98:37897': 'running',
 'tcp://10.165.135.151:41777': 'running',
 'tcp://10.165.135.24:39843': 'running',
 'tcp://10.165.135.75:41797': 'running'}
Scheduler Info
>>> pprint(client.scheduler_info())
{'address': 'tcp://10.165.119.247:8786',
 'id': 'Scheduler-ae134b55-02b0-4644-9053-1a4d27cd6253',
 'services': {'dashboard': 8787},
 'type': 'Scheduler',
 'workers': {'tcp://10.165.118.150:40363': {'host': '10.165.118.150',
                                            'id': 'tcp://10.165.118.150:40363',
                                            'last_seen': 1567418324.9379842,
                                            'local_directory': '/tmp/worker-yn1xir95',
                                            'memory_limit': 6000000000,
                                            'metrics': {'bandwidth': 100000000,
                                                        'cpu': 0.0,
                                                        'executing': 0,
                                                        'in_flight': 0,
                                                        'in_memory': 0,
                                                        'memory': 106196992,
                                                        'num_fds': 27,
                                                        'read_bytes': 285.2919233032259,
                                                        'ready': 0,
                                                        'time': 1567418324.9362829,
                                                        'write_bytes': 768.0936396625311},
                                            'name': 'tcp://10.165.118.150:40363',
                                            'nanny': 'tcp://10.165.118.150:41743',
                                            'nthreads': 4,
                                            'resources': {},
                                            'services': {'dashboard': 44259},
                                            'type': 'Worker'},
             'tcp://10.165.118.190:39499': {'host': '10.165.118.190',
                                            'id': 'tcp://10.165.118.190:39499',
                                            'last_seen': 1567418325.1259258,
                                            'local_directory': '/tmp/worker-n67kzmw4',
                                            'memory_limit': 6000000000,
                                            'metrics': {'bandwidth': 100000000,
                                                        'cpu': 0.0,
                                                        'executing': 0,
                                                        'in_flight': 0,
                                                        'in_memory': 0,
                                                        'memory': 107712512,
                                                        'num_fds': 27,
                                                        'read_bytes': 285.8943480874215,
                                                        'ready': 0,
                                                        'time': 1567418325.1242785,
                                                        'write_bytes': 769.7155525430579},
                                            'name': 'tcp://10.165.118.190:39499',
                                            'nanny': 'tcp://10.165.118.190:37897',
                                            'nthreads': 4,
                                            'resources': {},
                                            'services': {'dashboard': 36281},
                                            'type': 'Worker'},
             'tcp://10.165.119.194:43069': {'host': '10.165.119.194',
                                            'id': 'tcp://10.165.119.194:43069',
                                            'last_seen': 1567418324.9348905,
                                            'local_directory': '/tmp/worker-insrgwz0',
                                            'memory_limit': 6000000000,
                                            'metrics': {'bandwidth': 100000000,
                                                        'cpu': 2.0,
                                                        'executing': 0,
                                                        'in_flight': 0,
                                                        'in_memory': 0,
                                                        'memory': 1865625600,
                                                        'num_fds': 1284,
                                                        'read_bytes': 285.88140257823795,
                                                        'ready': 0,
                                                        'time': 1567418324.9321342,
                                                        'write_bytes': 773.6790405439028},
                                            'name': 'tcp://10.165.119.194:43069',
                                            'nanny': 'tcp://10.165.119.194:32957',
                                            'nthreads': 4,
                                            'resources': {},
                                            'services': {'dashboard': 40601},
                                            'type': 'Worker'},
             'tcp://10.165.134.5:45259': {'host': '10.165.134.5',
                                          'id': 'tcp://10.165.134.5:45259',
                                          'last_seen': 1567418325.0068722,
                                          'local_directory': '/tmp/worker-c9_tdwxs',
                                          'memory_limit': 6000000000,
                                          'metrics': {'bandwidth': 100000000,
                                                      'cpu': 0.0,
                                                      'executing': 0,
                                                      'in_flight': 0,
                                                      'in_memory': 0,
                                                      'memory': 114638848,
                                                      'num_fds': 27,
                                                      'read_bytes': 285.86423455352053,
                                                      'ready': 0,
                                                      'time': 1567418325.004854,
                                                      'write_bytes': 765.636376461527},
                                          'name': 'tcp://10.165.134.5:45259',
                                          'nanny': 'tcp://10.165.134.5:42299',
                                          'nthreads': 4,
                                          'resources': {},
                                          'services': {'dashboard': 42483},
                                          'type': 'Worker'},
             'tcp://10.165.134.98:42651': {'host': '10.165.134.98',
                                           'id': 'tcp://10.165.134.98:42651',
                                           'last_seen': 1567418325.138247,
                                           'local_directory': '/tmp/worker-gmprr26a',
                                           'memory_limit': 6000000000,
                                           'metrics': {'bandwidth': 100000000,
                                                       'cpu': 2.0,
                                                       'executing': 0,
                                                       'in_flight': 0,
                                                       'in_memory': 0,
                                                       'memory': 114290688,
                                                       'num_fds': 27,
                                                       'read_bytes': 286.39261759547566,
                                                       'ready': 0,
                                                       'time': 1567418324.6365747,
                                                       'write_bytes': 769.0543017948438},
                                           'name': 'tcp://10.165.134.98:42651',
                                           'nanny': 'tcp://10.165.134.98:45353',
                                           'nthreads': 4,
                                           'resources': {},
                                           'services': {'dashboard': 37307},
                                           'type': 'Worker'},
             'tcp://10.165.135.151:46743': {'host': '10.165.135.151',
                                            'id': 'tcp://10.165.135.151:46743',
                                            'last_seen': 1567418325.299708,
                                            'local_directory': '/tmp/worker-ytdfgks_',
                                            'memory_limit': 6000000000,
                                            'metrics': {'bandwidth': 100000000,
                                                        'cpu': 4.0,
                                                        'executing': 0,
                                                        'in_flight': 0,
                                                        'in_memory': 0,
                                                        'memory': 114499584,
                                                        'num_fds': 27,
                                                        'read_bytes': 286.0085919100238,
                                                        'ready': 0,
                                                        'time': 1567418325.29762,
                                                        'write_bytes': 770.0231320654489},
                                            'name': 'tcp://10.165.135.151:46743',
                                            'nanny': 'tcp://10.165.135.151:35201',
                                            'nthreads': 4,
                                            'resources': {},
                                            'services': {'dashboard': 34289},
                                            'type': 'Worker'},
             'tcp://10.165.135.24:39503': {'host': '10.165.135.24',
                                           'id': 'tcp://10.165.135.24:39503',
                                           'last_seen': 1567418325.1116345,
                                           'local_directory': '/tmp/worker-96w5bp0w',
                                           'memory_limit': 6000000000,
                                           'metrics': {'bandwidth': 100000000,
                                                       'cpu': 2.0,
                                                       'executing': 0,
                                                       'in_flight': 0,
                                                       'in_memory': 0,
                                                       'memory': 113909760,
                                                       'num_fds': 27,
                                                       'read_bytes': 285.5692941296901,
                                                       'ready': 0,
                                                       'time': 1567418325.1096437,
                                                       'write_bytes': 766.8434192014055},
                                           'name': 'tcp://10.165.135.24:39503',
                                           'nanny': 'tcp://10.165.135.24:37487',
                                           'nthreads': 4,
                                           'resources': {},
                                           'services': {'dashboard': 44285},
                                           'type': 'Worker'},
             'tcp://10.165.135.75:42995': {'host': '10.165.135.75',
                                           'id': 'tcp://10.165.135.75:42995',
                                           'last_seen': 1567418325.064751,
                                           'local_directory': '/tmp/worker-_zkylo2z',
                                           'memory_limit': 6000000000,
                                           'metrics': {'bandwidth': 100000000,
                                                       'cpu': 0.0,
                                                       'executing': 0,
                                                       'in_flight': 0,
                                                       'in_memory': 0,
                                                       'memory': 112508928,
                                                       'num_fds': 27,
                                                       'read_bytes': 285.7694387958324,
                                                       'ready': 0,
                                                       'time': 1567418325.062967,
                                                       'write_bytes': 767.3808706125849},
                                           'name': 'tcp://10.165.135.75:42995',
                                           'nanny': 'tcp://10.165.135.75:34145',
                                           'nthreads': 4,
                                           'resources': {},
                                           'services': {'dashboard': 35467},
                                           'type': 'Worker'}}}
@mrocklin
Copy link
Member

mrocklin commented Sep 2, 2019

Thank yo for the detailed bug report and analysis @AnesBenmerzoug . This looks like it might be the same as #1930 ?

The replicate code is not great, but has been useful enough to find its way into various code paths. I'm a little surprised actually that issues like yours are as rare as they are.

I think that there are two paths forward here:

  1. Short: provide a failing example, and then maybe someone can figure out why we were scaling down in an unsafe situation
  2. Long: someone can redo replicate to be more robust

@AnesBenmerzoug
Copy link
Author

@mrocklin It took some time, but I managed to recreate the issue locally.
For me this raises the assertion error pretty consistently.

I noticed that it happens when a worker that is being closed holds many keys in memory.
It could be that the transfer of those keys to other workers is abruptly stopped before the transfer is finished or something like that.

Failing example
import logging
import time
from random import randint

import dask.bag as db
import distributed
from loguru import logger


class InterceptHandler(logging.Handler):
    def emit(self, record):
        # Retrieve context where the logging call occurred, this happens to be in the 6th frame upward
        logger_opt = logger.opt(depth=6, exception=record.exc_info)
        logger_opt.log(logging.getLevelName(record.levelno), record.getMessage())


logging.getLogger("distributed").handlers = [InterceptHandler()]


def busy_work(x):
    time.sleep(randint(1, 10))
    x = list(map(lambda a: a + 1, x))
    return x


def main():
    cluster = distributed.LocalCluster(
        n_workers=4,
        threads_per_worker=4,
        lifetime="5s",
        lifetime_stagger="2s",
        lifetime_restart=True,
        silence_logs=logging.DEBUG,
    )
    with distributed.Client(cluster) as client:
        numbers = list(range(10000))
        numbers_bag = db.from_sequence(numbers, npartitions=len(numbers))
        numbers_bag = numbers_bag.remove(lambda x: x % 2 == 0)
        numbers_bag = numbers_bag.repartition(npartitions=len(numbers) // 10)
        numbers_bag = numbers_bag.map_partitions(busy_work)
        numbers_bag = numbers_bag.map_partitions(busy_work)
        delayed_number = numbers_bag.to_delayed()
        futures = client.compute(delayed_number)
        distributed.wait(futures)


if __name__ == "__main__":
    main()
Requirements

This snippet requires:

loguru
dask[bag]
distributed

loguru is just a convenient logging package that has good defaults and that can be used to intercept unexpected exceptions.

While trying to recreate the error in question I stumbled into another error.
When the the lifetime is set to a very small value, like "1s", it seems that some workers go directly from __init__ (where status is set to None) to close_gracefully (which expects status to be a string and tries to call its startswith method).
In the Worker's start method the check is different:

if self.status and self.status.startswith("clos"):
    return

So maybe changing adding an extra check for None would make sense.

Stack trace
2019-09-03 13:35:23.705 | INFO     | distributed.worker:_register_with_scheduler:780 - -------------------------------------------------
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x7f1a58e82748>>, <Task finished coro=<Worker.close_gracefully() done, defined at /opt/venv/lib/python3.6/site-packages/distributed/worker.py:1104> exception=AttributeError("'NoneType' object has no attribute 'startswith'",)>)
Traceback (most recent call last):
  File "/opt/venv/lib/python3.6/site-packages/tornado/ioloop.py", line 743, in _run_callback
    ret = callback()
  File "/opt/venv/lib/python3.6/site-packages/tornado/ioloop.py", line 767, in _discard_future_result
    future.result()
  File "/opt/venv/lib/python3.6/site-packages/distributed/worker.py", line 1110, in close_gracefully
    if self.status.startswith("closing"):
AttributeError: 'NoneType' object has no attribute 'startswith'

@SultanOrazbayev
Copy link
Contributor

I'm encountering the same problem: unsure what is the underlying cause, but self is None and I get a reference back to this function:

async def close_gracefully(self):
""" Gracefully shut down a worker
This first informs the scheduler that we're shutting down, and asks it
to move our data elsewhere. Afterwards, we close as normal
"""
if self.status.startswith("closing"):
await self.finished()
if self.status == "closed":
return
logger.info("Closing worker gracefully: %s", self.address)
self.status = "closing-gracefully"
await self.scheduler.retire_workers(workers=[self.address], remove=False)
await self.close(safe=True, nanny=not self.lifetime_restart)

@chinmaychandak
Copy link

chinmaychandak commented Jun 30, 2020

I noticed that sometimes one of the workers gets stuck in the "closing-gracefully" state because of an assertion error in the worker's close_gracefully() method.

@mrocklin Has there been any update on this?

I'm getting a similar error pretty consistently. My use case requires workers to run for longer periods of time (streaming jobs for example, wherein Dask workers keep processing batches of data every minute or so); and I'm periodically restarting the workers using the --lifetime and --lifetime-stagger parameters. The reason for the restart(s) is because I want the workers to start from a clean slate of memory every hour/few hours. Everything works for fine for a few restarts/life-cycles, but I see this error in a few worker logs after some time. This is causing loss of some computation, and tasks dependent on these computations to be held in memory.

FYI: I have over-provisioned my Dask cluster to ensure that there are enough active workers available to process data when a set of other workers are restarting.

Can someone please help?

@SultanOrazbayev
Copy link
Contributor

FWIW, I never managed to get the --lifetime and lifetime-stagger to work on a SLURM cluster. Staggering part worked, and the workers would be restarted but only within the limits of the original time, e.g. if I requested 10 minutes and set the lifetime to be 1 minute, the worker would be restarted 9 times, but once the 10 minutes were up, there would be no new job submissions... so I would end up having to manually add workers once they expired.

@quasiben
Copy link
Member

@chinmaychandak the updates on this are mostly like as you see them on the issue. I think the phrasing (though mostly likely not intended) adds stress to the maintainers of dask. What would be more helpful in pushing this issue forward is minimal reproducible example

@chinmaychandak
Copy link

FWIW, I never managed to get the --lifetime and lifetime-stagger to work on a SLURM cluster. Staggering part worked, and the workers would be restarted but only within the limits of the original time, e.g. if I requested 10 minutes and set the lifetime to be 1 minute, the worker would be restarted 9 times, but once the 10 minutes were up, there would be no new job submissions... so I would end up having to manually add workers once they expired.

@SultanOrazbayev My understanding is that --lifetime-stagger just ensures that all workers don't restart at the same time (--lifetime). So, if my lifetime is 10mins, and my stagger is 2 mins; if I have 4 workers, they will restart at random
times every 10 +/- 2 minutes. This ensures that when some of the workers restart, other workers can help handle the load. This functionality seems to be working just fine for me, but after a few lifecycles, I hit the same AssertionError that @AnesBenmerzoug mentioned.

My stack trace:

tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x7f5b52a931d0>>, <Task finished coro=<Worker.close_gracefully() done, defined at /home/ubuntu/anaconda3/envs/custreamz/lib/python3.7/site-packages/distributed/worker.py:1173> exception=AssertionError()>)
Traceback (most recent call last):
  File "/home/ubuntu/anaconda3/envs/custreamz/lib/python3.7/site-packages/tornado/ioloop.py", line 743, in _run_callback
    ret = callback()
  File "/home/ubuntu/anaconda3/envs/custreamz/lib/python3.7/site-packages/tornado/ioloop.py", line 767, in _discard_future_result
    future.result()
  File "/home/ubuntu/anaconda3/envs/custreamz/lib/python3.7/site-packages/distributed/worker.py", line 1187, in close_gracefully
    await self.scheduler.retire_workers(workers=[self.address], remove=False)
  File "/home/ubuntu/anaconda3/envs/custreamz/lib/python3.7/site-packages/distributed/core.py", line 806, in send_recv_from_rpc
    result = await send_recv(comm=comm, op=key, **kwargs)
  File "/home/ubuntu/anaconda3/envs/custreamz/lib/python3.7/site-packages/distributed/core.py", line 605, in send_recv
    raise exc.with_traceback(tb)
  File "/home/ubuntu/anaconda3/envs/custreamz/lib/python3.7/site-packages/distributed/core.py", line 459, in handle_comm
    result = await result
  File "/home/ubuntu/anaconda3/envs/custreamz/lib/python3.7/site-packages/distributed/scheduler.py", line 3466, in retire_workers
    lock=False,
  File "/home/ubuntu/anaconda3/envs/custreamz/lib/python3.7/site-packages/distributed/scheduler.py", line 3220, in replicate
    assert count > 0
AssertionError

@chinmaychandak
Copy link

@chinmaychandak the updates on this are mostly like as you see them on the issue. I think the phrasing (though mostly likely not intended) adds stress to the maintainers of dask. What would be more helpful in pushing this issue forward is minimal reproducible example

Apologies if the phrasing was inappropriate! Definitely didn't intend to offend/stress out anyone.

I saw a minimal reproducer posted already, so I thought this issue had gotten overlooked or something. But I will definitely try to create my own reproducer, and will post it here.

@mrocklin
Copy link
Member

mrocklin commented Jul 1, 2020

I saw a minimal reproducer posted already, so I thought this issue had gotten overlooked or something.

Oh yeah, you're right. I guess the next thing for someone to do is to try to figure out why it's failing. Maybe that someone is you?

I think that what people may not understand is that it's no one's job to fix these issues. The people who do so are often doing so for free as volunteers, or because they need to fix them to solve some problem that they're having at work.

Unfortunately people sometimes treat these community github issue trackers as a place where they go to ask people to do free work for them. They look a lot like other github issue trackers that they use in their workplace to ask other teams in their company to do work for them, which is reasonable given that those teams are paid.

Instead, I encourage you to think of these issue trackers as a place to collaborate on work. @AnesBenmerzoug was kind enough to make a reproducer (at significant personal cost it sounds like) great, who can take up the torch and work from there? Alternatively, if there are people paid by your company to fix these problems then maybe you can point them here and they can do this work.

People like me volunteer our time to help shepherd this process along, but we're not here to fix everyone's problem for free. There are too many problems to fix unfortunately, and we tend to be pretty busy fixing the problems that people pay us to fix.

But I will definitely try to create my own reproducer, and will post it here.

If the reproducer provided by @AnesBenmerzoug matches your situation then great. I just ran it but after a few minutes I'm not quite sure what I'm looking at, so I'm probably going to move on. Maybe you can help investigate here?

@chinmaychandak
Copy link

I think that what people may not understand is that it's no one's job to fix these issues. The people who do so are often doing so for free as volunteers, or because they need to fix them to solve some problem that they're having at work.

Unfortunately people sometimes treat these community github issue trackers as a place where they go to ask people to do free work for them. They look a lot like other github issue trackers that they use in their workplace to ask other teams in their company to do work for them, which is reasonable given that those teams are paid.

Instead, I encourage you to think of these issue trackers as a place to collaborate on work. @AnesBenmerzoug was kind enough to make a reproducer (at significant personal cost it sounds like) great, who can take up the torch and work from there? Alternatively, if there are people paid by your company to fix these problems then maybe you can point them here and they can do this work.

People like me volunteer our time to help shepherd this process along, but we're not here to fix everyone's problem for free. There are too many problems to fix unfortunately, and we tend to be pretty busy fixing the problems that people pay us to fix.

I definitely agree with everything here, and I again sincerely apologize for the inappropriate phrasing. Did not ever intend to ask people to fix my problem, or to stress you or any of the other maintainers. I think it's brilliant enough that so many important features are getting merged into Dask, and open-source projects in general! :)

If the reproducer provided by @AnesBenmerzoug matches your situation then great. I just ran it but after a few minutes I'm not quite sure what I'm looking at, so I'm probably going to move on. Maybe you can help investigate here?

Yes, I am going to try to investigate this soon. Will post findings here.

@Hoeze
Copy link

Hoeze commented Oct 27, 2020

Hi @chinmaychandak, did you have some success in debugging the issue?

Is there some workaround, e.g. by force-restarting the worker or does this also destroy the futures residing on them?

@chinmaychandak
Copy link

chinmaychandak commented Oct 27, 2020

Hey @Hoeze, I wasn't able to figure out how to fix the issue, but I did a couple of things as a workaround:

  1. I am now restarting workers less often by tweaking the worker.lifetime and worker.lifetime.stagger configs.
  2. I also changed a few other configs like worker.memory (increased all fractions), increased comm.timeouts, and that seemed to help with my use case, too.

@Hoeze
Copy link

Hoeze commented Oct 29, 2020

Thanks a lot for your tips @chinmaychandak :)
Increasing comm.timeouts seems to improve the situation a lot.

@lbonniot
Copy link

Hi all,

I'm encountering a similar issue with dask / distributed 2021.10.

I'm also using lifetime and stagger to periodically cleanup RAM.
When gracefully restarting, some workers may throw the following exception:

distributed.worker - INFO - Closing worker gracefully: tcp://172.20.0.18:33763
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x7f7008164100>>, <Task finished name='Task-108718' coro=<Worker.close_gracefully() done, defined at /opt/conda/lib/python3.8/site-packages/distributed/worker.py:1542> exception=AssertionError()>)
Traceback (most recent call last):
  File "/opt/conda/lib/python3.8/site-packages/tornado/ioloop.py", line 741, in _run_callback
    ret = callback()
  File "/opt/conda/lib/python3.8/site-packages/tornado/ioloop.py", line 765, in _discard_future_result
    future.result()
  File "/opt/conda/lib/python3.8/site-packages/distributed/worker.py", line 1559, in close_gracefully
    await self.scheduler.retire_workers(workers=[self.address], remove=False)
  File "/opt/conda/lib/python3.8/site-packages/distributed/core.py", line 863, in send_recv_from_rpc
    result = await send_recv(comm=comm, op=key, **kwargs)
  File "/opt/conda/lib/python3.8/site-packages/distributed/core.py", line 656, in send_recv
    raise exc.with_traceback(tb)
  File "/opt/conda/lib/python3.8/site-packages/distributed/core.py", line 498, in handle_comm
    result = await result
  File "/opt/conda/lib/python3.8/site-packages/distributed/scheduler.py", line 6750, in retire_workers
    await self.replicate(
  File "/opt/conda/lib/python3.8/site-packages/distributed/scheduler.py", line 6501, in replicate
    assert count > 0
AssertionError

This results in tasks being held up indefinitely in that "zombie" worker.

@fjetter
Copy link
Member

fjetter commented Dec 14, 2021

We're currently working on making the graceful downscaling much more robust which should avoid the above replicate assertion error. See #5381 for the current WIP. we're struggling with a few flaky tests bur are hoping to merge soon.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

8 participants