diff --git a/catalyst/core/logger.py b/catalyst/core/logger.py index c18ccdd86d..ed8d65d340 100644 --- a/catalyst/core/logger.py +++ b/catalyst/core/logger.py @@ -107,7 +107,7 @@ def flush_log(self) -> None: """Flushes the logger.""" pass - def close_log(self) -> None: + def close_log(self, scope: str = None) -> None: """Closes the logger.""" pass diff --git a/catalyst/core/runner.py b/catalyst/core/runner.py index 5e20aeff64..384a506822 100644 --- a/catalyst/core/runner.py +++ b/catalyst/core/runner.py @@ -578,10 +578,10 @@ def flush_log(self) -> None: for logger in self.loggers.values(): logger.flush_log() - def close_log(self) -> None: + def close_log(self, *args, **kwargs) -> None: """Closes the loggers.""" for logger in self.loggers.values(): - logger.close_log() + logger.close_log(*args, **kwargs) def _setup_loaders(self) -> None: set_global_seed(self.seed + self.engine.rank + self.global_epoch_step) @@ -724,6 +724,7 @@ def on_stage_end(self, runner: "IRunner"): del self.loaders self.loaders = {} self.engine.deinit_components(runner=self) + self.close_log(scope="stage") # due to multiprocessing setup we have to close current loggers # to prevent EOF-like errors @@ -735,7 +736,7 @@ def on_stage_end(self, runner: "IRunner"): def on_experiment_end(self, runner: "IRunner"): """Event handler.""" self.flush_log() - self.close_log() + self.close_log(scope="experiment") def on_exception(self, runner: "IRunner"): """Event handler.""" diff --git a/catalyst/loggers/csv.py b/catalyst/loggers/csv.py index 47cf6770ae..e12fdcb465 100644 --- a/catalyst/loggers/csv.py +++ b/catalyst/loggers/csv.py @@ -143,10 +143,11 @@ def flush_log(self) -> None: for logger in self.loggers.values(): logger.flush() - def close_log(self) -> None: + def close_log(self, scope: str = None) -> None: """@TODO: docs.""" - for logger in self.loggers.values(): - logger.close() + if scope is None or scope == "experiment": + for logger in self.loggers.values(): + logger.close() __all__ = ["CSVLogger"] diff --git a/catalyst/loggers/mlflow.py b/catalyst/loggers/mlflow.py index 000d65b07e..259a03c49f 100644 --- a/catalyst/loggers/mlflow.py +++ b/catalyst/loggers/mlflow.py @@ -4,7 +4,7 @@ from catalyst.core.logger import ILogger from catalyst.settings import SETTINGS -from catalyst.typing import Directory, File, Number +from catalyst.typing import Directory, File, Number, Union if SETTINGS.mlflow_required: import mlflow @@ -61,8 +61,6 @@ def _mlflow_log_dict(dictionary: Dict[str, Any], prefix: str = "", log_type: Opt mlflow.log_param(name, value) except mlflow.exceptions.MlflowException: continue - elif isinstance(value, (Directory, File)) or log_type == "artifact": - mlflow.log_artifact(value) elif isinstance(value, Number): mlflow.log_metric(name, value) else: @@ -263,11 +261,36 @@ def log_hparams( exp_params = hparams.get(key, {}) _mlflow_log_dict(exp_params, log_type="param") - def close_log(self) -> None: + def log_artifact( + self, + tag: str, + artifact: Union[Directory, File] = None, + path_to_artifact: str = None, + scope: str = None, + # experiment info + run_key: str = None, + global_epoch_step: int = 0, + global_batch_step: int = 0, + global_sample_step: int = 0, + # stage info + stage_key: str = None, + stage_epoch_len: int = 0, + stage_epoch_step: int = 0, + stage_batch_step: int = 0, + stage_sample_step: int = 0, + # loader info + loader_key: str = None, + loader_batch_len: int = 0, + loader_sample_len: int = 0, + loader_batch_step: int = 0, + loader_sample_step: int = 0, + ) -> None: + """Logs a local file or directory as an artifact to the logger.""" + mlflow.log_artifact(artifact, path_to_artifact) + + def close_log(self, scope: str = None) -> None: """Finds all **running** runs and ends them.""" - all_runs = mlflow.search_runs() - for _ in all_runs[all_runs.status == "RUNNING"]: - mlflow.end_run() + mlflow.end_run() __all__ = ["MLflowLogger"] diff --git a/catalyst/loggers/neptune.py b/catalyst/loggers/neptune.py index 112a3e0915..4f87082b88 100644 --- a/catalyst/loggers/neptune.py +++ b/catalyst/loggers/neptune.py @@ -339,9 +339,10 @@ def flush_log(self) -> None: """Flushes the loggers.""" pass - def close_log(self) -> None: + def close_log(self, scope: str = None) -> None: """Closes the loggers.""" - self.run.wait() + if scope is None or scope == "experiment": + self.run.wait() __all__ = ["NeptuneLogger"] diff --git a/catalyst/loggers/tensorboard.py b/catalyst/loggers/tensorboard.py index 4feb30148b..cf567669c4 100644 --- a/catalyst/loggers/tensorboard.py +++ b/catalyst/loggers/tensorboard.py @@ -170,10 +170,11 @@ def flush_log(self) -> None: for logger in self.loggers.values(): logger.flush() - def close_log(self) -> None: + def close_log(self, scope: str = None) -> None: """Closes the loggers.""" - for logger in self.loggers.values(): - logger.close() + if scope is None or scope == "experiment": + for logger in self.loggers.values(): + logger.close() __all__ = ["TensorboardLogger"] diff --git a/catalyst/loggers/wandb.py b/catalyst/loggers/wandb.py index edbd3978e8..960f62ccc4 100644 --- a/catalyst/loggers/wandb.py +++ b/catalyst/loggers/wandb.py @@ -172,9 +172,10 @@ def flush_log(self) -> None: """Flushes the logger.""" pass - def close_log(self) -> None: + def close_log(self, scope: str = None) -> None: """Closes the logger.""" - self.run.finish() + if scope is None or scope == "experiment": + self.run.finish() __all__ = ["WandbLogger"]