Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Worker restriction using alias doesn't resolve after worker restart #8600

Open
jinmannwong opened this issue Mar 25, 2024 · 0 comments
Open

Comments

@jinmannwong
Copy link

jinmannwong commented Mar 25, 2024

Describe the issue:

Worker restrictions when specified using the worker name can fail to be resolved if the named worker is restarted and allow_other_workers = False. I think this is because the worker_restrictions in TaskState records the output of Scheduler.coerce_address, which is most likely not the same when the worker is restarted. As a result, the worker restriction is never fulfilled even though the restarted worker keeps the same name. Is this the intended behaviour?

Minimal Complete Verifiable Example:

import time

def test_func(duration: int):
    time.sleep(duration)

def main():
    from dask.distributed import Scheduler, Nanny, Client, SpecCluster, as_completed 

    scheduler =  {'cls': Scheduler, 'options': {"dashboard_address": ':8787'}}
    workers = {
        'worker-0': {"cls": Nanny, "options": {"nthreads": 1}},
    }
    cluster = SpecCluster(scheduler=scheduler, workers=workers)
    client = Client(cluster)
    future = client.submit(test_func, 60, workers="worker-0", allow_other_workers=False)
    time.sleep(10) # to ensure task is submitted 
    for key, task in cluster.scheduler.tasks.items():
        print("BEFORE RESTART", key, task.worker_restrictions, task.host_restrictions)

    client.restart_workers(["worker-0"])

    for _ in as_completed([future]):
        pass

if __name__ == "__main__":
    main()

The output I get from the above example shows the scheduler and worker starting. The initial execution of thetest_func task does not complete before the worker is restarted, after which the program hangs because the worker restriction is never fulfilled. The output of the print statement shows that the worker restriction in the task state records the address of the initial worker.

Environment:

  • Dask version: 2024.2.0
  • Python version: 3.10
  • Operating System: Linux
  • Install method (conda, pip, source): pip
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant