/
subprocess_script.py
168 lines (132 loc) · 6.94 KB
/
subprocess_script.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# Copyright The PyTorch Lightning team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import subprocess
import sys
from time import sleep
from typing import Any, Callable, Optional, Sequence, Tuple
import numpy as np
from lightning_utilities.core.imports import RequirementCache
from lightning_lite.plugins.environments.cluster_environment import ClusterEnvironment
from lightning_lite.strategies.launchers.base import _Launcher
_HYDRA_AVAILABLE = RequirementCache("hydra-core")
class _SubprocessScriptLauncher(_Launcher):
r"""
A process laucher that invokes the current script as many times as desired in a single node.
This launcher needs to be invoked on each node.
In its default behavior, the main process in each node then spawns N-1 child processes via :func:`subprocess.Popen`,
where N is the number of devices (e.g. GPU) per node. It is very similar to how :mod:`torch.distributed.run`
launches processes.
For example, if the script gets invoked with the command
.. code-block:: bash
python train.py --devices 4
The launcher will create three additional subprocesses that get called like so:
.. code-block:: bash
LOCAL_RANK=1 python train.py --devices 4
LOCAL_RANK=2 python train.py --devices 4
LOCAL_RANK=3 python train.py --devices 4
It is implied that the main process which launched the others has ``LOCAL_RANK=0``.
Beside the local rank, the following other environment variables also get set, but unlike the local rank, these
get determined by the cluster environment:
1. `MASTER_ADDR`: The IP address of the main node.
2. `MASTER_PORT`: The port number of the main node through which all processes communicate.
3. `NODE_RANK`: The index of the node the current process is running on. Ranges from 0 to ``num_nodes - 1``.
4. `WORLD_SIZE`: The total number of processes across all nodes, i.e., ``num_processes * num_nodes``.
Arguments:
cluster_environment: A cluster environment that provides access to world size, node rank, etc.
num_processes: The number of processes to launch in the current node.
num_nodes: The total number of nodes that participate in this process group.
"""
def __init__(
self,
cluster_environment: "ClusterEnvironment",
num_processes: int,
num_nodes: int,
) -> None:
super().__init__()
self.cluster_environment = cluster_environment
self.num_processes = num_processes
self.num_nodes = num_nodes
@property
def is_interactive_compatible(self) -> bool:
return False
def launch(self, function: Callable, *args: Any, **kwargs: Any) -> Any:
"""Creates new processes, then calls the given function.
Arguments:
function: A callback function to execute after all processes have been created.
It is up to the implementation of this function to synchronize the processes, e.g., with barriers.
*args: Optional positional arguments to be passed to the given function.
**kwargs: Optional keyword arguments to be passed to the given function.
"""
if not self.cluster_environment.creates_processes_externally:
self._call_children_scripts()
return function(*args, **kwargs)
def _call_children_scripts(self) -> None:
# bookkeeping of spawned processes
self._check_can_spawn_children()
# DDP Environment variables
os.environ["MASTER_ADDR"] = self.cluster_environment.main_address
os.environ["MASTER_PORT"] = str(self.cluster_environment.main_port)
# allow the user to pass the node rank
os.environ["NODE_RANK"] = str(self.cluster_environment.node_rank())
os.environ["LOCAL_RANK"] = str(self.cluster_environment.local_rank())
os.environ["WORLD_SIZE"] = f"{self.num_processes * self.num_nodes}"
for local_rank in range(1, self.num_processes):
env_copy = os.environ.copy()
env_copy["LOCAL_RANK"] = f"{local_rank}"
# remove env var if global seed not set
if os.environ.get("PL_GLOBAL_SEED") is None and "PL_GLOBAL_SEED" in env_copy:
del env_copy["PL_GLOBAL_SEED"]
# start process
# if hydra is available and initialized, make sure to set the cwd correctly
hydra_in_use = False
cwd: Optional[str] = None
if _HYDRA_AVAILABLE:
from hydra.core.hydra_config import HydraConfig
hydra_in_use = HydraConfig.initialized()
if hydra_in_use:
command, cwd = _hydra_subprocess_cmd(local_rank=local_rank)
else:
command = _basic_subprocess_cmd()
subprocess.Popen(command, env=env_copy, cwd=cwd)
# starting all processes at once can cause issues
# with dataloaders delay between 1-10 seconds
delay = np.random.uniform(1, 5, 1)[0]
sleep(delay)
def _check_can_spawn_children(self) -> None:
if self.cluster_environment.local_rank() != 0:
raise RuntimeError(
"Lightning attempted to launch new distributed processes with `local_rank > 0`. This should not happen."
" Possible reasons: 1) LOCAL_RANK environment variable was incorrectly modified by the user,"
" 2) `ClusterEnvironment.creates_processes_externally` incorrectly implemented."
)
def _basic_subprocess_cmd() -> Sequence[str]:
import __main__ # local import to avoid https://github.com/Lightning-AI/lightning/issues/15218
if __main__.__spec__ is None: # pragma: no-cover
return [sys.executable, os.path.abspath(sys.argv[0])] + sys.argv[1:]
else:
return [sys.executable, "-m", __main__.__spec__.name] + sys.argv[1:]
def _hydra_subprocess_cmd(local_rank: int) -> Tuple[Sequence[str], str]:
import __main__ # local import to avoid https://github.com/Lightning-AI/lightning/issues/15218
from hydra.utils import get_original_cwd, to_absolute_path
# when user is using hydra find the absolute path
if __main__.__spec__ is None: # pragma: no-cover
command = [sys.executable, to_absolute_path(sys.argv[0])]
else:
command = [sys.executable, "-m", __main__.__spec__.name]
command += sys.argv[1:]
cwd = get_original_cwd()
os_cwd = f'"{os.getcwd()}"'
command += [f"hydra.run.dir={os_cwd}", f"hydra.job.name=train_ddp_process_{local_rank}"]
return command, cwd