From e65d344ff1f3f920193f151525a97a79ed094c65 Mon Sep 17 00:00:00 2001 From: Nisheeth Lahoti Date: Thu, 27 Jul 2023 09:36:51 +0530 Subject: [PATCH] Support multi-run with hydra + DDP --- .../strategies/launchers/subprocess_script.py | 20 ++++++-- .../launchers/test_subprocess_script.py | 51 ++++++++++++++++++- 2 files changed, 65 insertions(+), 6 deletions(-) diff --git a/src/lightning/fabric/strategies/launchers/subprocess_script.py b/src/lightning/fabric/strategies/launchers/subprocess_script.py index 62b3a718b90f92..4a7aaaa253fcaa 100644 --- a/src/lightning/fabric/strategies/launchers/subprocess_script.py +++ b/src/lightning/fabric/strategies/launchers/subprocess_script.py @@ -14,6 +14,7 @@ import os import subprocess import sys +from pathlib import Path from typing import Any, Callable, Optional, Sequence, Tuple from lightning_utilities.core.imports import RequirementCache @@ -143,6 +144,8 @@ def _basic_subprocess_cmd() -> Sequence[str]: def _hydra_subprocess_cmd(local_rank: int) -> Tuple[Sequence[str], str]: import __main__ # local import to avoid https://github.com/Lightning-AI/lightning/issues/15218 + from hydra.core.hydra_config import HydraConfig + from hydra.types import RunMode from hydra.utils import get_original_cwd, to_absolute_path # when user is using hydra find the absolute path @@ -151,9 +154,18 @@ def _hydra_subprocess_cmd(local_rank: int) -> Tuple[Sequence[str], str]: else: command = [sys.executable, "-m", __main__.__spec__.name] - command += sys.argv[1:] - cwd = get_original_cwd() - os_cwd = f'"{os.getcwd()}"' - command += [f"hydra.run.dir={os_cwd}", f"hydra.job.name=train_ddp_process_{local_rank}"] + hydra_cfg = HydraConfig.get() + run_dir = Path(hydra_cfg.runtime.output_dir) + + if hydra_cfg.output_subdir is None: # config isn't saved, so re-run original command + if hydra_cfg.mode == RunMode.MULTIRUN: + raise RuntimeError(f"DDP with multirun requires saved config file") + command += sys.argv[1:] + else: + hydra_subdir = run_dir / hydra_cfg.output_subdir + command += ["-cp", str(hydra_subdir), "-cn", "config.yaml"] # Used saved config for new run + command += [f"hydra.output_subdir=.pl_ddp_hydra_{local_rank}"] # Log to different subdir + + command += [f"hydra.run.dir={run_dir}", f"hydra.job.name=train_ddp_process_{local_rank}"] return command, cwd diff --git a/tests/tests_pytorch/strategies/launchers/test_subprocess_script.py b/tests/tests_pytorch/strategies/launchers/test_subprocess_script.py index 9f2fb371dc967b..3f28fa5ea50664 100644 --- a/tests/tests_pytorch/strategies/launchers/test_subprocess_script.py +++ b/tests/tests_pytorch/strategies/launchers/test_subprocess_script.py @@ -1,5 +1,6 @@ import subprocess import sys +from pathlib import Path from unittest.mock import Mock import pytest @@ -13,6 +14,7 @@ if _HYDRA_WITH_RUN_PROCESS: from hydra.test_utils.test_utils import run_process + from omegaconf import OmegaConf # Script to run from command line @@ -48,7 +50,7 @@ def task_fn(cfg): @RunIf(min_cuda_gpus=2, skip_windows=True, standalone=True) @pytest.mark.skipif(not _HYDRA_WITH_RUN_PROCESS, reason=str(_HYDRA_WITH_RUN_PROCESS)) -@pytest.mark.parametrize("subdir", [None, "dksa", ".hello"]) +@pytest.mark.parametrize("subdir", [None, "null", "dksa", ".hello"]) def test_ddp_with_hydra_runjob(subdir, tmpdir, monkeypatch): monkeypatch.chdir(tmpdir) @@ -58,11 +60,56 @@ def test_ddp_with_hydra_runjob(subdir, tmpdir, monkeypatch): # Run CLI devices = 2 - cmd = [sys.executable, "temp.py", f"+devices={devices}", '+strategy="ddp"'] + run_dir = Path(tmpdir) / "hydra_output" + cmd = [sys.executable, "temp.py", f"+devices={devices}", '+strategy="ddp"', f"hydra.run.dir={run_dir}"] if subdir is not None: cmd += [f"hydra.output_subdir={subdir}"] run_process(cmd) + # Make sure config.yaml was created for additional processes iff subdir is present. + saved_confs = list(run_dir.glob("**/config.yaml")) + assert len(saved_confs) == (0 if subdir == "null" else devices) + + if saved_confs: # Make sure the parameter was set and used + cfg = OmegaConf.load(saved_confs[0]) + assert cfg.devices == devices + + # Make sure PL spawned jobs that are logged by Hydra + logs = list(run_dir.glob("**/*.log")) + assert len(logs) == devices + + +@RunIf(min_cuda_gpus=2, skip_windows=True, standalone=True) +@pytest.mark.skipif(not _HYDRA_WITH_RUN_PROCESS, reason=str(_HYDRA_WITH_RUN_PROCESS)) +@pytest.mark.parametrize("num_jobs", [1, 2]) +def test_ddp_with_hydra_multirunjob(tmpdir, num_jobs, monkeypatch): + monkeypatch.chdir(tmpdir) + + # Save script locally + with open("temp.py", "w") as fn: + fn.write(script) + + # Run CLI + devices = 2 + sweep_dir = Path(tmpdir) / "hydra_output" + command = [sys.executable, "temp.py", f"+devices={devices}", '+strategy="ddp"', f"hydra.sweep.dir={sweep_dir}"] + command += ["--multirun", "+foo=" + ",".join(str(i) for i in range(num_jobs))] # fake multirun params + run_process(command) + + # Make sure config.yaml was created for each job + saved_confs = list(sweep_dir.glob("**/config.yaml")) + assert len(saved_confs) == devices * num_jobs + + # Make sure the parameter was set and used for each job + for config in saved_confs: + cfg = OmegaConf.load(config) + local_rank = int(config.parent.parent.parts[-1]) + assert cfg.devices == devices + assert cfg.foo == local_rank + + logs = list(sweep_dir.glob("**/*.log")) + assert len(logs) == devices * num_jobs + def test_kill(): launcher = _SubprocessScriptLauncher(Mock(), 1, 1)