Skip to content

Commit

Permalink
Revert new Hydra launch behavior (#15737)
Browse files Browse the repository at this point in the history
* revert new hydra cwd behavior
* remove debug statements
* changelog

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Jirka <jirka.borovec@seznam.cz>
Co-authored-by: Jirka Borovec <6035284+Borda@users.noreply.github.com>
  • Loading branch information
4 people committed Nov 21, 2022
1 parent dd75906 commit 88b2e5a
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 136 deletions.
38 changes: 11 additions & 27 deletions src/lightning_lite/strategies/launchers/subprocess_script.py
Expand Up @@ -14,7 +14,7 @@
import os
import subprocess
import sys
from typing import Any, Callable, Sequence
from typing import Any, Callable, Optional, Sequence, Tuple

from lightning_utilities.core.imports import RequirementCache

Expand Down Expand Up @@ -114,15 +114,16 @@ def _call_children_scripts(self) -> None:
# start process
# if hydra is available and initialized, make sure to set the cwd correctly
hydra_in_use = False
cwd: Optional[str] = None
if _HYDRA_AVAILABLE:
from hydra.core.hydra_config import HydraConfig

hydra_in_use = HydraConfig.initialized()
if hydra_in_use:
command = _hydra_subprocess_cmd(local_rank=local_rank)
command, cwd = _hydra_subprocess_cmd(local_rank=local_rank)
else:
command = _basic_subprocess_cmd()
subprocess.Popen(command, env=env_copy)
subprocess.Popen(command, env=env_copy, cwd=cwd)

def _check_can_spawn_children(self) -> None:
if self.cluster_environment.local_rank() != 0:
Expand All @@ -142,36 +143,19 @@ def _basic_subprocess_cmd() -> Sequence[str]:
return [sys.executable, "-m", __main__.__spec__.name] + sys.argv[1:]


def _hydra_subprocess_cmd(local_rank: int) -> Sequence[str]:
def _hydra_subprocess_cmd(local_rank: int) -> Tuple[Sequence[str], str]:
import __main__ # local import to avoid https://github.com/Lightning-AI/lightning/issues/15218
from hydra.core.hydra_config import HydraConfig
from hydra.utils import to_absolute_path
from hydra.utils import get_original_cwd, to_absolute_path

# when user is using hydra find the absolute path
if __main__.__spec__ is None: # pragma: no-cover
command = [sys.executable, to_absolute_path(sys.argv[0])]
else:
command = [sys.executable, "-m", __main__.__spec__.name]

# extract the hydra configuration
hydra_cfg = HydraConfig.get()
command += sys.argv[1:]

# the location of the hydra configuration files saved for the current job
hydra_output = hydra_cfg.runtime.output_dir
if hydra_cfg.output_subdir is not None:
hydra_output = os.path.join(hydra_output, hydra_cfg.output_subdir)

# check if experimental re-run capability exists
# otherwise use existing config.yaml which may have issues
pickled_config = os.path.join(hydra_output, "config.pickle")
if os.path.exists(pickled_config):
command += ["--experimental-rerun", pickled_config]

else:
command += ["-cp", hydra_output, "-cn", "config.yaml"]
command += [
f"hydra.output_subdir=.pl_ddp_hydra_{local_rank}",
f"hydra.run.dir={hydra_cfg.runtime.output_dir}",
]

return command
cwd = get_original_cwd()
os_cwd = f'"{os.getcwd()}"'
command += [f"hydra.run.dir={os_cwd}", f"hydra.job.name=train_ddp_process_{local_rank}"]
return command, cwd
2 changes: 2 additions & 0 deletions src/pytorch_lightning/CHANGELOG.md
Expand Up @@ -53,6 +53,8 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).

- Removed deprecated `pytorch_lightning.utilities.memory.get_gpu_memory_map` in favor of `pytorch_lightning.accelerators.cuda.get_nvidia_gpu_stats` ([#15617](https://github.com/Lightning-AI/lightning/pull/15617))

- Temporarily removed support for Hydra multi-run ([#15737](https://github.com/Lightning-AI/lightning/pull/15737))


### Fixed

Expand Down
Expand Up @@ -109,17 +109,18 @@ def _call_children_scripts(self) -> None:
del env_copy["PL_GLOBAL_SEED"]

hydra_in_use = False
cwd: Optional[str] = None
if _HYDRA_AVAILABLE:
from hydra.core.hydra_config import HydraConfig

hydra_in_use = HydraConfig.initialized()

if hydra_in_use:
command = _hydra_subprocess_cmd(local_rank)
command, cwd = _hydra_subprocess_cmd(local_rank)
else:
command = _basic_subprocess_cmd()

subprocess.Popen(command, env=env_copy)
subprocess.Popen(command, env=env_copy, cwd=cwd)

def _check_can_spawn_children(self) -> None:
if self.cluster_environment.local_rank() != 0:
Expand Down
Expand Up @@ -83,7 +83,7 @@ def test_subprocess_script_launcher_launch_processes(popen_mock):
@mock.patch("lightning_lite.strategies.launchers.subprocess_script.subprocess.Popen")
def test_subprocess_script_launcher_hydra_in_use(popen_mock, _, monkeypatch):
basic_command = Mock(return_value="basic_command")
hydra_command = Mock(return_value="hydra_command")
hydra_command = Mock(return_value=("hydra_command", "hydra_cwd"))
monkeypatch.setattr(lightning_lite.strategies.launchers.subprocess_script, "_basic_subprocess_cmd", basic_command)
monkeypatch.setattr(lightning_lite.strategies.launchers.subprocess_script, "_hydra_subprocess_cmd", hydra_command)

Expand All @@ -100,7 +100,7 @@ def simulate_launch():
# when hydra not available
monkeypatch.setattr(lightning_lite.strategies.launchers.subprocess_script, "_HYDRA_AVAILABLE", False)
simulate_launch()
popen_mock.assert_called_with("basic_command", env=ANY)
popen_mock.assert_called_with("basic_command", env=ANY, cwd=None)
popen_mock.reset_mock()

import hydra
Expand All @@ -111,7 +111,7 @@ def simulate_launch():
HydraConfigMock.initialized.return_value = False
monkeypatch.setattr(hydra.core.hydra_config, "HydraConfig", HydraConfigMock)
simulate_launch()
popen_mock.assert_called_with("basic_command", env=ANY)
popen_mock.assert_called_with("basic_command", env=ANY, cwd=None)
popen_mock.reset_mock()

# when hydra available and initialized
Expand All @@ -120,5 +120,5 @@ def simulate_launch():
HydraConfigMock.initialized.return_value = True
monkeypatch.setattr(hydra.core.hydra_config, "HydraConfig", HydraConfigMock)
simulate_launch()
popen_mock.assert_called_with("hydra_command", env=ANY)
popen_mock.assert_called_with("hydra_command", env=ANY, cwd="hydra_cwd")
popen_mock.reset_mock()
106 changes: 3 additions & 103 deletions tests/tests_pytorch/strategies/launchers/test_subprocess_script.py
@@ -1,35 +1,17 @@
import logging
import os
import sys
from pathlib import Path

import pytest
from lightning_utilities.core.imports import RequirementCache

from pytorch_lightning.strategies.launchers.subprocess_script import _HYDRA_AVAILABLE
from tests_pytorch.helpers.runif import RunIf

_HYDRA_WITH_RERUN = RequirementCache("hydra-core>=1.2")
_HYDRA_WITH_RUN_PROCESS = RequirementCache("hydra-core>=1.0.7")

if _HYDRA_AVAILABLE:
from omegaconf import OmegaConf
if _HYDRA_WITH_RUN_PROCESS:
from hydra.test_utils.test_utils import run_process


# fixture to run hydra jobs in a clean temporary directory
# Hydra creates its own output directories and logs
@pytest.fixture
def cleandir(tmp_path):
"""Run function in a temporary directory."""
old_dir = os.getcwd() # get current working directory (cwd)
os.chdir(tmp_path) # change cwd to the temp-directory
yield tmp_path # yields control to the test to be run
os.chdir(old_dir)
logging.shutdown()


# Script to run from command line
script = """
import hydra
Expand Down Expand Up @@ -64,7 +46,9 @@ def task_fn(cfg):
@RunIf(min_cuda_gpus=2, skip_windows=True, standalone=True)
@pytest.mark.skipif(not _HYDRA_WITH_RUN_PROCESS, reason=str(_HYDRA_WITH_RUN_PROCESS))
@pytest.mark.parametrize("subdir", [None, "dksa", ".hello"])
def test_ddp_with_hydra_runjob(cleandir, subdir):
def test_ddp_with_hydra_runjob(subdir, tmpdir, monkeypatch):
monkeypatch.chdir(tmpdir)

# Save script locally
with open("temp.py", "w") as fn:
fn.write(script)
Expand All @@ -75,87 +59,3 @@ def test_ddp_with_hydra_runjob(cleandir, subdir):
if subdir is not None:
cmd += [f"hydra.output_subdir={subdir}"]
run_process(cmd)

# Make sure config.yaml was created for additional
# processes.
logs = list(Path.cwd().glob("**/config.yaml"))
assert len(logs) == devices

# Make sure the parameter was set and used
cfg = OmegaConf.load(logs[0])
assert cfg.devices == devices

# Make sure PL spawned a job that is logged by Hydra
logs = list(Path.cwd().glob("**/*.log"))
assert len(logs) == 1


@RunIf(min_cuda_gpus=2, skip_windows=True, standalone=True)
@pytest.mark.skipif(not _HYDRA_WITH_RUN_PROCESS, reason=str(_HYDRA_WITH_RUN_PROCESS))
@pytest.mark.parametrize("num_jobs", [1, 2])
def test_ddp_with_hydra_multirunjob(cleandir, num_jobs):
# Save script locally
with open("temp.py", "w") as fn:
fn.write(script)

# create fake multirun params based on `num_jobs`
fake_param = "+foo=" + ",".join(str(i) for i in range(num_jobs))

# Run CLI
run_process([sys.executable, "temp.py", "+devices=2", '+strategy="ddp"', fake_param, "--multirun"])

# Make sure config.yaml was created for each job
configs = sorted(Path.cwd().glob("**/.pl_ddp_hydra_*/config.yaml"))
assert len(configs) == num_jobs

# Make sure the parameter was set and used for each job
for i, config in enumerate(configs):
cfg = OmegaConf.load(config)
local_rank = int(config.parent.parent.parts[-1])
assert cfg.devices == 2
assert cfg.foo == local_rank

logs = list(Path.cwd().glob("**/*.log"))
assert len(logs) == num_jobs


yaml_file = """
hydra:
callbacks:
save_job_info:
_target_: hydra.experimental.callbacks.PickleJobInfoCallback
"""


@RunIf(min_cuda_gpus=2, skip_windows=True, standalone=True)
@pytest.mark.skipif(not _HYDRA_WITH_RERUN, reason=str(_HYDRA_WITH_RERUN))
@pytest.mark.parametrize("num_jobs", [1, 2])
def test_ddp_with_hydra_multirunjob_rerun(cleandir, num_jobs):
# Save script locally
with open("temp.py", "w") as fn:
fn.write(script)

with open("config.yaml", "w") as fn:
fn.write(yaml_file)

# create fake multirun params based on `num_jobs`
fake_param = "+foo=" + ",".join(str(i) for i in range(num_jobs))

# Run CLI
run_process(
[
sys.executable,
"temp.py",
"-cp",
".",
"-cn",
"config.yaml",
"+devices=2",
'+strategy="ddp"',
fake_param,
"--multirun",
]
)

pickles = sorted(Path.cwd().glob("**/.hydra/config.pickle"))
assert len(pickles) == num_jobs

0 comments on commit 88b2e5a

Please sign in to comment.