From 7551558d9522189c175a72ebea2a9d4aa4b78786 Mon Sep 17 00:00:00 2001 From: Dmytro Doroshenko Date: Sun, 4 Apr 2021 01:27:27 +0300 Subject: [PATCH 01/10] apex kwargs --- catalyst/engines/apex.py | 137 ++++++------------- catalyst/engines/tests/test_apex.py | 2 +- catalyst/engines/tests/test_parallel_apex.py | 2 +- 3 files changed, 45 insertions(+), 96 deletions(-) diff --git a/catalyst/engines/apex.py b/catalyst/engines/apex.py index c46609afad..5a94e350eb 100644 --- a/catalyst/engines/apex.py +++ b/catalyst/engines/apex.py @@ -126,25 +126,11 @@ class APEXEngine(DeviceEngine): Args: device: use device, default is `"cuda"`. - opt_level: optimization level, should be one of ``"O0"``, - ``"O1"``, ``"O2"`` or ``"O3"``. - - - ``"O0"`` - no-op training - - ``"O1"`` - mixed precision (FP16) training (default) - - ``"O2"`` - "almost" mixed precision training - - ``"O3"`` - another implementation of mixed precision training - - Details about levels can be found here: - https://nvidia.github.io/apex/amp.html#opt-levels - keep_batchnorm_fp32: To enhance precision and enable CUDNN batchnorm - (which improves performance), - it’s often beneficial to keep batchnorm weights in FP32 even - if the rest of the model is FP16. - loss_scale: If loss_scale is a float value, - use this value as the static (fixed) loss scale - If loss_scale is the string "dynamic", - adaptively adjust the loss scale over time. - Dynamic loss scale adjustments are performed by Amp automatically. + apex_kwargs: parameters for `apex.amp.initialize` + except models and optimizers (they will be forwared automatically). + + Docs for `apex.amp.initialize`: + https://nvidia.github.io/apex/amp.html#apex.amp.initialize Examples: @@ -177,21 +163,16 @@ def get_engine(self): """ - def __init__( - self, - device: str = "cuda", - opt_level: str = "O1", - keep_batchnorm_fp32: bool = None, - loss_scale: Union[float, str] = None, - ): + def __init__(self, device: str = "cuda", **apex_kwargs): """Init.""" super().__init__(device) - self.opt_level = opt_level - self.keep_batchnorm_fp32 = keep_batchnorm_fp32 - self.loss_scale = loss_scale + self.apex_kwargs = apex_kwargs def __repr__(self) -> str: # noqa: D105 - return f"{self.__class__.__name__}(device='{self.device}',opt_level='{self.opt_level}')" + args_list = [f"device='{self.device}'"] + for k, v in self.apex_kwargs.items(): + args_list.append(f"{k}={v}") + return f"{self.__class__.__name__}(" + ",".join(args_list) + ")" def init_components( self, model_fn=None, criterion_fn=None, optimizer_fn=None, scheduler_fn=None, @@ -212,13 +193,7 @@ def init_components( # from official docs: # https://nvidia.github.io/apex/amp.html#opt-levels-and-properties - model, optimizer = _initialize_apex( - model, - optimizer, - opt_level=self.opt_level, - keep_batchnorm_fp32=self.keep_batchnorm_fp32, - loss_scale=self.loss_scale, - ) + model, optimizer = _initialize_apex(model, optimizer, **self.apex_kwargs) # scheduler scheduler = scheduler_fn() @@ -299,16 +274,11 @@ class DataParallelApexEngine(APEXEngine): """Apex multi-gpu training device engine. Args: - opt_level: optimization level, should be one of ``"O0"``, - ``"O1"``, ``"O2"`` or ``"O3"``. + apex_kwargs: parameters for `apex.amp.initialize` + except models and optimizers (they will be forwared automatically). - - ``"O0"`` - no-op training - - ``"O1"`` - mixed precision (FP16) training (default) - - ``"O2"`` - "almost" mixed precision training - - ``"O3"`` - another implementation of mixed precision training - - Details about levels can be found here: - https://nvidia.github.io/apex/amp.html#opt-levels + Docs for `apex.amp.initialize`: + https://nvidia.github.io/apex/amp.html#apex.amp.initialize Examples: @@ -340,13 +310,16 @@ def get_engine(self): """ - def __init__(self, opt_level: str = "O1"): + def __init__(self, **apex_kwargs): """Init.""" - super().__init__(f"cuda:{torch.cuda.current_device()}", opt_level) + super().__init__(f"cuda:{torch.cuda.current_device()}", **apex_kwargs) self.device_count = torch.cuda.device_count() def __repr__(self) -> str: # noqa: D105 - return f"{self.__class__.__name__}(device='{self.device}',opt_level='{self.opt_level}')" + args_list = [f"device='{self.device}'"] + for k, v in self.apex_kwargs.items(): + args_list.append(f"{k}={v}") + return f"{self.__class__.__name__}(" + ",".join(args_list) + ")" def init_components( self, model_fn=None, criterion_fn=None, optimizer_fn=None, scheduler_fn=None, @@ -364,7 +337,7 @@ def init_components( optimizer = self.sync_device(optimizer) model, optimizer = _wrap_into_data_parallel_with_apex( - model, optimizer, distributed_params={"opt_level": self.opt_level} + model, optimizer, distributed_params=self.apex_kwargs ) # scheduler @@ -384,28 +357,11 @@ class DistributedDataParallelApexEngine(DistributedDataParallelEngine): backend: multiprocessing backend to use, default is `"nccl"`. world_size: number of processes. - opt_level: optimization level, should be one of ``"O0"``, - ``"O1"``, ``"O2"`` or ``"O3"``. - - - ``"O0"`` - no-op training - - ``"O1"`` - mixed precision (FP16) training (default) - - ``"O2"`` - "almost" mixed precision training - - ``"O3"`` - another implementation of mixed precision training - - Details about levels can be found here: - https://nvidia.github.io/apex/amp.html#opt-levels - - keep_batchnorm_fp32: To enhance precision and - enable CUDNN batchnorm (which improves performance), - it’s often beneficial to keep batchnorm weights in FP32 even - if the rest of the model is FP16. - loss_scale: If loss_scale is a float value, - use this value as the static (fixed) loss scale. - If loss_scale is the string "dynamic", - adaptively adjust the loss scale over time. - Dynamic loss scale adjustments are performed by Amp automatically. - delay_all_reduce (bool): boolean flag for delayed all reduce, - default is `True`. + apex_kwargs: parameters for `apex.amp.initialize` + except models and optimizers (they will be forwared automatically). + + Docs for `apex.amp.initialize`: + https://nvidia.github.io/apex/amp.html#apex.amp.initialize Examples: @@ -446,10 +402,7 @@ def __init__( port: str = "12345", backend: str = "nccl", world_size: int = None, - opt_level: str = "O1", - keep_batchnorm_fp32: bool = None, - loss_scale: Union[float, str] = None, - delay_all_reduce: bool = True, + **apex_kwargs, ): """Init.""" super().__init__() @@ -459,18 +412,19 @@ def __init__( self._rank = 0 self._world_size = world_size or torch.cuda.device_count() self.device = None - self.opt_level = opt_level - self.delay_all_reduce = delay_all_reduce - self.keep_batchnorm_fp32 = keep_batchnorm_fp32 - self.loss_scale = loss_scale + self.apex_kwargs = apex_kwargs def __repr__(self): # noqa: D105 - return ( - f"{self.__class__.__name__}(address={self.address}, " - f"port={self.port}, backend='{self.backend}', " - f"rank={self._rank}, world_size={self._world_size}, " - f"opt_level='{self.opt_level}')" - ) + args_list = [ + f"address={self.address}", + f"port={self.port}", + f"backend='{self.backend}'", + f"rank={self._rank}", + f"world_size={self._world_size}", + ] + for k, v in self.apex_kwargs.items(): + args_list.append(f"{k}={v}") + return f"{self.__class__.__name__}(" + ",".join(args_list) + ")" def init_components( self, model_fn=None, criterion_fn=None, optimizer_fn=None, scheduler_fn=None, @@ -485,14 +439,9 @@ def init_components( optimizer = optimizer_fn() optimizer = self.sync_device(optimizer) - model, optimizer = amp.initialize( - model, - optimizer, - opt_level=self.opt_level, - keep_batchnorm_fp32=self.keep_batchnorm_fp32, - loss_scale=self.loss_scale, - ) - model = ApexDistributedDataParallel(model, delay_allreduce=self.delay_all_reduce) + model, optimizer = amp.initialize(model, optimizer, **self.apex_kwargs) + # TODO: kwargs for Apex DDP ? + model = ApexDistributedDataParallel(model) # , delay_allreduce=self.delay_all_reduce) scheduler = scheduler_fn() scheduler = self.sync_device(scheduler) diff --git a/catalyst/engines/tests/test_apex.py b/catalyst/engines/tests/test_apex.py index 83be6b8b5a..d00676a838 100644 --- a/catalyst/engines/tests/test_apex.py +++ b/catalyst/engines/tests/test_apex.py @@ -45,7 +45,7 @@ def __init__(self, logdir, device, opt_level): self._opt_level = opt_level def get_engine(self): - return APEXEngine(self._device, self._opt_level) + return APEXEngine(self._device, opt_level=self._opt_level) def get_callbacks(self, stage: str): return { diff --git a/catalyst/engines/tests/test_parallel_apex.py b/catalyst/engines/tests/test_parallel_apex.py index 6262ed8e6f..9bcf0f9a60 100644 --- a/catalyst/engines/tests/test_parallel_apex.py +++ b/catalyst/engines/tests/test_parallel_apex.py @@ -44,7 +44,7 @@ def __init__(self, logdir, opt_level): self._opt_level = opt_level def get_engine(self): - return DataParallelApexEngine(self._opt_level) + return DataParallelApexEngine(opt_level=self._opt_level) def get_callbacks(self, stage: str): return { From c64d02319c6e2524b66b7ea7684a952fb95e0a6d Mon Sep 17 00:00:00 2001 From: Dmytro Doroshenko Date: Sun, 4 Apr 2021 01:42:21 +0300 Subject: [PATCH 02/10] removed unused typing import (Union) --- catalyst/engines/apex.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/catalyst/engines/apex.py b/catalyst/engines/apex.py index 5a94e350eb..ad528e4368 100644 --- a/catalyst/engines/apex.py +++ b/catalyst/engines/apex.py @@ -1,4 +1,4 @@ -from typing import Dict, Union +from typing import Dict from collections import OrderedDict import torch From a861c0b2a5a8ffeb58afc0137361754efeee1655 Mon Sep 17 00:00:00 2001 From: Dmytro Doroshenko Date: Wed, 7 Apr 2021 20:33:43 +0300 Subject: [PATCH 03/10] apex kwargs now is a dict --- catalyst/engines/apex.py | 25 +++++++++---------- catalyst/engines/tests/test_apex.py | 4 +-- .../engines/tests/test_distributed_apex.py | 6 +++-- catalyst/engines/tests/test_parallel_apex.py | 7 ++++-- 4 files changed, 23 insertions(+), 19 deletions(-) diff --git a/catalyst/engines/apex.py b/catalyst/engines/apex.py index ad528e4368..01daaa0dc9 100644 --- a/catalyst/engines/apex.py +++ b/catalyst/engines/apex.py @@ -1,4 +1,4 @@ -from typing import Dict +from typing import Any, Dict from collections import OrderedDict import torch @@ -163,15 +163,15 @@ def get_engine(self): """ - def __init__(self, device: str = "cuda", **apex_kwargs): + def __init__(self, device: str = "cuda", apex_kwargs: Dict[str, Any] = None): """Init.""" super().__init__(device) + if apex_kwargs is None: + apex_kwargs = {} self.apex_kwargs = apex_kwargs def __repr__(self) -> str: # noqa: D105 - args_list = [f"device='{self.device}'"] - for k, v in self.apex_kwargs.items(): - args_list.append(f"{k}={v}") + args_list = [f"device='{self.device}'", f"apex_kwargs={self.apex_kwargs}"] return f"{self.__class__.__name__}(" + ",".join(args_list) + ")" def init_components( @@ -310,15 +310,13 @@ def get_engine(self): """ - def __init__(self, **apex_kwargs): + def __init__(self, apex_kwargs: Dict[str, Any]): """Init.""" - super().__init__(f"cuda:{torch.cuda.current_device()}", **apex_kwargs) + super().__init__(f"cuda:{torch.cuda.current_device()}", apex_kwargs) self.device_count = torch.cuda.device_count() def __repr__(self) -> str: # noqa: D105 - args_list = [f"device='{self.device}'"] - for k, v in self.apex_kwargs.items(): - args_list.append(f"{k}={v}") + args_list = [f"device='{self.device}'", f"apex_kwargs={self.apex_kwargs}"] return f"{self.__class__.__name__}(" + ",".join(args_list) + ")" def init_components( @@ -402,7 +400,7 @@ def __init__( port: str = "12345", backend: str = "nccl", world_size: int = None, - **apex_kwargs, + apex_kwargs: Dict[str, Any] = None, ): """Init.""" super().__init__() @@ -412,6 +410,8 @@ def __init__( self._rank = 0 self._world_size = world_size or torch.cuda.device_count() self.device = None + if apex_kwargs is None: + apex_kwargs = {} self.apex_kwargs = apex_kwargs def __repr__(self): # noqa: D105 @@ -421,9 +421,8 @@ def __repr__(self): # noqa: D105 f"backend='{self.backend}'", f"rank={self._rank}", f"world_size={self._world_size}", + f"apex_kwargs={self.apex_kwargs}", ] - for k, v in self.apex_kwargs.items(): - args_list.append(f"{k}={v}") return f"{self.__class__.__name__}(" + ",".join(args_list) + ")" def init_components( diff --git a/catalyst/engines/tests/test_apex.py b/catalyst/engines/tests/test_apex.py index d00676a838..be9ba35505 100644 --- a/catalyst/engines/tests/test_apex.py +++ b/catalyst/engines/tests/test_apex.py @@ -45,7 +45,7 @@ def __init__(self, logdir, device, opt_level): self._opt_level = opt_level def get_engine(self): - return APEXEngine(self._device, opt_level=self._opt_level) + return APEXEngine(self._device, apex_kwargs=dict(opt_level=self._opt_level)) def get_callbacks(self, stage: str): return { @@ -116,7 +116,7 @@ def train_from_config(device, opt_level): "engine": { "_target_": "APEXEngine", "device": device, - "opt_level": opt_level.upper(), + "apex_kwargs": {"opt_level": opt_level.upper()}, }, "args": {"logdir": logdir}, "stages": { diff --git a/catalyst/engines/tests/test_distributed_apex.py b/catalyst/engines/tests/test_distributed_apex.py index 4aa5260331..8db094b44c 100644 --- a/catalyst/engines/tests/test_distributed_apex.py +++ b/catalyst/engines/tests/test_distributed_apex.py @@ -46,7 +46,9 @@ def __init__(self, logdir, opt_level, port="12345"): self._port = port def get_engine(self): - return DistributedDataParallelApexEngine(port=self._port, opt_level=self._opt_level) + return DistributedDataParallelApexEngine( + port=self._port, apex_kwargs=dict(opt_level=self._opt_level) + ) def get_callbacks(self, stage: str): return { @@ -117,7 +119,7 @@ def train_from_config(port, logdir, opt_lvl): "engine": { "_target_": "DistributedDataParallelApexEngine", "port": port, - "opt_level": opt, + "apex_kwargs": {"opt_level": opt}, }, "loggers": {"console": {"_target_": "ConsoleLogger"}}, "stages": { diff --git a/catalyst/engines/tests/test_parallel_apex.py b/catalyst/engines/tests/test_parallel_apex.py index 9bcf0f9a60..c19aed5c13 100644 --- a/catalyst/engines/tests/test_parallel_apex.py +++ b/catalyst/engines/tests/test_parallel_apex.py @@ -44,7 +44,7 @@ def __init__(self, logdir, opt_level): self._opt_level = opt_level def get_engine(self): - return DataParallelApexEngine(opt_level=self._opt_level) + return DataParallelApexEngine(apex_kwargs=dict(opt_level=self._opt_level)) def get_callbacks(self, stage: str): return { @@ -111,7 +111,10 @@ def train_from_config(opt_level): config={ "args": {"logdir": logdir}, "model": {"_target_": "DummyModel", "in_features": 4, "out_features": 2}, - "engine": {"_target_": "DataParallelApexEngine", "opt_level": opt_level}, + "engine": { + "_target_": "DataParallelApexEngine", + "apex_kwargs": {"opt_level": opt_level}, + }, "args": {"logdir": logdir}, "stages": { "stage1": { From 3f95b31c608e1bfad602e3d08cdba8e747a9803c Mon Sep 17 00:00:00 2001 From: Dmytro Doroshenko Date: Wed, 7 Apr 2021 20:42:15 +0300 Subject: [PATCH 04/10] scaler_kwargs --- catalyst/engines/amp.py | 47 +++++++++++++++++++++++++++++++++-------- 1 file changed, 38 insertions(+), 9 deletions(-) diff --git a/catalyst/engines/amp.py b/catalyst/engines/amp.py index 1d0f850e3b..2bca0bef7e 100644 --- a/catalyst/engines/amp.py +++ b/catalyst/engines/amp.py @@ -1,3 +1,5 @@ +from typing import Any, Dict + import torch from torch import nn import torch.cuda.amp as amp @@ -10,6 +12,9 @@ class AMPEngine(DeviceEngine): Args: device: used device, default is `"cuda"`. + scaler_kwargs: parameters for `torch.cuda.amp.GradScaler`. + Possible parameters: + https://pytorch.org/docs/stable/amp.html#torch.cuda.amp.GradScaler Examples: @@ -41,13 +46,19 @@ def get_engine(self): """ - def __init__(self, device: str = "cuda"): + def __init__(self, device: str = "cuda", scaler_kwargs: Dict[str, Any] = None): """Init.""" super().__init__(device) - self.scaler = amp.GradScaler() + if scaler_kwargs is None: + scaler_kwargs = {} + self.scaler_kwargs = scaler_kwargs + self.scaler = amp.GradScaler(**self.scaler_kwargs) def __repr__(self) -> str: # noqa: D105 - return f"{self.__class__.__name__}(device='{self.device}')" + return ( + f"{self.__class__.__name__}(device='{self.device}', " + f"scaler_kwargs={self.scaler_kwargs})" + ) def backward_loss(self, loss, model, optimizer) -> None: """Abstraction over ``loss.backward()`` step.""" @@ -67,6 +78,11 @@ def autocast(self): class DataParallelAMPEngine(AMPEngine): """AMP multi-gpu training device engine. + Args: + scaler_kwargs: parameters for `torch.cuda.amp.GradScaler`. + Possible parameters: + https://pytorch.org/docs/stable/amp.html#torch.cuda.amp.GradScaler + Examples: .. code-block:: python @@ -96,13 +112,16 @@ def get_engine(self): """ - def __init__(self): + def __init__(self, scaler_kwargs: Dict[str, Any] = None): """Init.""" - super().__init__(f"cuda:{torch.cuda.current_device()}") + super().__init__(f"cuda:{torch.cuda.current_device()}", scaler_kwargs) self.device_count = torch.cuda.device_count() def __repr__(self) -> str: # noqa: D105 - return f"{self.__class__.__name__}(device='{self.device}')" + return ( + f"{self.__class__.__name__}(device='{self.device}', " + f"scaler_kwargs={self.scaler_kwargs})" + ) def init_components( self, model_fn=None, criterion_fn=None, optimizer_fn=None, scheduler_fn=None, @@ -140,6 +159,9 @@ class DistributedDataParallelAMPEngine(DistributedDataParallelEngine): backend: multiprocessing backend to use, default is `"nccl"`. world_size: number of processes. + scaler_kwargs: parameters for `torch.cuda.amp.GradScaler`. + Possible parameters: + https://pytorch.org/docs/stable/amp.html#torch.cuda.amp.GradScaler Examples: @@ -177,16 +199,23 @@ def __init__( port: str = "12345", backend: str = "nccl", world_size: int = None, + scaler_kwargs: Dict[str, Any] = None, ): """Init.""" super().__init__(address, port, backend, world_size) - self.scaler = amp.GradScaler() + if scaler_kwargs is None: + scaler_kwargs = {} + self.scaler_kwargs = scaler_kwargs + self.scaler = amp.GradScaler(**self.scaler_kwargs) def __repr__(self): # noqa: D105 return ( f"{self.__class__.__name__}(address={self.address}, " - f"port={self.port}, backend='{self.backend}'," - f"rank={self._rank}, world_size={self._world_size})" + f"port={self.port}, " + f"backend='{self.backend}', " + f"rank={self._rank}, " + f"world_size={self._world_size}, " + f"scaler_kwargs={self.scaler_kwargs})" ) def backward_loss(self, loss, model, optimizer) -> None: From 5657ed02410a2b9c3b631d72fd531123ca5def39 Mon Sep 17 00:00:00 2001 From: Dmytro Doroshenko Date: Wed, 7 Apr 2021 20:53:57 +0300 Subject: [PATCH 05/10] feature: ddp_kwargs --- catalyst/engines/amp.py | 11 ++----- catalyst/engines/apex.py | 17 ++--------- catalyst/engines/tests/test_distributed.py | 6 ++-- .../engines/tests/test_distributed_amp.py | 6 ++-- .../engines/tests/test_distributed_apex.py | 4 +-- catalyst/engines/torch.py | 30 ++++++++++--------- 6 files changed, 30 insertions(+), 44 deletions(-) diff --git a/catalyst/engines/amp.py b/catalyst/engines/amp.py index 2bca0bef7e..50f0e58ee3 100644 --- a/catalyst/engines/amp.py +++ b/catalyst/engines/amp.py @@ -193,16 +193,9 @@ def get_engine(self): """ - def __init__( - self, - address: str = "localhost", - port: str = "12345", - backend: str = "nccl", - world_size: int = None, - scaler_kwargs: Dict[str, Any] = None, - ): + def __init__(self, ddp_kwargs: Dict[str, Any] = None, scaler_kwargs: Dict[str, Any] = None): """Init.""" - super().__init__(address, port, backend, world_size) + super().__init__(ddp_kwargs=ddp_kwargs) if scaler_kwargs is None: scaler_kwargs = {} self.scaler_kwargs = scaler_kwargs diff --git a/catalyst/engines/apex.py b/catalyst/engines/apex.py index 01daaa0dc9..aee2ea6da8 100644 --- a/catalyst/engines/apex.py +++ b/catalyst/engines/apex.py @@ -394,22 +394,9 @@ def get_engine(self): ... """ - def __init__( - self, - address: str = "localhost", - port: str = "12345", - backend: str = "nccl", - world_size: int = None, - apex_kwargs: Dict[str, Any] = None, - ): + def __init__(self, ddp_kwargs: Dict[str, Any] = None, apex_kwargs: Dict[str, Any] = None): """Init.""" - super().__init__() - self.address = address - self.port = port - self.backend = backend - self._rank = 0 - self._world_size = world_size or torch.cuda.device_count() - self.device = None + super().__init__(ddp_kwargs=ddp_kwargs) if apex_kwargs is None: apex_kwargs = {} self.apex_kwargs = apex_kwargs diff --git a/catalyst/engines/tests/test_distributed.py b/catalyst/engines/tests/test_distributed.py index 7e328f7346..d7b3b85ac4 100644 --- a/catalyst/engines/tests/test_distributed.py +++ b/catalyst/engines/tests/test_distributed.py @@ -41,7 +41,9 @@ def __init__(self, logdir): self._logdir = logdir def get_engine(self): - return DistributedDataParallelEngine(port=DDP_ADDRESS + random.randint(1, 100)) + return DistributedDataParallelEngine( + ddp_kwargs=dict(port=DDP_ADDRESS + random.randint(1, 100)) + ) def get_callbacks(self, stage: str): return { @@ -124,7 +126,7 @@ def test_config_ddp_engine(): "model": {"_target_": "DummyModel", "in_features": 4, "out_features": 2}, "engine": { "_target_": "DistributedDataParallelEngine", - "port": DDP_ADDRESS + random.randint(100, 200), + "ddp_kwargs": {"port": DDP_ADDRESS + random.randint(100, 200)}, }, "loggers": {"console": {"_target_": "ConsoleLogger"}}, "stages": { diff --git a/catalyst/engines/tests/test_distributed_amp.py b/catalyst/engines/tests/test_distributed_amp.py index 562000765e..67fe8dbfaa 100644 --- a/catalyst/engines/tests/test_distributed_amp.py +++ b/catalyst/engines/tests/test_distributed_amp.py @@ -43,7 +43,9 @@ def __init__(self, logdir): self._logdir = logdir def get_engine(self): - return DistributedDataParallelAMPEngine(port=DDP_ADDRESS + random.randint(1, 100)) + return DistributedDataParallelAMPEngine( + ddp_kwargs=dict(port=DDP_ADDRESS + random.randint(1, 100)) + ) def get_callbacks(self, stage: str): return { @@ -126,7 +128,7 @@ def test_train_with_config_experiment_distributed_parallel_amp_device(): "model": {"_target_": "DummyModel", "in_features": 4, "out_features": 2}, "engine": { "_target_": "DistributedDataParallelAMPEngine", - "port": DDP_ADDRESS + random.randint(100, 200), + "ddp_kwargs": {"port": DDP_ADDRESS + random.randint(100, 200)}, }, "loggers": {"console": {"_target_": "ConsoleLogger"}}, "stages": { diff --git a/catalyst/engines/tests/test_distributed_apex.py b/catalyst/engines/tests/test_distributed_apex.py index 8db094b44c..950ddf3880 100644 --- a/catalyst/engines/tests/test_distributed_apex.py +++ b/catalyst/engines/tests/test_distributed_apex.py @@ -47,7 +47,7 @@ def __init__(self, logdir, opt_level, port="12345"): def get_engine(self): return DistributedDataParallelApexEngine( - port=self._port, apex_kwargs=dict(opt_level=self._opt_level) + ddp_kwargs=dict(port=self._port), apex_kwargs=dict(opt_level=self._opt_level) ) def get_callbacks(self, stage: str): @@ -118,7 +118,7 @@ def train_from_config(port, logdir, opt_lvl): "model": {"_target_": "DummyModel", "in_features": 4, "out_features": 2}, "engine": { "_target_": "DistributedDataParallelApexEngine", - "port": port, + "ddp_kwargs": {"port": port}, "apex_kwargs": {"opt_level": opt}, }, "loggers": {"console": {"_target_": "ConsoleLogger"}}, diff --git a/catalyst/engines/torch.py b/catalyst/engines/torch.py index 176c9d1c34..a5dd39a407 100644 --- a/catalyst/engines/torch.py +++ b/catalyst/engines/torch.py @@ -302,27 +302,27 @@ def get_engine(self): """ - def __init__( - self, - address: str = "localhost", - port: str = "12345", - backend: str = "nccl", - world_size: int = None, - ): + def __init__(self, ddp_kwargs: Dict[str, Any] = None): """Init.""" super().__init__() - self.address = address - self.port = port - self.backend = backend + if ddp_kwargs is None: + ddp_kwargs = {} + self.address = ddp_kwargs.pop("address", "localhost") + self.port = ddp_kwargs.pop("port", "12345") + self.backend = ddp_kwargs.pop("backend", "nccl") + self._world_size = ddp_kwargs.pop("world_size", None) or torch.cuda.device_count() + self.ddp_kwargs = ddp_kwargs self._rank = 0 - self._world_size = world_size or torch.cuda.device_count() self.device = None def __repr__(self): # noqa: D105 return ( f"{self.__class__.__name__}(address={self.address}, " - f"port={self.port}, backend='{self.backend}'," - f"rank={self._rank}, world_size={self._world_size})" + f"port={self.port}, " + f"backend='{self.backend}', " + f"rank={self._rank}, " + f"world_size={self._world_size}, " + f"ddp_kwargs={self.ddp_kwargs})" ) @property @@ -367,7 +367,9 @@ def setup_process(self, rank: int = -1, world_size: int = 1): self._world_size = world_size os.environ["MASTER_ADDR"] = str(self.address) os.environ["MASTER_PORT"] = str(self.port) - dist.init_process_group(self.backend, rank=self.rank, world_size=self.world_size) + dist.init_process_group( + self.backend, rank=self.rank, world_size=self.world_size, **self.ddp_kwargs + ) torch.cuda.set_device(int(self._rank)) self.device = f"cuda:{int(self._rank)}" From 4dd7f684a28e2c5d18cd399d32f45f108a40e824 Mon Sep 17 00:00:00 2001 From: Dmytro Doroshenko Date: Wed, 7 Apr 2021 21:00:33 +0300 Subject: [PATCH 06/10] docs: examples of kwargs usage --- catalyst/engines/amp.py | 20 +++++++++++--------- catalyst/engines/apex.py | 20 +++++++++----------- catalyst/engines/torch.py | 15 ++++++--------- 3 files changed, 26 insertions(+), 29 deletions(-) diff --git a/catalyst/engines/amp.py b/catalyst/engines/amp.py index 50f0e58ee3..e9f3af1a88 100644 --- a/catalyst/engines/amp.py +++ b/catalyst/engines/amp.py @@ -152,13 +152,9 @@ class DistributedDataParallelAMPEngine(DistributedDataParallelEngine): """Distributed AMP multi-gpu training device engine. Args: - address: process address to use - (required for PyTorch backend), default is `"localhost"`. - port: process port to listen - (required for PyTorch backend), default is `"12345"`. - backend: multiprocessing backend to use, - default is `"nccl"`. - world_size: number of processes. + ddp_kwargs: parameters for `torch.distributed.init_process_group`. + More info here: + https://pytorch.org/docs/stable/distributed.html#torch.distributed.init_process_group scaler_kwargs: parameters for `torch.cuda.amp.GradScaler`. Possible parameters: https://pytorch.org/docs/stable/amp.html#torch.cuda.amp.GradScaler @@ -172,7 +168,10 @@ class DistributedDataParallelAMPEngine(DistributedDataParallelEngine): class MyRunner(dl.IRunner): # ... def get_engine(self): - return dl.DistributedDataParallelAMPEngine(port=12345) + return dl.DistributedDataParallelAMPEngine( + ddp_kwargs={"port": 12345}, + scaler_kwargs={"growth_factor": 1.5} + ) # ... .. code-block:: yaml @@ -186,7 +185,10 @@ def get_engine(self): engine: _target_: DistributedDataParallelAMPEngine - port: 12345 + ddp_kwargs: + port: 12345 + scaler_kwargs: + growth_factor: 1.5 stages: ... diff --git a/catalyst/engines/apex.py b/catalyst/engines/apex.py index aee2ea6da8..669167c7be 100644 --- a/catalyst/engines/apex.py +++ b/catalyst/engines/apex.py @@ -348,13 +348,9 @@ class DistributedDataParallelApexEngine(DistributedDataParallelEngine): """Distributed Apex MultiGPU training device engine. Args: - address: process address to use - (required for PyTorch backend), default is `"localhost"`. - port: process port to listen - (required for PyTorch backend), default is `"12345"`. - backend: multiprocessing backend to use, - default is `"nccl"`. - world_size: number of processes. + ddp_kwargs: parameters for `torch.distributed.init_process_group`. + More info here: + https://pytorch.org/docs/stable/distributed.html#torch.distributed.init_process_group apex_kwargs: parameters for `apex.amp.initialize` except models and optimizers (they will be forwared automatically). @@ -371,8 +367,8 @@ class MyRunner(dl.IRunner): # ... def get_engine(self): return dl.DistributedDataParallelApexEngine( - port=12345, - opt_level="O1" + ddp_kwargs={"port": 12345}, + apex_kwargs={"opt_level": "O1"}, ) # ... @@ -387,8 +383,10 @@ def get_engine(self): engine: _target_: DistributedDataParallelApexEngine - port: 12345 - opt_level: O1 + ddp_kwargs: + port: 12345 + apex_kwargs: + opt_level: O1 stages: ... diff --git a/catalyst/engines/torch.py b/catalyst/engines/torch.py index a5dd39a407..5e13e56e3e 100644 --- a/catalyst/engines/torch.py +++ b/catalyst/engines/torch.py @@ -264,13 +264,9 @@ class DistributedDataParallelEngine(DeviceEngine): """Distributed MultiGPU training device engine. Args: - address: process address to use - (required for PyTorch backend), default is `"localhost"`. - port: process port to listen - (required for PyTorch backend), default is `"12345"`. - backend: multiprocessing backend to use, - default is `"nccl"`. - world_size: number of processes. + ddp_kwargs: parameters for `torch.distributed.init_process_group`. + More info here: + https://pytorch.org/docs/stable/distributed.html#torch.distributed.init_process_group Examples: @@ -281,7 +277,7 @@ class DistributedDataParallelEngine(DeviceEngine): class MyRunner(dl.IRunner): # ... def get_engine(self): - return dl.DistributedDataParallelEngine(port=12345) + return dl.DistributedDataParallelEngine(ddp_kwargs={"port": 12345}) # ... .. code-block:: yaml @@ -295,7 +291,8 @@ def get_engine(self): engine: _target_: DistributedDataParallelEngine - port: 12345 + ddp_kwargs: + port: 12345 stages: ... From 25d218c03f4574f6e0d206253fbac28dc17361a8 Mon Sep 17 00:00:00 2001 From: Dmytro Doroshenko Date: Thu, 8 Apr 2021 10:56:02 +0300 Subject: [PATCH 07/10] feature: ddp_kwargs -> process_group_kwargs --- catalyst/engines/amp.py | 13 +++++--- catalyst/engines/apex.py | 33 ++++++++++--------- catalyst/engines/tests/test_distributed.py | 4 +-- .../engines/tests/test_distributed_amp.py | 2 +- .../engines/tests/test_distributed_apex.py | 4 +-- catalyst/engines/torch.py | 28 ++++++++-------- 6 files changed, 45 insertions(+), 39 deletions(-) diff --git a/catalyst/engines/amp.py b/catalyst/engines/amp.py index e9f3af1a88..c4f7a81827 100644 --- a/catalyst/engines/amp.py +++ b/catalyst/engines/amp.py @@ -152,7 +152,7 @@ class DistributedDataParallelAMPEngine(DistributedDataParallelEngine): """Distributed AMP multi-gpu training device engine. Args: - ddp_kwargs: parameters for `torch.distributed.init_process_group`. + process_group_kwargs: parameters for `torch.distributed.init_process_group`. More info here: https://pytorch.org/docs/stable/distributed.html#torch.distributed.init_process_group scaler_kwargs: parameters for `torch.cuda.amp.GradScaler`. @@ -169,7 +169,7 @@ class MyRunner(dl.IRunner): # ... def get_engine(self): return dl.DistributedDataParallelAMPEngine( - ddp_kwargs={"port": 12345}, + process_group_kwargs={"port": 12345}, scaler_kwargs={"growth_factor": 1.5} ) # ... @@ -185,7 +185,7 @@ def get_engine(self): engine: _target_: DistributedDataParallelAMPEngine - ddp_kwargs: + process_group_kwargs: port: 12345 scaler_kwargs: growth_factor: 1.5 @@ -195,9 +195,11 @@ def get_engine(self): """ - def __init__(self, ddp_kwargs: Dict[str, Any] = None, scaler_kwargs: Dict[str, Any] = None): + def __init__( + self, process_group_kwargs: Dict[str, Any] = None, scaler_kwargs: Dict[str, Any] = None + ): """Init.""" - super().__init__(ddp_kwargs=ddp_kwargs) + super().__init__(process_group_kwargs=process_group_kwargs) if scaler_kwargs is None: scaler_kwargs = {} self.scaler_kwargs = scaler_kwargs @@ -210,6 +212,7 @@ def __repr__(self): # noqa: D105 f"backend='{self.backend}', " f"rank={self._rank}, " f"world_size={self._world_size}, " + f"process_group_kwargs={self.process_group_kwargs}, " f"scaler_kwargs={self.scaler_kwargs})" ) diff --git a/catalyst/engines/apex.py b/catalyst/engines/apex.py index 669167c7be..c8d645742b 100644 --- a/catalyst/engines/apex.py +++ b/catalyst/engines/apex.py @@ -316,8 +316,7 @@ def __init__(self, apex_kwargs: Dict[str, Any]): self.device_count = torch.cuda.device_count() def __repr__(self) -> str: # noqa: D105 - args_list = [f"device='{self.device}'", f"apex_kwargs={self.apex_kwargs}"] - return f"{self.__class__.__name__}(" + ",".join(args_list) + ")" + return f"{self.__class__.__name__}(device='{self.device}', apex_kwargs={self.apex_kwargs})" def init_components( self, model_fn=None, criterion_fn=None, optimizer_fn=None, scheduler_fn=None, @@ -348,7 +347,7 @@ class DistributedDataParallelApexEngine(DistributedDataParallelEngine): """Distributed Apex MultiGPU training device engine. Args: - ddp_kwargs: parameters for `torch.distributed.init_process_group`. + process_group_kwargs: parameters for `torch.distributed.init_process_group`. More info here: https://pytorch.org/docs/stable/distributed.html#torch.distributed.init_process_group apex_kwargs: parameters for `apex.amp.initialize` @@ -367,7 +366,7 @@ class MyRunner(dl.IRunner): # ... def get_engine(self): return dl.DistributedDataParallelApexEngine( - ddp_kwargs={"port": 12345}, + process_group_kwargs={"port": 12345}, apex_kwargs={"opt_level": "O1"}, ) # ... @@ -383,7 +382,7 @@ def get_engine(self): engine: _target_: DistributedDataParallelApexEngine - ddp_kwargs: + process_group_kwargs: port: 12345 apex_kwargs: opt_level: O1 @@ -392,23 +391,25 @@ def get_engine(self): ... """ - def __init__(self, ddp_kwargs: Dict[str, Any] = None, apex_kwargs: Dict[str, Any] = None): + def __init__( + self, process_group_kwargs: Dict[str, Any] = None, apex_kwargs: Dict[str, Any] = None + ): """Init.""" - super().__init__(ddp_kwargs=ddp_kwargs) + super().__init__(process_group_kwargs=process_group_kwargs) if apex_kwargs is None: apex_kwargs = {} self.apex_kwargs = apex_kwargs def __repr__(self): # noqa: D105 - args_list = [ - f"address={self.address}", - f"port={self.port}", - f"backend='{self.backend}'", - f"rank={self._rank}", - f"world_size={self._world_size}", - f"apex_kwargs={self.apex_kwargs}", - ] - return f"{self.__class__.__name__}(" + ",".join(args_list) + ")" + return ( + f"{self.__class__.__name__}(address={self.address}, " + f"port={self.port}, " + f"backend='{self.backend}', " + f"rank={self._rank}, " + f"world_size={self._world_size}, " + f"process_group_kwargs={self.process_group_kwargs}, " + f"apex_kwargs={self.apex_kwargs})" + ) def init_components( self, model_fn=None, criterion_fn=None, optimizer_fn=None, scheduler_fn=None, diff --git a/catalyst/engines/tests/test_distributed.py b/catalyst/engines/tests/test_distributed.py index d7b3b85ac4..97b1d196f9 100644 --- a/catalyst/engines/tests/test_distributed.py +++ b/catalyst/engines/tests/test_distributed.py @@ -42,7 +42,7 @@ def __init__(self, logdir): def get_engine(self): return DistributedDataParallelEngine( - ddp_kwargs=dict(port=DDP_ADDRESS + random.randint(1, 100)) + process_group_kwargs=dict(port=DDP_ADDRESS + random.randint(1, 100)) ) def get_callbacks(self, stage: str): @@ -126,7 +126,7 @@ def test_config_ddp_engine(): "model": {"_target_": "DummyModel", "in_features": 4, "out_features": 2}, "engine": { "_target_": "DistributedDataParallelEngine", - "ddp_kwargs": {"port": DDP_ADDRESS + random.randint(100, 200)}, + "process_group_kwargs": {"port": DDP_ADDRESS + random.randint(100, 200)}, }, "loggers": {"console": {"_target_": "ConsoleLogger"}}, "stages": { diff --git a/catalyst/engines/tests/test_distributed_amp.py b/catalyst/engines/tests/test_distributed_amp.py index 67fe8dbfaa..b9ff0b9ed7 100644 --- a/catalyst/engines/tests/test_distributed_amp.py +++ b/catalyst/engines/tests/test_distributed_amp.py @@ -44,7 +44,7 @@ def __init__(self, logdir): def get_engine(self): return DistributedDataParallelAMPEngine( - ddp_kwargs=dict(port=DDP_ADDRESS + random.randint(1, 100)) + process_group_kwargs=dict(port=DDP_ADDRESS + random.randint(1, 100)) ) def get_callbacks(self, stage: str): diff --git a/catalyst/engines/tests/test_distributed_apex.py b/catalyst/engines/tests/test_distributed_apex.py index 950ddf3880..49c1dd88dd 100644 --- a/catalyst/engines/tests/test_distributed_apex.py +++ b/catalyst/engines/tests/test_distributed_apex.py @@ -47,7 +47,7 @@ def __init__(self, logdir, opt_level, port="12345"): def get_engine(self): return DistributedDataParallelApexEngine( - ddp_kwargs=dict(port=self._port), apex_kwargs=dict(opt_level=self._opt_level) + process_group_kwargs=dict(port=self._port), apex_kwargs=dict(opt_level=self._opt_level) ) def get_callbacks(self, stage: str): @@ -118,7 +118,7 @@ def train_from_config(port, logdir, opt_lvl): "model": {"_target_": "DummyModel", "in_features": 4, "out_features": 2}, "engine": { "_target_": "DistributedDataParallelApexEngine", - "ddp_kwargs": {"port": port}, + "process_group_kwargs": {"port": port}, "apex_kwargs": {"opt_level": opt}, }, "loggers": {"console": {"_target_": "ConsoleLogger"}}, diff --git a/catalyst/engines/torch.py b/catalyst/engines/torch.py index 5e13e56e3e..1b56771fae 100644 --- a/catalyst/engines/torch.py +++ b/catalyst/engines/torch.py @@ -264,7 +264,7 @@ class DistributedDataParallelEngine(DeviceEngine): """Distributed MultiGPU training device engine. Args: - ddp_kwargs: parameters for `torch.distributed.init_process_group`. + process_group_kwargs: parameters for `torch.distributed.init_process_group`. More info here: https://pytorch.org/docs/stable/distributed.html#torch.distributed.init_process_group @@ -277,7 +277,7 @@ class DistributedDataParallelEngine(DeviceEngine): class MyRunner(dl.IRunner): # ... def get_engine(self): - return dl.DistributedDataParallelEngine(ddp_kwargs={"port": 12345}) + return dl.DistributedDataParallelEngine(process_group_kwargs={"port": 12345}) # ... .. code-block:: yaml @@ -291,7 +291,7 @@ def get_engine(self): engine: _target_: DistributedDataParallelEngine - ddp_kwargs: + process_group_kwargs: port: 12345 stages: @@ -299,16 +299,18 @@ def get_engine(self): """ - def __init__(self, ddp_kwargs: Dict[str, Any] = None): + def __init__(self, process_group_kwargs: Dict[str, Any] = None): """Init.""" super().__init__() - if ddp_kwargs is None: - ddp_kwargs = {} - self.address = ddp_kwargs.pop("address", "localhost") - self.port = ddp_kwargs.pop("port", "12345") - self.backend = ddp_kwargs.pop("backend", "nccl") - self._world_size = ddp_kwargs.pop("world_size", None) or torch.cuda.device_count() - self.ddp_kwargs = ddp_kwargs + if process_group_kwargs is None: + process_group_kwargs = {} + self.address = process_group_kwargs.pop("address", "localhost") + self.port = process_group_kwargs.pop("port", "12345") + self.backend = process_group_kwargs.pop("backend", "nccl") + self._world_size = ( + process_group_kwargs.pop("world_size", None) or torch.cuda.device_count() + ) + self.process_group_kwargs = process_group_kwargs self._rank = 0 self.device = None @@ -319,7 +321,7 @@ def __repr__(self): # noqa: D105 f"backend='{self.backend}', " f"rank={self._rank}, " f"world_size={self._world_size}, " - f"ddp_kwargs={self.ddp_kwargs})" + f"process_group_kwargs={self.process_group_kwargs})" ) @property @@ -365,7 +367,7 @@ def setup_process(self, rank: int = -1, world_size: int = 1): os.environ["MASTER_ADDR"] = str(self.address) os.environ["MASTER_PORT"] = str(self.port) dist.init_process_group( - self.backend, rank=self.rank, world_size=self.world_size, **self.ddp_kwargs + self.backend, rank=self.rank, world_size=self.world_size, **self.process_group_kwargs ) torch.cuda.set_device(int(self._rank)) self.device = f"cuda:{int(self._rank)}" From 8d346f1fde0411203d89e8f742ad6fac6dfb4cc5 Mon Sep 17 00:00:00 2001 From: Dmytro Doroshenko Date: Thu, 8 Apr 2021 21:18:34 +0300 Subject: [PATCH 08/10] feature: ddp_kwargs --- catalyst/engines/amp.py | 32 ++++++-- catalyst/engines/apex.py | 39 +++++++--- catalyst/engines/tests/test_distributed.py | 5 +- .../engines/tests/test_distributed_amp.py | 5 +- .../engines/tests/test_distributed_apex.py | 7 +- catalyst/engines/torch.py | 74 +++++++++++++------ 6 files changed, 119 insertions(+), 43 deletions(-) diff --git a/catalyst/engines/amp.py b/catalyst/engines/amp.py index c4f7a81827..871e02cc52 100644 --- a/catalyst/engines/amp.py +++ b/catalyst/engines/amp.py @@ -1,4 +1,4 @@ -from typing import Any, Dict +from typing import Any, Dict, Union import torch from torch import nn @@ -152,6 +152,11 @@ class DistributedDataParallelAMPEngine(DistributedDataParallelEngine): """Distributed AMP multi-gpu training device engine. Args: + address: address to use for backend. + port: port to use for backend. + ddp_kwargs: parameters for `torch.nn.parallel.DistributedDataParallel`. + More info here: + https://pytorch.org/docs/stable/generated/torch.nn.parallel.DistributedDataParallel.html#torch.nn.parallel.DistributedDataParallel process_group_kwargs: parameters for `torch.distributed.init_process_group`. More info here: https://pytorch.org/docs/stable/distributed.html#torch.distributed.init_process_group @@ -169,6 +174,9 @@ class MyRunner(dl.IRunner): # ... def get_engine(self): return dl.DistributedDataParallelAMPEngine( + address="0.0.0.0", + port=23234, + ddp_kwargs={"find_unused_parameters": False}, process_group_kwargs={"port": 12345}, scaler_kwargs={"growth_factor": 1.5} ) @@ -185,6 +193,10 @@ def get_engine(self): engine: _target_: DistributedDataParallelAMPEngine + address: 0.0.0.0 + port: 23234 + ddp_kwargs: + find_unused_parameters: false process_group_kwargs: port: 12345 scaler_kwargs: @@ -196,10 +208,20 @@ def get_engine(self): """ def __init__( - self, process_group_kwargs: Dict[str, Any] = None, scaler_kwargs: Dict[str, Any] = None + self, + address: str = None, + port: Union[str, int] = None, + ddp_kwargs: Dict[str, Any] = None, + process_group_kwargs: Dict[str, Any] = None, + scaler_kwargs: Dict[str, Any] = None, ): """Init.""" - super().__init__(process_group_kwargs=process_group_kwargs) + super().__init__( + address=address, + port=port, + ddp_kwargs=ddp_kwargs, + process_group_kwargs=process_group_kwargs, + ) if scaler_kwargs is None: scaler_kwargs = {} self.scaler_kwargs = scaler_kwargs @@ -209,9 +231,7 @@ def __repr__(self): # noqa: D105 return ( f"{self.__class__.__name__}(address={self.address}, " f"port={self.port}, " - f"backend='{self.backend}', " - f"rank={self._rank}, " - f"world_size={self._world_size}, " + f"ddp_kwargs={self.ddp_kwargs}, " f"process_group_kwargs={self.process_group_kwargs}, " f"scaler_kwargs={self.scaler_kwargs})" ) diff --git a/catalyst/engines/apex.py b/catalyst/engines/apex.py index c8d645742b..aa72da62bc 100644 --- a/catalyst/engines/apex.py +++ b/catalyst/engines/apex.py @@ -1,4 +1,4 @@ -from typing import Any, Dict +from typing import Any, Dict, Union from collections import OrderedDict import torch @@ -347,6 +347,11 @@ class DistributedDataParallelApexEngine(DistributedDataParallelEngine): """Distributed Apex MultiGPU training device engine. Args: + address: address to use for backend. + port: port to use for backend. + ddp_kwargs: parameters for `apex.parallel.DistributedDataParallel`. + More info here: + https://nvidia.github.io/apex/parallel.html#apex.parallel.DistributedDataParallel process_group_kwargs: parameters for `torch.distributed.init_process_group`. More info here: https://pytorch.org/docs/stable/distributed.html#torch.distributed.init_process_group @@ -366,7 +371,10 @@ class MyRunner(dl.IRunner): # ... def get_engine(self): return dl.DistributedDataParallelApexEngine( - process_group_kwargs={"port": 12345}, + address="0.0.0.0", + port=23234, + ddp_kwargs={"allreduce_always_fp32": True}, + process_group_kwargs={"backend": "nccl"}, apex_kwargs={"opt_level": "O1"}, ) # ... @@ -382,8 +390,12 @@ def get_engine(self): engine: _target_: DistributedDataParallelApexEngine + address: 0.0.0.0 + port: 23234 + ddp_kwargs: + allreduce_always_fp32: true process_group_kwargs: - port: 12345 + backend: nccl apex_kwargs: opt_level: O1 @@ -392,10 +404,20 @@ def get_engine(self): """ def __init__( - self, process_group_kwargs: Dict[str, Any] = None, apex_kwargs: Dict[str, Any] = None + self, + address: str = None, + port: Union[str, int] = None, + ddp_kwargs: Dict[str, Any] = None, + process_group_kwargs: Dict[str, Any] = None, + apex_kwargs: Dict[str, Any] = None, ): """Init.""" - super().__init__(process_group_kwargs=process_group_kwargs) + super().__init__( + address=address, port=port, ddp_kwargs=None, process_group_kwargs=process_group_kwargs + ) + if ddp_kwargs is None: + ddp_kwargs = {} + self.ddp_kwargs = ddp_kwargs if apex_kwargs is None: apex_kwargs = {} self.apex_kwargs = apex_kwargs @@ -404,9 +426,7 @@ def __repr__(self): # noqa: D105 return ( f"{self.__class__.__name__}(address={self.address}, " f"port={self.port}, " - f"backend='{self.backend}', " - f"rank={self._rank}, " - f"world_size={self._world_size}, " + f"ddp_kwargs={self.ddp_kwargs}, " f"process_group_kwargs={self.process_group_kwargs}, " f"apex_kwargs={self.apex_kwargs})" ) @@ -425,8 +445,7 @@ def init_components( optimizer = self.sync_device(optimizer) model, optimizer = amp.initialize(model, optimizer, **self.apex_kwargs) - # TODO: kwargs for Apex DDP ? - model = ApexDistributedDataParallel(model) # , delay_allreduce=self.delay_all_reduce) + model = ApexDistributedDataParallel(model, **self.ddp_kwargs) scheduler = scheduler_fn() scheduler = self.sync_device(scheduler) diff --git a/catalyst/engines/tests/test_distributed.py b/catalyst/engines/tests/test_distributed.py index 97b1d196f9..7165c1c9c7 100644 --- a/catalyst/engines/tests/test_distributed.py +++ b/catalyst/engines/tests/test_distributed.py @@ -42,7 +42,7 @@ def __init__(self, logdir): def get_engine(self): return DistributedDataParallelEngine( - process_group_kwargs=dict(port=DDP_ADDRESS + random.randint(1, 100)) + port=DDP_ADDRESS + random.randint(1, 100), process_group_kwargs={"backend": "nccl"} ) def get_callbacks(self, stage: str): @@ -126,7 +126,8 @@ def test_config_ddp_engine(): "model": {"_target_": "DummyModel", "in_features": 4, "out_features": 2}, "engine": { "_target_": "DistributedDataParallelEngine", - "process_group_kwargs": {"port": DDP_ADDRESS + random.randint(100, 200)}, + "port": DDP_ADDRESS + random.randint(100, 200), + "process_group_kwargs": {"backend": "nccl"}, }, "loggers": {"console": {"_target_": "ConsoleLogger"}}, "stages": { diff --git a/catalyst/engines/tests/test_distributed_amp.py b/catalyst/engines/tests/test_distributed_amp.py index b9ff0b9ed7..2441f325a0 100644 --- a/catalyst/engines/tests/test_distributed_amp.py +++ b/catalyst/engines/tests/test_distributed_amp.py @@ -44,7 +44,7 @@ def __init__(self, logdir): def get_engine(self): return DistributedDataParallelAMPEngine( - process_group_kwargs=dict(port=DDP_ADDRESS + random.randint(1, 100)) + port=DDP_ADDRESS + random.randint(1, 100), process_group_kwargs={"backend": "nccl"} ) def get_callbacks(self, stage: str): @@ -128,7 +128,8 @@ def test_train_with_config_experiment_distributed_parallel_amp_device(): "model": {"_target_": "DummyModel", "in_features": 4, "out_features": 2}, "engine": { "_target_": "DistributedDataParallelAMPEngine", - "ddp_kwargs": {"port": DDP_ADDRESS + random.randint(100, 200)}, + "port": DDP_ADDRESS + random.randint(100, 200), + "process_group_kwargs": {"backend": "nccl"}, }, "loggers": {"console": {"_target_": "ConsoleLogger"}}, "stages": { diff --git a/catalyst/engines/tests/test_distributed_apex.py b/catalyst/engines/tests/test_distributed_apex.py index 49c1dd88dd..7de4dbfc9a 100644 --- a/catalyst/engines/tests/test_distributed_apex.py +++ b/catalyst/engines/tests/test_distributed_apex.py @@ -47,7 +47,9 @@ def __init__(self, logdir, opt_level, port="12345"): def get_engine(self): return DistributedDataParallelApexEngine( - process_group_kwargs=dict(port=self._port), apex_kwargs=dict(opt_level=self._opt_level) + port=DDP_ADDRESS + random.randint(1, 100), + process_group_kwargs={"backend": "nccl"}, + apex_kwargs=dict(opt_level=self._opt_level), ) def get_callbacks(self, stage: str): @@ -118,7 +120,8 @@ def train_from_config(port, logdir, opt_lvl): "model": {"_target_": "DummyModel", "in_features": 4, "out_features": 2}, "engine": { "_target_": "DistributedDataParallelApexEngine", - "process_group_kwargs": {"port": port}, + "port": DDP_ADDRESS + random.randint(100, 200), + "process_group_kwargs": {"backend": "nccl"}, "apex_kwargs": {"opt_level": opt}, }, "loggers": {"console": {"_target_": "ConsoleLogger"}}, diff --git a/catalyst/engines/torch.py b/catalyst/engines/torch.py index 1b56771fae..34274491a0 100644 --- a/catalyst/engines/torch.py +++ b/catalyst/engines/torch.py @@ -1,4 +1,5 @@ from typing import Any, Dict, Mapping, Union +import copy import os import torch @@ -264,6 +265,11 @@ class DistributedDataParallelEngine(DeviceEngine): """Distributed MultiGPU training device engine. Args: + address: address to use for backend. + port: port to use for backend. + ddp_kwargs: parameters for `torch.nn.parallel.DistributedDataParallel`. + More info here: + https://pytorch.org/docs/stable/generated/torch.nn.parallel.DistributedDataParallel.html#torch.nn.parallel.DistributedDataParallel process_group_kwargs: parameters for `torch.distributed.init_process_group`. More info here: https://pytorch.org/docs/stable/distributed.html#torch.distributed.init_process_group @@ -277,7 +283,12 @@ class DistributedDataParallelEngine(DeviceEngine): class MyRunner(dl.IRunner): # ... def get_engine(self): - return dl.DistributedDataParallelEngine(process_group_kwargs={"port": 12345}) + return dl.DistributedDataParallelEngine( + address="0.0.0.0", + port=23234, + ddp_kwargs={"find_unused_parameters": False}, + process_group_kwargs={"backend": "nccl"}, + ) # ... .. code-block:: yaml @@ -291,36 +302,59 @@ def get_engine(self): engine: _target_: DistributedDataParallelEngine + address: 0.0.0.0 + port: 23234 + ddp_kwargs: + find_unused_parameters: false process_group_kwargs: - port: 12345 + backend: nccl stages: ... """ - def __init__(self, process_group_kwargs: Dict[str, Any] = None): + def __init__( + self, + address: str = None, + port: Union[str, int] = None, + ddp_kwargs: Dict[str, Any] = None, + process_group_kwargs: Dict[str, Any] = None, + ): """Init.""" super().__init__() + self.address = address or "localhost" + self.port = port or 12345 + self._rank = 0 + self.device = None + + if ddp_kwargs is None: + ddp_kwargs = {} + self.ddp_kwargs = copy.deepcopy(ddp_kwargs) + if "device_ids" not in self.ddp_kwargs: + self.ddp_kwargs["device_ids"] = [self.device] + if process_group_kwargs is None: process_group_kwargs = {} - self.address = process_group_kwargs.pop("address", "localhost") - self.port = process_group_kwargs.pop("port", "12345") - self.backend = process_group_kwargs.pop("backend", "nccl") + self.process_group_kwargs = copy.deepcopy(process_group_kwargs) + # add missing arguments + default_values = [ + ("backend", "nccl"), + ("world_size", torch.cuda.device_count()), + ] + for required_arg, default_value in default_values: + if required_arg not in self.process_group_kwargs: + self.process_group_kwargs[required_arg] = default_value + self._world_size = ( - process_group_kwargs.pop("world_size", None) or torch.cuda.device_count() + self.process_group_kwargs.get("world_size", None) or torch.cuda.device_count() ) - self.process_group_kwargs = process_group_kwargs - self._rank = 0 - self.device = None def __repr__(self): # noqa: D105 return ( f"{self.__class__.__name__}(address={self.address}, " f"port={self.port}, " - f"backend='{self.backend}', " - f"rank={self._rank}, " - f"world_size={self._world_size}, " + f"ddp_kwargs={self.ddp_kwargs}, " f"process_group_kwargs={self.process_group_kwargs})" ) @@ -364,11 +398,11 @@ def setup_process(self, rank: int = -1, world_size: int = 1): """ self._rank = rank self._world_size = world_size + self.process_group_kwargs["rank"] = rank + self.process_group_kwargs["world_size"] = world_size os.environ["MASTER_ADDR"] = str(self.address) os.environ["MASTER_PORT"] = str(self.port) - dist.init_process_group( - self.backend, rank=self.rank, world_size=self.world_size, **self.process_group_kwargs - ) + dist.init_process_group(**self.process_group_kwargs) torch.cuda.set_device(int(self._rank)) self.device = f"cuda:{int(self._rank)}" @@ -405,13 +439,11 @@ def init_components( """Inits the runs components.""" model = model_fn() model = self.sync_device(model) - # NOTE: do not forget to wrap a model in DDP + # TODO: maybe need to update somehow value for "device_ids" if isinstance(model, nn.Module): - model = DistributedDataParallel(model, device_ids=[self.device]) + model = DistributedDataParallel(model, **self.ddp_kwargs) elif isinstance(model, dict): - model = { - k: DistributedDataParallel(v, device_ids=[self.device]) for k, v in model.items() - } + model = {k: DistributedDataParallel(v, **self.ddp_kwargs) for k, v in model.items()} # criterion criterion = criterion_fn() criterion = self.sync_device(criterion) From 16aac6e1c87b920ca2dcbe98674d2c2c0d15cd8f Mon Sep 17 00:00:00 2001 From: Dmytro Doroshenko Date: Fri, 9 Apr 2021 19:18:17 +0300 Subject: [PATCH 09/10] fix: ddp_kwargs default arguments when determined device --- catalyst/engines/apex.py | 23 +++++++++++++++++++++++ catalyst/engines/torch.py | 18 +++++++++--------- 2 files changed, 32 insertions(+), 9 deletions(-) diff --git a/catalyst/engines/apex.py b/catalyst/engines/apex.py index aa72da62bc..eb8cb80c88 100644 --- a/catalyst/engines/apex.py +++ b/catalyst/engines/apex.py @@ -1,8 +1,10 @@ from typing import Any, Dict, Union from collections import OrderedDict +import os import torch from torch import nn +import torch.distributed as dist from catalyst.engines.torch import DeviceEngine, DistributedDataParallelEngine from catalyst.settings import SETTINGS @@ -431,6 +433,27 @@ def __repr__(self): # noqa: D105 f"apex_kwargs={self.apex_kwargs})" ) + def setup_process(self, rank: int = -1, world_size: int = 1): + """Initialize DDP variables and processes. + + Args: + rank: process rank. Default is `-1`. + world_size: number of devices in netwok to expect for train. + Default is `1`. + """ + self._rank = rank + self._world_size = world_size + + self.process_group_kwargs["rank"] = rank + self.process_group_kwargs["world_size"] = world_size + os.environ["MASTER_ADDR"] = str(self.address) + os.environ["MASTER_PORT"] = str(self.port) + + dist.init_process_group(**self.process_group_kwargs) + + torch.cuda.set_device(int(self._rank)) + self.device = f"cuda:{int(self._rank)}" + def init_components( self, model_fn=None, criterion_fn=None, optimizer_fn=None, scheduler_fn=None, ): diff --git a/catalyst/engines/torch.py b/catalyst/engines/torch.py index 34274491a0..6862e593bb 100644 --- a/catalyst/engines/torch.py +++ b/catalyst/engines/torch.py @@ -331,20 +331,15 @@ def __init__( if ddp_kwargs is None: ddp_kwargs = {} self.ddp_kwargs = copy.deepcopy(ddp_kwargs) - if "device_ids" not in self.ddp_kwargs: - self.ddp_kwargs["device_ids"] = [self.device] if process_group_kwargs is None: process_group_kwargs = {} self.process_group_kwargs = copy.deepcopy(process_group_kwargs) # add missing arguments - default_values = [ - ("backend", "nccl"), - ("world_size", torch.cuda.device_count()), - ] - for required_arg, default_value in default_values: - if required_arg not in self.process_group_kwargs: - self.process_group_kwargs[required_arg] = default_value + if "backend" not in self.process_group_kwargs: + self.process_group_kwargs["backend"] = "nccl" + if "world_size" not in self.process_group_kwargs: + self.process_group_kwargs["world_size"] = torch.cuda.device_count() self._world_size = ( self.process_group_kwargs.get("world_size", None) or torch.cuda.device_count() @@ -398,13 +393,18 @@ def setup_process(self, rank: int = -1, world_size: int = 1): """ self._rank = rank self._world_size = world_size + self.process_group_kwargs["rank"] = rank self.process_group_kwargs["world_size"] = world_size os.environ["MASTER_ADDR"] = str(self.address) os.environ["MASTER_PORT"] = str(self.port) + dist.init_process_group(**self.process_group_kwargs) + torch.cuda.set_device(int(self._rank)) self.device = f"cuda:{int(self._rank)}" + if "device_ids" not in self.ddp_kwargs: + self.ddp_kwargs["device_ids"] = [self.device] def cleanup_process(self): """Clean DDP variables and processes.""" From 72015a8d952a30f75a9cadd1fb5b14d27870eb9a Mon Sep 17 00:00:00 2001 From: Dmytro Doroshenko Date: Fri, 9 Apr 2021 19:19:58 +0300 Subject: [PATCH 10/10] fix: removed redundant comment --- catalyst/engines/torch.py | 1 - 1 file changed, 1 deletion(-) diff --git a/catalyst/engines/torch.py b/catalyst/engines/torch.py index 6862e593bb..99f9daecad 100644 --- a/catalyst/engines/torch.py +++ b/catalyst/engines/torch.py @@ -439,7 +439,6 @@ def init_components( """Inits the runs components.""" model = model_fn() model = self.sync_device(model) - # TODO: maybe need to update somehow value for "device_ids" if isinstance(model, nn.Module): model = DistributedDataParallel(model, **self.ddp_kwargs) elif isinstance(model, dict):