Skip to content

Commit

Permalink
_pytest._py.path: get mypy passing
Browse files Browse the repository at this point in the history
  • Loading branch information
asottile committed Oct 20, 2022
1 parent ed4c18f commit 59d8f8a
Showing 1 changed file with 58 additions and 24 deletions.
82 changes: 58 additions & 24 deletions src/_pytest/_py/path.py
Expand Up @@ -22,9 +22,16 @@
from stat import S_ISDIR
from stat import S_ISLNK
from stat import S_ISREG
from typing import Any
from typing import Callable
from typing import overload
from typing import TYPE_CHECKING

from . import error

if TYPE_CHECKING:
from typing import Literal

# Moved from local.py.
iswin32 = sys.platform == "win32" or (getattr(os, "_name", False) == "nt")

Expand Down Expand Up @@ -96,7 +103,9 @@ def _evaluate(self, kw):
return False
return True

def _stat(self):
_statcache: Stat

def _stat(self) -> Stat:
try:
return self._statcache
except AttributeError:
Expand Down Expand Up @@ -129,7 +138,7 @@ def __init__(self, fil, rec, ignore, bf, sort):
if isinstance(fil, str):
fil = FNMatcher(fil)
if isinstance(rec, str):
self.rec = FNMatcher(rec)
self.rec: Callable[[LocalPath], bool] = FNMatcher(rec)
elif not hasattr(rec, "__call__") and rec:
self.rec = lambda path: True
else:
Expand Down Expand Up @@ -192,7 +201,17 @@ def map_as_list(func, iter):


class Stat:
def __getattr__(self, name):
if TYPE_CHECKING:

@property
def size(self) -> int:
...

@property
def mtime(self) -> float:
...

def __getattr__(self, name: str) -> Any:
return getattr(self._osstatresult, "st_" + name)

def __init__(self, path, osstatresult):
Expand Down Expand Up @@ -295,9 +314,10 @@ def chown(self, user, group, rec=0):
error.checked_call(os.chown, str(x), uid, gid)
error.checked_call(os.chown, str(self), uid, gid)

def readlink(self):
def readlink(self) -> str:
"""Return value of a symbolic link."""
return error.checked_call(os.readlink, self.strpath)
# https://github.com/python/mypy/issues/12278
return error.checked_call(os.readlink, self.strpath) # type: ignore[arg-type,return-value]

def mklinkto(self, oldname):
"""Posix style hard link to another name."""
Expand Down Expand Up @@ -659,32 +679,31 @@ def new(self, **kw):
obj.strpath = normpath("%(dirname)s%(sep)s%(basename)s" % kw)
return obj

def _getbyspec(self, spec):
def _getbyspec(self, spec: str) -> list[str]:
"""See new for what 'spec' can be."""
res = []
parts = self.strpath.split(self.sep)

args = filter(None, spec.split(","))
append = res.append
for name in args:
if name == "drive":
append(parts[0])
res.append(parts[0])
elif name == "dirname":
append(self.sep.join(parts[:-1]))
res.append(self.sep.join(parts[:-1]))
else:
basename = parts[-1]
if name == "basename":
append(basename)
res.append(basename)
else:
i = basename.rfind(".")
if i == -1:
purebasename, ext = basename, ""
else:
purebasename, ext = basename[:i], basename[i:]
if name == "purebasename":
append(purebasename)
res.append(purebasename)
elif name == "ext":
append(ext)
res.append(ext)
else:
raise ValueError("invalid part specification %r" % name)
return res
Expand All @@ -699,16 +718,16 @@ def dirpath(self, *args, **kwargs):
return path
return self.new(basename="").join(*args, **kwargs)

def join(self, *args, **kwargs):
def join(self, *args: os.PathLike[str], abs: bool = False) -> LocalPath:
"""Return a new path by appending all 'args' as path
components. if abs=1 is used restart from root if any
of the args is an absolute path.
"""
sep = self.sep
strargs = [os.fspath(arg) for arg in args]
strpath = self.strpath
if kwargs.get("abs"):
newargs = []
if abs:
newargs: list[str] = []
for arg in reversed(strargs):
if isabs(arg):
strpath = arg
Expand Down Expand Up @@ -801,11 +820,11 @@ def listdir(self, fil=None, sort=None):
self._sortlist(res, sort)
return res

def size(self):
def size(self) -> int:
"""Return size of the underlying file object"""
return self.stat().size

def mtime(self):
def mtime(self) -> float:
"""Return last modification time of the path."""
return self.stat().mtime

Expand Down Expand Up @@ -936,7 +955,15 @@ def ensure(self, *args, **kwargs):
p.open("w").close()
return p

def stat(self, raising=True):
@overload
def stat(self, raising: Literal[True] = ...) -> Stat:
...

@overload
def stat(self, raising: Literal[False]) -> Stat | None:
...

def stat(self, raising: bool = True) -> Stat | None:
"""Return an os.stat() tuple."""
if raising:
return Stat(self, error.checked_call(os.stat, self.strpath))
Expand All @@ -947,7 +974,7 @@ def stat(self, raising=True):
except Exception:
return None

def lstat(self):
def lstat(self) -> Stat:
"""Return an os.lstat() tuple."""
return Stat(self, error.checked_call(os.lstat, self.strpath))

Expand Down Expand Up @@ -1067,7 +1094,7 @@ def pyimport(self, modname=None, ensuresyspath=True):
if modname is None:
modname = self.purebasename
spec = importlib.util.spec_from_file_location(modname, str(self))
if spec is None:
if spec is None or spec.loader is None:
raise ImportError(
f"Can't find module {modname} at location {str(self)}"
)
Expand Down Expand Up @@ -1095,6 +1122,7 @@ def pyimport(self, modname=None, ensuresyspath=True):
return mod # we don't check anything as we might
# be in a namespace package ... too icky to check
modfile = mod.__file__
assert modfile is not None
if modfile[-4:] in (".pyc", ".pyo"):
modfile = modfile[:-1]
elif modfile.endswith("$py.class"):
Expand Down Expand Up @@ -1129,16 +1157,22 @@ def pyimport(self, modname=None, ensuresyspath=True):
raise
return mod

def sysexec(self, *argv, **popen_opts):
def sysexec(self, *argv: os.PathLike[str], **popen_opts: Any) -> str:
"""Return stdout text from executing a system child process,
where the 'self' path points to executable.
The process is directly invoked and not through a system shell.
"""
from subprocess import Popen, PIPE

argv = map_as_list(str, argv)
popen_opts["stdout"] = popen_opts["stderr"] = PIPE
proc = Popen([str(self)] + argv, **popen_opts)
popen_opts.pop("stdout", None)
popen_opts.pop("stderr", None)
proc = Popen(
[str(self)] + [str(arg) for arg in argv],
**popen_opts,
stdout=PIPE,
stderr=PIPE,
)
stdout: str | bytes
stdout, stderr = proc.communicate()
ret = proc.wait()
if isinstance(stdout, bytes):
Expand Down

0 comments on commit 59d8f8a

Please sign in to comment.