diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 3f3e7e535eb..b33c88b9f67 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -66,7 +66,7 @@ jobs: - name: "windows-py37-pluggy" python: "3.7" os: windows-latest - tox_env: "py37-pluggymain-xdist" + tox_env: "py37-pluggymain-pylib-xdist" - name: "windows-py38" python: "3.8" os: windows-latest @@ -93,7 +93,7 @@ jobs: - name: "ubuntu-py37-pluggy" python: "3.7" os: ubuntu-latest - tox_env: "py37-pluggymain-xdist" + tox_env: "py37-pluggymain-pylib-xdist" - name: "ubuntu-py37-freeze" python: "3.7" os: ubuntu-latest diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 3cc1b1de718..de612d96988 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -65,7 +65,6 @@ repos: args: [] additional_dependencies: - iniconfig>=1.1.0 - - py>=1.8.2 - attrs>=19.2.0 - packaging - tomli diff --git a/changelog/10396.deprecation.rst b/changelog/10396.deprecation.rst new file mode 100644 index 00000000000..84461e82ede --- /dev/null +++ b/changelog/10396.deprecation.rst @@ -0,0 +1 @@ +pytest no longer depends on the ``py`` library. ``pytest`` provides a vendored copy of ``py.error`` and ``py.path`` modules but will use the ``py`` library if it is installed. If you need other ``py.*`` modules, continue to install the deprecated ``py`` library separately, otherwise it can usually be removed as a dependency. diff --git a/setup.cfg b/setup.cfg index 38f50556c93..39ade4dff4c 100644 --- a/setup.cfg +++ b/setup.cfg @@ -36,16 +36,17 @@ packages = _pytest _pytest._code _pytest._io + _pytest._py _pytest.assertion _pytest.config _pytest.mark pytest +py_modules = py install_requires = attrs>=19.2.0 iniconfig packaging pluggy>=0.12,<2.0 - py>=1.8.2 colorama;sys_platform=="win32" exceptiongroup>=1.0.0rc8;python_version<"3.11" importlib-metadata>=0.12;python_version<"3.8" diff --git a/src/_pytest/_py/__init__.py b/src/_pytest/_py/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/src/_pytest/_py/error.py b/src/_pytest/_py/error.py new file mode 100644 index 00000000000..0b8f2d535ef --- /dev/null +++ b/src/_pytest/_py/error.py @@ -0,0 +1,109 @@ +"""create errno-specific classes for IO or os calls.""" +from __future__ import annotations + +import errno +import os +import sys +from typing import Callable +from typing import TYPE_CHECKING +from typing import TypeVar + +if TYPE_CHECKING: + from typing_extensions import ParamSpec + + P = ParamSpec("P") + +R = TypeVar("R") + + +class Error(EnvironmentError): + def __repr__(self) -> str: + return "{}.{} {!r}: {} ".format( + self.__class__.__module__, + self.__class__.__name__, + self.__class__.__doc__, + " ".join(map(str, self.args)), + # repr(self.args) + ) + + def __str__(self) -> str: + s = "[{}]: {}".format( + self.__class__.__doc__, + " ".join(map(str, self.args)), + ) + return s + + +_winerrnomap = { + 2: errno.ENOENT, + 3: errno.ENOENT, + 17: errno.EEXIST, + 18: errno.EXDEV, + 13: errno.EBUSY, # empty cd drive, but ENOMEDIUM seems unavailiable + 22: errno.ENOTDIR, + 20: errno.ENOTDIR, + 267: errno.ENOTDIR, + 5: errno.EACCES, # anything better? +} + + +class ErrorMaker: + """lazily provides Exception classes for each possible POSIX errno + (as defined per the 'errno' module). All such instances + subclass EnvironmentError. + """ + + _errno2class: dict[int, type[Error]] = {} + + def __getattr__(self, name: str) -> type[Error]: + if name[0] == "_": + raise AttributeError(name) + eno = getattr(errno, name) + cls = self._geterrnoclass(eno) + setattr(self, name, cls) + return cls + + def _geterrnoclass(self, eno: int) -> type[Error]: + try: + return self._errno2class[eno] + except KeyError: + clsname = errno.errorcode.get(eno, "UnknownErrno%d" % (eno,)) + errorcls = type( + clsname, + (Error,), + {"__module__": "py.error", "__doc__": os.strerror(eno)}, + ) + self._errno2class[eno] = errorcls + return errorcls + + def checked_call( + self, func: Callable[P, R], *args: P.args, **kwargs: P.kwargs + ) -> R: + """Call a function and raise an errno-exception if applicable.""" + __tracebackhide__ = True + try: + return func(*args, **kwargs) + except Error: + raise + except OSError as value: + if not hasattr(value, "errno"): + raise + errno = value.errno + if sys.platform == "win32": + try: + cls = self._geterrnoclass(_winerrnomap[errno]) + except KeyError: + raise value + else: + # we are not on Windows, or we got a proper OSError + cls = self._geterrnoclass(errno) + + raise cls(f"{func.__name__}{args!r}") + + +_error_maker = ErrorMaker() +checked_call = _error_maker.checked_call + + +def __getattr__(attr: str) -> type[Error]: + return getattr(_error_maker, attr) # type: ignore[no-any-return] diff --git a/src/_pytest/_py/path.py b/src/_pytest/_py/path.py new file mode 100644 index 00000000000..00f1515238b --- /dev/null +++ b/src/_pytest/_py/path.py @@ -0,0 +1,1474 @@ +"""local path implementation.""" +from __future__ import annotations + +import atexit +import fnmatch +import importlib.util +import io +import os +import posixpath +import sys +import uuid +import warnings +from contextlib import contextmanager +from os.path import abspath +from os.path import dirname +from os.path import exists +from os.path import isabs +from os.path import isdir +from os.path import isfile +from os.path import islink +from os.path import normpath +from stat import S_ISDIR +from stat import S_ISLNK +from stat import S_ISREG +from typing import Any +from typing import Callable +from typing import overload +from typing import TYPE_CHECKING + +from . import error + +if TYPE_CHECKING: + from typing import Literal + +# Moved from local.py. +iswin32 = sys.platform == "win32" or (getattr(os, "_name", False) == "nt") + + +class Checkers: + _depend_on_existence = "exists", "link", "dir", "file" + + def __init__(self, path): + self.path = path + + def dotfile(self): + return self.path.basename.startswith(".") + + def ext(self, arg): + if not arg.startswith("."): + arg = "." + arg + return self.path.ext == arg + + def basename(self, arg): + return self.path.basename == arg + + def basestarts(self, arg): + return self.path.basename.startswith(arg) + + def relto(self, arg): + return self.path.relto(arg) + + def fnmatch(self, arg): + return self.path.fnmatch(arg) + + def endswith(self, arg): + return str(self.path).endswith(arg) + + def _evaluate(self, kw): + from .._code.source import getrawcode + + for name, value in kw.items(): + invert = False + meth = None + try: + meth = getattr(self, name) + except AttributeError: + if name[:3] == "not": + invert = True + try: + meth = getattr(self, name[3:]) + except AttributeError: + pass + if meth is None: + raise TypeError(f"no {name!r} checker available for {self.path!r}") + try: + if getrawcode(meth).co_argcount > 1: + if (not meth(value)) ^ invert: + return False + else: + if bool(value) ^ bool(meth()) ^ invert: + return False + except (error.ENOENT, error.ENOTDIR, error.EBUSY): + # EBUSY feels not entirely correct, + # but its kind of necessary since ENOMEDIUM + # is not accessible in python + for name in self._depend_on_existence: + if name in kw: + if kw.get(name): + return False + name = "not" + name + if name in kw: + if not kw.get(name): + return False + return True + + _statcache: Stat + + def _stat(self) -> Stat: + try: + return self._statcache + except AttributeError: + try: + self._statcache = self.path.stat() + except error.ELOOP: + self._statcache = self.path.lstat() + return self._statcache + + def dir(self): + return S_ISDIR(self._stat().mode) + + def file(self): + return S_ISREG(self._stat().mode) + + def exists(self): + return self._stat() + + def link(self): + st = self.path.lstat() + return S_ISLNK(st.mode) + + +class NeverRaised(Exception): + pass + + +class Visitor: + def __init__(self, fil, rec, ignore, bf, sort): + if isinstance(fil, str): + fil = FNMatcher(fil) + if isinstance(rec, str): + self.rec: Callable[[LocalPath], bool] = FNMatcher(rec) + elif not hasattr(rec, "__call__") and rec: + self.rec = lambda path: True + else: + self.rec = rec + self.fil = fil + self.ignore = ignore + self.breadthfirst = bf + self.optsort = sort and sorted or (lambda x: x) + + def gen(self, path): + try: + entries = path.listdir() + except self.ignore: + return + rec = self.rec + dirs = self.optsort( + [p for p in entries if p.check(dir=1) and (rec is None or rec(p))] + ) + if not self.breadthfirst: + for subdir in dirs: + for p in self.gen(subdir): + yield p + for p in self.optsort(entries): + if self.fil is None or self.fil(p): + yield p + if self.breadthfirst: + for subdir in dirs: + for p in self.gen(subdir): + yield p + + +class FNMatcher: + def __init__(self, pattern): + self.pattern = pattern + + def __call__(self, path): + pattern = self.pattern + + if ( + pattern.find(path.sep) == -1 + and iswin32 + and pattern.find(posixpath.sep) != -1 + ): + # Running on Windows, the pattern has no Windows path separators, + # and the pattern has one or more Posix path separators. Replace + # the Posix path separators with the Windows path separator. + pattern = pattern.replace(posixpath.sep, path.sep) + + if pattern.find(path.sep) == -1: + name = path.basename + else: + name = str(path) # path.strpath # XXX svn? + if not os.path.isabs(pattern): + pattern = "*" + path.sep + pattern + return fnmatch.fnmatch(name, pattern) + + +def map_as_list(func, iter): + return list(map(func, iter)) + + +class Stat: + if TYPE_CHECKING: + + @property + def size(self) -> int: + ... + + @property + def mtime(self) -> float: + ... + + def __getattr__(self, name: str) -> Any: + return getattr(self._osstatresult, "st_" + name) + + def __init__(self, path, osstatresult): + self.path = path + self._osstatresult = osstatresult + + @property + def owner(self): + if iswin32: + raise NotImplementedError("XXX win32") + import pwd + + entry = error.checked_call(pwd.getpwuid, self.uid) + return entry[0] + + @property + def group(self): + """Return group name of file.""" + if iswin32: + raise NotImplementedError("XXX win32") + import grp + + entry = error.checked_call(grp.getgrgid, self.gid) + return entry[0] + + def isdir(self): + return S_ISDIR(self._osstatresult.st_mode) + + def isfile(self): + return S_ISREG(self._osstatresult.st_mode) + + def islink(self): + self.path.lstat() + return S_ISLNK(self._osstatresult.st_mode) + + +def getuserid(user): + import pwd + + if not isinstance(user, int): + user = pwd.getpwnam(user)[2] + return user + + +def getgroupid(group): + import grp + + if not isinstance(group, int): + group = grp.getgrnam(group)[2] + return group + + +class LocalPath: + """Object oriented interface to os.path and other local filesystem + related information. + """ + + class ImportMismatchError(ImportError): + """raised on pyimport() if there is a mismatch of __file__'s""" + + sep = os.sep + + def __init__(self, path=None, expanduser=False): + """Initialize and return a local Path instance. + + Path can be relative to the current directory. + If path is None it defaults to the current working directory. + If expanduser is True, tilde-expansion is performed. + Note that Path instances always carry an absolute path. + Note also that passing in a local path object will simply return + the exact same path object. Use new() to get a new copy. + """ + if path is None: + self.strpath = error.checked_call(os.getcwd) + else: + try: + path = os.fspath(path) + except TypeError: + raise ValueError( + "can only pass None, Path instances " + "or non-empty strings to LocalPath" + ) + if expanduser: + path = os.path.expanduser(path) + self.strpath = abspath(path) + + if sys.platform != "win32": + + def chown(self, user, group, rec=0): + """Change ownership to the given user and group. + user and group may be specified by a number or + by a name. if rec is True change ownership + recursively. + """ + uid = getuserid(user) + gid = getgroupid(group) + if rec: + for x in self.visit(rec=lambda x: x.check(link=0)): + if x.check(link=0): + error.checked_call(os.chown, str(x), uid, gid) + error.checked_call(os.chown, str(self), uid, gid) + + def readlink(self) -> str: + """Return value of a symbolic link.""" + # https://github.com/python/mypy/issues/12278 + return error.checked_call(os.readlink, self.strpath) # type: ignore[arg-type,return-value] + + def mklinkto(self, oldname): + """Posix style hard link to another name.""" + error.checked_call(os.link, str(oldname), str(self)) + + def mksymlinkto(self, value, absolute=1): + """Create a symbolic link with the given value (pointing to another name).""" + if absolute: + error.checked_call(os.symlink, str(value), self.strpath) + else: + base = self.common(value) + # with posix local paths '/' is always a common base + relsource = self.__class__(value).relto(base) + reldest = self.relto(base) + n = reldest.count(self.sep) + target = self.sep.join(("..",) * n + (relsource,)) + error.checked_call(os.symlink, target, self.strpath) + + def __div__(self, other): + return self.join(os.fspath(other)) + + __truediv__ = __div__ # py3k + + @property + def basename(self): + """Basename part of path.""" + return self._getbyspec("basename")[0] + + @property + def dirname(self): + """Dirname part of path.""" + return self._getbyspec("dirname")[0] + + @property + def purebasename(self): + """Pure base name of the path.""" + return self._getbyspec("purebasename")[0] + + @property + def ext(self): + """Extension of the path (including the '.').""" + return self._getbyspec("ext")[0] + + def read_binary(self): + """Read and return a bytestring from reading the path.""" + with self.open("rb") as f: + return f.read() + + def read_text(self, encoding): + """Read and return a Unicode string from reading the path.""" + with self.open("r", encoding=encoding) as f: + return f.read() + + def read(self, mode="r"): + """Read and return a bytestring from reading the path.""" + with self.open(mode) as f: + return f.read() + + def readlines(self, cr=1): + """Read and return a list of lines from the path. if cr is False, the + newline will be removed from the end of each line.""" + mode = "r" + + if not cr: + content = self.read(mode) + return content.split("\n") + else: + f = self.open(mode) + try: + return f.readlines() + finally: + f.close() + + def load(self): + """(deprecated) return object unpickled from self.read()""" + f = self.open("rb") + try: + import pickle + + return error.checked_call(pickle.load, f) + finally: + f.close() + + def move(self, target): + """Move this path to target.""" + if target.relto(self): + raise error.EINVAL(target, "cannot move path into a subdirectory of itself") + try: + self.rename(target) + except error.EXDEV: # invalid cross-device link + self.copy(target) + self.remove() + + def fnmatch(self, pattern): + """Return true if the basename/fullname matches the glob-'pattern'. + + valid pattern characters:: + + * matches everything + ? matches any single character + [seq] matches any character in seq + [!seq] matches any char not in seq + + If the pattern contains a path-separator then the full path + is used for pattern matching and a '*' is prepended to the + pattern. + + if the pattern doesn't contain a path-separator the pattern + is only matched against the basename. + """ + return FNMatcher(pattern)(self) + + def relto(self, relpath): + """Return a string which is the relative part of the path + to the given 'relpath'. + """ + if not isinstance(relpath, (str, LocalPath)): + raise TypeError(f"{relpath!r}: not a string or path object") + strrelpath = str(relpath) + if strrelpath and strrelpath[-1] != self.sep: + strrelpath += self.sep + # assert strrelpath[-1] == self.sep + # assert strrelpath[-2] != self.sep + strself = self.strpath + if sys.platform == "win32" or getattr(os, "_name", None) == "nt": + if os.path.normcase(strself).startswith(os.path.normcase(strrelpath)): + return strself[len(strrelpath) :] + elif strself.startswith(strrelpath): + return strself[len(strrelpath) :] + return "" + + def ensure_dir(self, *args): + """Ensure the path joined with args is a directory.""" + return self.ensure(*args, **{"dir": True}) + + def bestrelpath(self, dest): + """Return a string which is a relative path from self + (assumed to be a directory) to dest such that + self.join(bestrelpath) == dest and if not such + path can be determined return dest. + """ + try: + if self == dest: + return os.curdir + base = self.common(dest) + if not base: # can be the case on windows + return str(dest) + self2base = self.relto(base) + reldest = dest.relto(base) + if self2base: + n = self2base.count(self.sep) + 1 + else: + n = 0 + lst = [os.pardir] * n + if reldest: + lst.append(reldest) + target = dest.sep.join(lst) + return target + except AttributeError: + return str(dest) + + def exists(self): + return self.check() + + def isdir(self): + return self.check(dir=1) + + def isfile(self): + return self.check(file=1) + + def parts(self, reverse=False): + """Return a root-first list of all ancestor directories + plus the path itself. + """ + current = self + lst = [self] + while 1: + last = current + current = current.dirpath() + if last == current: + break + lst.append(current) + if not reverse: + lst.reverse() + return lst + + def common(self, other): + """Return the common part shared with the other path + or None if there is no common part. + """ + last = None + for x, y in zip(self.parts(), other.parts()): + if x != y: + return last + last = x + return last + + def __add__(self, other): + """Return new path object with 'other' added to the basename""" + return self.new(basename=self.basename + str(other)) + + def visit(self, fil=None, rec=None, ignore=NeverRaised, bf=False, sort=False): + """Yields all paths below the current one + + fil is a filter (glob pattern or callable), if not matching the + path will not be yielded, defaulting to None (everything is + returned) + + rec is a filter (glob pattern or callable) that controls whether + a node is descended, defaulting to None + + ignore is an Exception class that is ignoredwhen calling dirlist() + on any of the paths (by default, all exceptions are reported) + + bf if True will cause a breadthfirst search instead of the + default depthfirst. Default: False + + sort if True will sort entries within each directory level. + """ + yield from Visitor(fil, rec, ignore, bf, sort).gen(self) + + def _sortlist(self, res, sort): + if sort: + if hasattr(sort, "__call__"): + warnings.warn( + DeprecationWarning( + "listdir(sort=callable) is deprecated and breaks on python3" + ), + stacklevel=3, + ) + res.sort(sort) + else: + res.sort() + + def __fspath__(self): + return self.strpath + + def __hash__(self): + s = self.strpath + if iswin32: + s = s.lower() + return hash(s) + + def __eq__(self, other): + s1 = os.fspath(self) + try: + s2 = os.fspath(other) + except TypeError: + return False + if iswin32: + s1 = s1.lower() + try: + s2 = s2.lower() + except AttributeError: + return False + return s1 == s2 + + def __ne__(self, other): + return not (self == other) + + def __lt__(self, other): + return os.fspath(self) < os.fspath(other) + + def __gt__(self, other): + return os.fspath(self) > os.fspath(other) + + def samefile(self, other): + """Return True if 'other' references the same file as 'self'.""" + other = os.fspath(other) + if not isabs(other): + other = abspath(other) + if self == other: + return True + if not hasattr(os.path, "samefile"): + return False + return error.checked_call(os.path.samefile, self.strpath, other) + + def remove(self, rec=1, ignore_errors=False): + """Remove a file or directory (or a directory tree if rec=1). + if ignore_errors is True, errors while removing directories will + be ignored. + """ + if self.check(dir=1, link=0): + if rec: + # force remove of readonly files on windows + if iswin32: + self.chmod(0o700, rec=1) + import shutil + + error.checked_call( + shutil.rmtree, self.strpath, ignore_errors=ignore_errors + ) + else: + error.checked_call(os.rmdir, self.strpath) + else: + if iswin32: + self.chmod(0o700) + error.checked_call(os.remove, self.strpath) + + def computehash(self, hashtype="md5", chunksize=524288): + """Return hexdigest of hashvalue for this file.""" + try: + try: + import hashlib as mod + except ImportError: + if hashtype == "sha1": + hashtype = "sha" + mod = __import__(hashtype) + hash = getattr(mod, hashtype)() + except (AttributeError, ImportError): + raise ValueError(f"Don't know how to compute {hashtype!r} hash") + f = self.open("rb") + try: + while 1: + buf = f.read(chunksize) + if not buf: + return hash.hexdigest() + hash.update(buf) + finally: + f.close() + + def new(self, **kw): + """Create a modified version of this path. + the following keyword arguments modify various path parts:: + + a:/some/path/to/a/file.ext + xx drive + xxxxxxxxxxxxxxxxx dirname + xxxxxxxx basename + xxxx purebasename + xxx ext + """ + obj = object.__new__(self.__class__) + if not kw: + obj.strpath = self.strpath + return obj + drive, dirname, basename, purebasename, ext = self._getbyspec( + "drive,dirname,basename,purebasename,ext" + ) + if "basename" in kw: + if "purebasename" in kw or "ext" in kw: + raise ValueError("invalid specification %r" % kw) + else: + pb = kw.setdefault("purebasename", purebasename) + try: + ext = kw["ext"] + except KeyError: + pass + else: + if ext and not ext.startswith("."): + ext = "." + ext + kw["basename"] = pb + ext + + if "dirname" in kw and not kw["dirname"]: + kw["dirname"] = drive + else: + kw.setdefault("dirname", dirname) + kw.setdefault("sep", self.sep) + obj.strpath = normpath("%(dirname)s%(sep)s%(basename)s" % kw) + return obj + + def _getbyspec(self, spec: str) -> list[str]: + """See new for what 'spec' can be.""" + res = [] + parts = self.strpath.split(self.sep) + + args = filter(None, spec.split(",")) + for name in args: + if name == "drive": + res.append(parts[0]) + elif name == "dirname": + res.append(self.sep.join(parts[:-1])) + else: + basename = parts[-1] + if name == "basename": + res.append(basename) + else: + i = basename.rfind(".") + if i == -1: + purebasename, ext = basename, "" + else: + purebasename, ext = basename[:i], basename[i:] + if name == "purebasename": + res.append(purebasename) + elif name == "ext": + res.append(ext) + else: + raise ValueError("invalid part specification %r" % name) + return res + + def dirpath(self, *args, **kwargs): + """Return the directory path joined with any given path arguments.""" + if not kwargs: + path = object.__new__(self.__class__) + path.strpath = dirname(self.strpath) + if args: + path = path.join(*args) + return path + return self.new(basename="").join(*args, **kwargs) + + def join(self, *args: os.PathLike[str], abs: bool = False) -> LocalPath: + """Return a new path by appending all 'args' as path + components. if abs=1 is used restart from root if any + of the args is an absolute path. + """ + sep = self.sep + strargs = [os.fspath(arg) for arg in args] + strpath = self.strpath + if abs: + newargs: list[str] = [] + for arg in reversed(strargs): + if isabs(arg): + strpath = arg + strargs = newargs + break + newargs.insert(0, arg) + # special case for when we have e.g. strpath == "/" + actual_sep = "" if strpath.endswith(sep) else sep + for arg in strargs: + arg = arg.strip(sep) + if iswin32: + # allow unix style paths even on windows. + arg = arg.strip("/") + arg = arg.replace("/", sep) + strpath = strpath + actual_sep + arg + actual_sep = sep + obj = object.__new__(self.__class__) + obj.strpath = normpath(strpath) + return obj + + def open(self, mode="r", ensure=False, encoding=None): + """Return an opened file with the given mode. + + If ensure is True, create parent directories if needed. + """ + if ensure: + self.dirpath().ensure(dir=1) + if encoding: + return error.checked_call(io.open, self.strpath, mode, encoding=encoding) + return error.checked_call(open, self.strpath, mode) + + def _fastjoin(self, name): + child = object.__new__(self.__class__) + child.strpath = self.strpath + self.sep + name + return child + + def islink(self): + return islink(self.strpath) + + def check(self, **kw): + """Check a path for existence and properties. + + Without arguments, return True if the path exists, otherwise False. + + valid checkers:: + + file=1 # is a file + file=0 # is not a file (may not even exist) + dir=1 # is a dir + link=1 # is a link + exists=1 # exists + + You can specify multiple checker definitions, for example:: + + path.check(file=1, link=1) # a link pointing to a file + """ + if not kw: + return exists(self.strpath) + if len(kw) == 1: + if "dir" in kw: + return not kw["dir"] ^ isdir(self.strpath) + if "file" in kw: + return not kw["file"] ^ isfile(self.strpath) + if not kw: + kw = {"exists": 1} + return Checkers(self)._evaluate(kw) + + _patternchars = set("*?[" + os.path.sep) + + def listdir(self, fil=None, sort=None): + """List directory contents, possibly filter by the given fil func + and possibly sorted. + """ + if fil is None and sort is None: + names = error.checked_call(os.listdir, self.strpath) + return map_as_list(self._fastjoin, names) + if isinstance(fil, str): + if not self._patternchars.intersection(fil): + child = self._fastjoin(fil) + if exists(child.strpath): + return [child] + return [] + fil = FNMatcher(fil) + names = error.checked_call(os.listdir, self.strpath) + res = [] + for name in names: + child = self._fastjoin(name) + if fil is None or fil(child): + res.append(child) + self._sortlist(res, sort) + return res + + def size(self) -> int: + """Return size of the underlying file object""" + return self.stat().size + + def mtime(self) -> float: + """Return last modification time of the path.""" + return self.stat().mtime + + def copy(self, target, mode=False, stat=False): + """Copy path to target. + + If mode is True, will copy copy permission from path to target. + If stat is True, copy permission, last modification + time, last access time, and flags from path to target. + """ + if self.check(file=1): + if target.check(dir=1): + target = target.join(self.basename) + assert self != target + copychunked(self, target) + if mode: + copymode(self.strpath, target.strpath) + if stat: + copystat(self, target) + else: + + def rec(p): + return p.check(link=0) + + for x in self.visit(rec=rec): + relpath = x.relto(self) + newx = target.join(relpath) + newx.dirpath().ensure(dir=1) + if x.check(link=1): + newx.mksymlinkto(x.readlink()) + continue + elif x.check(file=1): + copychunked(x, newx) + elif x.check(dir=1): + newx.ensure(dir=1) + if mode: + copymode(x.strpath, newx.strpath) + if stat: + copystat(x, newx) + + def rename(self, target): + """Rename this path to target.""" + target = os.fspath(target) + return error.checked_call(os.rename, self.strpath, target) + + def dump(self, obj, bin=1): + """Pickle object into path location""" + f = self.open("wb") + import pickle + + try: + error.checked_call(pickle.dump, obj, f, bin) + finally: + f.close() + + def mkdir(self, *args): + """Create & return the directory joined with args.""" + p = self.join(*args) + error.checked_call(os.mkdir, os.fspath(p)) + return p + + def write_binary(self, data, ensure=False): + """Write binary data into path. If ensure is True create + missing parent directories. + """ + if ensure: + self.dirpath().ensure(dir=1) + with self.open("wb") as f: + f.write(data) + + def write_text(self, data, encoding, ensure=False): + """Write text data into path using the specified encoding. + If ensure is True create missing parent directories. + """ + if ensure: + self.dirpath().ensure(dir=1) + with self.open("w", encoding=encoding) as f: + f.write(data) + + def write(self, data, mode="w", ensure=False): + """Write data into path. If ensure is True create + missing parent directories. + """ + if ensure: + self.dirpath().ensure(dir=1) + if "b" in mode: + if not isinstance(data, bytes): + raise ValueError("can only process bytes") + else: + if not isinstance(data, str): + if not isinstance(data, bytes): + data = str(data) + else: + data = data.decode(sys.getdefaultencoding()) + f = self.open(mode) + try: + f.write(data) + finally: + f.close() + + def _ensuredirs(self): + parent = self.dirpath() + if parent == self: + return self + if parent.check(dir=0): + parent._ensuredirs() + if self.check(dir=0): + try: + self.mkdir() + except error.EEXIST: + # race condition: file/dir created by another thread/process. + # complain if it is not a dir + if self.check(dir=0): + raise + return self + + def ensure(self, *args, **kwargs): + """Ensure that an args-joined path exists (by default as + a file). if you specify a keyword argument 'dir=True' + then the path is forced to be a directory path. + """ + p = self.join(*args) + if kwargs.get("dir", 0): + return p._ensuredirs() + else: + p.dirpath()._ensuredirs() + if not p.check(file=1): + p.open("w").close() + return p + + @overload + def stat(self, raising: Literal[True] = ...) -> Stat: + ... + + @overload + def stat(self, raising: Literal[False]) -> Stat | None: + ... + + def stat(self, raising: bool = True) -> Stat | None: + """Return an os.stat() tuple.""" + if raising: + return Stat(self, error.checked_call(os.stat, self.strpath)) + try: + return Stat(self, os.stat(self.strpath)) + except KeyboardInterrupt: + raise + except Exception: + return None + + def lstat(self) -> Stat: + """Return an os.lstat() tuple.""" + return Stat(self, error.checked_call(os.lstat, self.strpath)) + + def setmtime(self, mtime=None): + """Set modification time for the given path. if 'mtime' is None + (the default) then the file's mtime is set to current time. + + Note that the resolution for 'mtime' is platform dependent. + """ + if mtime is None: + return error.checked_call(os.utime, self.strpath, mtime) + try: + return error.checked_call(os.utime, self.strpath, (-1, mtime)) + except error.EINVAL: + return error.checked_call(os.utime, self.strpath, (self.atime(), mtime)) + + def chdir(self): + """Change directory to self and return old current directory""" + try: + old = self.__class__() + except error.ENOENT: + old = None + error.checked_call(os.chdir, self.strpath) + return old + + @contextmanager + def as_cwd(self): + """ + Return a context manager, which changes to the path's dir during the + managed "with" context. + On __enter__ it returns the old dir, which might be ``None``. + """ + old = self.chdir() + try: + yield old + finally: + if old is not None: + old.chdir() + + def realpath(self): + """Return a new path which contains no symbolic links.""" + return self.__class__(os.path.realpath(self.strpath)) + + def atime(self): + """Return last access time of the path.""" + return self.stat().atime + + def __repr__(self): + return "local(%r)" % self.strpath + + def __str__(self): + """Return string representation of the Path.""" + return self.strpath + + def chmod(self, mode, rec=0): + """Change permissions to the given mode. If mode is an + integer it directly encodes the os-specific modes. + if rec is True perform recursively. + """ + if not isinstance(mode, int): + raise TypeError(f"mode {mode!r} must be an integer") + if rec: + for x in self.visit(rec=rec): + error.checked_call(os.chmod, str(x), mode) + error.checked_call(os.chmod, self.strpath, mode) + + def pypkgpath(self): + """Return the Python package path by looking for the last + directory upwards which still contains an __init__.py. + Return None if a pkgpath can not be determined. + """ + pkgpath = None + for parent in self.parts(reverse=True): + if parent.isdir(): + if not parent.join("__init__.py").exists(): + break + if not isimportable(parent.basename): + break + pkgpath = parent + return pkgpath + + def _ensuresyspath(self, ensuremode, path): + if ensuremode: + s = str(path) + if ensuremode == "append": + if s not in sys.path: + sys.path.append(s) + else: + if s != sys.path[0]: + sys.path.insert(0, s) + + def pyimport(self, modname=None, ensuresyspath=True): + """Return path as an imported python module. + + If modname is None, look for the containing package + and construct an according module name. + The module will be put/looked up in sys.modules. + if ensuresyspath is True then the root dir for importing + the file (taking __init__.py files into account) will + be prepended to sys.path if it isn't there already. + If ensuresyspath=="append" the root dir will be appended + if it isn't already contained in sys.path. + if ensuresyspath is False no modification of syspath happens. + + Special value of ensuresyspath=="importlib" is intended + purely for using in pytest, it is capable only of importing + separate .py files outside packages, e.g. for test suite + without any __init__.py file. It effectively allows having + same-named test modules in different places and offers + mild opt-in via this option. Note that it works only in + recent versions of python. + """ + if not self.check(): + raise error.ENOENT(self) + + if ensuresyspath == "importlib": + if modname is None: + modname = self.purebasename + spec = importlib.util.spec_from_file_location(modname, str(self)) + if spec is None or spec.loader is None: + raise ImportError( + f"Can't find module {modname} at location {str(self)}" + ) + mod = importlib.util.module_from_spec(spec) + spec.loader.exec_module(mod) + return mod + + pkgpath = None + if modname is None: + pkgpath = self.pypkgpath() + if pkgpath is not None: + pkgroot = pkgpath.dirpath() + names = self.new(ext="").relto(pkgroot).split(self.sep) + if names[-1] == "__init__": + names.pop() + modname = ".".join(names) + else: + pkgroot = self.dirpath() + modname = self.purebasename + + self._ensuresyspath(ensuresyspath, pkgroot) + __import__(modname) + mod = sys.modules[modname] + if self.basename == "__init__.py": + return mod # we don't check anything as we might + # be in a namespace package ... too icky to check + modfile = mod.__file__ + assert modfile is not None + if modfile[-4:] in (".pyc", ".pyo"): + modfile = modfile[:-1] + elif modfile.endswith("$py.class"): + modfile = modfile[:-9] + ".py" + if modfile.endswith(os.path.sep + "__init__.py"): + if self.basename != "__init__.py": + modfile = modfile[:-12] + try: + issame = self.samefile(modfile) + except error.ENOENT: + issame = False + if not issame: + ignore = os.getenv("PY_IGNORE_IMPORTMISMATCH") + if ignore != "1": + raise self.ImportMismatchError(modname, modfile, self) + return mod + else: + try: + return sys.modules[modname] + except KeyError: + # we have a custom modname, do a pseudo-import + import types + + mod = types.ModuleType(modname) + mod.__file__ = str(self) + sys.modules[modname] = mod + try: + with open(str(self), "rb") as f: + exec(f.read(), mod.__dict__) + except BaseException: + del sys.modules[modname] + raise + return mod + + def sysexec(self, *argv: os.PathLike[str], **popen_opts: Any) -> str: + """Return stdout text from executing a system child process, + where the 'self' path points to executable. + The process is directly invoked and not through a system shell. + """ + from subprocess import Popen, PIPE + + popen_opts.pop("stdout", None) + popen_opts.pop("stderr", None) + proc = Popen( + [str(self)] + [str(arg) for arg in argv], + **popen_opts, + stdout=PIPE, + stderr=PIPE, + ) + stdout: str | bytes + stdout, stderr = proc.communicate() + ret = proc.wait() + if isinstance(stdout, bytes): + stdout = stdout.decode(sys.getdefaultencoding()) + if ret != 0: + if isinstance(stderr, bytes): + stderr = stderr.decode(sys.getdefaultencoding()) + raise RuntimeError( + ret, + ret, + str(self), + stdout, + stderr, + ) + return stdout + + @classmethod + def sysfind(cls, name, checker=None, paths=None): + """Return a path object found by looking at the systems + underlying PATH specification. If the checker is not None + it will be invoked to filter matching paths. If a binary + cannot be found, None is returned + Note: This is probably not working on plain win32 systems + but may work on cygwin. + """ + if isabs(name): + p = local(name) + if p.check(file=1): + return p + else: + if paths is None: + if iswin32: + paths = os.environ["Path"].split(";") + if "" not in paths and "." not in paths: + paths.append(".") + try: + systemroot = os.environ["SYSTEMROOT"] + except KeyError: + pass + else: + paths = [ + path.replace("%SystemRoot%", systemroot) for path in paths + ] + else: + paths = os.environ["PATH"].split(":") + tryadd = [] + if iswin32: + tryadd += os.environ["PATHEXT"].split(os.pathsep) + tryadd.append("") + + for x in paths: + for addext in tryadd: + p = local(x).join(name, abs=True) + addext + try: + if p.check(file=1): + if checker: + if not checker(p): + continue + return p + except error.EACCES: + pass + return None + + @classmethod + def _gethomedir(cls): + try: + x = os.environ["HOME"] + except KeyError: + try: + x = os.environ["HOMEDRIVE"] + os.environ["HOMEPATH"] + except KeyError: + return None + return cls(x) + + # """ + # special class constructors for local filesystem paths + # """ + @classmethod + def get_temproot(cls): + """Return the system's temporary directory + (where tempfiles are usually created in) + """ + import tempfile + + return local(tempfile.gettempdir()) + + @classmethod + def mkdtemp(cls, rootdir=None): + """Return a Path object pointing to a fresh new temporary directory + (which we created ourself). + """ + import tempfile + + if rootdir is None: + rootdir = cls.get_temproot() + return cls(error.checked_call(tempfile.mkdtemp, dir=str(rootdir))) + + @classmethod + def make_numbered_dir( + cls, prefix="session-", rootdir=None, keep=3, lock_timeout=172800 + ): # two days + """Return unique directory with a number greater than the current + maximum one. The number is assumed to start directly after prefix. + if keep is true directories with a number less than (maxnum-keep) + will be removed. If .lock files are used (lock_timeout non-zero), + algorithm is multi-process safe. + """ + if rootdir is None: + rootdir = cls.get_temproot() + + nprefix = prefix.lower() + + def parse_num(path): + """Parse the number out of a path (if it matches the prefix)""" + nbasename = path.basename.lower() + if nbasename.startswith(nprefix): + try: + return int(nbasename[len(nprefix) :]) + except ValueError: + pass + + def create_lockfile(path): + """Exclusively create lockfile. Throws when failed""" + mypid = os.getpid() + lockfile = path.join(".lock") + if hasattr(lockfile, "mksymlinkto"): + lockfile.mksymlinkto(str(mypid)) + else: + fd = error.checked_call( + os.open, str(lockfile), os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o644 + ) + with os.fdopen(fd, "w") as f: + f.write(str(mypid)) + return lockfile + + def atexit_remove_lockfile(lockfile): + """Ensure lockfile is removed at process exit""" + mypid = os.getpid() + + def try_remove_lockfile(): + # in a fork() situation, only the last process should + # remove the .lock, otherwise the other processes run the + # risk of seeing their temporary dir disappear. For now + # we remove the .lock in the parent only (i.e. we assume + # that the children finish before the parent). + if os.getpid() != mypid: + return + try: + lockfile.remove() + except error.Error: + pass + + atexit.register(try_remove_lockfile) + + # compute the maximum number currently in use with the prefix + lastmax = None + while True: + maxnum = -1 + for path in rootdir.listdir(): + num = parse_num(path) + if num is not None: + maxnum = max(maxnum, num) + + # make the new directory + try: + udir = rootdir.mkdir(prefix + str(maxnum + 1)) + if lock_timeout: + lockfile = create_lockfile(udir) + atexit_remove_lockfile(lockfile) + except (error.EEXIST, error.ENOENT, error.EBUSY): + # race condition (1): another thread/process created the dir + # in the meantime - try again + # race condition (2): another thread/process spuriously acquired + # lock treating empty directory as candidate + # for removal - try again + # race condition (3): another thread/process tried to create the lock at + # the same time (happened in Python 3.3 on Windows) + # https://ci.appveyor.com/project/pytestbot/py/build/1.0.21/job/ffi85j4c0lqwsfwa + if lastmax == maxnum: + raise + lastmax = maxnum + continue + break + + def get_mtime(path): + """Read file modification time""" + try: + return path.lstat().mtime + except error.Error: + pass + + garbage_prefix = prefix + "garbage-" + + def is_garbage(path): + """Check if path denotes directory scheduled for removal""" + bn = path.basename + return bn.startswith(garbage_prefix) + + # prune old directories + udir_time = get_mtime(udir) + if keep and udir_time: + for path in rootdir.listdir(): + num = parse_num(path) + if num is not None and num <= (maxnum - keep): + try: + # try acquiring lock to remove directory as exclusive user + if lock_timeout: + create_lockfile(path) + except (error.EEXIST, error.ENOENT, error.EBUSY): + path_time = get_mtime(path) + if not path_time: + # assume directory doesn't exist now + continue + if abs(udir_time - path_time) < lock_timeout: + # assume directory with lockfile exists + # and lock timeout hasn't expired yet + continue + + # path dir locked for exclusive use + # and scheduled for removal to avoid another thread/process + # treating it as a new directory or removal candidate + garbage_path = rootdir.join(garbage_prefix + str(uuid.uuid4())) + try: + path.rename(garbage_path) + garbage_path.remove(rec=1) + except KeyboardInterrupt: + raise + except Exception: # this might be error.Error, WindowsError ... + pass + if is_garbage(path): + try: + path.remove(rec=1) + except KeyboardInterrupt: + raise + except Exception: # this might be error.Error, WindowsError ... + pass + + # make link... + try: + username = os.environ["USER"] # linux, et al + except KeyError: + try: + username = os.environ["USERNAME"] # windows + except KeyError: + username = "current" + + src = str(udir) + dest = src[: src.rfind("-")] + "-" + username + try: + os.unlink(dest) + except OSError: + pass + try: + os.symlink(src, dest) + except (OSError, AttributeError, NotImplementedError): + pass + + return udir + + +def copymode(src, dest): + """Copy permission from src to dst.""" + import shutil + + shutil.copymode(src, dest) + + +def copystat(src, dest): + """Copy permission, last modification time, + last access time, and flags from src to dst.""" + import shutil + + shutil.copystat(str(src), str(dest)) + + +def copychunked(src, dest): + chunksize = 524288 # half a meg of bytes + fsrc = src.open("rb") + try: + fdest = dest.open("wb") + try: + while 1: + buf = fsrc.read(chunksize) + if not buf: + break + fdest.write(buf) + finally: + fdest.close() + finally: + fsrc.close() + + +def isimportable(name): + if name and (name[0].isalpha() or name[0] == "_"): + name = name.replace("_", "") + return not name or name.isalnum() + + +local = LocalPath diff --git a/src/_pytest/compat.py b/src/_pytest/compat.py index fab4c31107f..211407b2374 100644 --- a/src/_pytest/compat.py +++ b/src/_pytest/compat.py @@ -18,6 +18,7 @@ from typing import Union import attr + import py # fmt: off diff --git a/src/py.py b/src/py.py new file mode 100644 index 00000000000..7813c9b93cd --- /dev/null +++ b/src/py.py @@ -0,0 +1,10 @@ +# shim for pylib going away +# if pylib is installed this file will get skipped +# (`py/__init__.py` has higher precedence) +import sys + +import _pytest._py.error as error +import _pytest._py.path as path + +sys.modules["py.error"] = error +sys.modules["py.path"] = path diff --git a/testing/_py/test_local.py b/testing/_py/test_local.py new file mode 100644 index 00000000000..31c10b16021 --- /dev/null +++ b/testing/_py/test_local.py @@ -0,0 +1,1556 @@ +import multiprocessing +import os +import sys +import time +from unittest import mock + +import pytest +from py import error +from py.path import local + + +class CommonFSTests: + def test_constructor_equality(self, path1): + p = path1.__class__(path1) + assert p == path1 + + def test_eq_nonstring(self, path1): + p1 = path1.join("sampledir") + p2 = path1.join("sampledir") + assert p1 == p2 + + def test_new_identical(self, path1): + assert path1 == path1.new() + + def test_join(self, path1): + p = path1.join("sampledir") + strp = str(p) + assert strp.endswith("sampledir") + assert strp.startswith(str(path1)) + + def test_join_normalized(self, path1): + newpath = path1.join(path1.sep + "sampledir") + strp = str(newpath) + assert strp.endswith("sampledir") + assert strp.startswith(str(path1)) + newpath = path1.join((path1.sep * 2) + "sampledir") + strp = str(newpath) + assert strp.endswith("sampledir") + assert strp.startswith(str(path1)) + + def test_join_noargs(self, path1): + newpath = path1.join() + assert path1 == newpath + + def test_add_something(self, path1): + p = path1.join("sample") + p = p + "dir" + assert p.check() + assert p.exists() + assert p.isdir() + assert not p.isfile() + + def test_parts(self, path1): + newpath = path1.join("sampledir", "otherfile") + par = newpath.parts()[-3:] + assert par == [path1, path1.join("sampledir"), newpath] + + revpar = newpath.parts(reverse=True)[:3] + assert revpar == [newpath, path1.join("sampledir"), path1] + + def test_common(self, path1): + other = path1.join("sampledir") + x = other.common(path1) + assert x == path1 + + # def test_parents_nonexisting_file(self, path1): + # newpath = path1 / 'dirnoexist' / 'nonexisting file' + # par = list(newpath.parents()) + # assert par[:2] == [path1 / 'dirnoexist', path1] + + def test_basename_checks(self, path1): + newpath = path1.join("sampledir") + assert newpath.check(basename="sampledir") + assert newpath.check(notbasename="xyz") + assert newpath.basename == "sampledir" + + def test_basename(self, path1): + newpath = path1.join("sampledir") + assert newpath.check(basename="sampledir") + assert newpath.basename, "sampledir" + + def test_dirname(self, path1): + newpath = path1.join("sampledir") + assert newpath.dirname == str(path1) + + def test_dirpath(self, path1): + newpath = path1.join("sampledir") + assert newpath.dirpath() == path1 + + def test_dirpath_with_args(self, path1): + newpath = path1.join("sampledir") + assert newpath.dirpath("x") == path1.join("x") + + def test_newbasename(self, path1): + newpath = path1.join("samplefile") + newbase = newpath.new(basename="samplefile2") + assert newbase.basename == "samplefile2" + assert newbase.dirpath() == newpath.dirpath() + + def test_not_exists(self, path1): + assert not path1.join("does_not_exist").check() + assert path1.join("does_not_exist").check(exists=0) + + def test_exists(self, path1): + assert path1.join("samplefile").check() + assert path1.join("samplefile").check(exists=1) + assert path1.join("samplefile").exists() + assert path1.join("samplefile").isfile() + assert not path1.join("samplefile").isdir() + + def test_dir(self, path1): + # print repr(path1.join("sampledir")) + assert path1.join("sampledir").check(dir=1) + assert path1.join("samplefile").check(notdir=1) + assert not path1.join("samplefile").check(dir=1) + assert path1.join("samplefile").exists() + assert not path1.join("samplefile").isdir() + assert path1.join("samplefile").isfile() + + def test_fnmatch_file(self, path1): + assert path1.join("samplefile").check(fnmatch="s*e") + assert path1.join("samplefile").fnmatch("s*e") + assert not path1.join("samplefile").fnmatch("s*x") + assert not path1.join("samplefile").check(fnmatch="s*x") + + # def test_fnmatch_dir(self, path1): + + # pattern = path1.sep.join(['s*file']) + # sfile = path1.join("samplefile") + # assert sfile.check(fnmatch=pattern) + + def test_relto(self, path1): + p = path1.join("sampledir", "otherfile") + assert p.relto(path1) == p.sep.join(["sampledir", "otherfile"]) + assert p.check(relto=path1) + assert path1.check(notrelto=p) + assert not path1.check(relto=p) + + def test_bestrelpath(self, path1): + curdir = path1 + sep = curdir.sep + s = curdir.bestrelpath(curdir) + assert s == "." + s = curdir.bestrelpath(curdir.join("hello", "world")) + assert s == "hello" + sep + "world" + + s = curdir.bestrelpath(curdir.dirpath().join("sister")) + assert s == ".." + sep + "sister" + assert curdir.bestrelpath(curdir.dirpath()) == ".." + + assert curdir.bestrelpath("hello") == "hello" + + def test_relto_not_relative(self, path1): + l1 = path1.join("bcde") + l2 = path1.join("b") + assert not l1.relto(l2) + assert not l2.relto(l1) + + def test_listdir(self, path1): + p = path1.listdir() + assert path1.join("sampledir") in p + assert path1.join("samplefile") in p + with pytest.raises(error.ENOTDIR): + path1.join("samplefile").listdir() + + def test_listdir_fnmatchstring(self, path1): + p = path1.listdir("s*dir") + assert len(p) + assert p[0], path1.join("sampledir") + + def test_listdir_filter(self, path1): + p = path1.listdir(lambda x: x.check(dir=1)) + assert path1.join("sampledir") in p + assert not path1.join("samplefile") in p + + def test_listdir_sorted(self, path1): + p = path1.listdir(lambda x: x.check(basestarts="sample"), sort=True) + assert path1.join("sampledir") == p[0] + assert path1.join("samplefile") == p[1] + assert path1.join("samplepickle") == p[2] + + def test_visit_nofilter(self, path1): + lst = [] + for i in path1.visit(): + lst.append(i.relto(path1)) + assert "sampledir" in lst + assert path1.sep.join(["sampledir", "otherfile"]) in lst + + def test_visit_norecurse(self, path1): + lst = [] + for i in path1.visit(None, lambda x: x.basename != "sampledir"): + lst.append(i.relto(path1)) + assert "sampledir" in lst + assert not path1.sep.join(["sampledir", "otherfile"]) in lst + + @pytest.mark.parametrize( + "fil", + ["*dir", "*dir", pytest.mark.skip("sys.version_info <" " (3,6)")(b"*dir")], + ) + def test_visit_filterfunc_is_string(self, path1, fil): + lst = [] + for i in path1.visit(fil): + lst.append(i.relto(path1)) + assert len(lst), 2 + assert "sampledir" in lst + assert "otherdir" in lst + + def test_visit_ignore(self, path1): + p = path1.join("nonexisting") + assert list(p.visit(ignore=error.ENOENT)) == [] + + def test_visit_endswith(self, path1): + p = [] + for i in path1.visit(lambda x: x.check(endswith="file")): + p.append(i.relto(path1)) + assert path1.sep.join(["sampledir", "otherfile"]) in p + assert "samplefile" in p + + def test_cmp(self, path1): + path1 = path1.join("samplefile") + path2 = path1.join("samplefile2") + assert (path1 < path2) == ("samplefile" < "samplefile2") + assert not (path1 < path1) + + def test_simple_read(self, path1): + x = path1.join("samplefile").read("r") + assert x == "samplefile\n" + + def test_join_div_operator(self, path1): + newpath = path1 / "/sampledir" / "/test//" + newpath2 = path1.join("sampledir", "test") + assert newpath == newpath2 + + def test_ext(self, path1): + newpath = path1.join("sampledir.ext") + assert newpath.ext == ".ext" + newpath = path1.join("sampledir") + assert not newpath.ext + + def test_purebasename(self, path1): + newpath = path1.join("samplefile.py") + assert newpath.purebasename == "samplefile" + + def test_multiple_parts(self, path1): + newpath = path1.join("samplefile.py") + dirname, purebasename, basename, ext = newpath._getbyspec( + "dirname,purebasename,basename,ext" + ) + assert str(path1).endswith(dirname) # be careful with win32 'drive' + assert purebasename == "samplefile" + assert basename == "samplefile.py" + assert ext == ".py" + + def test_dotted_name_ext(self, path1): + newpath = path1.join("a.b.c") + ext = newpath.ext + assert ext == ".c" + assert newpath.ext == ".c" + + def test_newext(self, path1): + newpath = path1.join("samplefile.py") + newext = newpath.new(ext=".txt") + assert newext.basename == "samplefile.txt" + assert newext.purebasename == "samplefile" + + def test_readlines(self, path1): + fn = path1.join("samplefile") + contents = fn.readlines() + assert contents == ["samplefile\n"] + + def test_readlines_nocr(self, path1): + fn = path1.join("samplefile") + contents = fn.readlines(cr=0) + assert contents == ["samplefile", ""] + + def test_file(self, path1): + assert path1.join("samplefile").check(file=1) + + def test_not_file(self, path1): + assert not path1.join("sampledir").check(file=1) + assert path1.join("sampledir").check(file=0) + + def test_non_existent(self, path1): + assert path1.join("sampledir.nothere").check(dir=0) + assert path1.join("sampledir.nothere").check(file=0) + assert path1.join("sampledir.nothere").check(notfile=1) + assert path1.join("sampledir.nothere").check(notdir=1) + assert path1.join("sampledir.nothere").check(notexists=1) + assert not path1.join("sampledir.nothere").check(notfile=0) + + # pattern = path1.sep.join(['s*file']) + # sfile = path1.join("samplefile") + # assert sfile.check(fnmatch=pattern) + + def test_size(self, path1): + url = path1.join("samplefile") + assert url.size() > len("samplefile") + + def test_mtime(self, path1): + url = path1.join("samplefile") + assert url.mtime() > 0 + + def test_relto_wrong_type(self, path1): + with pytest.raises(TypeError): + path1.relto(42) + + def test_load(self, path1): + p = path1.join("samplepickle") + obj = p.load() + assert type(obj) is dict + assert obj.get("answer", None) == 42 + + def test_visit_filesonly(self, path1): + p = [] + for i in path1.visit(lambda x: x.check(file=1)): + p.append(i.relto(path1)) + assert "sampledir" not in p + assert path1.sep.join(["sampledir", "otherfile"]) in p + + def test_visit_nodotfiles(self, path1): + p = [] + for i in path1.visit(lambda x: x.check(dotfile=0)): + p.append(i.relto(path1)) + assert "sampledir" in p + assert path1.sep.join(["sampledir", "otherfile"]) in p + assert ".dotfile" not in p + + def test_visit_breadthfirst(self, path1): + lst = [] + for i in path1.visit(bf=True): + lst.append(i.relto(path1)) + for i, p in enumerate(lst): + if path1.sep in p: + for j in range(i, len(lst)): + assert path1.sep in lst[j] + break + else: + pytest.fail("huh") + + def test_visit_sort(self, path1): + lst = [] + for i in path1.visit(bf=True, sort=True): + lst.append(i.relto(path1)) + for i, p in enumerate(lst): + if path1.sep in p: + break + assert lst[:i] == sorted(lst[:i]) + assert lst[i:] == sorted(lst[i:]) + + def test_endswith(self, path1): + def chk(p): + return p.check(endswith="pickle") + + assert not chk(path1) + assert not chk(path1.join("samplefile")) + assert chk(path1.join("somepickle")) + + def test_copy_file(self, path1): + otherdir = path1.join("otherdir") + initpy = otherdir.join("__init__.py") + copied = otherdir.join("copied") + initpy.copy(copied) + try: + assert copied.check() + s1 = initpy.read() + s2 = copied.read() + assert s1 == s2 + finally: + if copied.check(): + copied.remove() + + def test_copy_dir(self, path1): + otherdir = path1.join("otherdir") + copied = path1.join("newdir") + try: + otherdir.copy(copied) + assert copied.check(dir=1) + assert copied.join("__init__.py").check(file=1) + s1 = otherdir.join("__init__.py").read() + s2 = copied.join("__init__.py").read() + assert s1 == s2 + finally: + if copied.check(dir=1): + copied.remove(rec=1) + + def test_remove_file(self, path1): + d = path1.ensure("todeleted") + assert d.check() + d.remove() + assert not d.check() + + def test_remove_dir_recursive_by_default(self, path1): + d = path1.ensure("to", "be", "deleted") + assert d.check() + p = path1.join("to") + p.remove() + assert not p.check() + + def test_ensure_dir(self, path1): + b = path1.ensure_dir("001", "002") + assert b.basename == "002" + assert b.isdir() + + def test_mkdir_and_remove(self, path1): + tmpdir = path1 + with pytest.raises(error.EEXIST): + tmpdir.mkdir("sampledir") + new = tmpdir.join("mktest1") + new.mkdir() + assert new.check(dir=1) + new.remove() + + new = tmpdir.mkdir("mktest") + assert new.check(dir=1) + new.remove() + assert tmpdir.join("mktest") == new + + def test_move_file(self, path1): + p = path1.join("samplefile") + newp = p.dirpath("moved_samplefile") + p.move(newp) + try: + assert newp.check(file=1) + assert not p.check() + finally: + dp = newp.dirpath() + if hasattr(dp, "revert"): + dp.revert() + else: + newp.move(p) + assert p.check() + + def test_move_dir(self, path1): + source = path1.join("sampledir") + dest = path1.join("moveddir") + source.move(dest) + assert dest.check(dir=1) + assert dest.join("otherfile").check(file=1) + assert not source.join("sampledir").check() + + def test_fspath_protocol_match_strpath(self, path1): + assert path1.__fspath__() == path1.strpath + + def test_fspath_func_match_strpath(self, path1): + from os import fspath + + assert fspath(path1) == path1.strpath + + @pytest.mark.skip("sys.version_info < (3,6)") + def test_fspath_open(self, path1): + f = path1.join("opentestfile") + open(f) + + @pytest.mark.skip("sys.version_info < (3,6)") + def test_fspath_fsencode(self, path1): + from os import fsencode + + assert fsencode(path1) == fsencode(path1.strpath) + + +def setuptestfs(path): + if path.join("samplefile").check(): + return + # print "setting up test fs for", repr(path) + samplefile = path.ensure("samplefile") + samplefile.write("samplefile\n") + + execfile = path.ensure("execfile") + execfile.write("x=42") + + execfilepy = path.ensure("execfile.py") + execfilepy.write("x=42") + + d = {1: 2, "hello": "world", "answer": 42} + path.ensure("samplepickle").dump(d) + + sampledir = path.ensure("sampledir", dir=1) + sampledir.ensure("otherfile") + + otherdir = path.ensure("otherdir", dir=1) + otherdir.ensure("__init__.py") + + module_a = otherdir.ensure("a.py") + module_a.write("from .b import stuff as result\n") + module_b = otherdir.ensure("b.py") + module_b.write('stuff="got it"\n') + module_c = otherdir.ensure("c.py") + module_c.write( + """import py; +import otherdir.a +value = otherdir.a.result +""" + ) + module_d = otherdir.ensure("d.py") + module_d.write( + """import py; +from otherdir import a +value2 = a.result +""" + ) + + +win32only = pytest.mark.skipif( + "not (sys.platform == 'win32' or getattr(os, '_name', None) == 'nt')" +) +skiponwin32 = pytest.mark.skipif( + "sys.platform == 'win32' or getattr(os, '_name', None) == 'nt'" +) + +ATIME_RESOLUTION = 0.01 + + +@pytest.fixture(scope="session") +def path1(tmpdir_factory): + path = tmpdir_factory.mktemp("path") + setuptestfs(path) + yield path + assert path.join("samplefile").check() + + +@pytest.fixture +def fake_fspath_obj(request): + class FakeFSPathClass: + def __init__(self, path): + self._path = path + + def __fspath__(self): + return self._path + + return FakeFSPathClass(os.path.join("this", "is", "a", "fake", "path")) + + +def batch_make_numbered_dirs(rootdir, repeats): + for i in range(repeats): + dir_ = local.make_numbered_dir(prefix="repro-", rootdir=rootdir) + file_ = dir_.join("foo") + file_.write("%s" % i) + actual = int(file_.read()) + assert actual == i, f"int(file_.read()) is {actual} instead of {i}" + dir_.join(".lock").remove(ignore_errors=True) + return True + + +class TestLocalPath(CommonFSTests): + def test_join_normpath(self, tmpdir): + assert tmpdir.join(".") == tmpdir + p = tmpdir.join("../%s" % tmpdir.basename) + assert p == tmpdir + p = tmpdir.join("..//%s/" % tmpdir.basename) + assert p == tmpdir + + @skiponwin32 + def test_dirpath_abs_no_abs(self, tmpdir): + p = tmpdir.join("foo") + assert p.dirpath("/bar") == tmpdir.join("bar") + assert tmpdir.dirpath("/bar", abs=True) == local("/bar") + + def test_gethash(self, tmpdir): + from hashlib import md5 + from hashlib import sha1 as sha + + fn = tmpdir.join("testhashfile") + data = b"hello" + fn.write(data, mode="wb") + assert fn.computehash("md5") == md5(data).hexdigest() + assert fn.computehash("sha1") == sha(data).hexdigest() + with pytest.raises(ValueError): + fn.computehash("asdasd") + + def test_remove_removes_readonly_file(self, tmpdir): + readonly_file = tmpdir.join("readonly").ensure() + readonly_file.chmod(0) + readonly_file.remove() + assert not readonly_file.check(exists=1) + + def test_remove_removes_readonly_dir(self, tmpdir): + readonly_dir = tmpdir.join("readonlydir").ensure(dir=1) + readonly_dir.chmod(int("500", 8)) + readonly_dir.remove() + assert not readonly_dir.check(exists=1) + + def test_remove_removes_dir_and_readonly_file(self, tmpdir): + readonly_dir = tmpdir.join("readonlydir").ensure(dir=1) + readonly_file = readonly_dir.join("readonlyfile").ensure() + readonly_file.chmod(0) + readonly_dir.remove() + assert not readonly_dir.check(exists=1) + + def test_remove_routes_ignore_errors(self, tmpdir, monkeypatch): + lst = [] + monkeypatch.setattr("shutil.rmtree", lambda *args, **kwargs: lst.append(kwargs)) + tmpdir.remove() + assert not lst[0]["ignore_errors"] + for val in (True, False): + lst[:] = [] + tmpdir.remove(ignore_errors=val) + assert lst[0]["ignore_errors"] == val + + def test_initialize_curdir(self): + assert str(local()) == os.getcwd() + + @skiponwin32 + def test_chdir_gone(self, path1): + p = path1.ensure("dir_to_be_removed", dir=1) + p.chdir() + p.remove() + pytest.raises(error.ENOENT, local) + assert path1.chdir() is None + assert os.getcwd() == str(path1) + + with pytest.raises(error.ENOENT): + with p.as_cwd(): + raise NotImplementedError + + @skiponwin32 + def test_chdir_gone_in_as_cwd(self, path1): + p = path1.ensure("dir_to_be_removed", dir=1) + p.chdir() + p.remove() + + with path1.as_cwd() as old: + assert old is None + + def test_as_cwd(self, path1): + dir = path1.ensure("subdir", dir=1) + old = local() + with dir.as_cwd() as x: + assert x == old + assert local() == dir + assert os.getcwd() == str(old) + + def test_as_cwd_exception(self, path1): + old = local() + dir = path1.ensure("subdir", dir=1) + with pytest.raises(ValueError): + with dir.as_cwd(): + raise ValueError() + assert old == local() + + def test_initialize_reldir(self, path1): + with path1.as_cwd(): + p = local("samplefile") + assert p.check() + + def test_tilde_expansion(self, monkeypatch, tmpdir): + monkeypatch.setenv("HOME", str(tmpdir)) + p = local("~", expanduser=True) + assert p == os.path.expanduser("~") + + @pytest.mark.skipif( + not sys.platform.startswith("win32"), reason="case insensitive only on windows" + ) + def test_eq_hash_are_case_insensitive_on_windows(self): + a = local("/some/path") + b = local("/some/PATH") + assert a == b + assert hash(a) == hash(b) + assert a in {b} + assert a in {b: "b"} + + def test_eq_with_strings(self, path1): + path1 = path1.join("sampledir") + path2 = str(path1) + assert path1 == path2 + assert path2 == path1 + path3 = path1.join("samplefile") + assert path3 != path2 + assert path2 != path3 + + def test_eq_with_none(self, path1): + assert path1 != None # noqa: E711 + + def test_eq_non_ascii_unicode(self, path1): + path2 = path1.join("temp") + path3 = path1.join("ação") + path4 = path1.join("ディレクトリ") + + assert path2 != path3 + assert path2 != path4 + assert path4 != path3 + + def test_gt_with_strings(self, path1): + path2 = path1.join("sampledir") + path3 = str(path1.join("ttt")) + assert path3 > path2 + assert path2 < path3 + assert path2 < "ttt" + assert "ttt" > path2 + path4 = path1.join("aaa") + lst = [path2, path4, path3] + assert sorted(lst) == [path4, path2, path3] + + def test_open_and_ensure(self, path1): + p = path1.join("sub1", "sub2", "file") + with p.open("w", ensure=1) as f: + f.write("hello") + assert p.read() == "hello" + + def test_write_and_ensure(self, path1): + p = path1.join("sub1", "sub2", "file") + p.write("hello", ensure=1) + assert p.read() == "hello" + + @pytest.mark.parametrize("bin", (False, True)) + def test_dump(self, tmpdir, bin): + path = tmpdir.join("dumpfile%s" % int(bin)) + try: + d = {"answer": 42} + path.dump(d, bin=bin) + f = path.open("rb+") + import pickle + + dnew = pickle.load(f) + assert d == dnew + finally: + f.close() + + def test_setmtime(self): + import tempfile + import time + + try: + fd, name = tempfile.mkstemp() + os.close(fd) + except AttributeError: + name = tempfile.mktemp() + open(name, "w").close() + try: + mtime = int(time.time()) - 100 + path = local(name) + assert path.mtime() != mtime + path.setmtime(mtime) + assert path.mtime() == mtime + path.setmtime() + assert path.mtime() != mtime + finally: + os.remove(name) + + def test_normpath(self, path1): + new1 = path1.join("/otherdir") + new2 = path1.join("otherdir") + assert str(new1) == str(new2) + + def test_mkdtemp_creation(self): + d = local.mkdtemp() + try: + assert d.check(dir=1) + finally: + d.remove(rec=1) + + def test_tmproot(self): + d = local.mkdtemp() + tmproot = local.get_temproot() + try: + assert d.check(dir=1) + assert d.dirpath() == tmproot + finally: + d.remove(rec=1) + + def test_chdir(self, tmpdir): + old = local() + try: + res = tmpdir.chdir() + assert str(res) == str(old) + assert os.getcwd() == str(tmpdir) + finally: + old.chdir() + + def test_ensure_filepath_withdir(self, tmpdir): + newfile = tmpdir.join("test1", "test") + newfile.ensure() + assert newfile.check(file=1) + newfile.write("42") + newfile.ensure() + s = newfile.read() + assert s == "42" + + def test_ensure_filepath_withoutdir(self, tmpdir): + newfile = tmpdir.join("test1file") + t = newfile.ensure() + assert t == newfile + assert newfile.check(file=1) + + def test_ensure_dirpath(self, tmpdir): + newfile = tmpdir.join("test1", "testfile") + t = newfile.ensure(dir=1) + assert t == newfile + assert newfile.check(dir=1) + + def test_ensure_non_ascii_unicode(self, tmpdir): + newfile = tmpdir.join("ação", "ディレクトリ") + t = newfile.ensure(dir=1) + assert t == newfile + assert newfile.check(dir=1) + + @pytest.mark.xfail(run=False, reason="unreliable est for long filenames") + def test_long_filenames(self, tmpdir): + if sys.platform == "win32": + pytest.skip("win32: work around needed for path length limit") + # see http://codespeak.net/pipermail/py-dev/2008q2/000922.html + + # testing paths > 260 chars (which is Windows' limitation, but + # depending on how the paths are used), but > 4096 (which is the + # Linux' limitation) - the behaviour of paths with names > 4096 chars + # is undetermined + newfilename = "/test" * 60 + l1 = tmpdir.join(newfilename) + l1.ensure(file=True) + l1.write("foo") + l2 = tmpdir.join(newfilename) + assert l2.read() == "foo" + + def test_visit_depth_first(self, tmpdir): + tmpdir.ensure("a", "1") + tmpdir.ensure("b", "2") + p3 = tmpdir.ensure("breadth") + lst = list(tmpdir.visit(lambda x: x.check(file=1))) + assert len(lst) == 3 + # check that breadth comes last + assert lst[2] == p3 + + def test_visit_rec_fnmatch(self, tmpdir): + p1 = tmpdir.ensure("a", "123") + tmpdir.ensure(".b", "345") + lst = list(tmpdir.visit("???", rec="[!.]*")) + assert len(lst) == 1 + # check that breadth comes last + assert lst[0] == p1 + + def test_fnmatch_file_abspath(self, tmpdir): + b = tmpdir.join("a", "b") + assert b.fnmatch(os.sep.join("ab")) + pattern = os.sep.join([str(tmpdir), "*", "b"]) + assert b.fnmatch(pattern) + + def test_sysfind(self): + name = sys.platform == "win32" and "cmd" or "test" + x = local.sysfind(name) + assert x.check(file=1) + assert local.sysfind("jaksdkasldqwe") is None + assert local.sysfind(name, paths=[]) is None + x2 = local.sysfind(name, paths=[x.dirpath()]) + assert x2 == x + + def test_fspath_protocol_other_class(self, fake_fspath_obj): + # py.path is always absolute + py_path = local(fake_fspath_obj) + str_path = fake_fspath_obj.__fspath__() + assert py_path.check(endswith=str_path) + assert py_path.join(fake_fspath_obj).strpath == os.path.join( + py_path.strpath, str_path + ) + + def test_make_numbered_dir_multiprocess_safe(self, tmpdir): + # https://github.com/pytest-dev/py/issues/30 + with multiprocessing.Pool() as pool: + results = [ + pool.apply_async(batch_make_numbered_dirs, [tmpdir, 100]) + for _ in range(20) + ] + for r in results: + assert r.get() + + +class TestExecutionOnWindows: + pytestmark = win32only + + def test_sysfind_bat_exe_before(self, tmpdir, monkeypatch): + monkeypatch.setenv("PATH", str(tmpdir), prepend=os.pathsep) + tmpdir.ensure("hello") + h = tmpdir.ensure("hello.bat") + x = local.sysfind("hello") + assert x == h + + +class TestExecution: + pytestmark = skiponwin32 + + def test_sysfind_no_permisson_ignored(self, monkeypatch, tmpdir): + noperm = tmpdir.ensure("noperm", dir=True) + monkeypatch.setenv("PATH", str(noperm), prepend=":") + noperm.chmod(0) + try: + assert local.sysfind("jaksdkasldqwe") is None + finally: + noperm.chmod(0o644) + + def test_sysfind_absolute(self): + x = local.sysfind("test") + assert x.check(file=1) + y = local.sysfind(str(x)) + assert y.check(file=1) + assert y == x + + def test_sysfind_multiple(self, tmpdir, monkeypatch): + monkeypatch.setenv( + "PATH", "{}:{}".format(tmpdir.ensure("a"), tmpdir.join("b")), prepend=":" + ) + tmpdir.ensure("b", "a") + x = local.sysfind("a", checker=lambda x: x.dirpath().basename == "b") + assert x.basename == "a" + assert x.dirpath().basename == "b" + assert local.sysfind("a", checker=lambda x: None) is None + + def test_sysexec(self): + x = local.sysfind("ls") + out = x.sysexec("-a") + for x in local().listdir(): + assert out.find(x.basename) != -1 + + def test_sysexec_failing(self): + try: + from py._process.cmdexec import ExecutionFailed # py library + except ImportError: + ExecutionFailed = RuntimeError # py vendored + x = local.sysfind("false") + with pytest.raises(ExecutionFailed): + x.sysexec("aksjdkasjd") + + def test_make_numbered_dir(self, tmpdir): + tmpdir.ensure("base.not_an_int", dir=1) + for i in range(10): + numdir = local.make_numbered_dir( + prefix="base.", rootdir=tmpdir, keep=2, lock_timeout=0 + ) + assert numdir.check() + assert numdir.basename == "base.%d" % i + if i >= 1: + assert numdir.new(ext=str(i - 1)).check() + if i >= 2: + assert numdir.new(ext=str(i - 2)).check() + if i >= 3: + assert not numdir.new(ext=str(i - 3)).check() + + def test_make_numbered_dir_case(self, tmpdir): + """make_numbered_dir does not make assumptions on the underlying + filesystem based on the platform and will assume it _could_ be case + insensitive. + + See issues: + - https://github.com/pytest-dev/pytest/issues/708 + - https://github.com/pytest-dev/pytest/issues/3451 + """ + d1 = local.make_numbered_dir( + prefix="CAse.", + rootdir=tmpdir, + keep=2, + lock_timeout=0, + ) + d2 = local.make_numbered_dir( + prefix="caSE.", + rootdir=tmpdir, + keep=2, + lock_timeout=0, + ) + assert str(d1).lower() != str(d2).lower() + assert str(d2).endswith(".1") + + def test_make_numbered_dir_NotImplemented_Error(self, tmpdir, monkeypatch): + def notimpl(x, y): + raise NotImplementedError(42) + + monkeypatch.setattr(os, "symlink", notimpl) + x = tmpdir.make_numbered_dir(rootdir=tmpdir, lock_timeout=0) + assert x.relto(tmpdir) + assert x.check() + + def test_locked_make_numbered_dir(self, tmpdir): + for i in range(10): + numdir = local.make_numbered_dir(prefix="base2.", rootdir=tmpdir, keep=2) + assert numdir.check() + assert numdir.basename == "base2.%d" % i + for j in range(i): + assert numdir.new(ext=str(j)).check() + + def test_error_preservation(self, path1): + pytest.raises(EnvironmentError, path1.join("qwoeqiwe").mtime) + pytest.raises(EnvironmentError, path1.join("qwoeqiwe").read) + + # def test_parentdirmatch(self): + # local.parentdirmatch('std', startmodule=__name__) + # + + +class TestImport: + @pytest.fixture(autouse=True) + def preserve_sys(self): + with mock.patch.dict(sys.modules): + with mock.patch.object(sys, "path", list(sys.path)): + yield + + def test_pyimport(self, path1): + obj = path1.join("execfile.py").pyimport() + assert obj.x == 42 + assert obj.__name__ == "execfile" + + def test_pyimport_renamed_dir_creates_mismatch(self, tmpdir, monkeypatch): + p = tmpdir.ensure("a", "test_x123.py") + p.pyimport() + tmpdir.join("a").move(tmpdir.join("b")) + with pytest.raises(tmpdir.ImportMismatchError): + tmpdir.join("b", "test_x123.py").pyimport() + + # Errors can be ignored. + monkeypatch.setenv("PY_IGNORE_IMPORTMISMATCH", "1") + tmpdir.join("b", "test_x123.py").pyimport() + + # PY_IGNORE_IMPORTMISMATCH=0 does not ignore error. + monkeypatch.setenv("PY_IGNORE_IMPORTMISMATCH", "0") + with pytest.raises(tmpdir.ImportMismatchError): + tmpdir.join("b", "test_x123.py").pyimport() + + def test_pyimport_messy_name(self, tmpdir): + # http://bitbucket.org/hpk42/py-trunk/issue/129 + path = tmpdir.ensure("foo__init__.py") + path.pyimport() + + def test_pyimport_dir(self, tmpdir): + p = tmpdir.join("hello_123") + p_init = p.ensure("__init__.py") + m = p.pyimport() + assert m.__name__ == "hello_123" + m = p_init.pyimport() + assert m.__name__ == "hello_123" + + def test_pyimport_execfile_different_name(self, path1): + obj = path1.join("execfile.py").pyimport(modname="0x.y.z") + assert obj.x == 42 + assert obj.__name__ == "0x.y.z" + + def test_pyimport_a(self, path1): + otherdir = path1.join("otherdir") + mod = otherdir.join("a.py").pyimport() + assert mod.result == "got it" + assert mod.__name__ == "otherdir.a" + + def test_pyimport_b(self, path1): + otherdir = path1.join("otherdir") + mod = otherdir.join("b.py").pyimport() + assert mod.stuff == "got it" + assert mod.__name__ == "otherdir.b" + + def test_pyimport_c(self, path1): + otherdir = path1.join("otherdir") + mod = otherdir.join("c.py").pyimport() + assert mod.value == "got it" + + def test_pyimport_d(self, path1): + otherdir = path1.join("otherdir") + mod = otherdir.join("d.py").pyimport() + assert mod.value2 == "got it" + + def test_pyimport_and_import(self, tmpdir): + tmpdir.ensure("xxxpackage", "__init__.py") + mod1path = tmpdir.ensure("xxxpackage", "module1.py") + mod1 = mod1path.pyimport() + assert mod1.__name__ == "xxxpackage.module1" + from xxxpackage import module1 + + assert module1 is mod1 + + def test_pyimport_check_filepath_consistency(self, monkeypatch, tmpdir): + name = "pointsback123" + ModuleType = type(os) + p = tmpdir.ensure(name + ".py") + for ending in (".pyc", "$py.class", ".pyo"): + mod = ModuleType(name) + pseudopath = tmpdir.ensure(name + ending) + mod.__file__ = str(pseudopath) + monkeypatch.setitem(sys.modules, name, mod) + newmod = p.pyimport() + assert mod == newmod + monkeypatch.undo() + mod = ModuleType(name) + pseudopath = tmpdir.ensure(name + "123.py") + mod.__file__ = str(pseudopath) + monkeypatch.setitem(sys.modules, name, mod) + excinfo = pytest.raises(pseudopath.ImportMismatchError, p.pyimport) + modname, modfile, orig = excinfo.value.args + assert modname == name + assert modfile == pseudopath + assert orig == p + assert issubclass(pseudopath.ImportMismatchError, ImportError) + + def test_issue131_pyimport_on__init__(self, tmpdir): + # __init__.py files may be namespace packages, and thus the + # __file__ of an imported module may not be ourselves + # see issue + p1 = tmpdir.ensure("proja", "__init__.py") + p2 = tmpdir.ensure("sub", "proja", "__init__.py") + m1 = p1.pyimport() + m2 = p2.pyimport() + assert m1 == m2 + + def test_ensuresyspath_append(self, tmpdir): + root1 = tmpdir.mkdir("root1") + file1 = root1.ensure("x123.py") + assert str(root1) not in sys.path + file1.pyimport(ensuresyspath="append") + assert str(root1) == sys.path[-1] + assert str(root1) not in sys.path[:-1] + + +class TestImportlibImport: + OPTS = {"ensuresyspath": "importlib"} + + def test_pyimport(self, path1): + obj = path1.join("execfile.py").pyimport(**self.OPTS) + assert obj.x == 42 + assert obj.__name__ == "execfile" + + def test_pyimport_dir_fails(self, tmpdir): + p = tmpdir.join("hello_123") + p.ensure("__init__.py") + with pytest.raises(ImportError): + p.pyimport(**self.OPTS) + + def test_pyimport_execfile_different_name(self, path1): + obj = path1.join("execfile.py").pyimport(modname="0x.y.z", **self.OPTS) + assert obj.x == 42 + assert obj.__name__ == "0x.y.z" + + def test_pyimport_relative_import_fails(self, path1): + otherdir = path1.join("otherdir") + with pytest.raises(ImportError): + otherdir.join("a.py").pyimport(**self.OPTS) + + def test_pyimport_doesnt_use_sys_modules(self, tmpdir): + p = tmpdir.ensure("file738jsk.py") + mod = p.pyimport(**self.OPTS) + assert mod.__name__ == "file738jsk" + assert "file738jsk" not in sys.modules + + +def test_pypkgdir(tmpdir): + pkg = tmpdir.ensure("pkg1", dir=1) + pkg.ensure("__init__.py") + pkg.ensure("subdir/__init__.py") + assert pkg.pypkgpath() == pkg + assert pkg.join("subdir", "__init__.py").pypkgpath() == pkg + + +def test_pypkgdir_unimportable(tmpdir): + pkg = tmpdir.ensure("pkg1-1", dir=1) # unimportable + pkg.ensure("__init__.py") + subdir = pkg.ensure("subdir/__init__.py").dirpath() + assert subdir.pypkgpath() == subdir + assert subdir.ensure("xyz.py").pypkgpath() == subdir + assert not pkg.pypkgpath() + + +def test_isimportable(): + try: + from py.path import isimportable # py vendored version + except ImportError: + from py._path.local import isimportable # py library + + assert not isimportable("") + assert isimportable("x") + assert isimportable("x1") + assert isimportable("x_1") + assert isimportable("_") + assert isimportable("_1") + assert not isimportable("x-1") + assert not isimportable("x:1") + + +def test_homedir_from_HOME(monkeypatch): + path = os.getcwd() + monkeypatch.setenv("HOME", path) + assert local._gethomedir() == local(path) + + +def test_homedir_not_exists(monkeypatch): + monkeypatch.delenv("HOME", raising=False) + monkeypatch.delenv("HOMEDRIVE", raising=False) + homedir = local._gethomedir() + assert homedir is None + + +def test_samefile(tmpdir): + assert tmpdir.samefile(tmpdir) + p = tmpdir.ensure("hello") + assert p.samefile(p) + with p.dirpath().as_cwd(): + assert p.samefile(p.basename) + if sys.platform == "win32": + p1 = p.__class__(str(p).lower()) + p2 = p.__class__(str(p).upper()) + assert p1.samefile(p2) + + +@pytest.mark.skipif(not hasattr(os, "symlink"), reason="os.symlink not available") +def test_samefile_symlink(tmpdir): + p1 = tmpdir.ensure("foo.txt") + p2 = tmpdir.join("linked.txt") + try: + os.symlink(str(p1), str(p2)) + except (OSError, NotImplementedError) as e: + # on Windows this might fail if the user doesn't have special symlink permissions + # pypy3 on Windows doesn't implement os.symlink and raises NotImplementedError + pytest.skip(str(e.args[0])) + + assert p1.samefile(p2) + + +def test_listdir_single_arg(tmpdir): + tmpdir.ensure("hello") + assert tmpdir.listdir("hello")[0].basename == "hello" + + +def test_mkdtemp_rootdir(tmpdir): + dtmp = local.mkdtemp(rootdir=tmpdir) + assert tmpdir.listdir() == [dtmp] + + +class TestWINLocalPath: + pytestmark = win32only + + def test_owner_group_not_implemented(self, path1): + with pytest.raises(NotImplementedError): + path1.stat().owner + with pytest.raises(NotImplementedError): + path1.stat().group + + def test_chmod_simple_int(self, path1): + mode = path1.stat().mode + # Ensure that we actually change the mode to something different. + path1.chmod(mode == 0 and 1 or 0) + try: + print(path1.stat().mode) + print(mode) + assert path1.stat().mode != mode + finally: + path1.chmod(mode) + assert path1.stat().mode == mode + + def test_path_comparison_lowercase_mixed(self, path1): + t1 = path1.join("a_path") + t2 = path1.join("A_path") + assert t1 == t1 + assert t1 == t2 + + def test_relto_with_mixed_case(self, path1): + t1 = path1.join("a_path", "fiLe") + t2 = path1.join("A_path") + assert t1.relto(t2) == "fiLe" + + def test_allow_unix_style_paths(self, path1): + t1 = path1.join("a_path") + assert t1 == str(path1) + "\\a_path" + t1 = path1.join("a_path/") + assert t1 == str(path1) + "\\a_path" + t1 = path1.join("dir/a_path") + assert t1 == str(path1) + "\\dir\\a_path" + + def test_sysfind_in_currentdir(self, path1): + cmd = local.sysfind("cmd") + root = cmd.new(dirname="", basename="") # c:\ in most installations + with root.as_cwd(): + x = local.sysfind(cmd.relto(root)) + assert x.check(file=1) + + def test_fnmatch_file_abspath_posix_pattern_on_win32(self, tmpdir): + # path-matching patterns might contain a posix path separator '/' + # Test that we can match that pattern on windows. + import posixpath + + b = tmpdir.join("a", "b") + assert b.fnmatch(posixpath.sep.join("ab")) + pattern = posixpath.sep.join([str(tmpdir), "*", "b"]) + assert b.fnmatch(pattern) + + +class TestPOSIXLocalPath: + pytestmark = skiponwin32 + + def test_hardlink(self, tmpdir): + linkpath = tmpdir.join("test") + filepath = tmpdir.join("file") + filepath.write("Hello") + nlink = filepath.stat().nlink + linkpath.mklinkto(filepath) + assert filepath.stat().nlink == nlink + 1 + + def test_symlink_are_identical(self, tmpdir): + filepath = tmpdir.join("file") + filepath.write("Hello") + linkpath = tmpdir.join("test") + linkpath.mksymlinkto(filepath) + assert linkpath.readlink() == str(filepath) + + def test_symlink_isfile(self, tmpdir): + linkpath = tmpdir.join("test") + filepath = tmpdir.join("file") + filepath.write("") + linkpath.mksymlinkto(filepath) + assert linkpath.check(file=1) + assert not linkpath.check(link=0, file=1) + assert linkpath.islink() + + def test_symlink_relative(self, tmpdir): + linkpath = tmpdir.join("test") + filepath = tmpdir.join("file") + filepath.write("Hello") + linkpath.mksymlinkto(filepath, absolute=False) + assert linkpath.readlink() == "file" + assert filepath.read() == linkpath.read() + + def test_symlink_not_existing(self, tmpdir): + linkpath = tmpdir.join("testnotexisting") + assert not linkpath.check(link=1) + assert linkpath.check(link=0) + + def test_relto_with_root(self, path1, tmpdir): + y = path1.join("x").relto(local("/")) + assert y[0] == str(path1)[1] + + def test_visit_recursive_symlink(self, tmpdir): + linkpath = tmpdir.join("test") + linkpath.mksymlinkto(tmpdir) + visitor = tmpdir.visit(None, lambda x: x.check(link=0)) + assert list(visitor) == [linkpath] + + def test_symlink_isdir(self, tmpdir): + linkpath = tmpdir.join("test") + linkpath.mksymlinkto(tmpdir) + assert linkpath.check(dir=1) + assert not linkpath.check(link=0, dir=1) + + def test_symlink_remove(self, tmpdir): + linkpath = tmpdir.join("test") + linkpath.mksymlinkto(linkpath) # point to itself + assert linkpath.check(link=1) + linkpath.remove() + assert not linkpath.check() + + def test_realpath_file(self, tmpdir): + linkpath = tmpdir.join("test") + filepath = tmpdir.join("file") + filepath.write("") + linkpath.mksymlinkto(filepath) + realpath = linkpath.realpath() + assert realpath.basename == "file" + + def test_owner(self, path1, tmpdir): + from pwd import getpwuid + from grp import getgrgid + + stat = path1.stat() + assert stat.path == path1 + + uid = stat.uid + gid = stat.gid + owner = getpwuid(uid)[0] + group = getgrgid(gid)[0] + + assert uid == stat.uid + assert owner == stat.owner + assert gid == stat.gid + assert group == stat.group + + def test_stat_helpers(self, tmpdir, monkeypatch): + path1 = tmpdir.ensure("file") + stat1 = path1.stat() + stat2 = tmpdir.stat() + assert stat1.isfile() + assert stat2.isdir() + assert not stat1.islink() + assert not stat2.islink() + + def test_stat_non_raising(self, tmpdir): + path1 = tmpdir.join("file") + pytest.raises(error.ENOENT, lambda: path1.stat()) + res = path1.stat(raising=False) + assert res is None + + def test_atime(self, tmpdir): + import time + + path = tmpdir.ensure("samplefile") + now = time.time() + atime1 = path.atime() + # we could wait here but timer resolution is very + # system dependent + path.read() + time.sleep(ATIME_RESOLUTION) + atime2 = path.atime() + time.sleep(ATIME_RESOLUTION) + duration = time.time() - now + assert (atime2 - atime1) <= duration + + def test_commondir(self, path1): + # XXX This is here in local until we find a way to implement this + # using the subversion command line api. + p1 = path1.join("something") + p2 = path1.join("otherthing") + assert p1.common(p2) == path1 + assert p2.common(p1) == path1 + + def test_commondir_nocommon(self, path1): + # XXX This is here in local until we find a way to implement this + # using the subversion command line api. + p1 = path1.join("something") + p2 = local(path1.sep + "blabla") + assert p1.common(p2) == "/" + + def test_join_to_root(self, path1): + root = path1.parts()[0] + assert len(str(root)) == 1 + assert str(root.join("a")) == "/a" + + def test_join_root_to_root_with_no_abs(self, path1): + nroot = path1.join("/") + assert str(path1) == str(nroot) + assert path1 == nroot + + def test_chmod_simple_int(self, path1): + mode = path1.stat().mode + path1.chmod(int(mode / 2)) + try: + assert path1.stat().mode != mode + finally: + path1.chmod(mode) + assert path1.stat().mode == mode + + def test_chmod_rec_int(self, path1): + # XXX fragile test + def recfilter(x): + return x.check(dotfile=0, link=0) + + oldmodes = {} + for x in path1.visit(rec=recfilter): + oldmodes[x] = x.stat().mode + path1.chmod(int("772", 8), rec=recfilter) + try: + for x in path1.visit(rec=recfilter): + assert x.stat().mode & int("777", 8) == int("772", 8) + finally: + for x, y in oldmodes.items(): + x.chmod(y) + + def test_copy_archiving(self, tmpdir): + unicode_fn = "something-\342\200\223.txt" + f = tmpdir.ensure("a", unicode_fn) + a = f.dirpath() + oldmode = f.stat().mode + newmode = oldmode ^ 1 + f.chmod(newmode) + b = tmpdir.join("b") + a.copy(b, mode=True) + assert b.join(f.basename).stat().mode == newmode + + def test_copy_stat_file(self, tmpdir): + src = tmpdir.ensure("src") + dst = tmpdir.join("dst") + # a small delay before the copy + time.sleep(ATIME_RESOLUTION) + src.copy(dst, stat=True) + oldstat = src.stat() + newstat = dst.stat() + assert oldstat.mode == newstat.mode + assert (dst.atime() - src.atime()) < ATIME_RESOLUTION + assert (dst.mtime() - src.mtime()) < ATIME_RESOLUTION + + def test_copy_stat_dir(self, tmpdir): + test_files = ["a", "b", "c"] + src = tmpdir.join("src") + for f in test_files: + src.join(f).write(f, ensure=True) + dst = tmpdir.join("dst") + # a small delay before the copy + time.sleep(ATIME_RESOLUTION) + src.copy(dst, stat=True) + for f in test_files: + oldstat = src.join(f).stat() + newstat = dst.join(f).stat() + assert (newstat.atime - oldstat.atime) < ATIME_RESOLUTION + assert (newstat.mtime - oldstat.mtime) < ATIME_RESOLUTION + assert oldstat.mode == newstat.mode + + def test_chown_identity(self, path1): + owner = path1.stat().owner + group = path1.stat().group + path1.chown(owner, group) + + def test_chown_dangling_link(self, path1): + owner = path1.stat().owner + group = path1.stat().group + x = path1.join("hello") + x.mksymlinkto("qlwkejqwlek") + try: + path1.chown(owner, group, rec=1) + finally: + x.remove(rec=0) + + def test_chown_identity_rec_mayfail(self, path1): + owner = path1.stat().owner + group = path1.stat().group + path1.chown(owner, group) + + +class TestUnicodePy2Py3: + def test_join_ensure(self, tmpdir, monkeypatch): + if sys.version_info >= (3, 0) and "LANG" not in os.environ: + pytest.skip("cannot run test without locale") + x = local(tmpdir.strpath) + part = "hällo" + y = x.ensure(part) + assert x.join(part) == y + + def test_listdir(self, tmpdir): + if sys.version_info >= (3, 0) and "LANG" not in os.environ: + pytest.skip("cannot run test without locale") + x = local(tmpdir.strpath) + part = "hällo" + y = x.ensure(part) + assert x.listdir(part)[0] == y + + @pytest.mark.xfail(reason="changing read/write might break existing usages") + def test_read_write(self, tmpdir): + x = tmpdir.join("hello") + part = "hällo" + x.write(part) + assert x.read() == part + x.write(part.encode(sys.getdefaultencoding())) + assert x.read() == part.encode(sys.getdefaultencoding()) + + +class TestBinaryAndTextMethods: + def test_read_binwrite(self, tmpdir): + x = tmpdir.join("hello") + part = "hällo" + part_utf8 = part.encode("utf8") + x.write_binary(part_utf8) + assert x.read_binary() == part_utf8 + s = x.read_text(encoding="utf8") + assert s == part + assert isinstance(s, str) + + def test_read_textwrite(self, tmpdir): + x = tmpdir.join("hello") + part = "hällo" + part_utf8 = part.encode("utf8") + x.write_text(part, encoding="utf8") + assert x.read_binary() == part_utf8 + assert x.read_text(encoding="utf8") == part + + def test_default_encoding(self, tmpdir): + x = tmpdir.join("hello") + # Can't use UTF8 as the default encoding (ASCII) doesn't support it + part = "hello" + x.write_text(part, "ascii") + s = x.read_text("ascii") + assert s == part + assert type(s) == type(part) diff --git a/testing/test_pathlib.py b/testing/test_pathlib.py index c901dc6f435..577c7749fd9 100644 --- a/testing/test_pathlib.py +++ b/testing/test_pathlib.py @@ -91,6 +91,12 @@ def path1(self, tmp_path_factory: TempPathFactory) -> Generator[Path, None, None yield path assert path.joinpath("samplefile").exists() + @pytest.fixture(autouse=True) + def preserve_sys(self): + with unittest.mock.patch.dict(sys.modules): + with unittest.mock.patch.object(sys, "path", list(sys.path)): + yield + def setuptestfs(self, path: Path) -> None: # print "setting up test fs for", repr(path) samplefile = path / "samplefile" diff --git a/tox.ini b/tox.ini index f1ab4b815a5..f04242c5c43 100644 --- a/tox.ini +++ b/tox.ini @@ -10,7 +10,7 @@ envlist = py310 py311 pypy3 - py37-{pexpect,xdist,unittestextras,numpy,pluggymain} + py37-{pexpect,xdist,unittestextras,numpy,pluggymain,pylib} doctesting plugins py37-freeze @@ -54,6 +54,7 @@ deps = numpy: numpy>=1.19.4 pexpect: pexpect>=4.8.0 pluggymain: pluggy @ git+https://github.com/pytest-dev/pluggy.git + pylib: py>=1.8.2 unittestextras: twisted unittestextras: asynctest xdist: pytest-xdist>=2.1.0