Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert new Hydra launch behavior #15737

Merged
merged 12 commits into from Nov 21, 2022
38 changes: 11 additions & 27 deletions src/lightning_lite/strategies/launchers/subprocess_script.py
Expand Up @@ -14,7 +14,7 @@
import os
import subprocess
import sys
from typing import Any, Callable, Sequence
from typing import Any, Callable, Optional, Sequence, Tuple

from lightning_utilities.core.imports import RequirementCache

Expand Down Expand Up @@ -114,15 +114,16 @@ def _call_children_scripts(self) -> None:
# start process
# if hydra is available and initialized, make sure to set the cwd correctly
hydra_in_use = False
cwd: Optional[str] = None
if _HYDRA_AVAILABLE:
from hydra.core.hydra_config import HydraConfig

hydra_in_use = HydraConfig.initialized()
if hydra_in_use:
command = _hydra_subprocess_cmd(local_rank=local_rank)
command, cwd = _hydra_subprocess_cmd(local_rank=local_rank)
else:
command = _basic_subprocess_cmd()
subprocess.Popen(command, env=env_copy)
subprocess.Popen(command, env=env_copy, cwd=cwd)

def _check_can_spawn_children(self) -> None:
if self.cluster_environment.local_rank() != 0:
Expand All @@ -142,36 +143,19 @@ def _basic_subprocess_cmd() -> Sequence[str]:
return [sys.executable, "-m", __main__.__spec__.name] + sys.argv[1:]


def _hydra_subprocess_cmd(local_rank: int) -> Sequence[str]:
def _hydra_subprocess_cmd(local_rank: int) -> Tuple[Sequence[str], str]:
import __main__ # local import to avoid https://github.com/Lightning-AI/lightning/issues/15218
from hydra.core.hydra_config import HydraConfig
from hydra.utils import to_absolute_path
from hydra.utils import get_original_cwd, to_absolute_path

# when user is using hydra find the absolute path
if __main__.__spec__ is None: # pragma: no-cover
command = [sys.executable, to_absolute_path(sys.argv[0])]
else:
command = [sys.executable, "-m", __main__.__spec__.name]

# extract the hydra configuration
hydra_cfg = HydraConfig.get()
command += sys.argv[1:]

# the location of the hydra configuration files saved for the current job
hydra_output = hydra_cfg.runtime.output_dir
if hydra_cfg.output_subdir is not None:
hydra_output = os.path.join(hydra_output, hydra_cfg.output_subdir)

# check if experimental re-run capability exists
# otherwise use existing config.yaml which may have issues
pickled_config = os.path.join(hydra_output, "config.pickle")
if os.path.exists(pickled_config):
command += ["--experimental-rerun", pickled_config]

else:
command += ["-cp", hydra_output, "-cn", "config.yaml"]
command += [
f"hydra.output_subdir=.pl_ddp_hydra_{local_rank}",
f"hydra.run.dir={hydra_cfg.runtime.output_dir}",
]

return command
cwd = get_original_cwd()
os_cwd = f'"{os.getcwd()}"'
command += [f"hydra.run.dir={os_cwd}", f"hydra.job.name=train_ddp_process_{local_rank}"]
return command, cwd
2 changes: 2 additions & 0 deletions src/pytorch_lightning/CHANGELOG.md
Expand Up @@ -53,6 +53,8 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).

- Removed deprecated `pytorch_lightning.utilities.memory.get_gpu_memory_map` in favor of `pytorch_lightning.accelerators.cuda.get_nvidia_gpu_stats` ([#15617](https://github.com/Lightning-AI/lightning/pull/15617))

- Temporarily removed support for Hydra multi-run ([#15737](https://github.com/Lightning-AI/lightning/pull/15737))


### Fixed

Expand Down
Expand Up @@ -109,17 +109,18 @@ def _call_children_scripts(self) -> None:
del env_copy["PL_GLOBAL_SEED"]

hydra_in_use = False
cwd: Optional[str] = None
if _HYDRA_AVAILABLE:
from hydra.core.hydra_config import HydraConfig

hydra_in_use = HydraConfig.initialized()

if hydra_in_use:
command = _hydra_subprocess_cmd(local_rank)
command, cwd = _hydra_subprocess_cmd(local_rank)
else:
command = _basic_subprocess_cmd()

subprocess.Popen(command, env=env_copy)
subprocess.Popen(command, env=env_copy, cwd=cwd)

def _check_can_spawn_children(self) -> None:
if self.cluster_environment.local_rank() != 0:
Expand Down
Expand Up @@ -83,7 +83,7 @@ def test_subprocess_script_launcher_launch_processes(popen_mock):
@mock.patch("lightning_lite.strategies.launchers.subprocess_script.subprocess.Popen")
def test_subprocess_script_launcher_hydra_in_use(popen_mock, _, monkeypatch):
basic_command = Mock(return_value="basic_command")
hydra_command = Mock(return_value="hydra_command")
hydra_command = Mock(return_value=("hydra_command", "hydra_cwd"))
monkeypatch.setattr(lightning_lite.strategies.launchers.subprocess_script, "_basic_subprocess_cmd", basic_command)
monkeypatch.setattr(lightning_lite.strategies.launchers.subprocess_script, "_hydra_subprocess_cmd", hydra_command)

Expand All @@ -100,7 +100,7 @@ def simulate_launch():
# when hydra not available
monkeypatch.setattr(lightning_lite.strategies.launchers.subprocess_script, "_HYDRA_AVAILABLE", False)
simulate_launch()
popen_mock.assert_called_with("basic_command", env=ANY)
popen_mock.assert_called_with("basic_command", env=ANY, cwd=None)
popen_mock.reset_mock()

import hydra
Expand All @@ -111,7 +111,7 @@ def simulate_launch():
HydraConfigMock.initialized.return_value = False
monkeypatch.setattr(hydra.core.hydra_config, "HydraConfig", HydraConfigMock)
simulate_launch()
popen_mock.assert_called_with("basic_command", env=ANY)
popen_mock.assert_called_with("basic_command", env=ANY, cwd=None)
popen_mock.reset_mock()

# when hydra available and initialized
Expand All @@ -120,5 +120,5 @@ def simulate_launch():
HydraConfigMock.initialized.return_value = True
monkeypatch.setattr(hydra.core.hydra_config, "HydraConfig", HydraConfigMock)
simulate_launch()
popen_mock.assert_called_with("hydra_command", env=ANY)
popen_mock.assert_called_with("hydra_command", env=ANY, cwd="hydra_cwd")
popen_mock.reset_mock()
106 changes: 3 additions & 103 deletions tests/tests_pytorch/strategies/launchers/test_subprocess_script.py
@@ -1,35 +1,17 @@
import logging
import os
import sys
from pathlib import Path

import pytest
from lightning_utilities.core.imports import RequirementCache

from pytorch_lightning.strategies.launchers.subprocess_script import _HYDRA_AVAILABLE
from tests_pytorch.helpers.runif import RunIf

_HYDRA_WITH_RERUN = RequirementCache("hydra-core>=1.2")
_HYDRA_WITH_RUN_PROCESS = RequirementCache("hydra-core>=1.0.7")

if _HYDRA_AVAILABLE:
from omegaconf import OmegaConf
if _HYDRA_WITH_RUN_PROCESS:
from hydra.test_utils.test_utils import run_process


# fixture to run hydra jobs in a clean temporary directory
# Hydra creates its own output directories and logs
@pytest.fixture
def cleandir(tmp_path):
"""Run function in a temporary directory."""
old_dir = os.getcwd() # get current working directory (cwd)
os.chdir(tmp_path) # change cwd to the temp-directory
yield tmp_path # yields control to the test to be run
os.chdir(old_dir)
logging.shutdown()


# Script to run from command line
script = """
import hydra
Expand Down Expand Up @@ -64,7 +46,9 @@ def task_fn(cfg):
@RunIf(min_cuda_gpus=2, skip_windows=True, standalone=True)
@pytest.mark.skipif(not _HYDRA_WITH_RUN_PROCESS, reason=str(_HYDRA_WITH_RUN_PROCESS))
@pytest.mark.parametrize("subdir", [None, "dksa", ".hello"])
def test_ddp_with_hydra_runjob(cleandir, subdir):
def test_ddp_with_hydra_runjob(subdir, tmpdir, monkeypatch):
monkeypatch.chdir(tmpdir)

# Save script locally
with open("temp.py", "w") as fn:
fn.write(script)
Expand All @@ -75,87 +59,3 @@ def test_ddp_with_hydra_runjob(cleandir, subdir):
if subdir is not None:
cmd += [f"hydra.output_subdir={subdir}"]
run_process(cmd)

# Make sure config.yaml was created for additional
# processes.
logs = list(Path.cwd().glob("**/config.yaml"))
assert len(logs) == devices

# Make sure the parameter was set and used
cfg = OmegaConf.load(logs[0])
assert cfg.devices == devices

# Make sure PL spawned a job that is logged by Hydra
logs = list(Path.cwd().glob("**/*.log"))
assert len(logs) == 1


@RunIf(min_cuda_gpus=2, skip_windows=True, standalone=True)
@pytest.mark.skipif(not _HYDRA_WITH_RUN_PROCESS, reason=str(_HYDRA_WITH_RUN_PROCESS))
@pytest.mark.parametrize("num_jobs", [1, 2])
def test_ddp_with_hydra_multirunjob(cleandir, num_jobs):
# Save script locally
with open("temp.py", "w") as fn:
fn.write(script)

# create fake multirun params based on `num_jobs`
fake_param = "+foo=" + ",".join(str(i) for i in range(num_jobs))

# Run CLI
run_process([sys.executable, "temp.py", "+devices=2", '+strategy="ddp"', fake_param, "--multirun"])

# Make sure config.yaml was created for each job
configs = sorted(Path.cwd().glob("**/.pl_ddp_hydra_*/config.yaml"))
assert len(configs) == num_jobs

# Make sure the parameter was set and used for each job
for i, config in enumerate(configs):
cfg = OmegaConf.load(config)
local_rank = int(config.parent.parent.parts[-1])
assert cfg.devices == 2
assert cfg.foo == local_rank

logs = list(Path.cwd().glob("**/*.log"))
assert len(logs) == num_jobs


yaml_file = """
hydra:
callbacks:
save_job_info:
_target_: hydra.experimental.callbacks.PickleJobInfoCallback
"""


@RunIf(min_cuda_gpus=2, skip_windows=True, standalone=True)
@pytest.mark.skipif(not _HYDRA_WITH_RERUN, reason=str(_HYDRA_WITH_RERUN))
@pytest.mark.parametrize("num_jobs", [1, 2])
def test_ddp_with_hydra_multirunjob_rerun(cleandir, num_jobs):
# Save script locally
with open("temp.py", "w") as fn:
fn.write(script)

with open("config.yaml", "w") as fn:
fn.write(yaml_file)

# create fake multirun params based on `num_jobs`
fake_param = "+foo=" + ",".join(str(i) for i in range(num_jobs))

# Run CLI
run_process(
[
sys.executable,
"temp.py",
"-cp",
".",
"-cn",
"config.yaml",
"+devices=2",
'+strategy="ddp"',
fake_param,
"--multirun",
]
)

pickles = sorted(Path.cwd().glob("**/.hydra/config.pickle"))
assert len(pickles) == num_jobs