-
Notifications
You must be signed in to change notification settings - Fork 3.3k
/
test_distributed.py
84 lines (62 loc) · 2.86 KB
/
test_distributed.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# Copyright The PyTorch Lightning team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from typing import Mapping
from unittest import mock
import pytest
import torch
import torch.multiprocessing as mp
import tests.helpers.utils as tutils
from pytorch_lightning.utilities.distributed import _collect_states_on_rank_zero
from tests.helpers.runif import RunIf
@pytest.mark.parametrize("env_vars", [{"RANK": "0"}, {"SLURM_PROCID": "0"}])
def test_rank_zero_known_cluster_envs(env_vars: Mapping[str, str]):
"""Test that SLURM environment variables are properly checked for rank_zero_only."""
from pytorch_lightning.utilities.distributed import _get_rank, rank_zero_only
rank_zero_only.rank = _get_rank()
with mock.patch.dict(os.environ, env_vars):
from pytorch_lightning.utilities.distributed import _get_rank, rank_zero_only
rank_zero_only.rank = _get_rank()
@rank_zero_only
def foo(): # The return type is optional because on non-zero ranks it will not be called
return 1
x = foo()
assert x == 1
@pytest.mark.parametrize("rank_key,rank", [("RANK", "1"), ("SLURM_PROCID", "2"), ("LOCAL_RANK", "3")])
def test_rank_zero_none_set(rank_key, rank):
"""Test that function is not called when rank environment variables are not global zero."""
with mock.patch.dict(os.environ, {rank_key: rank}):
from pytorch_lightning.utilities.distributed import _get_rank, rank_zero_only
rank_zero_only.rank = _get_rank()
@rank_zero_only
def foo():
return 1
x = foo()
assert x is None
def _test_collect_states(rank, worldsize):
os.environ["MASTER_ADDR"] = "localhost"
# initialize the process group
torch.distributed.init_process_group("nccl", rank=rank, world_size=worldsize)
state = {"something": torch.tensor([rank])}
collected_state = _collect_states_on_rank_zero(state, device=torch.device(f"cuda:{rank}"))
if rank == 1:
assert collected_state is None
else:
assert collected_state == {1: {"something": torch.tensor([1])}, 0: {"something": torch.tensor([0])}}
torch.distributed.destroy_process_group()
@RunIf(skip_windows=True, min_gpus=2, min_torch="1.10")
def test_collect_states():
"""Make sure result logging works with DDP."""
tutils.set_random_main_port()
mp.spawn(_test_collect_states, args=(2,), nprocs=2)