diff --git a/tensorboard/backend/event_processing/BUILD b/tensorboard/backend/event_processing/BUILD index bcce833f7e8..d1adfe1f7fe 100644 --- a/tensorboard/backend/event_processing/BUILD +++ b/tensorboard/backend/event_processing/BUILD @@ -36,6 +36,7 @@ py_library( ":data_provider", ":event_multiplexer", ":tag_types", + "//tensorboard/compat:tensorflow", "//tensorboard/data:ingester", "//tensorboard/plugins/audio:metadata", "//tensorboard/plugins/histogram:metadata", @@ -53,7 +54,9 @@ py_test( srcs_version = "PY3", deps = [ ":data_ingester", + "//tensorboard:expect_tensorflow_installed", "//tensorboard:test", + "//tensorboard/compat:tensorflow", ], ) diff --git a/tensorboard/backend/event_processing/data_ingester.py b/tensorboard/backend/event_processing/data_ingester.py index 3c74c770e48..2ff648713ee 100644 --- a/tensorboard/backend/event_processing/data_ingester.py +++ b/tensorboard/backend/event_processing/data_ingester.py @@ -30,6 +30,7 @@ from tensorboard.plugins.pr_curve import metadata as pr_curve_metadata from tensorboard.plugins.scalar import metadata as scalar_metadata from tensorboard.util import tb_logging +from tensorboard.compat import tf DEFAULT_SIZE_GUIDANCE = { @@ -79,6 +80,10 @@ def __init__(self, flags): else: self._path_to_run = _parse_event_files_spec(flags.logdir_spec) + # Conditionally import tensorflow_io. + if not getattr(tf, "__version__", "stub") == "stub": + _check_filesystem_support(self._path_to_run.keys()) + @property def data_provider(self): return self._data_provider @@ -196,3 +201,76 @@ def _parse_event_files_spec(logdir_spec): path = os.path.realpath(os.path.expanduser(path)) files[path] = run_name return files + + +def _get_filesystem_scheme(path): + """Extracts filesystem scheme from a given path. + + The filesystem scheme is usually separated by `://` from the local filesystem + path if given. For example, the scheme of `file://tmp/tf` is `file`. + + Args: + path: A strings representing an input log directory. + Returns: + Filesystem scheme, None if the path doesn't contain one. + """ + if "://" not in path: + return None + return path.split("://")[0] + + +def _check_filesystem_support(paths): + """Examines the list of filesystems user requested. + + If TF I/O schemes are requested, try to import tensorflow_io module. + + Args: + paths: A list of strings representing input log directories. + """ + get_registered_schemes = getattr( + tf.io.gfile, "get_registered_schemes", None + ) + registered_schemes = ( + None if get_registered_schemes is None else get_registered_schemes() + ) + + # Only need to check one path for each scheme. + scheme_to_path = {_get_filesystem_scheme(path): path for path in paths} + missing_scheme = None + for scheme, path in scheme_to_path.items(): + if scheme is None: + continue + # Use `tf.io.gfile.exists.get_registered_schemes` if possible. + if registered_schemes is not None: + if scheme not in registered_schemes: + missing_scheme = scheme + break + else: + # Fall back to `tf.io.gfile.exists`. + try: + tf.io.gfile.exists(path) + except tf.errors.UnimplementedError: + missing_scheme = scheme + break + except tf.errors.OpError: + # Swallow other errors; we aren't concerned about them at this point. + pass + + if missing_scheme: + try: + import tensorflow_io # noqa: F401 + except ImportError: + supported_schemes_msg = ( + " (supported schemes: {})".format(registered_schemes) + if registered_schemes + else "" + ) + raise tf.errors.UnimplementedError( + None, + None, + ( + "Error: Unsupported filename scheme '{}'{}. For additional" + + " filesystem support, consider installing TensorFlow I/O" + + " (https://www.tensorflow.org/io) via `pip install tensorflow-io`." + ).format(missing_scheme, supported_schemes_msg), + ) diff --git a/tensorboard/backend/event_processing/data_ingester_test.py b/tensorboard/backend/event_processing/data_ingester_test.py index 5413bd12c48..77b336a550a 100644 --- a/tensorboard/backend/event_processing/data_ingester_test.py +++ b/tensorboard/backend/event_processing/data_ingester_test.py @@ -22,6 +22,7 @@ from tensorboard import test as tb_test from tensorboard.backend.event_processing import data_ingester +from tensorboard.compat import tf class FakeFlags(object): @@ -251,5 +252,91 @@ def testSingleLetterGroup(self): ) +class FileSystemSupport(tb_test.TestCase): + def testCheckFilesystemSupport(self): + with mock.patch.object( + tf.io.gfile, + "get_registered_schemes", + autospec=True, + return_value=["g3", "s3"], + ) as mock_get_registered_schemes: + with mock.patch("builtins.__import__") as mock_import: + data_ingester._check_filesystem_support( + ["tmp/demo", "s3://bucket/123"] + ) + mock_import.assert_not_called() + mock_get_registered_schemes.assert_called_once() + + def testCheckFilesystemSupport_importTFIO(self): + with mock.patch.object( + tf.io.gfile, + "get_registered_schemes", + autospec=True, + return_value=["file", ""], + ) as mock_get_registered_schemes: + with mock.patch("builtins.__import__") as mock_import: + data_ingester._check_filesystem_support( + ["tmp/demo", "s3://bucket/123"] + ) + self.assertEqual("tensorflow_io", mock_import.call_args[0][0]) + mock_get_registered_schemes.assert_called_once() + + def testCheckFilesystemSupport_raiseError(self): + with mock.patch.object( + tf.io.gfile, + "get_registered_schemes", + autospec=True, + return_value=["file", "ram"], + ) as mock_get_registered_schemes: + with mock.patch( + "builtins.__import__", + side_effect=ImportError, + ) as mock_import: + err_msg = ( + "Error: Unsupported filename scheme 's3' (supported schemes: ['file', 'ram'])." + + " For additional filesystem support, consider installing TensorFlow I/O" + + " (https://www.tensorflow.org/io) via `pip install tensorflow-io`." + ) + with self.assertRaisesWithLiteralMatch( + tf.errors.UnimplementedError, err_msg + ): + data_ingester._check_filesystem_support( + ["tmp/demo", "s3://bucket/123"] + ) + self.assertEqual("tensorflow_io", mock_import.call_args[0][0]) + mock_get_registered_schemes.assert_called_once() + + def testCheckFilesystemSupport_fallback(self): + with mock.patch.object(tf.io, "gfile", autospec=True) as mock_gfile: + del mock_gfile.get_registered_schemes + with mock.patch("builtins.__import__") as mock_import: + mock_gfile.exists.return_value = True + data_ingester._check_filesystem_support(["gs://bucket/abc"]) + mock_import.assert_not_called() + mock_gfile.exists.assert_called_once_with("gs://bucket/abc") + + def testCheckFilesystemSupport_fallback_raiseError(self): + with mock.patch.object(tf.io, "gfile", autospec=True) as mock_gfile: + del mock_gfile.get_registered_schemes + with mock.patch( + "builtins.__import__", + side_effect=ImportError, + ) as mock_import: + mock_gfile.exists.side_effect = tf.errors.UnimplementedError( + None, None, "oops" + ) + err_msg = ( + "Error: Unsupported filename scheme 'gs'." + + " For additional filesystem support, consider installing TensorFlow I/O" + + " (https://www.tensorflow.org/io) via `pip install tensorflow-io`." + ) + with self.assertRaisesWithLiteralMatch( + tf.errors.UnimplementedError, err_msg + ): + data_ingester._check_filesystem_support(["gs://bucket/abc"]) + self.assertEqual("tensorflow_io", mock_import.call_args[0][0]) + mock_gfile.exists.assert_called_once_with("gs://bucket/abc") + + if __name__ == "__main__": tb_test.main()