From 2750460b959e2e924b33355ba18315da1e8044c4 Mon Sep 17 00:00:00 2001 From: Aaron Pham <29749331+aarnphm@users.noreply.github.com> Date: Sun, 6 Nov 2022 23:51:35 -0800 Subject: [PATCH] chore: cleanup deadcode (#3196) --- .../_internal/bento/build_dev_bentoml_whl.py | 6 + .../_internal/bento/local_py_modules.py | 277 ------------ src/bentoml/_internal/bento/pip_pkg.py | 408 ------------------ src/bentoml/_internal/runner/container.py | 64 --- src/bentoml/_internal/types.py | 3 - src/bentoml/_internal/utils/__init__.py | 29 -- src/bentoml/_internal/utils/csv.py | 77 ---- src/bentoml/_internal/utils/dataclasses.py | 69 --- src/bentoml/_internal/utils/dataframe.py | 274 ------------ 9 files changed, 6 insertions(+), 1201 deletions(-) delete mode 100644 src/bentoml/_internal/bento/local_py_modules.py delete mode 100644 src/bentoml/_internal/bento/pip_pkg.py delete mode 100644 src/bentoml/_internal/utils/csv.py delete mode 100644 src/bentoml/_internal/utils/dataclasses.py delete mode 100644 src/bentoml/_internal/utils/dataframe.py diff --git a/src/bentoml/_internal/bento/build_dev_bentoml_whl.py b/src/bentoml/_internal/bento/build_dev_bentoml_whl.py index 0658362b58b..dce0c6107d0 100644 --- a/src/bentoml/_internal/bento/build_dev_bentoml_whl.py +++ b/src/bentoml/_internal/bento/build_dev_bentoml_whl.py @@ -28,9 +28,15 @@ def build_bentoml_editable_wheel(target_path: str) -> None: return try: + # NOTE: build.env is a standalone library, + # different from build. However, isort sometimes + # incorrectly re-order the imports order. + # isort: off from build.env import IsolatedEnvBuilder from build import ProjectBuilder + + # isort: on except ModuleNotFoundError as e: raise MissingDependencyException( f"Environment variable '{BENTOML_DEV_BUILD}=True', which requires the 'pypa/build' package ({e}). Install development dependencies with 'pip install -r requirements/dev-requirements.txt' and try again." diff --git a/src/bentoml/_internal/bento/local_py_modules.py b/src/bentoml/_internal/bento/local_py_modules.py deleted file mode 100644 index 53542d853be..00000000000 --- a/src/bentoml/_internal/bento/local_py_modules.py +++ /dev/null @@ -1,277 +0,0 @@ -import os -import re -import sys -import inspect -import logging -import importlib -import modulefinder -from typing import List -from typing import Tuple -from unittest.mock import patch - -from ..types import PathType -from .pip_pkg import get_all_pip_installed_modules -from ...exceptions import BentoMLException - -logger = logging.getLogger(__name__) - - -def _get_module_src_file(module): - """ - Return module.__file__, change extension to '.py' if __file__ is ending with '.pyc' - """ - return module.__file__[:-1] if module.__file__.endswith(".pyc") else module.__file__ - - -def _is_valid_py_identifier(s): - """ - Return true if string is in a valid python identifier format: - - https://docs.python.org/2/reference/lexical_analysis.html#identifiers - """ - return re.fullmatch(r"[A-Za-z_][A-Za-z_0-9]*", s) is not None - - -def _get_module_relative_file_path(module_name, module_file): - - if not os.path.isabs(module_file): - # For modules within current top level package, module_file here should - # already be a relative path to the src file - relative_path = module_file - - elif os.path.split(module_file)[1] == "__init__.py": - # for module a.b.c in 'some_path/a/b/c/__init__.py', copy file to - # 'destination/a/b/c/__init__.py' - relative_path = os.path.join(module_name.replace(".", os.sep), "__init__.py") - - else: - # for module a.b.c in 'some_path/a/b/c.py', copy file to 'destination/a/b/c.py' - relative_path = os.path.join(module_name.replace(".", os.sep) + ".py") - - return relative_path - - -def _get_module(target_module): - # When target_module is a string, try import it - if isinstance(target_module, str): - try: - target_module = importlib.import_module(target_module) - except ImportError: - pass - return inspect.getmodule(target_module) - - -def _import_module_from_file(path): - module_name = path.replace(os.sep, ".")[:-3] - spec = importlib.util.spec_from_file_location(module_name, path) - m = importlib.util.module_from_spec(spec) - return m - - -# TODO: change this to find_local_py_modules_used(svc: Service) -def find_local_py_modules_used(target_module_file: PathType) -> List[Tuple[str, str]]: - """Find all local python module dependencies of target_module, and list all the - local module python files to the destination directory while maintaining the module - structure unchanged to ensure all imports in target_module still works when loading - from the destination directory again - - Args: - path (`Union[str, bytes, os.PathLike]`): - Path to a python source file - - Returns: - list of (source file path, target file path) pairs - """ - - target_module = _import_module_from_file(target_module_file) - - try: - target_module_name = target_module.__spec__.name - except AttributeError: - target_module_name = target_module.__name__ - - # Find all non pip installed modules must be packaged for target module to run - exclude_modules = ["bentoml"] + get_all_pip_installed_modules() - finder = modulefinder.ModuleFinder(excludes=exclude_modules) - - try: - logger.debug( - "Searching for local dependant modules of %s:%s", - target_module_name, - target_module_file, - ) - if sys.version_info[0] == 3 and sys.version_info[1] >= 8: - _find_module = modulefinder._find_module - _PKG_DIRECTORY = modulefinder._PKG_DIRECTORY - - def _patch_find_module(name, path=None): - """ref issue: https://bugs.python.org/issue40350""" - - importlib.machinery.PathFinder.invalidate_caches() - - spec = importlib.machinery.PathFinder.find_spec(name, path) - - if spec is not None and spec.loader is None: - return None, None, ("", "", _PKG_DIRECTORY) - - return _find_module(name, path) - - with patch.object(modulefinder, "_find_module", _patch_find_module): - finder.run_script(target_module_file) - else: - finder.run_script(target_module_file) - except SyntaxError: - # For package with conditional import that may only work with py2 - # or py3, ModuleFinder#run_script will try to compile the source - # with current python version. And that may result in SyntaxError. - pass - - if finder.badmodules: - logger.debug( - "Find bad module imports that can not be parsed properly: %s", - finder.badmodules.keys(), - ) - - # Look for dependencies that are not distributed python package, but users' - # local python code, all other dependencies must be defined with @env - # decorator when creating a new BentoService class - user_packages_and_modules = {} - for name, module in finder.modules.items(): - if hasattr(module, "__file__") and module.__file__ is not None: - user_packages_and_modules[name] = module - - # Lastly, add target module itself - user_packages_and_modules[target_module_name] = target_module - - file_list = [] - for module_name, module in user_packages_and_modules.items(): - module_file = _get_module_src_file(module) - relative_path = _get_module_relative_file_path(module_name, module_file) - file_list.append((module_file, relative_path)) - - return file_list - - -def copy_local_py_modules(target_module, destination): - """Find all local python module dependencies of target_module, and copy all the - local module python files to the destination directory while maintaining the module - structure unchanged to ensure all imports in target_module still works when loading - from the destination directory again - """ - target_module = _get_module(target_module) - - # When target module is defined in interactive session, we can not easily - # get the class definition into a python module file and distribute it - if target_module.__name__ == "__main__" and not hasattr(target_module, "__file__"): - raise BentoMLException( - "Custom BentoModel class can not be defined in Python interactive REPL, try" - " writing the class definition to a file and import it." - ) - - try: - target_module_name = target_module.__spec__.name - except AttributeError: - target_module_name = target_module.__name__ - - target_module_file = _get_module_src_file(target_module) - logger.debug( - "copy_local_py_modules target_module_name: %s, target_module_file: %s", - target_module_name, - target_module_file, - ) - - if target_module_name == "__main__": - # Assuming no relative import in this case - target_module_file_name = os.path.split(target_module_file)[1] - target_module_name = target_module_file_name[:-3] # remove '.py' - logger.debug( - "Updating for __main__ module, target_module_name: %s, " - "target_module_file: %s", - target_module_name, - target_module_file, - ) - - # Find all non pip installed modules must be packaged for target module to run - # exclude_modules = ['bentoml'] + get_all_pip_installed_modules() - # finder = modulefinder.ModuleFinder(excludes=exclude_modules) - # - # try: - # logger.debug( - # "Searching for local dependant modules of %s:%s", - # target_module_name, - # target_module_file, - # ) - # if sys.version_info[0] == 3 and sys.version_info[1] >= 8: - # _find_module = modulefinder._find_module - # _PKG_DIRECTORY = modulefinder._PKG_DIRECTORY - # - # def _patch_find_module(name, path=None): - # """ref issue: https://bugs.python.org/issue40350""" - # - # importlib.machinery.PathFinder.invalidate_caches() - # - # spec = importlib.machinery.PathFinder.find_spec(name, path) - # - # if spec is not None and spec.loader is None: - # return None, None, ("", "", _PKG_DIRECTORY) - # - # return _find_module(name, path) - # - # with patch.object(modulefinder, '_find_module', _patch_find_module): - # finder.run_script(target_module_file) - # else: - # finder.run_script(target_module_file) - # except SyntaxError: - # # For package with conditional import that may only work with py2 - # # or py3, ModuleFinder#run_script will try to compile the source - # # with current python version. And that may result in SyntaxError. - # pass - # - # if finder.badmodules: - # logger.debug( - # "Find bad module imports that can not be parsed properly: %s", - # finder.badmodules.keys(), - # ) - # - # # Look for dependencies that are not distributed python package, but users' - # # local python code, all other dependencies must be defined with @env - # # decorator when creating a new BentoService class - # user_packages_and_modules = {} - # for name, module in finder.modules.items(): - # if hasattr(module, "__file__") and module.__file__ is not None: - # user_packages_and_modules[name] = module - - # # Remove "__main__" module, if target module is loaded as __main__, it should - # # be in module_files as (module_name, module_file) in current context - # if "__main__" in user_packages_and_modules: - # del user_packages_and_modules["__main__"] - # - # # Lastly, add target module itself - # user_packages_and_modules[target_module_name] = target_module - # logger.debug( - # "Copying user local python dependencies: %s", user_packages_and_modules - # ) - # - # for module_name, module in user_packages_and_modules.items(): - # module_file = _get_module_src_file(module) - # relative_path = _get_module_relative_file_path(module_name, module_file) - # target_file = os.path.join(destination, relative_path) - # - # # Create target directory if not exist - # Path(os.path.dirname(target_file)).mkdir(parents=True, exist_ok=True) - # - # # Copy module file to BentoArchive for distribution - # logger.debug("Copying local python module '%s'", module_file) - # copyfile(module_file, target_file) - # - # for root, _, files in os.walk(destination): - # if "__init__.py" not in files: - # logger.debug("Creating empty __init__.py under folder:'%s'", root) - # Path(os.path.join(root, "__init__.py")).touch() - # - # target_module_relative_path = _get_module_relative_file_path( - # target_module_name, target_module_file - # ) - # logger.debug("Done copying local python dependant modules") - # - # return target_module_name, target_module_relative_path diff --git a/src/bentoml/_internal/bento/pip_pkg.py b/src/bentoml/_internal/bento/pip_pkg.py deleted file mode 100644 index 01f2f904c9d..00000000000 --- a/src/bentoml/_internal/bento/pip_pkg.py +++ /dev/null @@ -1,408 +0,0 @@ -import os -import ast -import sys -import typing as t -import logging -import pkgutil -import zipfile -import zipimport -from typing import TYPE_CHECKING -from collections import defaultdict - -try: - import importlib.metadata as importlib_metadata -except ImportError: - import importlib_metadata -from packaging.requirements import Requirement - -if TYPE_CHECKING: - from ..service import Service - -EPP_NO_ERROR = 0 -EPP_PKG_NOT_EXIST = 1 -EPP_PKG_VERSION_MISMATCH = 2 - -__mm = None - -logger = logging.getLogger(__name__) - - -def split_requirement(requirement: str) -> t.Tuple[str, str]: - """ - Split requirements. 'bentoml>=1.0.0' -> ['bentoml', '>=1.0.0'] - """ - req = Requirement(requirement) - name = req.name.replace("-", "_") - return name, str(req.specifier) - - -def packages_distributions() -> t.Dict[str, t.List[str]]: - """Return a mapping of top-level packages to their distributions. We're - inlining this helper from the importlib_metadata "backport" here, since - it's not available in the builtin importlib.metadata. - """ - pkg_to_dist = defaultdict(list) - for dist in importlib_metadata.distributions(): - for pkg in (dist.read_text("top_level.txt") or "").split(): - pkg_to_dist[pkg].append(dist.metadata["Name"]) - return dict(pkg_to_dist) - - -def parse_requirement_string(rs): - name, _, version = rs.partition("==") - return name, version - - -def verify_pkg(pkg_req): - global __mm # pylint: disable=global-statement - if __mm is None: - __mm = ModuleManager() - return __mm.verify_pkg(pkg_req) - - -def seek_pip_packages(target_py_file_path): - global __mm # pylint: disable=global-statement - if __mm is None: - __mm = ModuleManager() - return __mm.seek_pip_packages(target_py_file_path) - - -def get_pkg_version(pkg_name): - global __mm # pylint: disable=global-statement - if __mm is None: - __mm = ModuleManager() - return __mm.pip_pkg_map.get(pkg_name, None) - - -def get_zipmodules(): - global __mm # pylint: disable=global-statement - if __mm is None: - __mm = ModuleManager() - return __mm.zip_modules - - -def get_all_pip_installed_modules(): - global __mm # pylint: disable=global-statement - if __mm is None: - __mm = ModuleManager() - - installed_modules = list( - # local modules are the ones imported from current directory, either from a - # module.py file or a module directory that contains a `__init__.py` file - filter(lambda m: not m.is_local, __mm.searched_modules.values()) - ) - return list(map(lambda m: m.name, installed_modules)) - - -class ModuleInfo(object): - def __init__(self, name, path, is_local, is_pkg): - super(ModuleInfo, self).__init__() - self.name = name - self.path = path - self.is_local = is_local - self.is_pkg = is_pkg - - -class ModuleManager(object): - def __init__(self): - super(ModuleManager, self).__init__() - self.pip_pkg_map = {} - self.pip_module_map = {} - self.setuptools_module_set = set() - self.nonlocal_package_path = set() - - import pkg_resources - - for dist in pkg_resources.working_set: # pylint: disable=not-an-iterable - module_path = dist.module_path or dist.location - if not module_path: - # Skip if no module path was found for pkg distribution - continue - - if os.path.realpath(module_path) != os.getcwd(): - # add to nonlocal_package path only if it's not current directory - self.nonlocal_package_path.add(module_path) - - self.pip_pkg_map[dist._key] = dist._version - for mn in dist._get_metadata("top_level.txt"): - if dist._key != "setuptools": - self.pip_module_map.setdefault(mn, []).append( - (dist._key, dist._version) - ) - else: - self.setuptools_module_set.add(mn) - - self.searched_modules = {} - self.zip_modules: t.Dict[str, zipimport.zipimporter] = {} - for m in pkgutil.iter_modules(): - if m.name not in self.searched_modules: - if isinstance(m.module_finder, zipimport.zipimporter): - logger.info(f"Detected zipimporter {m.module_finder}") - path = m.module_finder.archive - self.zip_modules[path] = m.module_finder - else: - path = m.module_finder.path - is_local = self.is_local_path(path) - self.searched_modules[m.name] = ModuleInfo( - m.name, path, is_local, m.ispkg - ) - - def verify_pkg(self, pkg_req): - if pkg_req.name not in self.pip_pkg_map: - # package does not exist in the current python session - return EPP_PKG_NOT_EXIST - - if self.pip_pkg_map[pkg_req.name] not in pkg_req.specifier: - # package version being used in the current python session does not meet - # the specified package version requirement - return EPP_PKG_VERSION_MISMATCH - - return EPP_NO_ERROR - - def seek_pip_packages(self, target_py_file_path): - logger.debug("target py file path: %s", target_py_file_path) - work = DepSeekWork(self, target_py_file_path) - work.do() - requirements = {} - for _, pkg_info_list in work.dependencies.items(): - for pkg_name, pkg_version in pkg_info_list: - requirements[pkg_name] = pkg_version - - return requirements, work.unknown_module_set - - def is_local_path(self, path): - if path in self.nonlocal_package_path: - return False - - dir_name = os.path.split(path)[1] - - if ( - "site-packages" in path - or "anaconda" in path - or path.endswith("packages") - or dir_name == "bin" - or dir_name.startswith("lib") - or dir_name.startswith("python") - or dir_name.startswith("plat") - ): - self.nonlocal_package_path.add(path) - return False - - return True - - -class DepSeekWork(object): - def __init__(self, module_manager, target_py_file_path): - super(DepSeekWork, self).__init__() - self.module_manager = module_manager - self.target_py_file_path = target_py_file_path - - self.dependencies = {} - self.unknown_module_set = set() - self.parsed_module_set = set() - - def do(self): - self.seek_in_file(self.target_py_file_path) - - def seek_in_file(self, file_path): - try: - with open(file_path) as f: - content = f.read() - except UnicodeDecodeError: - with open(file_path, encoding="utf-8") as f: - content = f.read() - self.seek_in_source(content) - - def seek_in_source(self, content): - # Extract all dependency modules by searching through the trees of the Python - # abstract syntax grammar with Python's built-in ast module - tree = ast.parse(content) - import_set = set() - for node in ast.walk(tree): - if isinstance(node, ast.Import): - for name in node.names: - import_set.add(name.name.partition(".")[0]) - elif isinstance(node, ast.ImportFrom): - if node.module is not None and node.level == 0: - import_set.add(node.module.partition(".")[0]) - logger.debug("import set: %s", import_set) - for module_name in import_set: - # Avoid parsing BentoML when BentoML is imported from local source code repo - if module_name == "bentoml": - continue - if module_name in self.parsed_module_set: - continue - self.parsed_module_set.add(module_name) - - if module_name in self.module_manager.searched_modules: - m = self.module_manager.searched_modules[module_name] - if m.is_local: - # Recursively search dependencies in sub-modules - if m.path in self.module_manager.zip_modules: - self.seek_in_zip(m.path) - elif m.is_pkg: - self.seek_in_dir(os.path.join(m.path, m.name)) - else: - self.seek_in_file(os.path.join(m.path, "{}.py".format(m.name))) - else: - # check if the package has already been added to the list - if ( - module_name in self.module_manager.pip_module_map - and module_name not in self.dependencies - and module_name not in self.module_manager.setuptools_module_set - ): - self.dependencies[ - module_name - ] = self.module_manager.pip_module_map[module_name] - else: - if module_name in self.module_manager.pip_module_map: - if module_name not in self.dependencies: - # In some special cases, the pip-installed module can not - # be located in the searched_modules - self.dependencies[ - module_name - ] = self.module_manager.pip_module_map[module_name] - else: - if module_name not in sys.builtin_module_names: - self.unknown_module_set.add(module_name) - - def seek_in_dir(self, dir_path): - for path, dir_list, file_list in os.walk(dir_path): - for file_name in file_list: - if not file_name.endswith(".py"): - continue - self.seek_in_file(os.path.join(path, file_name)) - for dir_name in dir_list: - if dir_name in ["__pycache__", ".ipynb_checkpoints"]: - continue - self.seek_in_dir(os.path.join(path, dir_name)) - - def seek_in_zip(self, zip_path): - with zipfile.ZipFile(zip_path) as zf: - for module_path in zf.infolist(): - filename = module_path.filename - if filename.endswith(".py"): - logger.debug("Seeking modules in zip %s", filename) - content = self.module_manager.zip_modules[zip_path].get_source( - filename.replace(".py", "") - ) - self.seek_in_source(content) - - -def lock_pypi_versions(package_list: t.List[str]) -> t.List[str]: - """ - Lock versions of pypi packages in current virtualenv - - Args: - package_list List[str]: - List contains package names - - Raises: - ValueError: if one package in `package_list` is not - available in current virtualenv - - Returns: - - list of lines for requirements.txt - - Example Results: - - * ['numpy==1.20.3', 'pandas==1.2.4', 'scipy==1.4.1'] - """ - pkgs_with_version = [] - - for pkg in package_list: - version = get_pkg_version(pkg) - print(pkg, version) - if version: - pkg_line = f"{pkg}=={version}" - pkgs_with_version.append(pkg_line) - else: - # better warning or throw an exception? - raise ValueError(f"package {pkg} is not available in current virtualenv") - - return pkgs_with_version - - -def with_pip_install_options( - package_lines: t.List[str], - index_url: t.Optional[str] = None, - extra_index_url: t.Optional[str] = None, - find_links: t.Optional[str] = None, -) -> t.List[str]: - """ - Lock versions of pypi packages in current virtualenv - - Args: - package_lines List[str]: - List contains items each representing one line of requirements.txt - - index_url Optional[str]: - value of --index-url - - extra_index_url Optional[str]: - value of --extra_index-url - - find_links Optional[str]: - value of --find-links - - Returns: - - list of lines for requirements.txt - - Example Results: - - * ['pandas==1.2.4 --index-url=https://mirror.baidu.com/pypi/simple', - 'numpy==1.20.3 --index-url=https://mirror.baidu.com/pypi/simple'] - """ - - options = [] - if index_url: - options.append(f"--index-url={index_url}") - if extra_index_url: - options.append(f"--extra-index-url={extra_index_url}") - if find_links: - options.append(f"--find-links={find_links}") - - if not options: - return package_lines - - option_str = " ".join(options) - pkgs_with_options = [pkg + " " + option_str for pkg in package_lines] - return pkgs_with_options - - -def find_required_pypi_packages( - svc: "Service", lock_versions: bool = True -) -> t.List[str]: - """ - Find required pypi packages in a python source file - - Args: - path (`Union[str, bytes, os.PathLike]`): - Path to a python source file - - lock_versions bool: - if the versions of packages should be locked - - Returns: - - list of lines for requirements.txt - - Example Results: - - * ['numpy==1.20.3', 'pandas==1.2.4'] - * ['numpy', 'pandas'] - """ - module_name = svc.__module__ - module = sys.modules[module_name] - reqs, unknown_modules = seek_pip_packages(module.__file__) - for module_name in unknown_modules: - logger.warning("unknown package dependency for module: %s", module_name) - - if lock_versions: - pkg_lines = ["%s==%s" % pkg for pkg in reqs.items()] - else: - pkg_lines = list(reqs.keys()) - - return pkg_lines diff --git a/src/bentoml/_internal/runner/container.py b/src/bentoml/_internal/runner/container.py index 3be63d3cda9..3be81a82f72 100644 --- a/src/bentoml/_internal/runner/container.py +++ b/src/bentoml/_internal/runner/container.py @@ -178,70 +178,6 @@ def from_batch_payloads( return cls.batches_to_batch(batches, batch_dim) -class DMatrixContainer( - DataContainer[ - "ext.DMatrix", - "ext.DMatrix", - ] -): - @classmethod - def batches_to_batch( - cls, - batches: t.Sequence[ext.DMatrix], - batch_dim: int = 0, - ) -> tuple[ext.DMatrix, list[int]]: - raise NotImplementedError - - @classmethod - def batch_to_batches( - cls, - batch: ext.DMatrix, - indices: t.Sequence[int], - batch_dim: int = 0, - ) -> list[ext.DMatrix]: - raise NotImplementedError - - @classmethod - @inject - def to_payload( - cls, - batch: ext.DMatrix, - batch_dim: int, - plasma_db: ext.PlasmaClient | None = Provide[BentoMLContainer.plasma_db], - ) -> Payload: - raise NotImplementedError - - @classmethod - @inject - def from_payload( - cls, - payload: Payload, - plasma_db: ext.PlasmaClient | None = Provide[BentoMLContainer.plasma_db], - ) -> ext.DMatrix: - raise NotImplementedError - - @classmethod - @inject - def batch_to_payloads( - cls, - batch: ext.DMatrix, - indices: t.Sequence[int], - batch_dim: int = 0, - plasma_db: ext.PlasmaClient | None = Provide[BentoMLContainer.plasma_db], - ) -> list[Payload]: - raise NotImplementedError - - @classmethod - @inject - def from_batch_payloads( - cls, - payloads: t.Sequence[Payload], - batch_dim: int = 0, - plasma_db: ext.PlasmaClient | None = Provide[BentoMLContainer.plasma_db], - ) -> tuple[ext.DMatrix, list[int]]: - raise NotImplementedError - - class PandasDataFrameContainer( DataContainer[t.Union["ext.PdDataFrame", "ext.PdSeries"], "ext.PdDataFrame"] ): diff --git a/src/bentoml/_internal/types.py b/src/bentoml/_internal/types.py index 2b3dce7adc5..534120efb33 100644 --- a/src/bentoml/_internal/types.py +++ b/src/bentoml/_internal/types.py @@ -13,8 +13,6 @@ from datetime import timedelta from dataclasses import dataclass -from .utils.dataclasses import json_serializer - if sys.version_info < (3, 8): import collections @@ -284,7 +282,6 @@ def is_compatible_type(t1: AnyType, t2: AnyType) -> bool: return True -@json_serializer(fields=["uri", "name"], compat=True) @dataclass(frozen=False) class FileLike(t.Generic[t.AnyStr], io.IOBase): """ diff --git a/src/bentoml/_internal/utils/__init__.py b/src/bentoml/_internal/utils/__init__.py index 1b99c4b1e60..f89ffb1b366 100644 --- a/src/bentoml/_internal/utils/__init__.py +++ b/src/bentoml/_internal/utils/__init__.py @@ -57,7 +57,6 @@ "cached_property", "cached_contextmanager", "reserve_free_port", - "catch_exceptions", "LazyLoader", "validate_or_create_dir", "display_path_under_home", @@ -177,34 +176,6 @@ def human_readable_size(size: t.Union[int, float], decimal_places: int = 2) -> s return f"{size:.{decimal_places}f} {unit}" -class catch_exceptions(t.Generic[_T_co], object): - def __init__( - self, - catch_exc: t.Union[t.Type[BaseException], t.Tuple[t.Type[BaseException], ...]], - throw_exc: t.Callable[[str], BaseException], - msg: str = "", - fallback: t.Optional[_T_co] = None, - raises: t.Optional[bool] = True, - ) -> None: - self._catch_exc = catch_exc - self._throw_exc = throw_exc - self._msg = msg - self._fallback = fallback - self._raises = raises - - def __call__(self, func: t.Callable[P, _T_co]) -> t.Callable[P, t.Optional[_T_co]]: - @functools.wraps(func) - def _(*args: P.args, **kwargs: P.kwargs) -> t.Optional[_T_co]: - try: - return func(*args, **kwargs) - except self._catch_exc: - if self._raises: - raise self._throw_exc(self._msg) - return self._fallback - - return _ - - def split_with_quotes( s: str, sep: str = ",", diff --git a/src/bentoml/_internal/utils/csv.py b/src/bentoml/_internal/utils/csv.py deleted file mode 100644 index 5a8115bf7d9..00000000000 --- a/src/bentoml/_internal/utils/csv.py +++ /dev/null @@ -1,77 +0,0 @@ -# CSV utils following https://tools.ietf.org/html/rfc4180 -import typing as t - - -def csv_splitlines(string: str) -> t.Iterator[str]: - if '"' in string: - - def _iter_line(line: str) -> t.Iterator[str]: - quoted = False - last_cur = 0 - for i, c in enumerate(line): - if c == '"': - quoted = not quoted - if not quoted and string[i : i + 1] == "\n": - if i == 0 or string[i - 1] != "\r": - yield line[last_cur:i] - last_cur = i + 1 - else: - yield line[last_cur : i - 1] - last_cur = i + 1 - yield line[last_cur:] - - return _iter_line(string) - - return iter(string.splitlines()) - - -def csv_split(string: str, delimiter: str) -> t.Iterator[str]: - if '"' in string: - d_len = len(delimiter) - - def _iter_line(line: str) -> t.Iterator[str]: - quoted = False - last_cur = 0 - for i, c in enumerate(line): - if c == '"': - quoted = not quoted - if not quoted and string[i : i + d_len] == delimiter: - yield line[last_cur:i] - last_cur = i + d_len - yield line[last_cur:] - - return _iter_line(string) - else: - return iter(string.split(delimiter)) - - -def csv_row(tds: t.Iterable) -> str: - return ",".join(csv_quote(td) for td in tds) - - -def csv_unquote(string: str) -> str: - if '"' in string: - string = string.strip() - assert string[0] == '"' and string[-1] == '"' - return string[1:-1].replace('""', '"') - return string - - -def csv_quote(td: t.Union[int, str]) -> str: - """ - >>> csv_quote(1) - '1' - >>> csv_quote('string') - 'string' - >>> csv_quote('a,b"c') - '"a,b""c"' - >>> csv_quote(' ') - '" "' - """ - if td is None: - td = "" - elif not isinstance(td, str): - td = str(td) - if "\n" in td or '"' in td or "," in td or not td.strip(): - return td.replace('"', '""').join('""') - return td diff --git a/src/bentoml/_internal/utils/dataclasses.py b/src/bentoml/_internal/utils/dataclasses.py deleted file mode 100644 index 90fc49713c0..00000000000 --- a/src/bentoml/_internal/utils/dataclasses.py +++ /dev/null @@ -1,69 +0,0 @@ -import json -import typing as t -from dataclasses import asdict -from dataclasses import fields as get_fields -from dataclasses import is_dataclass - - -class DataclassJsonEncoder(json.JSONEncoder): - """Special json encoder for numpy types""" - - def default(self, o: t.Any): # pylint: disable=method-hidden - if is_dataclass(o): - if hasattr(o, "to_json"): - return o.to_json() - else: - return asdict(o) - return super().default(o) - - -class json_serializer: - def __init__(self, fields: t.Optional[t.List[str]] = None, compat: bool = False): - self.fields = fields - self.compat = compat - - @staticmethod - def _extract_nested(obj: t.Any): - if hasattr(obj, "to_json"): - return obj.to_json() - return obj - - T = t.TypeVar( - "T", - ) - - def __call__(self, klass: t.Type[T]) -> t.Type[T]: - if not is_dataclass(klass): - raise TypeError( - f"{self.__class__.__name__} only accepts dataclasses, " - f"got {klass.__name__}" - ) - default_map = { - f.name: f.default_factory() if callable(f.default_factory) else f.default - for f in get_fields(klass) - } - - if self.fields is None: - _fields = tuple(k for k in default_map.keys() if not k.startswith("_")) - else: - _fields = self.fields - - if self.compat: - - def to_json_compat(data_obj: t.Any): - return { - k: self._extract_nested(getattr(data_obj, k)) - for k in _fields - if default_map[k] != getattr(data_obj, k) - } - - klass.to_json = to_json_compat # type: ignore - - else: - - def to_json(data_obj: t.Any): - return {k: self._extract_nested(getattr(data_obj, k)) for k in _fields} - - klass.to_json = to_json # type: ignore - - return klass diff --git a/src/bentoml/_internal/utils/dataframe.py b/src/bentoml/_internal/utils/dataframe.py deleted file mode 100644 index 51667dc1dc6..00000000000 --- a/src/bentoml/_internal/utils/dataframe.py +++ /dev/null @@ -1,274 +0,0 @@ -import io -import json -import typing as t -import itertools -from typing import TYPE_CHECKING - -from . import catch_exceptions -from .csv import csv_row -from .csv import csv_quote -from .csv import csv_split -from .csv import csv_unquote -from .csv import csv_splitlines -from .lazy_loader import LazyLoader -from ...exceptions import BadInput -from ...exceptions import BentoMLException - -if TYPE_CHECKING: - import pandas as pd -else: - pd = LazyLoader("pd", globals(), "pandas") - - -def check_dataframe_column_contains( - required_column_names: str, df: "pd.DataFrame" -) -> None: - df_columns = set(map(str, df.columns)) - for col in required_column_names: - if col not in df_columns: - raise BadInput( - f"Missing columns: {','.join(set(required_column_names) - df_columns)}, required_column:{df_columns}" # noqa: E501 - ) - - -@catch_exceptions(Exception, BentoMLException, fallback=None) -def guess_orient( - table: t.Union[t.List[t.Mapping[str, t.Any]], t.Dict[str, t.Any]], - strict: bool = False, -) -> t.Optional[t.Set[str]]: - if isinstance(table, list): - if not table: - if strict: - return {"records", "values"} - else: - return {"records"} - if isinstance(table[0], dict): - return {"records"} - else: - return {"values"} - elif isinstance(table, dict): - if set(table) == {"columns", "index", "data"}: - return {"split"} - if set(table) == {"schema", "data"} and "primaryKey" in table["schema"]: - return {"table"} - if strict: - return {"columns", "index"} - else: - return {"columns"} - else: - return None - - -class _DataFrameState(object): - # fmt: off - @t.overload - def __init__(self, columns: t.Optional[t.Dict[str, int]]): ... # noqa: F811,E704 - - @t.overload # noqa: F811 - def __init__(self, columns: t.Optional[t.Tuple[str, ...]]): ... # noqa: F811,E704 - # fmt: on - - def __init__( # noqa: F811 - self, - columns: t.Optional[t.Union[t.Mapping[str, int], t.Tuple[str, ...]]] = None, - ): - self.columns = columns - - -def _from_json_records(state: _DataFrameState, table: list) -> t.Iterator[str]: - if state.columns is None: # make header - state.columns = {k: i for i, k in enumerate(table[0].keys())} - for tr in table: - yield csv_row(tr[c] for c in state.columns) - - -def _from_json_values(_: _DataFrameState, table: list) -> t.Iterator[str]: - for tr in table: - yield csv_row(tr) - - -def _from_json_columns(state: _DataFrameState, table: dict) -> t.Iterator[str]: - if state.columns is None: # make header - state.columns = {k: i for i, k in enumerate(table.keys())} - for row in next(iter(table.values())): - yield csv_row(table[col][row] for col in state.columns) - - -def _from_json_index(state: _DataFrameState, table: dict) -> t.Iterator[str]: - if state.columns is None: # make header - state.columns = {k: i for i, k in enumerate(next(iter(table.values())).keys())} - for row in table.keys(): - yield csv_row(td for td in table[row].values()) - else: - for row in table.keys(): - yield csv_row(table[row][col] for col in state.columns) - - -def _from_json_split(state: _DataFrameState, table: dict) -> t.Iterator[str]: - table_columns = {k: i for i, k in enumerate(table["columns"])} - - if state.columns is None: # make header - state.columns = table_columns - for row in table["data"]: - yield csv_row(row) - else: - idxs = [state.columns[k] for k in table_columns] - for row in table["data"]: - yield csv_row(row[idx] for idx in idxs) - - -def _from_csv_without_index( - state: _DataFrameState, table: t.Iterator[str] -) -> t.Iterator[str]: - row_str = next(table) # skip column names - table_columns = tuple(csv_unquote(s) for s in csv_split(row_str, ",")) - - if state.columns is None: - state.columns = table_columns - for row_str in table: - if not row_str: # skip blank line - continue - if not row_str.strip(): - yield csv_quote(row_str) - else: - yield row_str - elif not all( - c1 == c2 for c1, c2 in itertools.zip_longest(state.columns, table_columns) - ): - # TODO: check type hint for this case. Right now nothing breaks so :) - idxs = [state.columns[k] for k in table_columns] # type: ignore[call-overload] - for row_str in table: - if not row_str: # skip blank line - continue - if not row_str.strip(): - yield csv_quote(row_str) - else: - tr = tuple(s for s in csv_split(row_str, ",")) - yield csv_row(tr[i] for i in idxs) - else: - for row_str in table: - if not row_str: # skip blank line - continue - if not row_str.strip(): - yield csv_quote(row_str) - else: - yield row_str - - -_ORIENT_MAP: t.Dict[str, t.Callable[["_DataFrameState", str], t.Iterator[str]]] = { - "records": _from_json_records, - "columns": _from_json_columns, - "values": _from_json_values, - "split": _from_json_split, - "index": _from_json_index, - # 'table': _from_json_table, -} - -PANDAS_DATAFRAME_TO_JSON_ORIENT_OPTIONS = {k for k in _ORIENT_MAP} - - -def _dataframe_csv_from_input( - table: str, - fmt: str, - orient: t.Optional[str], - state: _DataFrameState, -) -> t.Optional[t.Tuple[str, ...]]: - try: - if not fmt or fmt == "json": - table = json.loads(table) - if not orient: - orient = guess_orient(table, strict=False).pop() - else: - # TODO: this can be either a set or a string - guessed_orient = guess_orient(table, strict=True) # type: t.Set[str] - if set(orient) != guessed_orient and orient not in guessed_orient: - return None - if orient not in _ORIENT_MAP: - return None - _from_json = _ORIENT_MAP[orient] - try: - return tuple(_from_json(state, table)) - except (TypeError, AttributeError, KeyError, IndexError): - return None - elif fmt == "csv": - _table = csv_splitlines(table) - return tuple(_from_csv_without_index(state, _table)) - else: - return None - except json.JSONDecodeError: - return None - - -def from_json_or_csv( - data: t.Iterable[str], - formats: t.Iterable[str], - orient: t.Optional[str] = None, - columns: t.Optional[t.List[str]] = None, - dtype: t.Optional[t.Union[bool, t.Dict[str, t.Any]]] = None, -) -> t.Tuple[t.Optional["pd.DataFrame"], t.Tuple[int, ...]]: - """ - Load DataFrames from multiple raw data sources in JSON or CSV format, efficiently - - Background: Each call of `pandas.read_csv()` or `pandas.read_json` takes about - 100ms, no matter how many lines the read data contains. This function concats - the ragged_tensor/csv before running `read_json`/`read_csv` to improve performance. - - Args: - data (`Iterable[str]`): - Data in either JSON or CSV format - formats (`Iterable[str]`): - List of formats, which are either `json` or `csv` - orient (:code:`str`, `optional`, default `"records"`): - Indication of expected JSON string format. Compatible JSON strings can be - produced by `pandas.io.json.to_json()` with a corresponding orient value. - Possible orients are: - - `split` - :code:`Dict[str, Any]`: {idx -> [idx], columns -> [columns], data - -> [values]} - - `records` - `List[Any]`: [{column -> value}, ..., {column -> value}] - - `index` - :code:`Dict[str, Any]`: {idx -> {column -> value}} - - `columns` - :code:`Dict[str, Any]`: {column -> {index -> value}} - - `values` - :code:`Dict[str, Any]`: Values arrays - columns (`List[str]`, `optional`, default `None`): - List of column names that users wish to update - dtype (:code:`Union[bool, Dict[str, Any]]`, `optional`, default `None`): - Data type to inputs/outputs to. If it is a boolean, then pandas will infer - data types. Otherwise, if it is a dictionary of column to data type, then - applies those to incoming dataframes. If False, then don't infer data types - at all (only applies to the data). This is not applicable when - `orient='table'`. - - Returns: - A tuple containing a `pandas.DataFrame` and a tuple containing the length of all - series in the returned DataFrame. - - Raises: - pandas.errors.EmptyDataError: - When data is not found or is empty. - """ - state = _DataFrameState( - columns={k: i for i, k in enumerate(columns)} if columns else None - ) - trs_list = tuple( - _dataframe_csv_from_input(_t, _fmt, orient, state) - for _t, _fmt in zip(data, formats) - ) - header = ",".join(csv_quote(td) for td in state.columns) if state.columns else None - lens = tuple(len(trs) if trs else 0 for trs in trs_list) - table = "\n".join(tr for trs in trs_list if trs is not None for tr in trs) - try: - if not header: - df = pd.read_csv( - io.StringIO(table), - dtype=dtype, - index_col=None, - header=None, - ) - else: - df = pd.read_csv( - io.StringIO("\n".join((header, table))), - dtype=dtype, - index_col=None, - ) - return df, lens - except pd.errors.EmptyDataError: - return None, lens