Skip to content

Commit

Permalink
MlflowLogger update (#1258)
Browse files Browse the repository at this point in the history
* updated loggers; especially mlflow

* prettified the code

* fixed typing
  • Loading branch information
y-ksenia committed Jul 27, 2021
1 parent ce79bbe commit 2ffaab9
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 21 deletions.
2 changes: 1 addition & 1 deletion catalyst/core/logger.py
Expand Up @@ -107,7 +107,7 @@ def flush_log(self) -> None:
"""Flushes the logger."""
pass

def close_log(self) -> None:
def close_log(self, scope: str = None) -> None:
"""Closes the logger."""
pass

Expand Down
7 changes: 4 additions & 3 deletions catalyst/core/runner.py
Expand Up @@ -578,10 +578,10 @@ def flush_log(self) -> None:
for logger in self.loggers.values():
logger.flush_log()

def close_log(self) -> None:
def close_log(self, *args, **kwargs) -> None:
"""Closes the loggers."""
for logger in self.loggers.values():
logger.close_log()
logger.close_log(*args, **kwargs)

def _setup_loaders(self) -> None:
set_global_seed(self.seed + self.engine.rank + self.global_epoch_step)
Expand Down Expand Up @@ -724,6 +724,7 @@ def on_stage_end(self, runner: "IRunner"):
del self.loaders
self.loaders = {}
self.engine.deinit_components(runner=self)
self.close_log(scope="stage")

# due to multiprocessing setup we have to close current loggers
# to prevent EOF-like errors
Expand All @@ -735,7 +736,7 @@ def on_stage_end(self, runner: "IRunner"):
def on_experiment_end(self, runner: "IRunner"):
"""Event handler."""
self.flush_log()
self.close_log()
self.close_log(scope="experiment")

def on_exception(self, runner: "IRunner"):
"""Event handler."""
Expand Down
7 changes: 4 additions & 3 deletions catalyst/loggers/csv.py
Expand Up @@ -143,10 +143,11 @@ def flush_log(self) -> None:
for logger in self.loggers.values():
logger.flush()

def close_log(self) -> None:
def close_log(self, scope: str = None) -> None:
"""@TODO: docs."""
for logger in self.loggers.values():
logger.close()
if scope is None or scope == "experiment":
for logger in self.loggers.values():
logger.close()


__all__ = ["CSVLogger"]
37 changes: 30 additions & 7 deletions catalyst/loggers/mlflow.py
Expand Up @@ -4,7 +4,7 @@

from catalyst.core.logger import ILogger
from catalyst.settings import SETTINGS
from catalyst.typing import Directory, File, Number
from catalyst.typing import Directory, File, Number, Union

if SETTINGS.mlflow_required:
import mlflow
Expand Down Expand Up @@ -61,8 +61,6 @@ def _mlflow_log_dict(dictionary: Dict[str, Any], prefix: str = "", log_type: Opt
mlflow.log_param(name, value)
except mlflow.exceptions.MlflowException:
continue
elif isinstance(value, (Directory, File)) or log_type == "artifact":
mlflow.log_artifact(value)
elif isinstance(value, Number):
mlflow.log_metric(name, value)
else:
Expand Down Expand Up @@ -263,11 +261,36 @@ def log_hparams(
exp_params = hparams.get(key, {})
_mlflow_log_dict(exp_params, log_type="param")

def close_log(self) -> None:
def log_artifact(
self,
tag: str,
artifact: Union[Directory, File] = None,
path_to_artifact: str = None,
scope: str = None,
# experiment info
run_key: str = None,
global_epoch_step: int = 0,
global_batch_step: int = 0,
global_sample_step: int = 0,
# stage info
stage_key: str = None,
stage_epoch_len: int = 0,
stage_epoch_step: int = 0,
stage_batch_step: int = 0,
stage_sample_step: int = 0,
# loader info
loader_key: str = None,
loader_batch_len: int = 0,
loader_sample_len: int = 0,
loader_batch_step: int = 0,
loader_sample_step: int = 0,
) -> None:
"""Logs a local file or directory as an artifact to the logger."""
mlflow.log_artifact(artifact, path_to_artifact)

def close_log(self, scope: str = None) -> None:
"""Finds all **running** runs and ends them."""
all_runs = mlflow.search_runs()
for _ in all_runs[all_runs.status == "RUNNING"]:
mlflow.end_run()
mlflow.end_run()


__all__ = ["MLflowLogger"]
5 changes: 3 additions & 2 deletions catalyst/loggers/neptune.py
Expand Up @@ -339,9 +339,10 @@ def flush_log(self) -> None:
"""Flushes the loggers."""
pass

def close_log(self) -> None:
def close_log(self, scope: str = None) -> None:
"""Closes the loggers."""
self.run.wait()
if scope is None or scope == "experiment":
self.run.wait()


__all__ = ["NeptuneLogger"]
7 changes: 4 additions & 3 deletions catalyst/loggers/tensorboard.py
Expand Up @@ -170,10 +170,11 @@ def flush_log(self) -> None:
for logger in self.loggers.values():
logger.flush()

def close_log(self) -> None:
def close_log(self, scope: str = None) -> None:
"""Closes the loggers."""
for logger in self.loggers.values():
logger.close()
if scope is None or scope == "experiment":
for logger in self.loggers.values():
logger.close()


__all__ = ["TensorboardLogger"]
5 changes: 3 additions & 2 deletions catalyst/loggers/wandb.py
Expand Up @@ -172,9 +172,10 @@ def flush_log(self) -> None:
"""Flushes the logger."""
pass

def close_log(self) -> None:
def close_log(self, scope: str = None) -> None:
"""Closes the logger."""
self.run.finish()
if scope is None or scope == "experiment":
self.run.finish()


__all__ = ["WandbLogger"]

0 comments on commit 2ffaab9

Please sign in to comment.