diff --git a/dvc/commands/queue/status.py b/dvc/commands/queue/status.py index 3a32ce6e2d..c508c8b588 100644 --- a/dvc/commands/queue/status.py +++ b/dvc/commands/queue/status.py @@ -5,6 +5,7 @@ from dvc.cli.command import CmdBase from dvc.cli.utils import append_doc_link from dvc.compare import TabularData +from dvc.ui import ui from ..experiments.show import format_time @@ -26,6 +27,13 @@ def run(self): [exp["rev"][:7], exp.get("name", ""), created, exp["status"]] ) td.render() + worker_count = len(self.repo.experiments.celery_queue.active_worker()) + if worker_count == 1: + ui.write("There is 1 worker active at present.") + elif worker_count == 0: + ui.write("No worker active at present.") + else: + ui.write(f"There are {worker_count} worker active at present.") return 0 diff --git a/dvc/repo/experiments/queue/local.py b/dvc/repo/experiments/queue/local.py index 5b118a57ea..d374007f78 100644 --- a/dvc/repo/experiments/queue/local.py +++ b/dvc/repo/experiments/queue/local.py @@ -2,6 +2,7 @@ import locale import logging import os +import time from collections import defaultdict from typing import ( TYPE_CHECKING, @@ -124,22 +125,31 @@ def worker(self) -> "TemporaryWorker": ) def spawn_worker(self): - from shortuuid import uuid - from dvc_task.proc.process import ManagedProcess logger.debug("Spawning exp queue worker") wdir_hash = hashlib.sha256(self.wdir.encode("utf-8")).hexdigest()[:6] - node_name = f"dvc-exp-{wdir_hash}-1@localhost" + number = 1 + node_name = f"dvc-exp-{wdir_hash}-{number}@localhost" + worker_status = self.active_worker() + while node_name in worker_status: + number += 1 + node_name = f"dvc-exp-{wdir_hash}-{number}@localhost" + cmd = ["exp", "queue-worker", node_name] - name = "dvc-exp-worker" - if logger.getEffectiveLevel() < logging.INFO: - name = name + str(uuid()) + name = f"dvc-exp-worker-{number}" + logger.debug(f"start worker: {name}, node: {node_name}") if os.name == "nt": daemonize(cmd) else: ManagedProcess.spawn(["dvc"] + cmd, wdir=self.wdir, name=name) + for _ in range(5): + time.sleep(1) + if node_name in self.active_worker(): + return + logger.debug(f"worker {name} node {node_name} didn't start in 5 sec") + def put(self, *args, **kwargs) -> QueueEntry: """Stash an experiment and add it to the queue.""" entry = self._stash_exp(*args, **kwargs) @@ -316,6 +326,12 @@ def logs( ) as fobj: ui.write(fobj.read()) + def active_worker(self) -> Set: + """Return the current active celery worker""" + status = self.celery.control.inspect().active() or {} + logger.debug(f"Worker status: {status}") + return {name for name in status if status[name]} + class WorkspaceQueue(BaseStashQueue): def put(self, *args, **kwargs) -> QueueEntry: