Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gfile: conditionally import tensorflow_io #5491

Merged
merged 9 commits into from Jan 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions tensorboard/backend/event_processing/BUILD
Expand Up @@ -36,6 +36,7 @@ py_library(
":data_provider",
":event_multiplexer",
":tag_types",
"//tensorboard/compat:tensorflow",
"//tensorboard/data:ingester",
"//tensorboard/plugins/audio:metadata",
"//tensorboard/plugins/histogram:metadata",
Expand All @@ -53,7 +54,9 @@ py_test(
srcs_version = "PY3",
deps = [
":data_ingester",
"//tensorboard:expect_tensorflow_installed",
"//tensorboard:test",
"//tensorboard/compat:tensorflow",
],
)

Expand Down
78 changes: 78 additions & 0 deletions tensorboard/backend/event_processing/data_ingester.py
Expand Up @@ -30,6 +30,7 @@
from tensorboard.plugins.pr_curve import metadata as pr_curve_metadata
from tensorboard.plugins.scalar import metadata as scalar_metadata
from tensorboard.util import tb_logging
from tensorboard.compat import tf
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add a "//tensorboard/compat:tensorflow" dep on the corresponding BUILD target for this.

Same goes for the test file.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.



DEFAULT_SIZE_GUIDANCE = {
Expand Down Expand Up @@ -79,6 +80,10 @@ def __init__(self, flags):
else:
self._path_to_run = _parse_event_files_spec(flags.logdir_spec)

# Conditionally import tensorflow_io.
yatbear marked this conversation as resolved.
Show resolved Hide resolved
if not getattr(tf, "__version__", "stub") == "stub":
_check_filesystem_support(self._path_to_run.keys())

@property
def data_provider(self):
return self._data_provider
Expand Down Expand Up @@ -196,3 +201,76 @@ def _parse_event_files_spec(logdir_spec):
path = os.path.realpath(os.path.expanduser(path))
files[path] = run_name
return files


def _get_filesystem_scheme(path):
"""Extracts filesystem scheme from a given path.

The filesystem scheme is usually separated by `://` from the local filesystem
path if given. For example, the scheme of `file://tmp/tf` is `file`.

Args:
path: A strings representing an input log directory.
Returns:
Filesystem scheme, None if the path doesn't contain one.
"""
if "://" not in path:
return None
return path.split("://")[0]


def _check_filesystem_support(paths):
"""Examines the list of filesystems user requested.

If TF I/O schemes are requested, try to import tensorflow_io module.

Args:
paths: A list of strings representing input log directories.
"""
get_registered_schemes = getattr(
tf.io.gfile, "get_registered_schemes", None
)
registered_schemes = (
None if get_registered_schemes is None else get_registered_schemes()
)

# Only need to check one path for each scheme.
scheme_to_path = {_get_filesystem_scheme(path): path for path in paths}
missing_scheme = None
for scheme, path in scheme_to_path.items():
if scheme is None:
continue
# Use `tf.io.gfile.exists.get_registered_schemes` if possible.
if registered_schemes is not None:
if scheme not in registered_schemes:
missing_scheme = scheme
break
else:
# Fall back to `tf.io.gfile.exists`.
try:
tf.io.gfile.exists(path)
yatbear marked this conversation as resolved.
Show resolved Hide resolved
except tf.errors.UnimplementedError:
missing_scheme = scheme
break
except tf.errors.OpError:
# Swallow other errors; we aren't concerned about them at this point.
pass

if missing_scheme:
try:
import tensorflow_io # noqa: F401
except ImportError:
supported_schemes_msg = (
" (supported schemes: {})".format(registered_schemes)
if registered_schemes
else ""
)
raise tf.errors.UnimplementedError(
None,
None,
(
"Error: Unsupported filename scheme '{}'{}. For additional"
+ " filesystem support, consider installing TensorFlow I/O"
+ " (https://www.tensorflow.org/io) via `pip install tensorflow-io`."
).format(missing_scheme, supported_schemes_msg),
)
87 changes: 87 additions & 0 deletions tensorboard/backend/event_processing/data_ingester_test.py
Expand Up @@ -22,6 +22,7 @@

from tensorboard import test as tb_test
from tensorboard.backend.event_processing import data_ingester
from tensorboard.compat import tf


class FakeFlags(object):
Expand Down Expand Up @@ -251,5 +252,91 @@ def testSingleLetterGroup(self):
)


class FileSystemSupport(tb_test.TestCase):
def testCheckFilesystemSupport(self):
with mock.patch.object(
tf.io.gfile,
"get_registered_schemes",
autospec=True,
return_value=["g3", "s3"],
) as mock_get_registered_schemes:
with mock.patch("builtins.__import__") as mock_import:
data_ingester._check_filesystem_support(
["tmp/demo", "s3://bucket/123"]
)
mock_import.assert_not_called()
mock_get_registered_schemes.assert_called_once()

def testCheckFilesystemSupport_importTFIO(self):
with mock.patch.object(
tf.io.gfile,
"get_registered_schemes",
autospec=True,
return_value=["file", ""],
) as mock_get_registered_schemes:
with mock.patch("builtins.__import__") as mock_import:
data_ingester._check_filesystem_support(
["tmp/demo", "s3://bucket/123"]
)
self.assertEqual("tensorflow_io", mock_import.call_args[0][0])
mock_get_registered_schemes.assert_called_once()

def testCheckFilesystemSupport_raiseError(self):
with mock.patch.object(
tf.io.gfile,
"get_registered_schemes",
autospec=True,
return_value=["file", "ram"],
) as mock_get_registered_schemes:
with mock.patch(
"builtins.__import__",
side_effect=ImportError,
) as mock_import:
err_msg = (
"Error: Unsupported filename scheme 's3' (supported schemes: ['file', 'ram'])."
+ " For additional filesystem support, consider installing TensorFlow I/O"
+ " (https://www.tensorflow.org/io) via `pip install tensorflow-io`."
)
with self.assertRaisesWithLiteralMatch(
tf.errors.UnimplementedError, err_msg
):
data_ingester._check_filesystem_support(
["tmp/demo", "s3://bucket/123"]
)
self.assertEqual("tensorflow_io", mock_import.call_args[0][0])
mock_get_registered_schemes.assert_called_once()

def testCheckFilesystemSupport_fallback(self):
with mock.patch.object(tf.io, "gfile", autospec=True) as mock_gfile:
del mock_gfile.get_registered_schemes
with mock.patch("builtins.__import__") as mock_import:
mock_gfile.exists.return_value = True
data_ingester._check_filesystem_support(["gs://bucket/abc"])
mock_import.assert_not_called()
mock_gfile.exists.assert_called_once_with("gs://bucket/abc")

def testCheckFilesystemSupport_fallback_raiseError(self):
with mock.patch.object(tf.io, "gfile", autospec=True) as mock_gfile:
del mock_gfile.get_registered_schemes
with mock.patch(
"builtins.__import__",
side_effect=ImportError,
) as mock_import:
mock_gfile.exists.side_effect = tf.errors.UnimplementedError(
None, None, "oops"
)
err_msg = (
"Error: Unsupported filename scheme 'gs'."
+ " For additional filesystem support, consider installing TensorFlow I/O"
+ " (https://www.tensorflow.org/io) via `pip install tensorflow-io`."
)
with self.assertRaisesWithLiteralMatch(
tf.errors.UnimplementedError, err_msg
):
data_ingester._check_filesystem_support(["gs://bucket/abc"])
self.assertEqual("tensorflow_io", mock_import.call_args[0][0])
mock_gfile.exists.assert_called_once_with("gs://bucket/abc")


if __name__ == "__main__":
tb_test.main()