From 214142cb74e67155cd477f5674ca725041393c6a Mon Sep 17 00:00:00 2001 From: Sergey Kolesnikov Date: Wed, 4 Nov 2020 00:13:33 +0300 Subject: [PATCH 1/9] proposal --- catalyst/callbacks/checkpoint.py | 16 +- catalyst/callbacks/control_flow.py | 2 +- catalyst/callbacks/criterion.py | 5 +- catalyst/callbacks/early_stop.py | 2 +- catalyst/callbacks/optimizer.py | 10 +- catalyst/callbacks/scheduler.py | 9 +- catalyst/callbacks/validation.py | 2 +- catalyst/contrib/callbacks/telegram_logger.py | 4 +- catalyst/core/callback.py | 164 ++++--- catalyst/core/runner.py | 448 ++++++------------ catalyst/runners/runner.py | 22 +- catalyst/runners/supervised.py | 30 +- catalyst/utils/__init__.py | 1 + catalyst/utils/misc.py | 59 +++ 14 files changed, 338 insertions(+), 436 deletions(-) diff --git a/catalyst/callbacks/checkpoint.py b/catalyst/callbacks/checkpoint.py index 46a17fe2dc..ccda8d3ca8 100644 --- a/catalyst/callbacks/checkpoint.py +++ b/catalyst/callbacks/checkpoint.py @@ -25,7 +25,7 @@ def _pack_runner(runner: "IRunner"): scheduler=runner.scheduler, epoch_metrics=dict(runner.epoch_metrics), valid_metrics=dict(runner.valid_metrics), - stage_name=runner.stage_name, + stage_name=runner.stage, epoch=runner.epoch, loader_name=runner.loader_name, loader_step=runner.loader_batch_step, @@ -65,8 +65,8 @@ def _load_checkpoint( print(f"=> Loading checkpoint {filename}") checkpoint = load_checkpoint(filename) - if not runner.stage_name.startswith("infer") and load_full: - runner.stage_name = checkpoint["stage_name"] + if not runner.stage.startswith("infer") and load_full: + runner.stage = checkpoint["stage_name"] runner.epoch = checkpoint["epoch"] runner.global_epoch = checkpoint["global_epoch"] # @TODO: should we also load, @@ -620,10 +620,7 @@ def on_epoch_end(self, runner: "IRunner") -> None: Args: runner: current runner """ - if ( - runner.stage_name.startswith("infer") - or runner.is_distributed_worker - ): + if runner.stage.startswith("infer") or runner.is_distributed_worker: return if self.save_n_best > 0: @@ -644,10 +641,7 @@ def on_stage_end(self, runner: "IRunner") -> None: Args: runner: current runner """ - if ( - runner.stage_name.startswith("infer") - or runner.is_distributed_worker - ): + if runner.stage.startswith("infer") or runner.is_distributed_worker: return log_message = "Top best models:\n" # store latest state diff --git a/catalyst/callbacks/control_flow.py b/catalyst/callbacks/control_flow.py index 2909df0bf0..791bf86f75 100644 --- a/catalyst/callbacks/control_flow.py +++ b/catalyst/callbacks/control_flow.py @@ -378,7 +378,7 @@ def on_loader_start(self, runner: "IRunner") -> None: Args: runner: current runner """ - stage = runner.stage_name + stage = runner.stage loader = runner.loader_name epoch = runner.global_epoch if self.use_global_epochs else runner.epoch diff --git a/catalyst/callbacks/criterion.py b/catalyst/callbacks/criterion.py index 0014d50360..1814fc7fcc 100644 --- a/catalyst/callbacks/criterion.py +++ b/catalyst/callbacks/criterion.py @@ -1,6 +1,7 @@ from typing import Dict, List, TYPE_CHECKING, Union from catalyst.callbacks.metric import IBatchMetricCallback +from catalyst.utils.misc import get_attr if TYPE_CHECKING: from catalyst.core.runner import IRunner @@ -55,8 +56,8 @@ def on_stage_start(self, runner: "IRunner"): Args: runner: current runner """ - criterion = runner.get_attr( - key="criterion", inner_key=self.criterion_key + criterion = get_attr( + runner, key="criterion", inner_key=self.criterion_key ) assert criterion is not None self._criterion = criterion diff --git a/catalyst/callbacks/early_stop.py b/catalyst/callbacks/early_stop.py index 11e3986197..45c4258be1 100644 --- a/catalyst/callbacks/early_stop.py +++ b/catalyst/callbacks/early_stop.py @@ -120,7 +120,7 @@ def on_epoch_end(self, runner: "IRunner") -> None: Args: runner: current runner """ - if runner.stage_name.startswith("infer"): + if runner.stage.startswith("infer"): return score = runner.valid_metrics[self.metric] diff --git a/catalyst/callbacks/optimizer.py b/catalyst/callbacks/optimizer.py index b8b86d4596..8ca7f047d8 100644 --- a/catalyst/callbacks/optimizer.py +++ b/catalyst/callbacks/optimizer.py @@ -7,7 +7,7 @@ from catalyst import registry from catalyst.core.callback import Callback, CallbackNode, CallbackOrder from catalyst.typing import Optimizer -from catalyst.utils.misc import maybe_recursive_call +from catalyst.utils.misc import get_attr, maybe_recursive_call from catalyst.utils.torch import get_optimizer_momentum if TYPE_CHECKING: @@ -149,8 +149,8 @@ def on_stage_start(self, runner: "IRunner") -> None: Args: runner(IRunner): current runner """ - self._optimizer = runner.get_attr( - key="optimizer", inner_key=self.optimizer_key + self._optimizer = get_attr( + runner, key="optimizer", inner_key=self.optimizer_key ) # device based optimization step if runner.device.type == "xla": @@ -326,8 +326,8 @@ def on_stage_start(self, runner: "IRunner") -> None: """ from torch.cuda.amp import GradScaler - self._optimizer = runner.get_attr( - key="optimizer", inner_key=self.optimizer_key + self._optimizer = get_attr( + runner, key="optimizer", inner_key=self.optimizer_key ) self.scaler = GradScaler() assert self._optimizer is not None diff --git a/catalyst/callbacks/scheduler.py b/catalyst/callbacks/scheduler.py index d820ba965d..256112e7b1 100644 --- a/catalyst/callbacks/scheduler.py +++ b/catalyst/callbacks/scheduler.py @@ -5,6 +5,7 @@ from catalyst.contrib.nn.schedulers import BatchScheduler, OneCycleLRWithWarmup from catalyst.core.callback import Callback, CallbackNode, CallbackOrder +from catalyst.utils.misc import get_attr from catalyst.utils.torch import get_optimizer_momentum if TYPE_CHECKING: @@ -173,8 +174,8 @@ def on_stage_start(self, runner: "IRunner") -> None: """ self.reduced_metric = self.reduced_metric or runner.main_metric - scheduler = runner.get_attr( - key="scheduler", inner_key=self.scheduler_key + scheduler = get_attr( + runner, key="scheduler", inner_key=self.scheduler_key ) assert scheduler is not None self._scheduler = scheduler @@ -297,8 +298,8 @@ def on_stage_start(self, runner: "IRunner") -> None: Args: runner: current runner """ - optimizer = runner.get_attr( - key="optimizer", inner_key=self.optimizer_key + optimizer = get_attr( + runner, key="optimizer", inner_key=self.optimizer_key ) assert optimizer is not None self._optimizer = optimizer diff --git a/catalyst/callbacks/validation.py b/catalyst/callbacks/validation.py index 48a7724706..f88dc6d50a 100644 --- a/catalyst/callbacks/validation.py +++ b/catalyst/callbacks/validation.py @@ -33,7 +33,7 @@ def on_epoch_end(self, runner: "IRunner") -> None: Args: runner: current runner """ - if runner.stage_name.startswith("infer"): + if runner.stage.startswith("infer"): return runner.valid_metrics = { diff --git a/catalyst/contrib/callbacks/telegram_logger.py b/catalyst/contrib/callbacks/telegram_logger.py index 5a3cf85ecb..0d60df9c35 100644 --- a/catalyst/contrib/callbacks/telegram_logger.py +++ b/catalyst/contrib/callbacks/telegram_logger.py @@ -76,7 +76,7 @@ def _send_text(self, text: str): def on_stage_start(self, runner: "IRunner"): """Notify about starting a new stage.""" if self.log_on_stage_start: - text = f"{runner.stage_name} stage was started" + text = f"{runner.stage} stage was started" self._send_text(text) @@ -115,7 +115,7 @@ def on_loader_end(self, runner: "IRunner"): def on_stage_end(self, runner: "IRunner"): """Notify about finishing a stage.""" if self.log_on_stage_end: - text = f"{runner.stage_name} stage was finished" + text = f"{runner.stage} stage was finished" self._send_text(text) diff --git a/catalyst/core/callback.py b/catalyst/core/callback.py index b5793ddf82..3f06c6b92b 100644 --- a/catalyst/core/callback.py +++ b/catalyst/core/callback.py @@ -5,6 +5,96 @@ from catalyst.core.runner import IRunner +class ICallback: + def on_experiment_start(self, runner: "IRunner"): + """Event handler for stage start. + + Args: + runner: IRunner instance. + """ + pass + + def on_stage_start(self, runner: "IRunner"): + """Event handler for stage start. + + Args: + runner: IRunner instance. + """ + pass + + def on_epoch_start(self, runner: "IRunner"): + """Event handler for epoch start. + + Args: + runner: IRunner instance. + """ + pass + + def on_loader_start(self, runner: "IRunner"): + """Event handler for loader start. + + Args: + runner: IRunner instance. + """ + pass + + def on_batch_start(self, runner: "IRunner"): + """Event handler for batch start. + + Args: + runner: IRunner instance. + """ + pass + + def on_batch_end(self, runner: "IRunner"): + """Event handler for batch end. + + Args: + runner: IRunner instance. + """ + pass + + def on_loader_end(self, runner: "IRunner"): + """Event handler for loader end. + + Args: + runner: IRunner instance. + """ + pass + + def on_epoch_end(self, runner: "IRunner"): + """Event handler for epoch end. + + Args: + runner: IRunner instance. + """ + pass + + def on_stage_end(self, runner: "IRunner"): + """Event handler for stage end. + + Args: + runner: IRunner instance. + """ + pass + + def on_experiment_end(self, runner: "IRunner"): + """Event handler for stage start. + + Args: + runner: IRunner instance. + """ + pass + + def on_exception(self, runner: "IRunner"): + """Event handler for exception case. + + Args: + runner: IRunner instance. + """ + pass + + class CallbackNode(IntFlag): """Callback node usage flag during distributed training. @@ -78,7 +168,7 @@ class CallbackScope(IntFlag): Experiment = experiment = 1 # noqa: WPS115 -class Callback: +class Callback(ICallback): """ An abstraction that lets you customize your experiment run logic. To give users maximum flexibility and extensibility Catalyst supports @@ -136,78 +226,6 @@ def __init__( self.order = order self.scope = scope - def on_stage_start(self, runner: "IRunner"): - """Event handler for stage start. - - Args: - runner: IRunner instance. - """ - pass - - def on_stage_end(self, runner: "IRunner"): - """Event handler for stage end. - - Args: - runner: IRunner instance. - """ - pass - - def on_epoch_start(self, runner: "IRunner"): - """Event handler for epoch start. - - Args: - runner: IRunner instance. - """ - pass - - def on_epoch_end(self, runner: "IRunner"): - """Event handler for epoch end. - - Args: - runner: IRunner instance. - """ - pass - - def on_loader_start(self, runner: "IRunner"): - """Event handler for loader start. - - Args: - runner: IRunner instance. - """ - pass - - def on_loader_end(self, runner: "IRunner"): - """Event handler for loader end. - - Args: - runner: IRunner instance. - """ - pass - - def on_batch_start(self, runner: "IRunner"): - """Event handler for batch start. - - Args: - runner: IRunner instance. - """ - pass - - def on_batch_end(self, runner: "IRunner"): - """Event handler for batch end. - - Args: - runner: IRunner instance. - """ - pass - - def on_exception(self, runner: "IRunner"): - """Event handler for exception case. - - Args: - runner: IRunner instance. - """ - pass - class CallbackWrapper(Callback): """Enable/disable callback execution.""" diff --git a/catalyst/core/runner.py b/catalyst/core/runner.py index 48ca2ca725..033875938a 100644 --- a/catalyst/core/runner.py +++ b/catalyst/core/runner.py @@ -1,13 +1,14 @@ -from typing import Any, Callable, Dict, Mapping, Optional, Tuple, Union +from typing import Any, Callable, Dict, Mapping, Optional, Tuple, Union, List from abc import ABC, abstractmethod from collections import defaultdict, OrderedDict +from functools import lru_cache from pathlib import Path import torch from torch import nn from torch.utils.data import DataLoader, DistributedSampler -from catalyst.core.callback import Callback, CallbackScope +from catalyst.core.callback import Callback, CallbackScope, ICallback from catalyst.core.experiment import IExperiment from catalyst.core.functional import ( filter_callbacks_by_node, @@ -35,6 +36,11 @@ from catalyst.utils.torch import any2device +@lru_cache(maxsize=42) +def _is_substring(origin_string: str, strings: List): + return any(x in origin_string for x in strings) + + class RunnerException(Exception): """Exception class for all runner errors.""" @@ -46,7 +52,7 @@ def __init__(self, message: str): super().__init__(message) -class IRunner(ABC, IRunnerLegacy, FrozenClass): +class IRunner(ABC, ICallback, IRunnerLegacy, FrozenClass): """ An abstraction that knows how to run an experiment. It contains all the logic of **how** to run the experiment, @@ -360,10 +366,11 @@ class IRunner(ABC, IRunnerLegacy, FrozenClass): """ - _experiment_fn: Callable = IExperiment - def __init__( - self, model: RunnerModel = None, device: Device = None, **kwargs, + self, + model: RunnerModel = None, + device: Device = None, + experiment_fn: Callable = IExperiment, ): """ Args: @@ -372,9 +379,9 @@ def __init__( """ self._device = None self._model = None + self.experiment = None + self._experiment_fn = experiment_fn self._prepare_inner_state(model=model, device=device) - self._unfreeze() - self._init(**kwargs) self._freeze() def _prepare_inner_state( @@ -397,7 +404,6 @@ def _prepare_inner_state( verbose: bool = False, **kwargs, ): - self._unfreeze() # main runner components: model and device to run self.device: Device = device @@ -412,6 +418,7 @@ def _prepare_inner_state( self.callbacks: Dict[str, "Callback"] = callbacks or {} # the data + self.loader = None self.loaders: OrderedDict[str, DataLoader] = loaders # and the dataflow - model input, model output self.input = None @@ -458,8 +465,8 @@ def _prepare_inner_state( self.need_exception_reraise: bool = True # stage info self.num_epochs: int = num_epochs - self.stage_name: str = stage - self.is_infer_stage: bool = self.stage_name.startswith( + self.stage: str = stage + self.is_infer_stage: bool = self.stage.startswith( SETTINGS.stage_infer_prefix ) # epoch info @@ -489,15 +496,6 @@ def _prepare_inner_state( for key, value in kwargs.items(): setattr(self, key, value) - self._freeze() - - def _init(self, **kwargs) -> None: - """ - Inner method for children's classes - to specify type for Runners' Experiment. - """ - self.experiment: IExperiment = None - @property def model(self) -> Model: """Returns the runner's model instance.""" @@ -577,138 +575,97 @@ def device(self, value: Device): self._model, "to", device=self._device or "cpu" ) - @staticmethod - def _get_experiment_components( - experiment: IExperiment, stage: str = None, device: Device = None, - ) -> Tuple[Model, Criterion, Optimizer, Scheduler, Device]: - """ - Inner method for `Experiment` components preparation. + def on_experiment_start(self, runner: "IRunner"): + assert self.experiment is not None - Check available torch device, takes model from the experiment - and creates stage-specified criterion, optimizer, scheduler for it. + def on_stage_start(self, runner: "IRunner"): + pass - Args: - stage: experiment stage name of interest - like "pretrain" / "train" / "finetune" / etc + def on_epoch_start(self, runner: "IRunner"): + assert self.loaders is not None - Returns: - tuple: model, criterion, optimizer, - scheduler and device for a given stage and model - """ - model = experiment.get_model(stage) - criterion = experiment.get_criterion(stage) - optimizer = experiment.get_optimizer(stage, model) - scheduler = experiment.get_scheduler(stage, optimizer) - model, criterion, optimizer, scheduler, device = process_components( - model=model, - criterion=criterion, - optimizer=optimizer, - scheduler=scheduler, - distributed_params=experiment.distributed_params, - device=device, - ) - return model, criterion, optimizer, scheduler, device + for loader_name, loader in self.loaders.items(): + if len(loader) == 0: + raise RunnerException( + f"DataLoader with name {loader_name} is empty." + ) - @staticmethod - def _get_experiment_callbacks( - experiment: IExperiment, stage: str, - ) -> Dict[str, Callback]: - """Inner method for `Callbacks` preparation. + self.is_infer_stage = self.stage.startswith("infer") + if not self.is_infer_stage: + assert self.valid_loader in self.loaders.keys(), ( + f"'{self.valid_loader}' " + f"should be in provided loaders: {list(self.loaders.keys())}" + ) + else: + assert not any( + x.startswith(SETTINGS.loader_train_prefix) + for x in self.loaders.keys() + ), "for inference no train loader should be passed" - Takes callbacks from the Experiment - and filters them for distributed master/worker cases. + set_global_seed(self.experiment.initial_seed + self.global_epoch + 1) - Args: - stage: stage name of interest, - like "pretrain" / "train" / "finetune" / etc + def on_loader_start(self, runner: "IRunner"): + assert self.loader is not None + self.loader_len = len(self.loader) + if self.loader_len == 0: + raise RunnerException( + f"DataLoader with name {self.loader_name} is empty." + ) + self.loader_batch_size = ( + self.loader.batch_sampler.batch_size + if self.loader.batch_sampler is not None + else self.loader.batch_size + ) + self.loader_sample_step = 0 - Returns: - OrderedDict[str, Callback]: Ordered dictionary - with callbacks for current experiment stage. - """ - callbacks = experiment.get_callbacks(stage) - callbacks = filter_callbacks_by_node(callbacks) - callbacks = sort_callbacks_by_order(callbacks) - return callbacks + self.is_train_loader = self.loader_name.startswith("train") + self.is_valid_loader = self.loader_name.startswith("valid") + self.is_infer_loader = self.loader_name.startswith("infer") + maybe_recursive_call(self.model, "train", mode=self.is_train_loader) - def get_attr(self, key: str, inner_key: str = None) -> Any: - """ - Alias for python `getattr` method. Useful for Callbacks preparation - and cases with multi-criterion, multi-optimizer setup. - For example, when you would like to train multi-task classification. - - Used to get a named attribute from a `IRunner` by `key` keyword; - for example\ - :: - - # example 1 - runner.get_attr("criterion") - # is equivalent to - runner.criterion - - # example 2 - runner.get_attr("optimizer") - # is equivalent to - runner.optimizer - - # example 3 - runner.get_attr("scheduler") - # is equivalent to - runner.scheduler - - With `inner_key` usage, it suppose to find a dictionary under `key`\ - and would get `inner_key` from this dict; for example, - :: - - # example 1 - runner.get_attr("criterion", "bce") - # is equivalent to - runner.criterion["bce"] - - # example 2 - runner.get_attr("optimizer", "adam") - # is equivalent to - runner.optimizer["adam"] - - # example 3 - runner.get_attr("scheduler", "adam") - # is equivalent to - runner.scheduler["adam"] + if isinstance(self.loader.sampler, DistributedSampler): + self.loader.sampler.set_epoch(self.epoch) - Args: - key: name for attribute of interest, - like `criterion`, `optimizer`, `scheduler` - inner_key: name of inner dictionary key + set_global_seed(self.experiment.initial_seed + self.global_epoch + 1) - Returns: - inner attribute - """ - if inner_key is None: - return getattr(self, key) + def on_batch_start(self, runner: "IRunner"): + self.global_batch_step += 1 + batch = self.input + if isinstance(batch, dict): + self.batch_size = len(next(iter(batch.values()))) else: - return getattr(self, key)[inner_key] + self.batch_size = len(batch[0]) + self.global_sample_step += self.batch_size + self.loader_sample_step += self.batch_size - def _prepare_for_stage(self, stage: str) -> None: - """ - Inner method to prepare `Runner` for the specified stage. + def on_batch_end(self, runner: "IRunner"): + pass - Args: - stage: stage name of interest, - like "pretrain" / "train" / "finetune" / etc - """ + def on_loader_end(self, runner: "IRunner"): pass - def _prepare_for_epoch(self, stage: str, epoch: int) -> None: - """ - Inner method to prepare `Runner` for the specified stage and epoch. + def on_epoch_end(self, runner: "IRunner"): + self.global_epoch += 1 + self.epoch += 1 - Args: - stage: stage name of interest, - like "pretrain" / "train" / "finetune" / etc - epoch: epoch index - """ + def on_stage_end(self, runner: "IRunner"): pass + def on_experiment_end(self, runner: "IRunner"): + pass + + def on_exception(self, runner: "IRunner"): + from catalyst.callbacks.exception import ExceptionCallback + + def _exception_handler_check(callbacks: Union[OrderedDict, Dict]): + return callbacks is not None and any( + issubclass(x.__class__, ExceptionCallback) + for x in callbacks.values() + ) + + if not _exception_handler_check(getattr(self, "callbacks", None)): + raise self.exception + def _run_event(self, event: str) -> None: """Inner method to run specified event on Runners' callbacks. @@ -720,26 +677,13 @@ def _run_event(self, event: str) -> None: :py:mod:`catalyst.core.callback.Callback` documentation. """ + # @TODO: how to remove self duplication? + if _is_substring(event, ["start", "exception"]): + getattr(self, event)(self) for callback in self.callbacks.values(): getattr(callback, event)(self) - - def _batch2device( - self, batch: Mapping[str, Any], device: Device, - ) -> Mapping[str, Any]: - """ - Inner method to transfer incoming data batches to Runners' device. - - Args: - batch (Mapping[str, Any]): dictionary with data batches - from DataLoader. - device: torch device - - Returns: - Mapping[str, Any]: same structure as value, - but all tensors and np.arrays moved to device - """ - output = any2device(batch, device) - return output + if _is_substring(event, ["end"]): + getattr(self, event)(self) @abstractmethod def _handle_batch(self, batch: Mapping[str, Any]) -> None: @@ -753,146 +697,43 @@ def _handle_batch(self, batch: Mapping[str, Any]) -> None: """ pass - def _run_batch(self, batch: Mapping[str, Any]) -> None: - """ - Inner method to run train step on specified data batch, - with batch callbacks events. - - Args: - batch (Mapping[str, Any]): dictionary with data batches - from DataLoader. - """ - if isinstance(batch, dict): - self.batch_size = len(next(iter(batch.values()))) - else: - self.batch_size = len(batch[0]) - self.global_sample_step += self.batch_size - self.loader_sample_step += self.batch_size - batch = self._batch2device(batch, self.device) - self.input = batch - + def _run_batch(self) -> None: + self.input = any2device(self.input, self.device) self._run_event("on_batch_start") - self._handle_batch(batch=batch) + self._handle_batch(batch=self.input) self._run_event("on_batch_end") - def _run_loader(self, loader: DataLoader) -> None: - """ - Inner method to pass whole DataLoader through Runner, - with loader callbacks events. - - Args: - loader: dataloader to iterate - """ - if len(loader) == 0: - raise RunnerException( - f"DataLoader with name {self.loader_name} is empty." - ) - - self.loader_batch_size = ( - loader.batch_sampler.batch_size - if loader.batch_sampler is not None - else loader.batch_size - ) - - self.loader_sample_step = 0 - for i, batch in enumerate(loader): - self.global_batch_step += 1 - self.loader_batch_step = i + 1 - self._run_batch(batch) + def _run_loader(self) -> None: + self._run_event("on_loader_start") + for self.loader_batch_step, self.input in enumerate(self.loader): + self._run_batch() if self.need_early_stop: self.need_early_stop = False break + self._run_event("on_loader_end") - def _run_epoch(self, stage: str, epoch: int) -> None: - """ - Inner method to run epoch on Runner, - with epoch callbacks events. - - Args: - stage: stage name of interest, - like "pretrain" / "train" / "finetune" / etc - epoch: epoch index - """ - self._prepare_for_epoch(stage=stage, epoch=epoch) - assert self.loaders is not None - - for loader_name, loader in self.loaders.items(): - if len(loader) == 0: - raise RunnerException( - f"DataLoader with name {loader_name} is empty." - ) - - self.is_infer_stage = self.stage_name.startswith("infer") - if not self.is_infer_stage: - assert self.valid_loader in self.loaders.keys(), ( - f"'{self.valid_loader}' " - f"should be in provided loaders: {list(self.loaders.keys())}" - ) - else: - assert not any( - x.startswith(SETTINGS.loader_train_prefix) - for x in self.loaders.keys() - ), "for inference no train loader should be passed" - - for loader_name, loader in self.loaders.items(): - self.loader_name = loader_name - self.loader_len = len(loader) - self.is_train_loader = loader_name.startswith( - SETTINGS.loader_train_prefix - ) - self.is_valid_loader = loader_name.startswith( - SETTINGS.loader_valid_prefix - ) - self.is_infer_loader = loader_name.startswith( - SETTINGS.loader_infer_prefix - ) - maybe_recursive_call( - self.model, "train", mode=self.is_train_loader, - ) - - if ( - isinstance(loader.sampler, DistributedSampler) - and not self.is_infer_stage - ): - loader.sampler.set_epoch(self.epoch) - - set_global_seed( - self.experiment.initial_seed + self.global_epoch + 1 - ) - self._run_event("on_loader_start") + def _run_epoch(self) -> None: + self._run_event("on_epoch_start") + for self.loader_name, self.loader in self.loaders.items(): with torch.set_grad_enabled(self.is_train_loader): - self._run_loader(loader) - self._run_event("on_loader_end") - - def _run_stage(self, stage: str) -> None: - """ - Inner method to run stage on Runner, - with stage callbacks events. - - Args: - stage: stage name of interest, - like "pretrain" / "train" / "finetune" / etc - - """ - self._prepare_for_stage(stage) + self._run_loader() + self._run_event("on_epoch_end") + def _run_stage(self) -> None: self._run_event("on_stage_start") while self.epoch < self.num_epochs + 1: - set_global_seed( - self.experiment.initial_seed + self.global_epoch + 1 - ) - self._run_event("on_epoch_start") - self._run_epoch(stage=stage, epoch=self.epoch) - self._run_event("on_epoch_end") - + self._run_epoch() if self.need_early_stop: self.need_early_stop = False break - - self.global_epoch += 1 - self.epoch += 1 self._run_event("on_stage_end") + def _run_experiment(self) -> None: + self._run_event("on_experiment_start") + for self.stage in self.experiment.stages: + self._run_stage() + self._run_event("on_experiment_end") + def run_experiment(self, experiment: IExperiment = None) -> "IRunner": """ Starts the experiment. @@ -910,26 +751,11 @@ def run_experiment(self, experiment: IExperiment = None) -> "IRunner": no handler we found into callbacks """ self.experiment = experiment or self.experiment - assert self.experiment is not None - try: - for stage in self.experiment.stages: - self._run_stage(stage) + self._run_experiment() except (Exception, KeyboardInterrupt) as ex: - from catalyst.callbacks.exception import ExceptionCallback - - def _exception_handler_check(callbacks: Union[OrderedDict, Dict]): - return callbacks is not None and any( - issubclass(x.__class__, ExceptionCallback) - for x in callbacks.values() - ) - - if _exception_handler_check(getattr(self, "callbacks", None)): - self.exception = ex - self._run_event("on_exception") - else: - raise ex - + self.exception = ex + self._run_event("on_exception") return self @@ -939,7 +765,7 @@ class IStageBasedRunner(IRunner): datasources per stage. """ - def _prepare_for_stage(self, stage: str): + def on_stage_start(self, runner: "IRunner"): """Inner method to prepare `Runner` for the specified stage. Sets `Experiment` initial seed. @@ -953,27 +779,30 @@ def _prepare_for_stage(self, stage: str): like "pretrain" / "train" / "finetune" / etc """ set_global_seed(self.experiment.initial_seed) - loaders = self.experiment.get_loaders(stage=stage) + loaders = self.experiment.get_loaders(stage=self.stage) loaders = validate_loaders(loaders) self.loaders = loaders set_global_seed(self.experiment.initial_seed) - ( - model, - criterion, - optimizer, - scheduler, - device, - ) = self._get_experiment_components( - experiment=self.experiment, stage=stage, device=self.device + model = self.experiment.get_model(self.stage) + criterion = self.experiment.get_criterion(self.stage) + optimizer = self.experiment.get_optimizer(self.stage, model) + scheduler = self.experiment.get_scheduler(self.stage, optimizer) + model, criterion, optimizer, scheduler, device = process_components( + model=model, + criterion=criterion, + optimizer=optimizer, + scheduler=scheduler, + distributed_params=self.experiment.distributed_params, + device=self.device, ) set_global_seed(self.experiment.initial_seed) - callbacks = self._get_experiment_callbacks( - experiment=self.experiment, stage=stage - ) + callbacks = self.experiment.get_callbacks(self.stage) + callbacks = filter_callbacks_by_node(callbacks) + callbacks = sort_callbacks_by_order(callbacks) - migrating_params = dict(**self.experiment.get_stage_params(stage)) + migrating_params = dict(**self.experiment.get_stage_params(self.stage)) migrate_from_previous_stage = migrating_params.get( "migrate_from_previous_stage", True ) @@ -1000,7 +829,7 @@ def _prepare_for_stage(self, stage: str): ) self._prepare_inner_state( - stage=stage, + stage=self.stage, model=model, device=device, criterion=criterion, @@ -1010,6 +839,7 @@ def _prepare_for_stage(self, stage: str): loaders=getattr(self, "loaders", None), **migrating_params, ) + super().on_stage_start(runner) __all__ = ["IRunner", "IStageBasedRunner", "RunnerException"] diff --git a/catalyst/runners/runner.py b/catalyst/runners/runner.py index 1764a98d00..4848a82c47 100644 --- a/catalyst/runners/runner.py +++ b/catalyst/runners/runner.py @@ -10,7 +10,14 @@ from catalyst.core.functional import sort_callbacks_by_order from catalyst.core.runner import IStageBasedRunner from catalyst.experiments.experiment import Experiment -from catalyst.typing import Criterion, Device, Model, Optimizer, Scheduler +from catalyst.typing import ( + Criterion, + Device, + Model, + Optimizer, + RunnerModel, + Scheduler, +) from catalyst.utils.checkpoint import load_checkpoint, unpack_checkpoint from catalyst.utils.components import process_components from catalyst.utils.misc import maybe_recursive_call @@ -29,10 +36,15 @@ class Runner(IStageBasedRunner): Deep Learning Runner for supervised, unsupervised, gan, etc runs. """ - _experiment_fn: Callable = Experiment - - def _init(self, **kwargs): - self.experiment: Experiment = None + def __init__( + self, + model: RunnerModel = None, + device: Device = None, + experiment_fn: Callable = Experiment, + ): + super().__init__( + model=model, device=device, experiment_fn=experiment_fn + ) def train( self, diff --git a/catalyst/runners/supervised.py b/catalyst/runners/supervised.py index 1a1e9d9763..a18a144259 100644 --- a/catalyst/runners/supervised.py +++ b/catalyst/runners/supervised.py @@ -13,8 +13,6 @@ class SupervisedRunner(Runner): """Runner for experiments with supervised model.""" - _experiment_fn: Callable = SupervisedExperiment - def __init__( self, model: RunnerModel = None, @@ -22,6 +20,7 @@ def __init__( input_key: Any = "features", output_key: Any = "logits", input_target_key: str = "targets", + experiment_fn: Callable = SupervisedExperiment, ): """ Args: @@ -33,27 +32,12 @@ def __init__( input_target_key: Key in batch dict mapping for target """ super().__init__( - model=model, - device=device, - input_key=input_key, - output_key=output_key, - input_target_key=input_target_key, + model=model, device=device, experiment_fn=experiment_fn ) - - def _init( - self, - input_key: Any = "features", - output_key: Any = "logits", - input_target_key: str = "targets", - ): - """ - Args: - input_key: Key in batch dict mapping for model input - output_key: Key in output dict model output - will be stored under - input_target_key: Key in batch dict mapping for target - """ - self.experiment: SupervisedExperiment = None + self._device = None + self._model = None + self._experiment_fn = experiment_fn + self._prepare_inner_state(model=model, device=device) self.input_key = input_key self.output_key = output_key @@ -83,6 +67,8 @@ def _init( else: raise NotImplementedError() + self._freeze() + def _batch2device(self, batch: Mapping[str, Any], device: Device): if isinstance(batch, (tuple, list)): assert len(batch) == 2 diff --git a/catalyst/utils/__init__.py b/catalyst/utils/__init__.py index 0f247ff75d..e1b433e76c 100644 --- a/catalyst/utils/__init__.py +++ b/catalyst/utils/__init__.py @@ -63,6 +63,7 @@ is_exception, maybe_recursive_call, fn_ends_with_pass, + get_attr, ) from catalyst.utils.numpy import get_one_hot from catalyst.utils.parser import parse_config_args, parse_args_uargs diff --git a/catalyst/utils/misc.py b/catalyst/utils/misc.py index d7824f258d..fec55ea388 100644 --- a/catalyst/utils/misc.py +++ b/catalyst/utils/misc.py @@ -161,6 +161,64 @@ def fn_ends_with_pass(fn: Callable[..., Any]): return False +def get_attr(obj: Any, key: str, inner_key: str = None) -> Any: + """ + Alias for python `getattr` method. Useful for Callbacks preparation + and cases with multi-criterion, multi-optimizer setup. + For example, when you would like to train multi-task classification. + + Used to get a named attribute from a `IRunner` by `key` keyword; + for example\ + :: + + # example 1 + runner.get_attr("criterion") + # is equivalent to + runner.criterion + + # example 2 + runner.get_attr("optimizer") + # is equivalent to + runner.optimizer + + # example 3 + runner.get_attr("scheduler") + # is equivalent to + runner.scheduler + + With `inner_key` usage, it suppose to find a dictionary under `key`\ + and would get `inner_key` from this dict; for example, + :: + + # example 1 + runner.get_attr("criterion", "bce") + # is equivalent to + runner.criterion["bce"] + + # example 2 + runner.get_attr("optimizer", "adam") + # is equivalent to + runner.optimizer["adam"] + + # example 3 + runner.get_attr("scheduler", "adam") + # is equivalent to + runner.scheduler["adam"] + + Args: + key: name for attribute of interest, + like `criterion`, `optimizer`, `scheduler` + inner_key: name of inner dictionary key + + Returns: + inner attribute + """ + if inner_key is None: + return getattr(obj, key) + else: + return getattr(obj, key)[inner_key] + + __all__ = [ "copy_directory", "format_metric", @@ -170,4 +228,5 @@ def fn_ends_with_pass(fn: Callable[..., Any]): "is_exception", "maybe_recursive_call", "fn_ends_with_pass", + "get_attr", ] From 3395821d20edc738a65a2ac0329bf1bff43f35db Mon Sep 17 00:00:00 2001 From: Sergey Kolesnikov Date: Thu, 5 Nov 2020 09:23:59 +0300 Subject: [PATCH 2/9] proposal 2 --- catalyst/core/runner.py | 68 ++++++++++++++++------------------ catalyst/runners/supervised.py | 6 +-- 2 files changed, 35 insertions(+), 39 deletions(-) diff --git a/catalyst/core/runner.py b/catalyst/core/runner.py index 033875938a..023d282857 100644 --- a/catalyst/core/runner.py +++ b/catalyst/core/runner.py @@ -1,4 +1,4 @@ -from typing import Any, Callable, Dict, Mapping, Optional, Tuple, Union, List +from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple, Union from abc import ABC, abstractmethod from collections import defaultdict, OrderedDict from functools import lru_cache @@ -37,7 +37,7 @@ @lru_cache(maxsize=42) -def _is_substring(origin_string: str, strings: List): +def _is_substring(origin_string: str, strings: Tuple): return any(x in origin_string for x in strings) @@ -382,11 +382,11 @@ def __init__( self.experiment = None self._experiment_fn = experiment_fn self._prepare_inner_state(model=model, device=device) - self._freeze() + # self._freeze() def _prepare_inner_state( self, - stage: str = SETTINGS.stage_infer_prefix, + stage: str = "infer", device: Device = None, model: RunnerModel = None, criterion: RunnerCriterion = None, @@ -398,18 +398,19 @@ def _prepare_inner_state( num_epochs: int = 1, main_metric: str = "loss", minimize_metric: bool = True, - valid_loader: str = SETTINGS.loader_valid_prefix, + valid_loader: str = "valid", checkpoint_data: Dict = None, is_check_run: bool = False, verbose: bool = False, **kwargs, ): - + # @TODO: move/split this method to callbacks group + # here should be only a small part of it # main runner components: model and device to run self.device: Device = device self.model: RunnerModel = model - # extra experiment components, + # experiment components, # use `catalyst.core.IExperiment` to setup them self.criterion: RunnerCriterion = criterion self.optimizer: RunnerOptimizer = optimizer @@ -451,7 +452,7 @@ def _prepare_inner_state( self.is_best_valid: bool = False self.best_valid_metrics: Dict = defaultdict(None) - # distributed info + # distributed info (@TODO: move to Engine?) self.distributed_rank: int = get_rank() self.is_distributed_master: bool = ~(self.distributed_rank > 0) self.is_distributed_worker: bool = self.distributed_rank > 0 @@ -466,9 +467,7 @@ def _prepare_inner_state( # stage info self.num_epochs: int = num_epochs self.stage: str = stage - self.is_infer_stage: bool = self.stage.startswith( - SETTINGS.stage_infer_prefix - ) + self.is_infer_stage: bool = self.stage.startswith("infer") # epoch info self.epoch: int = 1 # loader info @@ -578,8 +577,12 @@ def device(self, value: Device): def on_experiment_start(self, runner: "IRunner"): assert self.experiment is not None + set_global_seed(self.experiment.initial_seed + self.global_epoch + 1) + def on_stage_start(self, runner: "IRunner"): - pass + assert self.stage is not None + + set_global_seed(self.experiment.initial_seed + self.global_epoch + 1) def on_epoch_start(self, runner: "IRunner"): assert self.loaders is not None @@ -590,7 +593,6 @@ def on_epoch_start(self, runner: "IRunner"): f"DataLoader with name {loader_name} is empty." ) - self.is_infer_stage = self.stage.startswith("infer") if not self.is_infer_stage: assert self.valid_loader in self.loaders.keys(), ( f"'{self.valid_loader}' " @@ -606,6 +608,7 @@ def on_epoch_start(self, runner: "IRunner"): def on_loader_start(self, runner: "IRunner"): assert self.loader is not None + self.loader_len = len(self.loader) if self.loader_len == 0: raise RunnerException( @@ -629,12 +632,13 @@ def on_loader_start(self, runner: "IRunner"): set_global_seed(self.experiment.initial_seed + self.global_epoch + 1) def on_batch_start(self, runner: "IRunner"): - self.global_batch_step += 1 - batch = self.input - if isinstance(batch, dict): - self.batch_size = len(next(iter(batch.values()))) + if isinstance(self.input, dict): + self.batch_size = len(next(iter(self.input.values()))) else: - self.batch_size = len(batch[0]) + self.batch_size = len(self.input[0]) + + self.global_batch_step += 1 + # self.loader_batch_step += 1 self.global_sample_step += self.batch_size self.loader_sample_step += self.batch_size @@ -677,14 +681,17 @@ def _run_event(self, event: str) -> None: :py:mod:`catalyst.core.callback.Callback` documentation. """ - # @TODO: how to remove self duplication? - if _is_substring(event, ["start", "exception"]): + # @TODO: how to remove self duplication? and does it really matter? + if _is_substring(event, ("start", "exception")): getattr(self, event)(self) for callback in self.callbacks.values(): getattr(callback, event)(self) - if _is_substring(event, ["end"]): + if _is_substring(event, ("end")): getattr(self, event)(self) + def _handle_batch_device(self, batch: Mapping[str, Any]): + return any2device(batch, self.device) + @abstractmethod def _handle_batch(self, batch: Mapping[str, Any]) -> None: """ @@ -698,7 +705,7 @@ def _handle_batch(self, batch: Mapping[str, Any]) -> None: pass def _run_batch(self) -> None: - self.input = any2device(self.input, self.device) + self.input = self._handle_batch_device(batch=self.input) self._run_event("on_batch_start") self._handle_batch(batch=self.input) self._run_event("on_batch_end") @@ -766,22 +773,12 @@ class IStageBasedRunner(IRunner): """ def on_stage_start(self, runner: "IRunner"): - """Inner method to prepare `Runner` for the specified stage. - - Sets `Experiment` initial seed. - Prepares experiment components with `self._get_experiment_components`. - Prepares callbacks with `self._get_experiment_callbacks`. - Prepares inner state with `self._prepare_inner_state` - Additionally sets `Experiment` datasources for specified stage. + super().on_stage_start(runner) - Args: - stage: stage name of interest, - like "pretrain" / "train" / "finetune" / etc - """ set_global_seed(self.experiment.initial_seed) loaders = self.experiment.get_loaders(stage=self.stage) loaders = validate_loaders(loaders) - self.loaders = loaders + # self.loaders = loaders set_global_seed(self.experiment.initial_seed) model = self.experiment.get_model(self.stage) @@ -836,10 +833,9 @@ def on_stage_start(self, runner: "IRunner"): optimizer=optimizer, scheduler=scheduler, callbacks=callbacks, - loaders=getattr(self, "loaders", None), + loaders=loaders, **migrating_params, ) - super().on_stage_start(runner) __all__ = ["IRunner", "IStageBasedRunner", "RunnerException"] diff --git a/catalyst/runners/supervised.py b/catalyst/runners/supervised.py index a18a144259..378ed3d80e 100644 --- a/catalyst/runners/supervised.py +++ b/catalyst/runners/supervised.py @@ -69,11 +69,11 @@ def __init__( self._freeze() - def _batch2device(self, batch: Mapping[str, Any], device: Device): + def _handle_batch_device(self, batch: Mapping[str, Any]): if isinstance(batch, (tuple, list)): assert len(batch) == 2 batch = {self.input_key: batch[0], self.target_key: batch[1]} - batch = super()._batch2device(batch, device) + batch = super()._handle_batch_device(batch) return batch def _process_input_str(self, batch: Mapping[str, Any], **kwargs): @@ -148,7 +148,7 @@ def predict_batch( Returns: Mapping[str, Any]: model output dictionary """ - batch = self._batch2device(batch, self.device) + batch = self._handle_batch_device(batch) output = self.forward(batch, **kwargs) return output From 7e9b117954e04fba6cd90f9820dd1275821a0166 Mon Sep 17 00:00:00 2001 From: Sergey Kolesnikov Date: Thu, 5 Nov 2020 09:29:03 +0300 Subject: [PATCH 3/9] proposal 3, and now it works ;) --- catalyst/core/runner.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/catalyst/core/runner.py b/catalyst/core/runner.py index 023d282857..881a33990b 100644 --- a/catalyst/core/runner.py +++ b/catalyst/core/runner.py @@ -686,7 +686,7 @@ def _run_event(self, event: str) -> None: getattr(self, event)(self) for callback in self.callbacks.values(): getattr(callback, event)(self) - if _is_substring(event, ("end")): + if _is_substring(event, ("end",)): getattr(self, event)(self) def _handle_batch_device(self, batch: Mapping[str, Any]): @@ -712,18 +712,18 @@ def _run_batch(self) -> None: def _run_loader(self) -> None: self._run_event("on_loader_start") - for self.loader_batch_step, self.input in enumerate(self.loader): - self._run_batch() - if self.need_early_stop: - self.need_early_stop = False - break + with torch.set_grad_enabled(self.is_train_loader): + for self.loader_batch_step, self.input in enumerate(self.loader): + self._run_batch() + if self.need_early_stop: + self.need_early_stop = False + break self._run_event("on_loader_end") def _run_epoch(self) -> None: self._run_event("on_epoch_start") for self.loader_name, self.loader in self.loaders.items(): - with torch.set_grad_enabled(self.is_train_loader): - self._run_loader() + self._run_loader() self._run_event("on_epoch_end") def _run_stage(self) -> None: From 6543d006a01dc69eb8e8630be5e5999c12d6ba63 Mon Sep 17 00:00:00 2001 From: Sergey Kolesnikov Date: Thu, 5 Nov 2020 09:31:35 +0300 Subject: [PATCH 4/9] proposal 3, and now it works 2 ;)) --- catalyst/runners/supervised.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/catalyst/runners/supervised.py b/catalyst/runners/supervised.py index 378ed3d80e..711fe42a55 100644 --- a/catalyst/runners/supervised.py +++ b/catalyst/runners/supervised.py @@ -67,7 +67,7 @@ def __init__( else: raise NotImplementedError() - self._freeze() + # self._freeze() def _handle_batch_device(self, batch: Mapping[str, Any]): if isinstance(batch, (tuple, list)): From 1939b6c94b2547f31c4737ab5c4d833b0b4bba4e Mon Sep 17 00:00:00 2001 From: Sergey Kolesnikov Date: Sun, 8 Nov 2020 11:46:02 +0300 Subject: [PATCH 5/9] renames --- catalyst/callbacks/checkpoint.py | 2 +- catalyst/callbacks/control_flow.py | 2 +- catalyst/callbacks/logging.py | 10 ++-- catalyst/callbacks/metric.py | 2 +- catalyst/callbacks/metrics/cmc_score.py | 2 +- .../tests/test_periodic_loader_callback.py | 2 +- catalyst/contrib/callbacks/alchemy_logger.py | 4 +- .../callbacks/confusion_matrix_logger.py | 2 +- .../contrib/callbacks/inference_callback.py | 2 +- catalyst/contrib/callbacks/knn_metric.py | 2 +- catalyst/contrib/callbacks/mask_inference.py | 4 +- catalyst/contrib/callbacks/neptune_logger.py | 4 +- catalyst/contrib/callbacks/telegram_logger.py | 4 +- catalyst/contrib/callbacks/wandb_logger.py | 4 +- catalyst/core/callback.py | 60 ++++++++++--------- catalyst/core/legacy.py | 15 +++++ catalyst/core/runner.py | 39 ++++++------ catalyst/runners/supervised.py | 22 +++---- .../core_batch_overfit_callback.py | 2 +- 19 files changed, 101 insertions(+), 83 deletions(-) diff --git a/catalyst/callbacks/checkpoint.py b/catalyst/callbacks/checkpoint.py index ccda8d3ca8..84f5d4d518 100644 --- a/catalyst/callbacks/checkpoint.py +++ b/catalyst/callbacks/checkpoint.py @@ -27,7 +27,7 @@ def _pack_runner(runner: "IRunner"): valid_metrics=dict(runner.valid_metrics), stage_name=runner.stage, epoch=runner.epoch, - loader_name=runner.loader_name, + loader_name=runner.loader_key, loader_step=runner.loader_batch_step, global_epoch=runner.global_epoch, checkpoint_data=runner.checkpoint_data, diff --git a/catalyst/callbacks/control_flow.py b/catalyst/callbacks/control_flow.py index 791bf86f75..c35ed35caa 100644 --- a/catalyst/callbacks/control_flow.py +++ b/catalyst/callbacks/control_flow.py @@ -379,7 +379,7 @@ def on_loader_start(self, runner: "IRunner") -> None: runner: current runner """ stage = runner.stage - loader = runner.loader_name + loader = runner.loader_key epoch = runner.global_epoch if self.use_global_epochs else runner.epoch if self.filter_fn is not None: diff --git a/catalyst/callbacks/logging.py b/catalyst/callbacks/logging.py index 6e2a24b2d1..3b4972677c 100644 --- a/catalyst/callbacks/logging.py +++ b/catalyst/callbacks/logging.py @@ -66,7 +66,7 @@ def on_loader_start(self, runner: "IRunner"): self.tqdm = tqdm( total=runner.loader_len, desc=f"{runner.epoch}/{runner.num_epochs}" - f" * Epoch ({runner.loader_name})", + f" * Epoch ({runner.loader_key})", leave=True, ncols=0, file=sys.stdout, @@ -222,9 +222,9 @@ def on_stage_start(self, runner: "IRunner") -> None: def on_loader_start(self, runner: "IRunner"): """Prepare tensorboard writers for the current stage.""" - if runner.loader_name not in self.loggers: - log_dir = os.path.join(runner.logdir, f"{runner.loader_name}_log") - self.loggers[runner.loader_name] = SummaryWriter(log_dir) + if runner.loader_key not in self.loggers: + log_dir = os.path.join(runner.logdir, f"{runner.loader_key}_log") + self.loggers[runner.loader_key] = SummaryWriter(log_dir) def on_batch_end(self, runner: "IRunner"): """Translate batch metrics to tensorboard.""" @@ -232,7 +232,7 @@ def on_batch_end(self, runner: "IRunner"): return if self.log_on_batch_end: - mode = runner.loader_name + mode = runner.loader_key metrics = runner.batch_metrics self._log_metrics( metrics=metrics, diff --git a/catalyst/callbacks/metric.py b/catalyst/callbacks/metric.py index 43ef3a9e6e..96280358f3 100644 --- a/catalyst/callbacks/metric.py +++ b/catalyst/callbacks/metric.py @@ -498,7 +498,7 @@ def on_loader_end(self, runner: "IRunner") -> None: value = value.mean runner.loader_metrics[key] = value for key, value in runner.loader_metrics.items(): - runner.epoch_metrics[f"{runner.loader_name}_{key}"] = value + runner.epoch_metrics[f"{runner.loader_key}_{key}"] = value # backward compatibility diff --git a/catalyst/callbacks/metrics/cmc_score.py b/catalyst/callbacks/metrics/cmc_score.py index c913b1c71e..c0c09f8ec2 100644 --- a/catalyst/callbacks/metrics/cmc_score.py +++ b/catalyst/callbacks/metrics/cmc_score.py @@ -121,7 +121,7 @@ def on_batch_end(self, runner: "IRunner"): def on_loader_start(self, runner: "IRunner"): """On loader start action""" - dataset = runner.loaders[runner.loader_name].dataset + dataset = runner.loaders[runner.loader_key].dataset assert isinstance(dataset, QueryGalleryDataset) self._query_size = dataset.query_size self._gallery_size = dataset.gallery_size diff --git a/catalyst/callbacks/tests/test_periodic_loader_callback.py b/catalyst/callbacks/tests/test_periodic_loader_callback.py index be1190d7c4..5aaf181000 100644 --- a/catalyst/callbacks/tests/test_periodic_loader_callback.py +++ b/catalyst/callbacks/tests/test_periodic_loader_callback.py @@ -746,7 +746,7 @@ def __init__(self, values): self.values = values def on_loader_end(self, runner: "IRunner") -> None: - score = self.values[runner.loader_name][runner.epoch] + score = self.values[runner.loader_key][runner.epoch] runner.loader_metrics["metric"] = score old_stdout = sys.stdout diff --git a/catalyst/contrib/callbacks/alchemy_logger.py b/catalyst/contrib/callbacks/alchemy_logger.py index 32272a83f1..0e720bd7d6 100644 --- a/catalyst/contrib/callbacks/alchemy_logger.py +++ b/catalyst/contrib/callbacks/alchemy_logger.py @@ -108,7 +108,7 @@ def _log_metrics( def on_batch_end(self, runner: "IRunner"): """Translate batch metrics to Alchemy.""" if self.log_on_batch_end: - mode = runner.loader_name + mode = runner.loader_key metrics = runner.batch_metrics self._log_metrics( metrics=metrics, @@ -120,7 +120,7 @@ def on_batch_end(self, runner: "IRunner"): def on_loader_end(self, runner: "IRunner"): """Translate loader metrics to Alchemy.""" if self.log_on_epoch_end: - mode = runner.loader_name + mode = runner.loader_key metrics = runner.loader_metrics self._log_metrics( metrics=metrics, diff --git a/catalyst/contrib/callbacks/confusion_matrix_logger.py b/catalyst/contrib/callbacks/confusion_matrix_logger.py index a8f6465811..34f35e19f5 100644 --- a/catalyst/contrib/callbacks/confusion_matrix_logger.py +++ b/catalyst/contrib/callbacks/confusion_matrix_logger.py @@ -146,7 +146,7 @@ def on_loader_end(self, runner: "IRunner"): if runner.distributed_rank <= 0: tb_callback = runner.callbacks[self.tensorboard_callback_name] self._plot_confusion_matrix( - logger=tb_callback.loggers[runner.loader_name], + logger=tb_callback.loggers[runner.loader_key], epoch=runner.global_epoch, confusion_matrix=confusion_matrix, class_names=class_names, diff --git a/catalyst/contrib/callbacks/inference_callback.py b/catalyst/contrib/callbacks/inference_callback.py index 5ffcae3361..30c618b937 100644 --- a/catalyst/contrib/callbacks/inference_callback.py +++ b/catalyst/contrib/callbacks/inference_callback.py @@ -72,7 +72,7 @@ def on_loader_end(self, runner: "IRunner"): } if self.out_prefix is not None: for key, value in self.predictions.items(): - suffix = ".".join([runner.loader_name, key]) + suffix = ".".join([runner.loader_key, key]) np.save(f"{self.out_prefix}/{suffix}.npy", value) diff --git a/catalyst/contrib/callbacks/knn_metric.py b/catalyst/contrib/callbacks/knn_metric.py index 4b36bc5bb9..0f60a2d63d 100644 --- a/catalyst/contrib/callbacks/knn_metric.py +++ b/catalyst/contrib/callbacks/knn_metric.py @@ -203,7 +203,7 @@ def on_loader_end(self, runner: "IRunner") -> None: "labels": self.targets, } - self.sets[runner.loader_name] = s + self.sets[runner.loader_key] = s y_true, y_pred = self._knn(s) diff --git a/catalyst/contrib/callbacks/mask_inference.py b/catalyst/contrib/callbacks/mask_inference.py index 94137351d6..22c9294767 100644 --- a/catalyst/contrib/callbacks/mask_inference.py +++ b/catalyst/contrib/callbacks/mask_inference.py @@ -75,7 +75,7 @@ def on_loader_start(self, runner: "IRunner"): Args: runner: current runner """ - lm = runner.loader_name + lm = runner.loader_key os.makedirs(f"{self.out_prefix}/{lm}/", exist_ok=True) def on_batch_end(self, runner: "IRunner"): @@ -84,7 +84,7 @@ def on_batch_end(self, runner: "IRunner"): Args: runner: current runner """ - lm = runner.loader_name + lm = runner.loader_key names = runner.input.get(self.name_key, []) features = runner.input[self.input_key].detach().cpu() diff --git a/catalyst/contrib/callbacks/neptune_logger.py b/catalyst/contrib/callbacks/neptune_logger.py index 758b026cd8..84da438575 100644 --- a/catalyst/contrib/callbacks/neptune_logger.py +++ b/catalyst/contrib/callbacks/neptune_logger.py @@ -142,7 +142,7 @@ def _log_metrics( def on_batch_end(self, runner: "IRunner"): """Log batch metrics to Neptune.""" if self.log_on_batch_end: - mode = runner.loader_name + mode = runner.loader_key metrics = runner.batch_metrics self._log_metrics( metrics=metrics, @@ -154,7 +154,7 @@ def on_batch_end(self, runner: "IRunner"): def on_loader_end(self, runner: "IRunner"): """Translate epoch metrics to Neptune.""" if self.log_on_epoch_end: - mode = runner.loader_name + mode = runner.loader_key metrics = runner.loader_metrics self._log_metrics( metrics=metrics, diff --git a/catalyst/contrib/callbacks/telegram_logger.py b/catalyst/contrib/callbacks/telegram_logger.py index 0d60df9c35..7edbebe3d7 100644 --- a/catalyst/contrib/callbacks/telegram_logger.py +++ b/catalyst/contrib/callbacks/telegram_logger.py @@ -84,7 +84,7 @@ def on_loader_start(self, runner: "IRunner"): """Notify about starting running the new loader.""" if self.log_on_loader_start: text = ( - f"{runner.loader_name} {runner.global_epoch} epoch has started" + f"{runner.loader_key} {runner.global_epoch} epoch has started" ) self._send_text(text) @@ -100,7 +100,7 @@ def on_loader_end(self, runner: "IRunner"): metrics_to_log = self.metrics_to_log rows: List[str] = [ - f"{runner.loader_name} {runner.global_epoch}" + f"{runner.loader_key} {runner.global_epoch}" f" epoch was finished:" ] diff --git a/catalyst/contrib/callbacks/wandb_logger.py b/catalyst/contrib/callbacks/wandb_logger.py index d48c87bbfa..ac7b241b1c 100644 --- a/catalyst/contrib/callbacks/wandb_logger.py +++ b/catalyst/contrib/callbacks/wandb_logger.py @@ -155,7 +155,7 @@ def on_stage_end(self, runner: "IRunner"): def on_batch_end(self, runner: "IRunner"): """Translate batch metrics to Weights & Biases.""" if self.log_on_batch_end: - mode = runner.loader_name + mode = runner.loader_key metrics = runner.batch_metrics self._log_metrics( metrics=metrics, @@ -168,7 +168,7 @@ def on_batch_end(self, runner: "IRunner"): def on_loader_end(self, runner: "IRunner"): """Translate loader metrics to Weights & Biases.""" if self.log_on_epoch_end: - mode = runner.loader_name + mode = runner.loader_key metrics = runner.loader_metrics self._log_metrics( metrics=metrics, diff --git a/catalyst/core/callback.py b/catalyst/core/callback.py index 3f06c6b92b..8186a976ae 100644 --- a/catalyst/core/callback.py +++ b/catalyst/core/callback.py @@ -11,6 +11,9 @@ def on_experiment_start(self, runner: "IRunner"): Args: runner: IRunner instance. + + .. note:: + This event work only on IRunner. """ pass @@ -83,6 +86,9 @@ def on_experiment_end(self, runner: "IRunner"): Args: runner: IRunner instance. + + .. note:: + This event work only on IRunner. """ pass @@ -202,11 +208,11 @@ class Callback(ICallback): Abstraction, please check out the implementations: - - :py:mod:`catalyst.core.callbacks.criterion.CriterionCallback` - - :py:mod:`catalyst.core.callbacks.optimizer.OptimizerCallback` - - :py:mod:`catalyst.core.callbacks.scheduler.SchedulerCallback` - - :py:mod:`catalyst.core.callbacks.logging.TensorboardLogger` - - :py:mod:`catalyst.core.callbacks.checkpoint.CheckpointCallback` + - :py:mod:`catalyst.callbacks.criterion.CriterionCallback` + - :py:mod:`catalyst.callbacks.optimizer.OptimizerCallback` + - :py:mod:`catalyst.callbacks.scheduler.SchedulerCallback` + - :py:mod:`catalyst.callbacks.logging.TensorboardLogger` + - :py:mod:`catalyst.callbacks.checkpoint.CheckpointCallback` """ def __init__( @@ -250,79 +256,79 @@ def __init__(self, base_callback: Callback, enable_callback: bool = True): self.callback = base_callback self._is_enabled = enable_callback - def on_loader_start(self, runner: "IRunner") -> None: - """ - Check if current epoch should be skipped. + def on_stage_start(self, runner: "IRunner") -> None: + """Run base_callback (if possible) Args: runner: current runner """ if self._is_enabled: - self.callback.on_loader_start(runner) + self.callback.on_stage_start(runner) - def on_loader_end(self, runner: "IRunner") -> None: - """ - Reset status of callback + def on_epoch_start(self, runner: "IRunner") -> None: + """Run base_callback (if possible) Args: runner: current runner """ if self._is_enabled: - self.callback.on_loader_end(runner) + self.callback.on_epoch_start(runner) - def on_stage_start(self, runner: "IRunner") -> None: - """Run base_callback (if possible) + def on_loader_start(self, runner: "IRunner") -> None: + """ + Check if current epoch should be skipped. Args: runner: current runner """ if self._is_enabled: - self.callback.on_stage_start(runner) + self.callback.on_loader_start(runner) - def on_stage_end(self, runner: "IRunner") -> None: + def on_batch_start(self, runner: "IRunner") -> None: """Run base_callback (if possible) Args: runner: current runner """ if self._is_enabled: - self.callback.on_stage_end(runner) + self.callback.on_batch_start(runner) - def on_epoch_start(self, runner: "IRunner") -> None: + def on_batch_end(self, runner: "IRunner") -> None: """Run base_callback (if possible) Args: runner: current runner """ if self._is_enabled: - self.callback.on_epoch_start(runner) + self.callback.on_batch_end(runner) - def on_epoch_end(self, runner: "IRunner") -> None: - """Run base_callback (if possible) + def on_loader_end(self, runner: "IRunner") -> None: + """ + Reset status of callback Args: runner: current runner """ if self._is_enabled: - self.callback.on_epoch_end(runner) + self.callback.on_loader_end(runner) - def on_batch_start(self, runner: "IRunner") -> None: + def on_epoch_end(self, runner: "IRunner") -> None: """Run base_callback (if possible) Args: runner: current runner """ if self._is_enabled: - self.callback.on_batch_start(runner) + self.callback.on_epoch_end(runner) - def on_batch_end(self, runner: "IRunner") -> None: + def on_stage_end(self, runner: "IRunner") -> None: """Run base_callback (if possible) Args: runner: current runner """ if self._is_enabled: - self.callback.on_batch_end(runner) + self.callback.on_stage_end(runner) def on_exception(self, runner: "IRunner") -> None: """Run base_callback (if possible) diff --git a/catalyst/core/legacy.py b/catalyst/core/legacy.py index 550d5f68af..58789b5bc0 100644 --- a/catalyst/core/legacy.py +++ b/catalyst/core/legacy.py @@ -72,6 +72,21 @@ def loader_step(self): ) return self.loader_batch_step + @property + def loader_name(self): + """Alias for `runner.loader_key`. + + .. warning:: + Deprecated, saved for backward compatibility. + Please use `runner.loader_key` instead. + """ + warnings.warn( + "`loader_name` was deprecated, " + "please use `loader_key` instead", + DeprecationWarning, + ) + return self.loader_key + @property def state(self): """Alias for `runner`. diff --git a/catalyst/core/runner.py b/catalyst/core/runner.py index 881a33990b..642991184e 100644 --- a/catalyst/core/runner.py +++ b/catalyst/core/runner.py @@ -1,4 +1,4 @@ -from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple, Union +from typing import Any, Callable, Dict, Mapping, Optional, Tuple, Union from abc import ABC, abstractmethod from collections import defaultdict, OrderedDict from functools import lru_cache @@ -16,17 +16,13 @@ ) from catalyst.core.legacy import IRunnerLegacy from catalyst.settings import SETTINGS -from catalyst.tools.frozen_class import FrozenClass from catalyst.typing import ( - Criterion, Device, Model, - Optimizer, RunnerCriterion, RunnerModel, RunnerOptimizer, RunnerScheduler, - Scheduler, ) from catalyst.utils.components import process_components from catalyst.utils.distributed import get_rank @@ -52,7 +48,7 @@ def __init__(self, message: str): super().__init__(message) -class IRunner(ABC, ICallback, IRunnerLegacy, FrozenClass): +class IRunner(ABC, ICallback, IRunnerLegacy): """ An abstraction that knows how to run an experiment. It contains all the logic of **how** to run the experiment, @@ -382,11 +378,10 @@ def __init__( self.experiment = None self._experiment_fn = experiment_fn self._prepare_inner_state(model=model, device=device) - # self._freeze() def _prepare_inner_state( self, - stage: str = "infer", + stage: str = SETTINGS.stage_infer_prefix, device: Device = None, model: RunnerModel = None, criterion: RunnerCriterion = None, @@ -398,7 +393,7 @@ def _prepare_inner_state( num_epochs: int = 1, main_metric: str = "loss", minimize_metric: bool = True, - valid_loader: str = "valid", + valid_loader: str = SETTINGS.loader_valid_prefix, checkpoint_data: Dict = None, is_check_run: bool = False, verbose: bool = False, @@ -467,13 +462,15 @@ def _prepare_inner_state( # stage info self.num_epochs: int = num_epochs self.stage: str = stage - self.is_infer_stage: bool = self.stage.startswith("infer") + self.is_infer_stage: bool = self.stage.startswith( + SETTINGS.stage_infer_prefix + ) # epoch info self.epoch: int = 1 # loader info self.loader_sample_step: int = 0 self.loader_batch_step: int = 0 - self.loader_name: str = None + self.loader_key: str = None self.loader_len: int = 0 self.loader_batch_size = 0 self.is_train_loader: bool = False @@ -612,7 +609,7 @@ def on_loader_start(self, runner: "IRunner"): self.loader_len = len(self.loader) if self.loader_len == 0: raise RunnerException( - f"DataLoader with name {self.loader_name} is empty." + f"DataLoader with name {self.loader_key} is empty." ) self.loader_batch_size = ( self.loader.batch_sampler.batch_size @@ -621,9 +618,15 @@ def on_loader_start(self, runner: "IRunner"): ) self.loader_sample_step = 0 - self.is_train_loader = self.loader_name.startswith("train") - self.is_valid_loader = self.loader_name.startswith("valid") - self.is_infer_loader = self.loader_name.startswith("infer") + self.is_train_loader = self.loader_key.startswith( + SETTINGS.loader_train_prefix + ) + self.is_valid_loader = self.loader_key.startswith( + SETTINGS.loader_valid_prefix + ) + self.is_infer_loader = self.loader_key.startswith( + SETTINGS.loader_infer_prefix + ) maybe_recursive_call(self.model, "train", mode=self.is_train_loader) if isinstance(self.loader.sampler, DistributedSampler): @@ -689,7 +692,7 @@ def _run_event(self, event: str) -> None: if _is_substring(event, ("end",)): getattr(self, event)(self) - def _handle_batch_device(self, batch: Mapping[str, Any]): + def _handle_device(self, batch: Mapping[str, Any]): return any2device(batch, self.device) @abstractmethod @@ -705,7 +708,7 @@ def _handle_batch(self, batch: Mapping[str, Any]) -> None: pass def _run_batch(self) -> None: - self.input = self._handle_batch_device(batch=self.input) + self.input = self._handle_device(batch=self.input) self._run_event("on_batch_start") self._handle_batch(batch=self.input) self._run_event("on_batch_end") @@ -722,7 +725,7 @@ def _run_loader(self) -> None: def _run_epoch(self) -> None: self._run_event("on_epoch_start") - for self.loader_name, self.loader in self.loaders.items(): + for self.loader_key, self.loader in self.loaders.items(): self._run_loader() self._run_event("on_epoch_end") diff --git a/catalyst/runners/supervised.py b/catalyst/runners/supervised.py index 711fe42a55..6187ef8424 100644 --- a/catalyst/runners/supervised.py +++ b/catalyst/runners/supervised.py @@ -34,10 +34,6 @@ def __init__( super().__init__( model=model, device=device, experiment_fn=experiment_fn ) - self._device = None - self._model = None - self._experiment_fn = experiment_fn - self._prepare_inner_state(model=model, device=device) self.input_key = input_key self.output_key = output_key @@ -67,15 +63,6 @@ def __init__( else: raise NotImplementedError() - # self._freeze() - - def _handle_batch_device(self, batch: Mapping[str, Any]): - if isinstance(batch, (tuple, list)): - assert len(batch) == 2 - batch = {self.input_key: batch[0], self.target_key: batch[1]} - batch = super()._handle_batch_device(batch) - return batch - def _process_input_str(self, batch: Mapping[str, Any], **kwargs): output = self.model(batch[self.input_key], **kwargs) return output @@ -118,6 +105,13 @@ def forward(self, batch: Mapping[str, Any], **kwargs) -> Mapping[str, Any]: output = self._process_output(output) return output + def _handle_device(self, batch: Mapping[str, Any]): + if isinstance(batch, (tuple, list)): + assert len(batch) == 2 + batch = {self.input_key: batch[0], self.target_key: batch[1]} + batch = super()._handle_device(batch) + return batch + def _handle_batch(self, batch: Mapping[str, Any]) -> None: """ Inner method to handle specified data batch. @@ -148,7 +142,7 @@ def predict_batch( Returns: Mapping[str, Any]: model output dictionary """ - batch = self._handle_batch_device(batch) + batch = self._handle_device(batch) output = self.forward(batch, **kwargs) return output diff --git a/tests/_tests_scripts/core_batch_overfit_callback.py b/tests/_tests_scripts/core_batch_overfit_callback.py index 6a4753d78a..9c536dc462 100644 --- a/tests/_tests_scripts/core_batch_overfit_callback.py +++ b/tests/_tests_scripts/core_batch_overfit_callback.py @@ -27,7 +27,7 @@ def on_loader_start(self, runner): # 320 samples with 32 batch size # -> 1 batch size = 32 # -> 0.1 portion = 32 - assert len(runner.loaders[runner.loader_name]) == 32 + assert len(runner.loaders[runner.loader_key]) == 32 # model training From 9f12876c7f9072936266e1734507d24f745cd5fd Mon Sep 17 00:00:00 2001 From: Sergey Kolesnikov Date: Sun, 8 Nov 2020 11:48:25 +0300 Subject: [PATCH 6/9] renames --- catalyst/core/legacy.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/catalyst/core/legacy.py b/catalyst/core/legacy.py index 58789b5bc0..feae659c84 100644 --- a/catalyst/core/legacy.py +++ b/catalyst/core/legacy.py @@ -87,6 +87,21 @@ def loader_name(self): ) return self.loader_key + @property + def stage_name(self): + """Alias for `runner.stage`. + + .. warning:: + Deprecated, saved for backward compatibility. + Please use `runner.stage` instead. + """ + warnings.warn( + "`stage_name` was deprecated, " + "please use `stage` instead", + DeprecationWarning, + ) + return self.stage + @property def state(self): """Alias for `runner`. From 031cf128aceddc01ec7578ac1cdde73bb28fe84e Mon Sep 17 00:00:00 2001 From: Sergey Kolesnikov Date: Wed, 11 Nov 2020 20:35:54 +0300 Subject: [PATCH 7/9] codestyle --- catalyst/core/callback.py | 26 ++++----- catalyst/core/legacy.py | 9 ++-- catalyst/core/runner.py | 98 +++++++++++++++++++++++++++++----- catalyst/runners/runner.py | 18 ++++--- catalyst/runners/supervised.py | 3 ++ catalyst/utils/misc.py | 1 + 6 files changed, 118 insertions(+), 37 deletions(-) diff --git a/catalyst/core/callback.py b/catalyst/core/callback.py index 8186a976ae..511c51b51e 100644 --- a/catalyst/core/callback.py +++ b/catalyst/core/callback.py @@ -6,8 +6,8 @@ class ICallback: - def on_experiment_start(self, runner: "IRunner"): - """Event handler for stage start. + def on_experiment_start(self, runner: "IRunner") -> None: + """Event handler for experiment start. Args: runner: IRunner instance. @@ -17,7 +17,7 @@ def on_experiment_start(self, runner: "IRunner"): """ pass - def on_stage_start(self, runner: "IRunner"): + def on_stage_start(self, runner: "IRunner") -> None: """Event handler for stage start. Args: @@ -25,7 +25,7 @@ def on_stage_start(self, runner: "IRunner"): """ pass - def on_epoch_start(self, runner: "IRunner"): + def on_epoch_start(self, runner: "IRunner") -> None: """Event handler for epoch start. Args: @@ -33,7 +33,7 @@ def on_epoch_start(self, runner: "IRunner"): """ pass - def on_loader_start(self, runner: "IRunner"): + def on_loader_start(self, runner: "IRunner") -> None: """Event handler for loader start. Args: @@ -41,7 +41,7 @@ def on_loader_start(self, runner: "IRunner"): """ pass - def on_batch_start(self, runner: "IRunner"): + def on_batch_start(self, runner: "IRunner") -> None: """Event handler for batch start. Args: @@ -49,7 +49,7 @@ def on_batch_start(self, runner: "IRunner"): """ pass - def on_batch_end(self, runner: "IRunner"): + def on_batch_end(self, runner: "IRunner") -> None: """Event handler for batch end. Args: @@ -57,7 +57,7 @@ def on_batch_end(self, runner: "IRunner"): """ pass - def on_loader_end(self, runner: "IRunner"): + def on_loader_end(self, runner: "IRunner") -> None: """Event handler for loader end. Args: @@ -65,7 +65,7 @@ def on_loader_end(self, runner: "IRunner"): """ pass - def on_epoch_end(self, runner: "IRunner"): + def on_epoch_end(self, runner: "IRunner") -> None: """Event handler for epoch end. Args: @@ -73,7 +73,7 @@ def on_epoch_end(self, runner: "IRunner"): """ pass - def on_stage_end(self, runner: "IRunner"): + def on_stage_end(self, runner: "IRunner") -> None: """Event handler for stage end. Args: @@ -81,8 +81,8 @@ def on_stage_end(self, runner: "IRunner"): """ pass - def on_experiment_end(self, runner: "IRunner"): - """Event handler for stage start. + def on_experiment_end(self, runner: "IRunner") -> None: + """Event handler for experiment end. Args: runner: IRunner instance. @@ -92,7 +92,7 @@ def on_experiment_end(self, runner: "IRunner"): """ pass - def on_exception(self, runner: "IRunner"): + def on_exception(self, runner: "IRunner") -> None: """Event handler for exception case. Args: diff --git a/catalyst/core/legacy.py b/catalyst/core/legacy.py index feae659c84..a999daa138 100644 --- a/catalyst/core/legacy.py +++ b/catalyst/core/legacy.py @@ -1,4 +1,5 @@ # flake8: noqa +from typing import Any import warnings @@ -81,8 +82,7 @@ def loader_name(self): Please use `runner.loader_key` instead. """ warnings.warn( - "`loader_name` was deprecated, " - "please use `loader_key` instead", + "`loader_name` was deprecated, please use `loader_key` instead", DeprecationWarning, ) return self.loader_key @@ -96,8 +96,7 @@ def stage_name(self): Please use `runner.stage` instead. """ warnings.warn( - "`stage_name` was deprecated, " - "please use `stage` instead", + "`stage_name` was deprecated, please use `stage` instead", DeprecationWarning, ) return self.stage @@ -111,7 +110,7 @@ def state(self): Please use `runner` instead. """ warnings.warn( - "`runner.state` was deprecated, " "please use `runner` instead", + "`runner.state` was deprecated, please use `runner` instead", DeprecationWarning, ) return self diff --git a/catalyst/core/runner.py b/catalyst/core/runner.py index 07610b7360..4e08936808 100644 --- a/catalyst/core/runner.py +++ b/catalyst/core/runner.py @@ -1,4 +1,4 @@ -from typing import Any, Callable, Dict, Mapping, Optional, Tuple, Union +from typing import Any, Dict, Mapping, Optional, Tuple, Union from abc import ABC, abstractmethod from collections import defaultdict, OrderedDict from functools import lru_cache @@ -362,10 +362,7 @@ class IRunner(ABC, ICallback, IRunnerLegacy): """ def __init__( - self, - model: RunnerModel = None, - device: Device = None, - experiment_fn: Callable = IExperiment, + self, model: RunnerModel = None, device: Device = None, ): """ Args: @@ -375,7 +372,6 @@ def __init__( self._device = None self._model = None self.experiment = None - self._experiment_fn = experiment_fn self._prepare_inner_state(model=model, device=device) def _prepare_inner_state( @@ -571,16 +567,37 @@ def device(self, value: Device): ) def on_experiment_start(self, runner: "IRunner"): + """Event handler for experiment start. + + Args: + runner: IRunner instance. + + .. note:: + This event work only on IRunner. + """ assert self.experiment is not None set_global_seed(self.experiment.initial_seed + self.global_epoch + 1) def on_stage_start(self, runner: "IRunner"): + """Event handler for stage start. + + Args: + runner: IRunner instance. + """ assert self.stage is not None set_global_seed(self.experiment.initial_seed + self.global_epoch + 1) def on_epoch_start(self, runner: "IRunner"): + """Event handler for epoch start. + + Args: + runner: IRunner instance. + + Raises: + RunnerException: if current DataLoader is empty. + """ assert self.loaders is not None for loader_name, loader in self.loaders.items(): @@ -603,6 +620,14 @@ def on_epoch_start(self, runner: "IRunner"): set_global_seed(self.experiment.initial_seed + self.global_epoch + 1) def on_loader_start(self, runner: "IRunner"): + """Event handler for loader start. + + Args: + runner: IRunner instance. + + Raises: + RunnerException: if current DataLoader is empty. + """ assert self.loader is not None self.loader_len = len(self.loader) @@ -634,6 +659,11 @@ def on_loader_start(self, runner: "IRunner"): set_global_seed(self.experiment.initial_seed + self.global_epoch + 1) def on_batch_start(self, runner: "IRunner"): + """Event handler for batch start. + + Args: + runner: IRunner instance. + """ if isinstance(self.input, dict): self.batch_size = len(next(iter(self.input.values()))) else: @@ -645,22 +675,59 @@ def on_batch_start(self, runner: "IRunner"): self.loader_sample_step += self.batch_size def on_batch_end(self, runner: "IRunner"): + """Event handler for batch end. + + Args: + runner: IRunner instance. + """ pass def on_loader_end(self, runner: "IRunner"): + """Event handler for loader end. + + Args: + runner: IRunner instance. + """ pass def on_epoch_end(self, runner: "IRunner"): + """Event handler for epoch end. + + Args: + runner: IRunner instance. + """ self.global_epoch += 1 self.epoch += 1 def on_stage_end(self, runner: "IRunner"): + """Event handler for stage end. + + Args: + runner: IRunner instance. + """ pass def on_experiment_end(self, runner: "IRunner"): + """Event handler for experiment end. + + Args: + runner: IRunner instance. + + .. note:: + This event work only on IRunner. + """ pass def on_exception(self, runner: "IRunner"): + """Event handler for exception case. + + Args: + runner: IRunner instance. + + Raises: + exception: if during pipeline exception, + no handler we found into callbacks + """ from catalyst.callbacks.exception import ExceptionCallback def _exception_handler_check(callbacks: Union[OrderedDict, Dict]): @@ -752,12 +819,6 @@ def run_experiment(self, experiment: IExperiment = None) -> "IRunner": Returns: self, `IRunner` instance after the experiment - - Raises: - Exception: if during pipeline exception, - no handler we found into callbacks - KeyboardInterrupt: if during pipeline exception, - no handler we found into callbacks """ self.experiment = experiment or self.experiment try: @@ -774,7 +835,18 @@ class IStageBasedRunner(IRunner): datasources per stage. """ - def on_stage_start(self, runner: "IRunner"): + def on_stage_start(self, runner: "IRunner") -> None: + """Event handler for stage start. + + For the `IStageBasedRunner` case: + + - prepares loaders - our datasources + - prepares model components - model, criterion, optimizer, scheduler + - prepares callbacks for the current stage + + Args: + runner: IRunner instance. + """ super().on_stage_start(runner) set_global_seed(self.experiment.initial_seed) diff --git a/catalyst/runners/runner.py b/catalyst/runners/runner.py index aea34a53db..52fe7362f7 100644 --- a/catalyst/runners/runner.py +++ b/catalyst/runners/runner.py @@ -31,9 +31,7 @@ class Runner(IStageBasedRunner): - """ - Deep Learning Runner for supervised, unsupervised, gan, etc runs. - """ + """Deep Learning Runner for supervised, unsupervised, gan, etc runs.""" def __init__( self, @@ -41,9 +39,17 @@ def __init__( device: Device = None, experiment_fn: Callable = Experiment, ): - super().__init__( - model=model, device=device, experiment_fn=experiment_fn - ) + """ + + Args: + model: Torch model object + device: Torch device + experiment_fn: callable function, + which defines default experiment type to use + during ``.train`` and ``.infer`` methods. + """ + super().__init__(model=model, device=device) + self._experiment_fn = experiment_fn def train( self, diff --git a/catalyst/runners/supervised.py b/catalyst/runners/supervised.py index 6187ef8424..6b6d050410 100644 --- a/catalyst/runners/supervised.py +++ b/catalyst/runners/supervised.py @@ -30,6 +30,9 @@ def __init__( output_key: Key in output dict model output will be stored under input_target_key: Key in batch dict mapping for target + experiment_fn: callable function, + which defines default experiment type to use + during ``.train`` and ``.infer`` methods. """ super().__init__( model=model, device=device, experiment_fn=experiment_fn diff --git a/catalyst/utils/misc.py b/catalyst/utils/misc.py index bf6058a81d..0fe9216fe8 100644 --- a/catalyst/utils/misc.py +++ b/catalyst/utils/misc.py @@ -220,6 +220,7 @@ def get_attr(obj: Any, key: str, inner_key: str = None) -> Any: runner.scheduler["adam"] Args: + obj: object of interest key: name for attribute of interest, like `criterion`, `optimizer`, `scheduler` inner_key: name of inner dictionary key From e15a4997b76370a2a9d54c7858489ee1ee21c224 Mon Sep 17 00:00:00 2001 From: Sergey Kolesnikov Date: Wed, 11 Nov 2020 21:02:13 +0300 Subject: [PATCH 8/9] tests --- catalyst/callbacks/checkpoint.py | 14 +++++++------- .../callbacks/tests/test_control_flow_callback.py | 8 ++++---- catalyst/callbacks/tests/test_early_stop.py | 2 +- catalyst/callbacks/tests/test_wrapper_callback.py | 4 ++-- catalyst/core/runner.py | 4 ++-- 5 files changed, 16 insertions(+), 16 deletions(-) diff --git a/catalyst/callbacks/checkpoint.py b/catalyst/callbacks/checkpoint.py index 84f5d4d518..526e077695 100644 --- a/catalyst/callbacks/checkpoint.py +++ b/catalyst/callbacks/checkpoint.py @@ -25,7 +25,7 @@ def _pack_runner(runner: "IRunner"): scheduler=runner.scheduler, epoch_metrics=dict(runner.epoch_metrics), valid_metrics=dict(runner.valid_metrics), - stage_name=runner.stage, + stage=runner.stage, epoch=runner.epoch, loader_name=runner.loader_key, loader_step=runner.loader_batch_step, @@ -66,7 +66,7 @@ def _load_checkpoint( checkpoint = load_checkpoint(filename) if not runner.stage.startswith("infer") and load_full: - runner.stage = checkpoint["stage_name"] + runner.stage = checkpoint["stage"] runner.epoch = checkpoint["epoch"] runner.global_epoch = checkpoint["global_epoch"] # @TODO: should we also load, @@ -86,7 +86,7 @@ def _load_checkpoint( f"loaded state checkpoint {filename} " f"(global epoch {checkpoint['global_epoch']}, " f"epoch {checkpoint['epoch']}, " - f"stage {checkpoint['stage_name']})" + f"stage {checkpoint['stage']})" ) else: unpack_checkpoint( @@ -373,12 +373,12 @@ def _get_checkpoint_suffix(self, checkpoint: dict) -> str: Args: checkpoint: checkpoint dict, - should contain ``stage_name`` and ``epoch`` keys. + should contain ``stage`` and ``epoch`` keys. Returns: str: checkpoint suffix """ - result = f"{checkpoint['stage_name']}.{checkpoint['epoch']}" + result = f"{checkpoint['stage']}.{checkpoint['epoch']}" return result def process_metrics(self, last_valid_metrics: Dict[str, float]) -> Dict: @@ -735,13 +735,13 @@ def _get_checkpoint_suffix(self, checkpoint: dict) -> str: Args: checkpoint: checkpoint dict, - should contain ``stage_name`` and ``epoch`` keys. + should contain ``stage`` and ``epoch`` keys. Returns: str: checkpoint suffix """ result = ( - f"{checkpoint['stage_name']}." + f"{checkpoint['stage']}." f"epoch.{checkpoint['epoch']}." f"iter.{self._iteration_counter}" ) diff --git a/catalyst/callbacks/tests/test_control_flow_callback.py b/catalyst/callbacks/tests/test_control_flow_callback.py index af985e409d..d878386b7e 100644 --- a/catalyst/callbacks/tests/test_control_flow_callback.py +++ b/catalyst/callbacks/tests/test_control_flow_callback.py @@ -12,8 +12,8 @@ class _Runner: - def __init__(self, stage_name, loader_name, global_epoch, epoch): - self.stage_name = stage_name + def __init__(self, stage, loader_name, global_epoch, epoch): + self.stage = stage self.loader_name = loader_name self.global_epoch = global_epoch self.epoch = epoch @@ -448,7 +448,7 @@ def test_ignore_foo_with_wrong_args(self): ) def test_filter_fn_with_wrong_args(self): - runner = Mock(stage_name="stage1", loader_name="train", epoch=1) + runner = Mock(stage="stage1", loader_name="train", epoch=1) orders = ( CallbackOrder.Internal, CallbackOrder.Metric, @@ -504,7 +504,7 @@ def _raise_foo(stage: str, epoch: int, loader: str) -> bool: wrapper.__getattribute__(event)(runner) def test_filter_fn_with_eval(self): - runner = Mock(stage_name="stage1", loader_name="train", epoch=1) + runner = Mock(stage="stage1", loader_name="train", epoch=1) orders = ( CallbackOrder.Internal, CallbackOrder.Metric, diff --git a/catalyst/callbacks/tests/test_early_stop.py b/catalyst/callbacks/tests/test_early_stop.py index 27229480c7..5eef1bc9e4 100644 --- a/catalyst/callbacks/tests/test_early_stop.py +++ b/catalyst/callbacks/tests/test_early_stop.py @@ -7,7 +7,7 @@ def test_patience1(): """Tests EarlyStoppingCallback.""" early_stop = EarlyStoppingCallback(1) runner = MagicMock() - type(runner).stage_name = PropertyMock(return_value="training") + type(runner).stage = PropertyMock(return_value="training") type(runner).valid_metrics = PropertyMock(return_value={"loss": 0.001}) stop_mock = PropertyMock(return_value=False) type(runner).need_early_stop = stop_mock diff --git a/catalyst/callbacks/tests/test_wrapper_callback.py b/catalyst/callbacks/tests/test_wrapper_callback.py index 02614d77e6..23c9938d7c 100644 --- a/catalyst/callbacks/tests/test_wrapper_callback.py +++ b/catalyst/callbacks/tests/test_wrapper_callback.py @@ -22,7 +22,7 @@ def __init__(self, order, method_to_raise: str): class TestWrapperCallback(unittest.TestCase): def test_enabled(self): - runner = Mock(stage_name="stage1", loader_name="train", epoch=1) + runner = Mock(stage="stage1", loader_name="train", epoch=1) orders = ( CallbackOrder.Internal, @@ -56,7 +56,7 @@ def test_enabled(self): wrapper.__getattribute__(event)(runner) def test_disabled(self): - runner = Mock(stage_name="stage1", loader_name="train", epoch=1) + runner = Mock(stage="stage1", loader_name="train", epoch=1) orders = ( CallbackOrder.Internal, diff --git a/catalyst/core/runner.py b/catalyst/core/runner.py index 4e08936808..1461213101 100644 --- a/catalyst/core/runner.py +++ b/catalyst/core/runner.py @@ -283,11 +283,11 @@ class IRunner(ABC, ICallback, IRunnerLegacy): Stage info section - **runner.stage_name** - string, current stage name,\ + **runner.stage** - string, current stage name,\ for example, :: - runner.stage_name = "pretraining" / "training" / "finetuning" / etc + runner.stage = "pretraining" / "training" / "finetuning" / etc **runner.num_epochs** - int, maximum number of epochs, \ required for this stage From eb5b7cb45d2c38bfb696ada16ab6aafb006edaaf Mon Sep 17 00:00:00 2001 From: Sergey Kolesnikov Date: Wed, 11 Nov 2020 21:18:54 +0300 Subject: [PATCH 9/9] tests2 --- catalyst/callbacks/checkpoint.py | 4 ++-- catalyst/callbacks/tests/test_control_flow_callback.py | 8 ++++---- catalyst/callbacks/tests/test_wrapper_callback.py | 4 ++-- catalyst/core/runner.py | 4 ++-- 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/catalyst/callbacks/checkpoint.py b/catalyst/callbacks/checkpoint.py index 526e077695..940293067b 100644 --- a/catalyst/callbacks/checkpoint.py +++ b/catalyst/callbacks/checkpoint.py @@ -27,8 +27,8 @@ def _pack_runner(runner: "IRunner"): valid_metrics=dict(runner.valid_metrics), stage=runner.stage, epoch=runner.epoch, - loader_name=runner.loader_key, - loader_step=runner.loader_batch_step, + loader_key=runner.loader_key, + loader_batch_step=runner.loader_batch_step, global_epoch=runner.global_epoch, checkpoint_data=runner.checkpoint_data, main_metric=runner.main_metric, diff --git a/catalyst/callbacks/tests/test_control_flow_callback.py b/catalyst/callbacks/tests/test_control_flow_callback.py index d878386b7e..5ab70c9987 100644 --- a/catalyst/callbacks/tests/test_control_flow_callback.py +++ b/catalyst/callbacks/tests/test_control_flow_callback.py @@ -12,9 +12,9 @@ class _Runner: - def __init__(self, stage, loader_name, global_epoch, epoch): + def __init__(self, stage, loader_key, global_epoch, epoch): self.stage = stage - self.loader_name = loader_name + self.loader_key = loader_key self.global_epoch = global_epoch self.epoch = epoch @@ -448,7 +448,7 @@ def test_ignore_foo_with_wrong_args(self): ) def test_filter_fn_with_wrong_args(self): - runner = Mock(stage="stage1", loader_name="train", epoch=1) + runner = Mock(stage="stage1", loader_key="train", epoch=1) orders = ( CallbackOrder.Internal, CallbackOrder.Metric, @@ -504,7 +504,7 @@ def _raise_foo(stage: str, epoch: int, loader: str) -> bool: wrapper.__getattribute__(event)(runner) def test_filter_fn_with_eval(self): - runner = Mock(stage="stage1", loader_name="train", epoch=1) + runner = Mock(stage="stage1", loader_key="train", epoch=1) orders = ( CallbackOrder.Internal, CallbackOrder.Metric, diff --git a/catalyst/callbacks/tests/test_wrapper_callback.py b/catalyst/callbacks/tests/test_wrapper_callback.py index 23c9938d7c..ea10df1d03 100644 --- a/catalyst/callbacks/tests/test_wrapper_callback.py +++ b/catalyst/callbacks/tests/test_wrapper_callback.py @@ -22,7 +22,7 @@ def __init__(self, order, method_to_raise: str): class TestWrapperCallback(unittest.TestCase): def test_enabled(self): - runner = Mock(stage="stage1", loader_name="train", epoch=1) + runner = Mock(stage="stage1", loader_key="train", epoch=1) orders = ( CallbackOrder.Internal, @@ -56,7 +56,7 @@ def test_enabled(self): wrapper.__getattribute__(event)(runner) def test_disabled(self): - runner = Mock(stage="stage1", loader_name="train", epoch=1) + runner = Mock(stage="stage1", loader_key="train", epoch=1) orders = ( CallbackOrder.Internal, diff --git a/catalyst/core/runner.py b/catalyst/core/runner.py index 1461213101..6434da7f15 100644 --- a/catalyst/core/runner.py +++ b/catalyst/core/runner.py @@ -600,10 +600,10 @@ def on_epoch_start(self, runner: "IRunner"): """ assert self.loaders is not None - for loader_name, loader in self.loaders.items(): + for loader_key, loader in self.loaders.items(): if len(loader) == 0: raise RunnerException( - f"DataLoader with name {loader_name} is empty." + f"DataLoader with name {loader_key} is empty." ) if not self.is_infer_stage: