diff --git a/catalyst/callbacks/checkpoint.py b/catalyst/callbacks/checkpoint.py index 46a17fe2dc..940293067b 100644 --- a/catalyst/callbacks/checkpoint.py +++ b/catalyst/callbacks/checkpoint.py @@ -25,10 +25,10 @@ def _pack_runner(runner: "IRunner"): scheduler=runner.scheduler, epoch_metrics=dict(runner.epoch_metrics), valid_metrics=dict(runner.valid_metrics), - stage_name=runner.stage_name, + stage=runner.stage, epoch=runner.epoch, - loader_name=runner.loader_name, - loader_step=runner.loader_batch_step, + loader_key=runner.loader_key, + loader_batch_step=runner.loader_batch_step, global_epoch=runner.global_epoch, checkpoint_data=runner.checkpoint_data, main_metric=runner.main_metric, @@ -65,8 +65,8 @@ def _load_checkpoint( print(f"=> Loading checkpoint {filename}") checkpoint = load_checkpoint(filename) - if not runner.stage_name.startswith("infer") and load_full: - runner.stage_name = checkpoint["stage_name"] + if not runner.stage.startswith("infer") and load_full: + runner.stage = checkpoint["stage"] runner.epoch = checkpoint["epoch"] runner.global_epoch = checkpoint["global_epoch"] # @TODO: should we also load, @@ -86,7 +86,7 @@ def _load_checkpoint( f"loaded state checkpoint {filename} " f"(global epoch {checkpoint['global_epoch']}, " f"epoch {checkpoint['epoch']}, " - f"stage {checkpoint['stage_name']})" + f"stage {checkpoint['stage']})" ) else: unpack_checkpoint( @@ -373,12 +373,12 @@ def _get_checkpoint_suffix(self, checkpoint: dict) -> str: Args: checkpoint: checkpoint dict, - should contain ``stage_name`` and ``epoch`` keys. + should contain ``stage`` and ``epoch`` keys. Returns: str: checkpoint suffix """ - result = f"{checkpoint['stage_name']}.{checkpoint['epoch']}" + result = f"{checkpoint['stage']}.{checkpoint['epoch']}" return result def process_metrics(self, last_valid_metrics: Dict[str, float]) -> Dict: @@ -620,10 +620,7 @@ def on_epoch_end(self, runner: "IRunner") -> None: Args: runner: current runner """ - if ( - runner.stage_name.startswith("infer") - or runner.is_distributed_worker - ): + if runner.stage.startswith("infer") or runner.is_distributed_worker: return if self.save_n_best > 0: @@ -644,10 +641,7 @@ def on_stage_end(self, runner: "IRunner") -> None: Args: runner: current runner """ - if ( - runner.stage_name.startswith("infer") - or runner.is_distributed_worker - ): + if runner.stage.startswith("infer") or runner.is_distributed_worker: return log_message = "Top best models:\n" # store latest state @@ -741,13 +735,13 @@ def _get_checkpoint_suffix(self, checkpoint: dict) -> str: Args: checkpoint: checkpoint dict, - should contain ``stage_name`` and ``epoch`` keys. + should contain ``stage`` and ``epoch`` keys. Returns: str: checkpoint suffix """ result = ( - f"{checkpoint['stage_name']}." + f"{checkpoint['stage']}." f"epoch.{checkpoint['epoch']}." f"iter.{self._iteration_counter}" ) diff --git a/catalyst/callbacks/control_flow.py b/catalyst/callbacks/control_flow.py index 2909df0bf0..c35ed35caa 100644 --- a/catalyst/callbacks/control_flow.py +++ b/catalyst/callbacks/control_flow.py @@ -378,8 +378,8 @@ def on_loader_start(self, runner: "IRunner") -> None: Args: runner: current runner """ - stage = runner.stage_name - loader = runner.loader_name + stage = runner.stage + loader = runner.loader_key epoch = runner.global_epoch if self.use_global_epochs else runner.epoch if self.filter_fn is not None: diff --git a/catalyst/callbacks/criterion.py b/catalyst/callbacks/criterion.py index 0014d50360..1814fc7fcc 100644 --- a/catalyst/callbacks/criterion.py +++ b/catalyst/callbacks/criterion.py @@ -1,6 +1,7 @@ from typing import Dict, List, TYPE_CHECKING, Union from catalyst.callbacks.metric import IBatchMetricCallback +from catalyst.utils.misc import get_attr if TYPE_CHECKING: from catalyst.core.runner import IRunner @@ -55,8 +56,8 @@ def on_stage_start(self, runner: "IRunner"): Args: runner: current runner """ - criterion = runner.get_attr( - key="criterion", inner_key=self.criterion_key + criterion = get_attr( + runner, key="criterion", inner_key=self.criterion_key ) assert criterion is not None self._criterion = criterion diff --git a/catalyst/callbacks/early_stop.py b/catalyst/callbacks/early_stop.py index 9cf1b112ee..789b89714d 100644 --- a/catalyst/callbacks/early_stop.py +++ b/catalyst/callbacks/early_stop.py @@ -177,7 +177,7 @@ def on_epoch_end(self, runner: "IRunner") -> None: Args: runner: current runner """ - if runner.stage_name.startswith("infer"): + if runner.stage.startswith("infer"): return score = runner.valid_metrics[self.metric] diff --git a/catalyst/callbacks/logging.py b/catalyst/callbacks/logging.py index 6e2a24b2d1..3b4972677c 100644 --- a/catalyst/callbacks/logging.py +++ b/catalyst/callbacks/logging.py @@ -66,7 +66,7 @@ def on_loader_start(self, runner: "IRunner"): self.tqdm = tqdm( total=runner.loader_len, desc=f"{runner.epoch}/{runner.num_epochs}" - f" * Epoch ({runner.loader_name})", + f" * Epoch ({runner.loader_key})", leave=True, ncols=0, file=sys.stdout, @@ -222,9 +222,9 @@ def on_stage_start(self, runner: "IRunner") -> None: def on_loader_start(self, runner: "IRunner"): """Prepare tensorboard writers for the current stage.""" - if runner.loader_name not in self.loggers: - log_dir = os.path.join(runner.logdir, f"{runner.loader_name}_log") - self.loggers[runner.loader_name] = SummaryWriter(log_dir) + if runner.loader_key not in self.loggers: + log_dir = os.path.join(runner.logdir, f"{runner.loader_key}_log") + self.loggers[runner.loader_key] = SummaryWriter(log_dir) def on_batch_end(self, runner: "IRunner"): """Translate batch metrics to tensorboard.""" @@ -232,7 +232,7 @@ def on_batch_end(self, runner: "IRunner"): return if self.log_on_batch_end: - mode = runner.loader_name + mode = runner.loader_key metrics = runner.batch_metrics self._log_metrics( metrics=metrics, diff --git a/catalyst/callbacks/metric.py b/catalyst/callbacks/metric.py index 43ef3a9e6e..96280358f3 100644 --- a/catalyst/callbacks/metric.py +++ b/catalyst/callbacks/metric.py @@ -498,7 +498,7 @@ def on_loader_end(self, runner: "IRunner") -> None: value = value.mean runner.loader_metrics[key] = value for key, value in runner.loader_metrics.items(): - runner.epoch_metrics[f"{runner.loader_name}_{key}"] = value + runner.epoch_metrics[f"{runner.loader_key}_{key}"] = value # backward compatibility diff --git a/catalyst/callbacks/metrics/cmc_score.py b/catalyst/callbacks/metrics/cmc_score.py index c913b1c71e..c0c09f8ec2 100644 --- a/catalyst/callbacks/metrics/cmc_score.py +++ b/catalyst/callbacks/metrics/cmc_score.py @@ -121,7 +121,7 @@ def on_batch_end(self, runner: "IRunner"): def on_loader_start(self, runner: "IRunner"): """On loader start action""" - dataset = runner.loaders[runner.loader_name].dataset + dataset = runner.loaders[runner.loader_key].dataset assert isinstance(dataset, QueryGalleryDataset) self._query_size = dataset.query_size self._gallery_size = dataset.gallery_size diff --git a/catalyst/callbacks/optimizer.py b/catalyst/callbacks/optimizer.py index bcc90c65ec..c75e7a0c35 100644 --- a/catalyst/callbacks/optimizer.py +++ b/catalyst/callbacks/optimizer.py @@ -7,7 +7,7 @@ from catalyst import registry from catalyst.core.callback import Callback, CallbackNode, CallbackOrder from catalyst.typing import Optimizer -from catalyst.utils.misc import maybe_recursive_call +from catalyst.utils.misc import get_attr, maybe_recursive_call from catalyst.utils.torch import get_optimizer_momentum if TYPE_CHECKING: @@ -151,8 +151,8 @@ def on_stage_start(self, runner: "IRunner") -> None: Args: runner(IRunner): current runner """ - self._optimizer = runner.get_attr( - key="optimizer", inner_key=self.optimizer_key + self._optimizer = get_attr( + runner, key="optimizer", inner_key=self.optimizer_key ) # device based optimization step if runner.device.type == "xla": @@ -328,8 +328,8 @@ def on_stage_start(self, runner: "IRunner") -> None: """ from torch.cuda.amp import GradScaler - self._optimizer = runner.get_attr( - key="optimizer", inner_key=self.optimizer_key + self._optimizer = get_attr( + runner, key="optimizer", inner_key=self.optimizer_key ) self.scaler = GradScaler() assert self._optimizer is not None diff --git a/catalyst/callbacks/scheduler.py b/catalyst/callbacks/scheduler.py index d820ba965d..256112e7b1 100644 --- a/catalyst/callbacks/scheduler.py +++ b/catalyst/callbacks/scheduler.py @@ -5,6 +5,7 @@ from catalyst.contrib.nn.schedulers import BatchScheduler, OneCycleLRWithWarmup from catalyst.core.callback import Callback, CallbackNode, CallbackOrder +from catalyst.utils.misc import get_attr from catalyst.utils.torch import get_optimizer_momentum if TYPE_CHECKING: @@ -173,8 +174,8 @@ def on_stage_start(self, runner: "IRunner") -> None: """ self.reduced_metric = self.reduced_metric or runner.main_metric - scheduler = runner.get_attr( - key="scheduler", inner_key=self.scheduler_key + scheduler = get_attr( + runner, key="scheduler", inner_key=self.scheduler_key ) assert scheduler is not None self._scheduler = scheduler @@ -297,8 +298,8 @@ def on_stage_start(self, runner: "IRunner") -> None: Args: runner: current runner """ - optimizer = runner.get_attr( - key="optimizer", inner_key=self.optimizer_key + optimizer = get_attr( + runner, key="optimizer", inner_key=self.optimizer_key ) assert optimizer is not None self._optimizer = optimizer diff --git a/catalyst/callbacks/tests/test_control_flow_callback.py b/catalyst/callbacks/tests/test_control_flow_callback.py index af985e409d..5ab70c9987 100644 --- a/catalyst/callbacks/tests/test_control_flow_callback.py +++ b/catalyst/callbacks/tests/test_control_flow_callback.py @@ -12,9 +12,9 @@ class _Runner: - def __init__(self, stage_name, loader_name, global_epoch, epoch): - self.stage_name = stage_name - self.loader_name = loader_name + def __init__(self, stage, loader_key, global_epoch, epoch): + self.stage = stage + self.loader_key = loader_key self.global_epoch = global_epoch self.epoch = epoch @@ -448,7 +448,7 @@ def test_ignore_foo_with_wrong_args(self): ) def test_filter_fn_with_wrong_args(self): - runner = Mock(stage_name="stage1", loader_name="train", epoch=1) + runner = Mock(stage="stage1", loader_key="train", epoch=1) orders = ( CallbackOrder.Internal, CallbackOrder.Metric, @@ -504,7 +504,7 @@ def _raise_foo(stage: str, epoch: int, loader: str) -> bool: wrapper.__getattribute__(event)(runner) def test_filter_fn_with_eval(self): - runner = Mock(stage_name="stage1", loader_name="train", epoch=1) + runner = Mock(stage="stage1", loader_key="train", epoch=1) orders = ( CallbackOrder.Internal, CallbackOrder.Metric, diff --git a/catalyst/callbacks/tests/test_early_stop.py b/catalyst/callbacks/tests/test_early_stop.py index 27229480c7..5eef1bc9e4 100644 --- a/catalyst/callbacks/tests/test_early_stop.py +++ b/catalyst/callbacks/tests/test_early_stop.py @@ -7,7 +7,7 @@ def test_patience1(): """Tests EarlyStoppingCallback.""" early_stop = EarlyStoppingCallback(1) runner = MagicMock() - type(runner).stage_name = PropertyMock(return_value="training") + type(runner).stage = PropertyMock(return_value="training") type(runner).valid_metrics = PropertyMock(return_value={"loss": 0.001}) stop_mock = PropertyMock(return_value=False) type(runner).need_early_stop = stop_mock diff --git a/catalyst/callbacks/tests/test_periodic_loader_callback.py b/catalyst/callbacks/tests/test_periodic_loader_callback.py index be1190d7c4..5aaf181000 100644 --- a/catalyst/callbacks/tests/test_periodic_loader_callback.py +++ b/catalyst/callbacks/tests/test_periodic_loader_callback.py @@ -746,7 +746,7 @@ def __init__(self, values): self.values = values def on_loader_end(self, runner: "IRunner") -> None: - score = self.values[runner.loader_name][runner.epoch] + score = self.values[runner.loader_key][runner.epoch] runner.loader_metrics["metric"] = score old_stdout = sys.stdout diff --git a/catalyst/callbacks/tests/test_wrapper_callback.py b/catalyst/callbacks/tests/test_wrapper_callback.py index 02614d77e6..ea10df1d03 100644 --- a/catalyst/callbacks/tests/test_wrapper_callback.py +++ b/catalyst/callbacks/tests/test_wrapper_callback.py @@ -22,7 +22,7 @@ def __init__(self, order, method_to_raise: str): class TestWrapperCallback(unittest.TestCase): def test_enabled(self): - runner = Mock(stage_name="stage1", loader_name="train", epoch=1) + runner = Mock(stage="stage1", loader_key="train", epoch=1) orders = ( CallbackOrder.Internal, @@ -56,7 +56,7 @@ def test_enabled(self): wrapper.__getattribute__(event)(runner) def test_disabled(self): - runner = Mock(stage_name="stage1", loader_name="train", epoch=1) + runner = Mock(stage="stage1", loader_key="train", epoch=1) orders = ( CallbackOrder.Internal, diff --git a/catalyst/callbacks/validation.py b/catalyst/callbacks/validation.py index 48a7724706..f88dc6d50a 100644 --- a/catalyst/callbacks/validation.py +++ b/catalyst/callbacks/validation.py @@ -33,7 +33,7 @@ def on_epoch_end(self, runner: "IRunner") -> None: Args: runner: current runner """ - if runner.stage_name.startswith("infer"): + if runner.stage.startswith("infer"): return runner.valid_metrics = { diff --git a/catalyst/contrib/callbacks/alchemy_logger.py b/catalyst/contrib/callbacks/alchemy_logger.py index 32272a83f1..0e720bd7d6 100644 --- a/catalyst/contrib/callbacks/alchemy_logger.py +++ b/catalyst/contrib/callbacks/alchemy_logger.py @@ -108,7 +108,7 @@ def _log_metrics( def on_batch_end(self, runner: "IRunner"): """Translate batch metrics to Alchemy.""" if self.log_on_batch_end: - mode = runner.loader_name + mode = runner.loader_key metrics = runner.batch_metrics self._log_metrics( metrics=metrics, @@ -120,7 +120,7 @@ def on_batch_end(self, runner: "IRunner"): def on_loader_end(self, runner: "IRunner"): """Translate loader metrics to Alchemy.""" if self.log_on_epoch_end: - mode = runner.loader_name + mode = runner.loader_key metrics = runner.loader_metrics self._log_metrics( metrics=metrics, diff --git a/catalyst/contrib/callbacks/confusion_matrix_logger.py b/catalyst/contrib/callbacks/confusion_matrix_logger.py index a8f6465811..34f35e19f5 100644 --- a/catalyst/contrib/callbacks/confusion_matrix_logger.py +++ b/catalyst/contrib/callbacks/confusion_matrix_logger.py @@ -146,7 +146,7 @@ def on_loader_end(self, runner: "IRunner"): if runner.distributed_rank <= 0: tb_callback = runner.callbacks[self.tensorboard_callback_name] self._plot_confusion_matrix( - logger=tb_callback.loggers[runner.loader_name], + logger=tb_callback.loggers[runner.loader_key], epoch=runner.global_epoch, confusion_matrix=confusion_matrix, class_names=class_names, diff --git a/catalyst/contrib/callbacks/inference_callback.py b/catalyst/contrib/callbacks/inference_callback.py index 5ffcae3361..30c618b937 100644 --- a/catalyst/contrib/callbacks/inference_callback.py +++ b/catalyst/contrib/callbacks/inference_callback.py @@ -72,7 +72,7 @@ def on_loader_end(self, runner: "IRunner"): } if self.out_prefix is not None: for key, value in self.predictions.items(): - suffix = ".".join([runner.loader_name, key]) + suffix = ".".join([runner.loader_key, key]) np.save(f"{self.out_prefix}/{suffix}.npy", value) diff --git a/catalyst/contrib/callbacks/knn_metric.py b/catalyst/contrib/callbacks/knn_metric.py index 4b36bc5bb9..0f60a2d63d 100644 --- a/catalyst/contrib/callbacks/knn_metric.py +++ b/catalyst/contrib/callbacks/knn_metric.py @@ -203,7 +203,7 @@ def on_loader_end(self, runner: "IRunner") -> None: "labels": self.targets, } - self.sets[runner.loader_name] = s + self.sets[runner.loader_key] = s y_true, y_pred = self._knn(s) diff --git a/catalyst/contrib/callbacks/mask_inference.py b/catalyst/contrib/callbacks/mask_inference.py index 94137351d6..22c9294767 100644 --- a/catalyst/contrib/callbacks/mask_inference.py +++ b/catalyst/contrib/callbacks/mask_inference.py @@ -75,7 +75,7 @@ def on_loader_start(self, runner: "IRunner"): Args: runner: current runner """ - lm = runner.loader_name + lm = runner.loader_key os.makedirs(f"{self.out_prefix}/{lm}/", exist_ok=True) def on_batch_end(self, runner: "IRunner"): @@ -84,7 +84,7 @@ def on_batch_end(self, runner: "IRunner"): Args: runner: current runner """ - lm = runner.loader_name + lm = runner.loader_key names = runner.input.get(self.name_key, []) features = runner.input[self.input_key].detach().cpu() diff --git a/catalyst/contrib/callbacks/neptune_logger.py b/catalyst/contrib/callbacks/neptune_logger.py index 758b026cd8..84da438575 100644 --- a/catalyst/contrib/callbacks/neptune_logger.py +++ b/catalyst/contrib/callbacks/neptune_logger.py @@ -142,7 +142,7 @@ def _log_metrics( def on_batch_end(self, runner: "IRunner"): """Log batch metrics to Neptune.""" if self.log_on_batch_end: - mode = runner.loader_name + mode = runner.loader_key metrics = runner.batch_metrics self._log_metrics( metrics=metrics, @@ -154,7 +154,7 @@ def on_batch_end(self, runner: "IRunner"): def on_loader_end(self, runner: "IRunner"): """Translate epoch metrics to Neptune.""" if self.log_on_epoch_end: - mode = runner.loader_name + mode = runner.loader_key metrics = runner.loader_metrics self._log_metrics( metrics=metrics, diff --git a/catalyst/contrib/callbacks/telegram_logger.py b/catalyst/contrib/callbacks/telegram_logger.py index 5a3cf85ecb..7edbebe3d7 100644 --- a/catalyst/contrib/callbacks/telegram_logger.py +++ b/catalyst/contrib/callbacks/telegram_logger.py @@ -76,7 +76,7 @@ def _send_text(self, text: str): def on_stage_start(self, runner: "IRunner"): """Notify about starting a new stage.""" if self.log_on_stage_start: - text = f"{runner.stage_name} stage was started" + text = f"{runner.stage} stage was started" self._send_text(text) @@ -84,7 +84,7 @@ def on_loader_start(self, runner: "IRunner"): """Notify about starting running the new loader.""" if self.log_on_loader_start: text = ( - f"{runner.loader_name} {runner.global_epoch} epoch has started" + f"{runner.loader_key} {runner.global_epoch} epoch has started" ) self._send_text(text) @@ -100,7 +100,7 @@ def on_loader_end(self, runner: "IRunner"): metrics_to_log = self.metrics_to_log rows: List[str] = [ - f"{runner.loader_name} {runner.global_epoch}" + f"{runner.loader_key} {runner.global_epoch}" f" epoch was finished:" ] @@ -115,7 +115,7 @@ def on_loader_end(self, runner: "IRunner"): def on_stage_end(self, runner: "IRunner"): """Notify about finishing a stage.""" if self.log_on_stage_end: - text = f"{runner.stage_name} stage was finished" + text = f"{runner.stage} stage was finished" self._send_text(text) diff --git a/catalyst/contrib/callbacks/wandb_logger.py b/catalyst/contrib/callbacks/wandb_logger.py index d48c87bbfa..ac7b241b1c 100644 --- a/catalyst/contrib/callbacks/wandb_logger.py +++ b/catalyst/contrib/callbacks/wandb_logger.py @@ -155,7 +155,7 @@ def on_stage_end(self, runner: "IRunner"): def on_batch_end(self, runner: "IRunner"): """Translate batch metrics to Weights & Biases.""" if self.log_on_batch_end: - mode = runner.loader_name + mode = runner.loader_key metrics = runner.batch_metrics self._log_metrics( metrics=metrics, @@ -168,7 +168,7 @@ def on_batch_end(self, runner: "IRunner"): def on_loader_end(self, runner: "IRunner"): """Translate loader metrics to Weights & Biases.""" if self.log_on_epoch_end: - mode = runner.loader_name + mode = runner.loader_key metrics = runner.loader_metrics self._log_metrics( metrics=metrics, diff --git a/catalyst/core/callback.py b/catalyst/core/callback.py index b5793ddf82..511c51b51e 100644 --- a/catalyst/core/callback.py +++ b/catalyst/core/callback.py @@ -5,6 +5,102 @@ from catalyst.core.runner import IRunner +class ICallback: + def on_experiment_start(self, runner: "IRunner") -> None: + """Event handler for experiment start. + + Args: + runner: IRunner instance. + + .. note:: + This event work only on IRunner. + """ + pass + + def on_stage_start(self, runner: "IRunner") -> None: + """Event handler for stage start. + + Args: + runner: IRunner instance. + """ + pass + + def on_epoch_start(self, runner: "IRunner") -> None: + """Event handler for epoch start. + + Args: + runner: IRunner instance. + """ + pass + + def on_loader_start(self, runner: "IRunner") -> None: + """Event handler for loader start. + + Args: + runner: IRunner instance. + """ + pass + + def on_batch_start(self, runner: "IRunner") -> None: + """Event handler for batch start. + + Args: + runner: IRunner instance. + """ + pass + + def on_batch_end(self, runner: "IRunner") -> None: + """Event handler for batch end. + + Args: + runner: IRunner instance. + """ + pass + + def on_loader_end(self, runner: "IRunner") -> None: + """Event handler for loader end. + + Args: + runner: IRunner instance. + """ + pass + + def on_epoch_end(self, runner: "IRunner") -> None: + """Event handler for epoch end. + + Args: + runner: IRunner instance. + """ + pass + + def on_stage_end(self, runner: "IRunner") -> None: + """Event handler for stage end. + + Args: + runner: IRunner instance. + """ + pass + + def on_experiment_end(self, runner: "IRunner") -> None: + """Event handler for experiment end. + + Args: + runner: IRunner instance. + + .. note:: + This event work only on IRunner. + """ + pass + + def on_exception(self, runner: "IRunner") -> None: + """Event handler for exception case. + + Args: + runner: IRunner instance. + """ + pass + + class CallbackNode(IntFlag): """Callback node usage flag during distributed training. @@ -78,7 +174,7 @@ class CallbackScope(IntFlag): Experiment = experiment = 1 # noqa: WPS115 -class Callback: +class Callback(ICallback): """ An abstraction that lets you customize your experiment run logic. To give users maximum flexibility and extensibility Catalyst supports @@ -112,11 +208,11 @@ class Callback: Abstraction, please check out the implementations: - - :py:mod:`catalyst.core.callbacks.criterion.CriterionCallback` - - :py:mod:`catalyst.core.callbacks.optimizer.OptimizerCallback` - - :py:mod:`catalyst.core.callbacks.scheduler.SchedulerCallback` - - :py:mod:`catalyst.core.callbacks.logging.TensorboardLogger` - - :py:mod:`catalyst.core.callbacks.checkpoint.CheckpointCallback` + - :py:mod:`catalyst.callbacks.criterion.CriterionCallback` + - :py:mod:`catalyst.callbacks.optimizer.OptimizerCallback` + - :py:mod:`catalyst.callbacks.scheduler.SchedulerCallback` + - :py:mod:`catalyst.callbacks.logging.TensorboardLogger` + - :py:mod:`catalyst.callbacks.checkpoint.CheckpointCallback` """ def __init__( @@ -136,78 +232,6 @@ def __init__( self.order = order self.scope = scope - def on_stage_start(self, runner: "IRunner"): - """Event handler for stage start. - - Args: - runner: IRunner instance. - """ - pass - - def on_stage_end(self, runner: "IRunner"): - """Event handler for stage end. - - Args: - runner: IRunner instance. - """ - pass - - def on_epoch_start(self, runner: "IRunner"): - """Event handler for epoch start. - - Args: - runner: IRunner instance. - """ - pass - - def on_epoch_end(self, runner: "IRunner"): - """Event handler for epoch end. - - Args: - runner: IRunner instance. - """ - pass - - def on_loader_start(self, runner: "IRunner"): - """Event handler for loader start. - - Args: - runner: IRunner instance. - """ - pass - - def on_loader_end(self, runner: "IRunner"): - """Event handler for loader end. - - Args: - runner: IRunner instance. - """ - pass - - def on_batch_start(self, runner: "IRunner"): - """Event handler for batch start. - - Args: - runner: IRunner instance. - """ - pass - - def on_batch_end(self, runner: "IRunner"): - """Event handler for batch end. - - Args: - runner: IRunner instance. - """ - pass - - def on_exception(self, runner: "IRunner"): - """Event handler for exception case. - - Args: - runner: IRunner instance. - """ - pass - class CallbackWrapper(Callback): """Enable/disable callback execution.""" @@ -232,79 +256,79 @@ def __init__(self, base_callback: Callback, enable_callback: bool = True): self.callback = base_callback self._is_enabled = enable_callback - def on_loader_start(self, runner: "IRunner") -> None: - """ - Check if current epoch should be skipped. + def on_stage_start(self, runner: "IRunner") -> None: + """Run base_callback (if possible) Args: runner: current runner """ if self._is_enabled: - self.callback.on_loader_start(runner) + self.callback.on_stage_start(runner) - def on_loader_end(self, runner: "IRunner") -> None: - """ - Reset status of callback + def on_epoch_start(self, runner: "IRunner") -> None: + """Run base_callback (if possible) Args: runner: current runner """ if self._is_enabled: - self.callback.on_loader_end(runner) + self.callback.on_epoch_start(runner) - def on_stage_start(self, runner: "IRunner") -> None: - """Run base_callback (if possible) + def on_loader_start(self, runner: "IRunner") -> None: + """ + Check if current epoch should be skipped. Args: runner: current runner """ if self._is_enabled: - self.callback.on_stage_start(runner) + self.callback.on_loader_start(runner) - def on_stage_end(self, runner: "IRunner") -> None: + def on_batch_start(self, runner: "IRunner") -> None: """Run base_callback (if possible) Args: runner: current runner """ if self._is_enabled: - self.callback.on_stage_end(runner) + self.callback.on_batch_start(runner) - def on_epoch_start(self, runner: "IRunner") -> None: + def on_batch_end(self, runner: "IRunner") -> None: """Run base_callback (if possible) Args: runner: current runner """ if self._is_enabled: - self.callback.on_epoch_start(runner) + self.callback.on_batch_end(runner) - def on_epoch_end(self, runner: "IRunner") -> None: - """Run base_callback (if possible) + def on_loader_end(self, runner: "IRunner") -> None: + """ + Reset status of callback Args: runner: current runner """ if self._is_enabled: - self.callback.on_epoch_end(runner) + self.callback.on_loader_end(runner) - def on_batch_start(self, runner: "IRunner") -> None: + def on_epoch_end(self, runner: "IRunner") -> None: """Run base_callback (if possible) Args: runner: current runner """ if self._is_enabled: - self.callback.on_batch_start(runner) + self.callback.on_epoch_end(runner) - def on_batch_end(self, runner: "IRunner") -> None: + def on_stage_end(self, runner: "IRunner") -> None: """Run base_callback (if possible) Args: runner: current runner """ if self._is_enabled: - self.callback.on_batch_end(runner) + self.callback.on_stage_end(runner) def on_exception(self, runner: "IRunner") -> None: """Run base_callback (if possible) diff --git a/catalyst/core/legacy.py b/catalyst/core/legacy.py index 550d5f68af..a999daa138 100644 --- a/catalyst/core/legacy.py +++ b/catalyst/core/legacy.py @@ -1,4 +1,5 @@ # flake8: noqa +from typing import Any import warnings @@ -72,6 +73,34 @@ def loader_step(self): ) return self.loader_batch_step + @property + def loader_name(self): + """Alias for `runner.loader_key`. + + .. warning:: + Deprecated, saved for backward compatibility. + Please use `runner.loader_key` instead. + """ + warnings.warn( + "`loader_name` was deprecated, please use `loader_key` instead", + DeprecationWarning, + ) + return self.loader_key + + @property + def stage_name(self): + """Alias for `runner.stage`. + + .. warning:: + Deprecated, saved for backward compatibility. + Please use `runner.stage` instead. + """ + warnings.warn( + "`stage_name` was deprecated, please use `stage` instead", + DeprecationWarning, + ) + return self.stage + @property def state(self): """Alias for `runner`. @@ -81,7 +110,7 @@ def state(self): Please use `runner` instead. """ warnings.warn( - "`runner.state` was deprecated, " "please use `runner` instead", + "`runner.state` was deprecated, please use `runner` instead", DeprecationWarning, ) return self diff --git a/catalyst/core/runner.py b/catalyst/core/runner.py index 825f419d4f..6434da7f15 100644 --- a/catalyst/core/runner.py +++ b/catalyst/core/runner.py @@ -1,13 +1,14 @@ -from typing import Any, Callable, Dict, Mapping, Optional, Tuple, Union +from typing import Any, Dict, Mapping, Optional, Tuple, Union from abc import ABC, abstractmethod from collections import defaultdict, OrderedDict +from functools import lru_cache from pathlib import Path import torch from torch import nn from torch.utils.data import DataLoader, DistributedSampler -from catalyst.core.callback import Callback, CallbackScope +from catalyst.core.callback import Callback, CallbackScope, ICallback from catalyst.core.experiment import IExperiment from catalyst.core.functional import ( filter_callbacks_by_node, @@ -15,17 +16,13 @@ ) from catalyst.core.legacy import IRunnerLegacy from catalyst.settings import SETTINGS -from catalyst.tools.frozen_class import FrozenClass from catalyst.typing import ( - Criterion, Device, Model, - Optimizer, RunnerCriterion, RunnerModel, RunnerOptimizer, RunnerScheduler, - Scheduler, ) from catalyst.utils.components import process_components from catalyst.utils.distributed import get_rank @@ -34,6 +31,11 @@ from catalyst.utils.torch import any2device +@lru_cache(maxsize=42) +def _is_substring(origin_string: str, strings: Tuple): + return any(x in origin_string for x in strings) + + class RunnerException(Exception): """Exception class for all runner errors.""" @@ -45,7 +47,7 @@ def __init__(self, message: str): super().__init__(message) -class IRunner(ABC, IRunnerLegacy, FrozenClass): +class IRunner(ABC, ICallback, IRunnerLegacy): """ An abstraction that knows how to run an experiment. It contains all the logic of **how** to run the experiment, @@ -281,11 +283,11 @@ class IRunner(ABC, IRunnerLegacy, FrozenClass): Stage info section - **runner.stage_name** - string, current stage name,\ + **runner.stage** - string, current stage name,\ for example, :: - runner.stage_name = "pretraining" / "training" / "finetuning" / etc + runner.stage = "pretraining" / "training" / "finetuning" / etc **runner.num_epochs** - int, maximum number of epochs, \ required for this stage @@ -359,10 +361,8 @@ class IRunner(ABC, IRunnerLegacy, FrozenClass): """ - _experiment_fn: Callable = IExperiment - def __init__( - self, model: RunnerModel = None, device: Device = None, **kwargs, + self, model: RunnerModel = None, device: Device = None, ): """ Args: @@ -371,10 +371,8 @@ def __init__( """ self._device = None self._model = None + self.experiment = None self._prepare_inner_state(model=model, device=device) - self._unfreeze() - self._init(**kwargs) - self._freeze() def _prepare_inner_state( self, @@ -396,13 +394,13 @@ def _prepare_inner_state( verbose: bool = False, **kwargs, ): - self._unfreeze() - + # @TODO: move/split this method to callbacks group + # here should be only a small part of it # main runner components: model and device to run self.device: Device = device self.model: RunnerModel = model - # extra experiment components, + # experiment components, # use `catalyst.core.IExperiment` to setup them self.criterion: RunnerCriterion = criterion self.optimizer: RunnerOptimizer = optimizer @@ -411,6 +409,7 @@ def _prepare_inner_state( self.callbacks: Dict[str, "Callback"] = callbacks or {} # the data + self.loader = None self.loaders: OrderedDict[str, DataLoader] = loaders # and the dataflow - model input, model output self.input = None @@ -443,7 +442,7 @@ def _prepare_inner_state( self.is_best_valid: bool = False self.best_valid_metrics: Dict = defaultdict(None) - # distributed info + # distributed info (@TODO: move to Engine?) self.distributed_rank: int = get_rank() self.is_distributed_master: bool = ~(self.distributed_rank > 0) self.is_distributed_worker: bool = self.distributed_rank > 0 @@ -457,8 +456,8 @@ def _prepare_inner_state( self.need_exception_reraise: bool = True # stage info self.num_epochs: int = num_epochs - self.stage_name: str = stage - self.is_infer_stage: bool = self.stage_name.startswith( + self.stage: str = stage + self.is_infer_stage: bool = self.stage.startswith( SETTINGS.stage_infer_prefix ) # epoch info @@ -466,7 +465,7 @@ def _prepare_inner_state( # loader info self.loader_sample_step: int = 0 self.loader_batch_step: int = 0 - self.loader_name: str = None + self.loader_key: str = None self.loader_len: int = 0 self.loader_batch_size = 0 self.is_train_loader: bool = False @@ -488,15 +487,6 @@ def _prepare_inner_state( for key, value in kwargs.items(): setattr(self, key, value) - self._freeze() - - def _init(self, **kwargs) -> None: - """ - Inner method for children's classes - to specify type for Runners' Experiment. - """ - self.experiment: IExperiment = None - @property def model(self) -> Model: """Returns the runner's model instance.""" @@ -576,322 +566,250 @@ def device(self, value: Device): self._model, "to", device=self._device or "cpu" ) - @staticmethod - def _get_experiment_components( - experiment: IExperiment, stage: str = None, device: Device = None, - ) -> Tuple[Model, Criterion, Optimizer, Scheduler, Device]: - """ - Inner method for `Experiment` components preparation. - - Check available torch device, takes model from the experiment - and creates stage-specified criterion, optimizer, scheduler for it. + def on_experiment_start(self, runner: "IRunner"): + """Event handler for experiment start. Args: - stage: experiment stage name of interest - like "pretrain" / "train" / "finetune" / etc + runner: IRunner instance. - Returns: - tuple: model, criterion, optimizer, - scheduler and device for a given stage and model + .. note:: + This event work only on IRunner. """ - model = experiment.get_model(stage) - criterion = experiment.get_criterion(stage) - optimizer = experiment.get_optimizer(stage, model) - scheduler = experiment.get_scheduler(stage, optimizer) - model, criterion, optimizer, scheduler, device = process_components( - model=model, - criterion=criterion, - optimizer=optimizer, - scheduler=scheduler, - distributed_params=experiment.distributed_params, - device=device, - ) - return model, criterion, optimizer, scheduler, device + assert self.experiment is not None - @staticmethod - def _get_experiment_callbacks( - experiment: IExperiment, stage: str, - ) -> Dict[str, Callback]: - """Inner method for `Callbacks` preparation. + set_global_seed(self.experiment.initial_seed + self.global_epoch + 1) - Takes callbacks from the Experiment - and filters them for distributed master/worker cases. + def on_stage_start(self, runner: "IRunner"): + """Event handler for stage start. Args: - stage: stage name of interest, - like "pretrain" / "train" / "finetune" / etc - - Returns: - OrderedDict[str, Callback]: Ordered dictionary - with callbacks for current experiment stage. + runner: IRunner instance. """ - callbacks = experiment.get_callbacks(stage) - callbacks = filter_callbacks_by_node(callbacks) - callbacks = sort_callbacks_by_order(callbacks) - return callbacks + assert self.stage is not None - def get_attr(self, key: str, inner_key: str = None) -> Any: - """ - Alias for python `getattr` method. Useful for Callbacks preparation - and cases with multi-criterion, multi-optimizer setup. - For example, when you would like to train multi-task classification. - - Used to get a named attribute from a `IRunner` by `key` keyword; - for example\ - :: - - # example 1 - runner.get_attr("criterion") - # is equivalent to - runner.criterion - - # example 2 - runner.get_attr("optimizer") - # is equivalent to - runner.optimizer - - # example 3 - runner.get_attr("scheduler") - # is equivalent to - runner.scheduler - - With `inner_key` usage, it suppose to find a dictionary under `key`\ - and would get `inner_key` from this dict; for example, - :: - - # example 1 - runner.get_attr("criterion", "bce") - # is equivalent to - runner.criterion["bce"] - - # example 2 - runner.get_attr("optimizer", "adam") - # is equivalent to - runner.optimizer["adam"] - - # example 3 - runner.get_attr("scheduler", "adam") - # is equivalent to - runner.scheduler["adam"] + set_global_seed(self.experiment.initial_seed + self.global_epoch + 1) + + def on_epoch_start(self, runner: "IRunner"): + """Event handler for epoch start. Args: - key: name for attribute of interest, - like `criterion`, `optimizer`, `scheduler` - inner_key: name of inner dictionary key + runner: IRunner instance. - Returns: - inner attribute + Raises: + RunnerException: if current DataLoader is empty. """ - if inner_key is None: - return getattr(self, key) - else: - return getattr(self, key)[inner_key] + assert self.loaders is not None - def _prepare_for_stage(self, stage: str) -> None: - """ - Inner method to prepare `Runner` for the specified stage. + for loader_key, loader in self.loaders.items(): + if len(loader) == 0: + raise RunnerException( + f"DataLoader with name {loader_key} is empty." + ) - Args: - stage: stage name of interest, - like "pretrain" / "train" / "finetune" / etc - """ - pass + if not self.is_infer_stage: + assert self.valid_loader in self.loaders.keys(), ( + f"'{self.valid_loader}' " + f"should be in provided loaders: {list(self.loaders.keys())}" + ) + else: + assert not any( + x.startswith(SETTINGS.loader_train_prefix) + for x in self.loaders.keys() + ), "for inference no train loader should be passed" - def _prepare_for_epoch(self, stage: str, epoch: int) -> None: - """ - Inner method to prepare `Runner` for the specified stage and epoch. + set_global_seed(self.experiment.initial_seed + self.global_epoch + 1) + + def on_loader_start(self, runner: "IRunner"): + """Event handler for loader start. Args: - stage: stage name of interest, - like "pretrain" / "train" / "finetune" / etc - epoch: epoch index + runner: IRunner instance. + + Raises: + RunnerException: if current DataLoader is empty. """ - pass + assert self.loader is not None - def _run_event(self, event: str) -> None: - """Inner method to run specified event on Runners' callbacks. + self.loader_len = len(self.loader) + if self.loader_len == 0: + raise RunnerException( + f"DataLoader with name {self.loader_key} is empty." + ) + self.loader_batch_size = ( + self.loader.batch_sampler.batch_size + if self.loader.batch_sampler is not None + else self.loader.batch_size + ) + self.loader_sample_step = 0 - Args: - event(str): event name to run on callbacks. + self.is_train_loader = self.loader_key.startswith( + SETTINGS.loader_train_prefix + ) + self.is_valid_loader = self.loader_key.startswith( + SETTINGS.loader_valid_prefix + ) + self.is_infer_loader = self.loader_key.startswith( + SETTINGS.loader_infer_prefix + ) + maybe_recursive_call(self.model, "train", mode=self.is_train_loader) - .. note:: - To learn more about Catalyst Callbacks mechanism, please follow - :py:mod:`catalyst.core.callback.Callback` documentation. + if isinstance(self.loader.sampler, DistributedSampler): + self.loader.sampler.set_epoch(self.epoch) - """ - for callback in self.callbacks.values(): - getattr(callback, event)(self) + set_global_seed(self.experiment.initial_seed + self.global_epoch + 1) - def _batch2device( - self, batch: Mapping[str, Any], device: Device, - ) -> Mapping[str, Any]: - """ - Inner method to transfer incoming data batches to Runners' device. + def on_batch_start(self, runner: "IRunner"): + """Event handler for batch start. Args: - batch (Mapping[str, Any]): dictionary with data batches - from DataLoader. - device: torch device - - Returns: - Mapping[str, Any]: same structure as value, - but all tensors and np.arrays moved to device + runner: IRunner instance. """ - output = any2device(batch, device) - return output + if isinstance(self.input, dict): + self.batch_size = len(next(iter(self.input.values()))) + else: + self.batch_size = len(self.input[0]) - @abstractmethod - def _handle_batch(self, batch: Mapping[str, Any]) -> None: - """ - Inner method to handle specified data batch. - Used to make a train/valid/infer stage during Experiment run. + self.global_batch_step += 1 + # self.loader_batch_step += 1 + self.global_sample_step += self.batch_size + self.loader_sample_step += self.batch_size + + def on_batch_end(self, runner: "IRunner"): + """Event handler for batch end. Args: - batch (Mapping[str, Any]): dictionary with data batches - from DataLoader. + runner: IRunner instance. """ pass - def _run_batch(self, batch: Mapping[str, Any]) -> None: - """ - Inner method to run train step on specified data batch, - with batch callbacks events. + def on_loader_end(self, runner: "IRunner"): + """Event handler for loader end. Args: - batch (Mapping[str, Any]): dictionary with data batches - from DataLoader. + runner: IRunner instance. """ - if isinstance(batch, dict): - self.batch_size = len(next(iter(batch.values()))) - else: - self.batch_size = len(batch[0]) - self.global_sample_step += self.batch_size - self.loader_sample_step += self.batch_size - batch = self._batch2device(batch, self.device) - self.input = batch + pass - self._run_event("on_batch_start") - self._handle_batch(batch=batch) - self._run_event("on_batch_end") + def on_epoch_end(self, runner: "IRunner"): + """Event handler for epoch end. - def _run_loader(self, loader: DataLoader) -> None: + Args: + runner: IRunner instance. """ - Inner method to pass whole DataLoader through Runner, - with loader callbacks events. + self.global_epoch += 1 + self.epoch += 1 + + def on_stage_end(self, runner: "IRunner"): + """Event handler for stage end. Args: - loader: dataloader to iterate + runner: IRunner instance. """ - if len(loader) == 0: - raise RunnerException( - f"DataLoader with name {self.loader_name} is empty." - ) + pass - self.loader_batch_size = ( - loader.batch_sampler.batch_size - if loader.batch_sampler is not None - else loader.batch_size - ) + def on_experiment_end(self, runner: "IRunner"): + """Event handler for experiment end. - self.loader_sample_step = 0 - for i, batch in enumerate(loader): - self.global_batch_step += 1 - self.loader_batch_step = i + 1 - self._run_batch(batch) - if self.need_early_stop: - self.need_early_stop = False - break + Args: + runner: IRunner instance. - def _run_epoch(self, stage: str, epoch: int) -> None: + .. note:: + This event work only on IRunner. """ - Inner method to run epoch on Runner, - with epoch callbacks events. + pass + + def on_exception(self, runner: "IRunner"): + """Event handler for exception case. Args: - stage: stage name of interest, - like "pretrain" / "train" / "finetune" / etc - epoch: epoch index - """ - self._prepare_for_epoch(stage=stage, epoch=epoch) - assert self.loaders is not None + runner: IRunner instance. - for loader_name, loader in self.loaders.items(): - if len(loader) == 0: - raise RunnerException( - f"DataLoader with name {loader_name} is empty." - ) + Raises: + exception: if during pipeline exception, + no handler we found into callbacks + """ + from catalyst.callbacks.exception import ExceptionCallback - self.is_infer_stage = self.stage_name.startswith("infer") - if not self.is_infer_stage: - assert self.valid_loader in self.loaders.keys(), ( - f"'{self.valid_loader}' " - f"should be in provided loaders: {list(self.loaders.keys())}" + def _exception_handler_check(callbacks: Union[OrderedDict, Dict]): + return callbacks is not None and any( + issubclass(x.__class__, ExceptionCallback) + for x in callbacks.values() ) - else: - assert not any( - x.startswith(SETTINGS.loader_train_prefix) - for x in self.loaders.keys() - ), "for inference no train loader should be passed" - for loader_name, loader in self.loaders.items(): - self.loader_name = loader_name - self.loader_len = len(loader) - self.is_train_loader = loader_name.startswith( - SETTINGS.loader_train_prefix - ) - self.is_valid_loader = loader_name.startswith( - SETTINGS.loader_valid_prefix - ) - self.is_infer_loader = loader_name.startswith( - SETTINGS.loader_infer_prefix - ) - maybe_recursive_call( - self.model, "train", mode=self.is_train_loader, - ) + if not _exception_handler_check(getattr(self, "callbacks", None)): + raise self.exception - if ( - isinstance(loader.sampler, DistributedSampler) - and not self.is_infer_stage - ): - loader.sampler.set_epoch(self.epoch) + def _run_event(self, event: str) -> None: + """Inner method to run specified event on Runners' callbacks. - set_global_seed( - self.experiment.initial_seed + self.global_epoch + 1 - ) - self._run_event("on_loader_start") - with torch.set_grad_enabled(self.is_train_loader): - self._run_loader(loader) - self._run_event("on_loader_end") + Args: + event(str): event name to run on callbacks. + + .. note:: + To learn more about Catalyst Callbacks mechanism, please follow + :py:mod:`catalyst.core.callback.Callback` documentation. - def _run_stage(self, stage: str) -> None: """ - Inner method to run stage on Runner, - with stage callbacks events. + # @TODO: how to remove self duplication? and does it really matter? + if _is_substring(event, ("start", "exception")): + getattr(self, event)(self) + for callback in self.callbacks.values(): + getattr(callback, event)(self) + if _is_substring(event, ("end",)): + getattr(self, event)(self) - Args: - stage: stage name of interest, - like "pretrain" / "train" / "finetune" / etc + def _handle_device(self, batch: Mapping[str, Any]): + return any2device(batch, self.device) + @abstractmethod + def _handle_batch(self, batch: Mapping[str, Any]) -> None: """ - self._prepare_for_stage(stage) + Inner method to handle specified data batch. + Used to make a train/valid/infer stage during Experiment run. + Args: + batch (Mapping[str, Any]): dictionary with data batches + from DataLoader. + """ + pass + + def _run_batch(self) -> None: + self.input = self._handle_device(batch=self.input) + self._run_event("on_batch_start") + self._handle_batch(batch=self.input) + self._run_event("on_batch_end") + + def _run_loader(self) -> None: + self._run_event("on_loader_start") + with torch.set_grad_enabled(self.is_train_loader): + for self.loader_batch_step, self.input in enumerate(self.loader): + self._run_batch() + if self.need_early_stop: + self.need_early_stop = False + break + self._run_event("on_loader_end") + + def _run_epoch(self) -> None: + self._run_event("on_epoch_start") + for self.loader_key, self.loader in self.loaders.items(): + self._run_loader() + self._run_event("on_epoch_end") + + def _run_stage(self) -> None: self._run_event("on_stage_start") while self.epoch < self.num_epochs + 1: - set_global_seed( - self.experiment.initial_seed + self.global_epoch + 1 - ) - self._run_event("on_epoch_start") - self._run_epoch(stage=stage, epoch=self.epoch) - self._run_event("on_epoch_end") - + self._run_epoch() if self.need_early_stop: self.need_early_stop = False break - - self.global_epoch += 1 - self.epoch += 1 self._run_event("on_stage_end") + def _run_experiment(self) -> None: + self._run_event("on_experiment_start") + for self.stage in self.experiment.stages: + self._run_stage() + self._run_event("on_experiment_end") + def run_experiment(self, experiment: IExperiment = None) -> "IRunner": """ Starts the experiment. @@ -901,34 +819,13 @@ def run_experiment(self, experiment: IExperiment = None) -> "IRunner": Returns: self, `IRunner` instance after the experiment - - Raises: - Exception: if during pipeline exception, - no handler we found into callbacks - KeyboardInterrupt: if during pipeline exception, - no handler we found into callbacks """ self.experiment = experiment or self.experiment - assert self.experiment is not None - try: - for stage in self.experiment.stages: - self._run_stage(stage) + self._run_experiment() except (Exception, KeyboardInterrupt) as ex: - from catalyst.callbacks.exception import ExceptionCallback - - def _exception_handler_check(callbacks: Union[OrderedDict, Dict]): - return callbacks is not None and any( - issubclass(x.__class__, ExceptionCallback) - for x in callbacks.values() - ) - - if _exception_handler_check(getattr(self, "callbacks", None)): - self.exception = ex - self._run_event("on_exception") - else: - raise ex - + self.exception = ex + self._run_event("on_exception") return self @@ -938,41 +835,45 @@ class IStageBasedRunner(IRunner): datasources per stage. """ - def _prepare_for_stage(self, stage: str): - """Inner method to prepare `Runner` for the specified stage. + def on_stage_start(self, runner: "IRunner") -> None: + """Event handler for stage start. + + For the `IStageBasedRunner` case: - Sets `Experiment` initial seed. - Prepares experiment components with `self._get_experiment_components`. - Prepares callbacks with `self._get_experiment_callbacks`. - Prepares inner state with `self._prepare_inner_state` - Additionally sets `Experiment` datasources for specified stage. + - prepares loaders - our datasources + - prepares model components - model, criterion, optimizer, scheduler + - prepares callbacks for the current stage Args: - stage: stage name of interest, - like "pretrain" / "train" / "finetune" / etc + runner: IRunner instance. """ + super().on_stage_start(runner) + set_global_seed(self.experiment.initial_seed) - loaders = self.experiment.get_loaders(stage=stage) + loaders = self.experiment.get_loaders(stage=self.stage) loaders = validate_loaders(loaders) - self.loaders = loaders + # self.loaders = loaders set_global_seed(self.experiment.initial_seed) - ( - model, - criterion, - optimizer, - scheduler, - device, - ) = self._get_experiment_components( - experiment=self.experiment, stage=stage, device=self.device + model = self.experiment.get_model(self.stage) + criterion = self.experiment.get_criterion(self.stage) + optimizer = self.experiment.get_optimizer(self.stage, model) + scheduler = self.experiment.get_scheduler(self.stage, optimizer) + model, criterion, optimizer, scheduler, device = process_components( + model=model, + criterion=criterion, + optimizer=optimizer, + scheduler=scheduler, + distributed_params=self.experiment.distributed_params, + device=self.device, ) set_global_seed(self.experiment.initial_seed) - callbacks = self._get_experiment_callbacks( - experiment=self.experiment, stage=stage - ) + callbacks = self.experiment.get_callbacks(self.stage) + callbacks = filter_callbacks_by_node(callbacks) + callbacks = sort_callbacks_by_order(callbacks) - migrating_params = dict(**self.experiment.get_stage_params(stage)) + migrating_params = dict(**self.experiment.get_stage_params(self.stage)) migrate_from_previous_stage = migrating_params.get( "migrate_from_previous_stage", True ) @@ -999,14 +900,14 @@ def _prepare_for_stage(self, stage: str): ) self._prepare_inner_state( - stage=stage, + stage=self.stage, model=model, device=device, criterion=criterion, optimizer=optimizer, scheduler=scheduler, callbacks=callbacks, - loaders=getattr(self, "loaders", None), + loaders=loaders, **migrating_params, ) diff --git a/catalyst/runners/runner.py b/catalyst/runners/runner.py index 115b810fd7..52fe7362f7 100644 --- a/catalyst/runners/runner.py +++ b/catalyst/runners/runner.py @@ -10,7 +10,14 @@ from catalyst.core.functional import sort_callbacks_by_order from catalyst.core.runner import IStageBasedRunner from catalyst.experiments.experiment import Experiment -from catalyst.typing import Criterion, Device, Model, Optimizer, Scheduler +from catalyst.typing import ( + Criterion, + Device, + Model, + Optimizer, + RunnerModel, + Scheduler, +) from catalyst.utils.checkpoint import load_checkpoint, unpack_checkpoint from catalyst.utils.components import process_components from catalyst.utils.misc import maybe_recursive_call, set_global_seed @@ -24,14 +31,25 @@ class Runner(IStageBasedRunner): - """ - Deep Learning Runner for supervised, unsupervised, gan, etc runs. - """ + """Deep Learning Runner for supervised, unsupervised, gan, etc runs.""" - _experiment_fn: Callable = Experiment + def __init__( + self, + model: RunnerModel = None, + device: Device = None, + experiment_fn: Callable = Experiment, + ): + """ - def _init(self, **kwargs): - self.experiment: Experiment = None + Args: + model: Torch model object + device: Torch device + experiment_fn: callable function, + which defines default experiment type to use + during ``.train`` and ``.infer`` methods. + """ + super().__init__(model=model, device=device) + self._experiment_fn = experiment_fn def train( self, diff --git a/catalyst/runners/supervised.py b/catalyst/runners/supervised.py index 1a1e9d9763..6b6d050410 100644 --- a/catalyst/runners/supervised.py +++ b/catalyst/runners/supervised.py @@ -13,8 +13,6 @@ class SupervisedRunner(Runner): """Runner for experiments with supervised model.""" - _experiment_fn: Callable = SupervisedExperiment - def __init__( self, model: RunnerModel = None, @@ -22,6 +20,7 @@ def __init__( input_key: Any = "features", output_key: Any = "logits", input_target_key: str = "targets", + experiment_fn: Callable = SupervisedExperiment, ): """ Args: @@ -31,30 +30,14 @@ def __init__( output_key: Key in output dict model output will be stored under input_target_key: Key in batch dict mapping for target + experiment_fn: callable function, + which defines default experiment type to use + during ``.train`` and ``.infer`` methods. """ super().__init__( - model=model, - device=device, - input_key=input_key, - output_key=output_key, - input_target_key=input_target_key, + model=model, device=device, experiment_fn=experiment_fn ) - def _init( - self, - input_key: Any = "features", - output_key: Any = "logits", - input_target_key: str = "targets", - ): - """ - Args: - input_key: Key in batch dict mapping for model input - output_key: Key in output dict model output - will be stored under - input_target_key: Key in batch dict mapping for target - """ - self.experiment: SupervisedExperiment = None - self.input_key = input_key self.output_key = output_key self.target_key = input_target_key @@ -83,13 +66,6 @@ def _init( else: raise NotImplementedError() - def _batch2device(self, batch: Mapping[str, Any], device: Device): - if isinstance(batch, (tuple, list)): - assert len(batch) == 2 - batch = {self.input_key: batch[0], self.target_key: batch[1]} - batch = super()._batch2device(batch, device) - return batch - def _process_input_str(self, batch: Mapping[str, Any], **kwargs): output = self.model(batch[self.input_key], **kwargs) return output @@ -132,6 +108,13 @@ def forward(self, batch: Mapping[str, Any], **kwargs) -> Mapping[str, Any]: output = self._process_output(output) return output + def _handle_device(self, batch: Mapping[str, Any]): + if isinstance(batch, (tuple, list)): + assert len(batch) == 2 + batch = {self.input_key: batch[0], self.target_key: batch[1]} + batch = super()._handle_device(batch) + return batch + def _handle_batch(self, batch: Mapping[str, Any]) -> None: """ Inner method to handle specified data batch. @@ -162,7 +145,7 @@ def predict_batch( Returns: Mapping[str, Any]: model output dictionary """ - batch = self._batch2device(batch, self.device) + batch = self._handle_device(batch) output = self.forward(batch, **kwargs) return output diff --git a/catalyst/utils/__init__.py b/catalyst/utils/__init__.py index 23961197af..c1def44f07 100644 --- a/catalyst/utils/__init__.py +++ b/catalyst/utils/__init__.py @@ -62,6 +62,7 @@ get_utcnow_time, is_exception, maybe_recursive_call, + get_attr, set_global_seed, ) from catalyst.utils.numpy import get_one_hot diff --git a/catalyst/utils/misc.py b/catalyst/utils/misc.py index acca7cda14..0fe9216fe8 100644 --- a/catalyst/utils/misc.py +++ b/catalyst/utils/misc.py @@ -175,6 +175,65 @@ def get_fn_argsnames(fn: Callable[..., Any], exclude: List[str] = None): return params +def get_attr(obj: Any, key: str, inner_key: str = None) -> Any: + """ + Alias for python `getattr` method. Useful for Callbacks preparation + and cases with multi-criterion, multi-optimizer setup. + For example, when you would like to train multi-task classification. + + Used to get a named attribute from a `IRunner` by `key` keyword; + for example\ + :: + + # example 1 + runner.get_attr("criterion") + # is equivalent to + runner.criterion + + # example 2 + runner.get_attr("optimizer") + # is equivalent to + runner.optimizer + + # example 3 + runner.get_attr("scheduler") + # is equivalent to + runner.scheduler + + With `inner_key` usage, it suppose to find a dictionary under `key`\ + and would get `inner_key` from this dict; for example, + :: + + # example 1 + runner.get_attr("criterion", "bce") + # is equivalent to + runner.criterion["bce"] + + # example 2 + runner.get_attr("optimizer", "adam") + # is equivalent to + runner.optimizer["adam"] + + # example 3 + runner.get_attr("scheduler", "adam") + # is equivalent to + runner.scheduler["adam"] + + Args: + obj: object of interest + key: name for attribute of interest, + like `criterion`, `optimizer`, `scheduler` + inner_key: name of inner dictionary key + + Returns: + inner attribute + """ + if inner_key is None: + return getattr(obj, key) + else: + return getattr(obj, key)[inner_key] + + __all__ = [ "copy_directory", "format_metric", @@ -183,5 +242,6 @@ def get_fn_argsnames(fn: Callable[..., Any], exclude: List[str] = None): "get_utcnow_time", "is_exception", "maybe_recursive_call", + "get_attr", "set_global_seed", ] diff --git a/tests/_tests_scripts/core_batch_overfit_callback.py b/tests/_tests_scripts/core_batch_overfit_callback.py index 6a4753d78a..9c536dc462 100644 --- a/tests/_tests_scripts/core_batch_overfit_callback.py +++ b/tests/_tests_scripts/core_batch_overfit_callback.py @@ -27,7 +27,7 @@ def on_loader_start(self, runner): # 320 samples with 32 batch size # -> 1 batch size = 32 # -> 0.1 portion = 32 - assert len(runner.loaders[runner.loader_name]) == 32 + assert len(runner.loaders[runner.loader_key]) == 32 # model training