Skip to content

Commit

Permalink
Increased type checks
Browse files Browse the repository at this point in the history
  • Loading branch information
ssbarnea committed Nov 6, 2020
1 parent 30616b0 commit 33d411c
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 28 deletions.
2 changes: 2 additions & 0 deletions .pre-commit-config.yaml
Expand Up @@ -50,13 +50,15 @@ repos:
entry: mypy lib/
pass_filenames: false
additional_dependencies:
- pytest>=6.1.2
- packaging
- rich
- repo: https://github.com/pre-commit/mirrors-pylint
rev: v2.6.0
hooks:
- id: pylint
additional_dependencies:
- pytest>=6.1.2
- rich
- typing
- typing-extensions
53 changes: 34 additions & 19 deletions lib/subprocess_tee/__init__.py
@@ -1,18 +1,24 @@
"""tee-like run implementation."""
"""tee like run implementation."""
import asyncio
import os
import platform
import subprocess
import sys
from subprocess import CompletedProcess
from typing import Any, Callable, Dict, List, Optional
from asyncio import StreamReader
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Union

if TYPE_CHECKING:
CompletedProcess = subprocess.CompletedProcess[Any] # pylint: disable=E1136
else:
CompletedProcess = subprocess.CompletedProcess

try:
from shlex import join # type: ignore
except ImportError:
from subprocess import list2cmdline as join # pylint: disable=ungrouped-imports


async def _read_stream(stream, callback: Callable):
async def _read_stream(stream: StreamReader, callback: Callable[..., Any]) -> None:
while True:
line = await stream.readline()
if line:
Expand All @@ -21,7 +27,7 @@ async def _read_stream(stream, callback: Callable):
break


async def _stream_subprocess(args, **kwargs) -> CompletedProcess:
async def _stream_subprocess(args: str, **kwargs: Any) -> CompletedProcess:
platform_settings: Dict[str, Any] = {}
if platform.system() == "Windows":
platform_settings["env"] = os.environ
Expand All @@ -46,21 +52,28 @@ async def _stream_subprocess(args, **kwargs) -> CompletedProcess:
out: List[str] = []
err: List[str] = []

def tee(line: bytes, sink: List[str], pipe: Optional[Any]):
def tee(line: bytes, sink: List[str], pipe: Optional[Any]) -> None:
line_str = line.decode("utf-8").rstrip()
sink.append(line_str)
if not kwargs.get("quiet", False):
print(line_str, file=pipe)

loop = asyncio.get_event_loop()
task1 = loop.create_task(
_read_stream(process.stdout, lambda l: tee(l, out, sys.stdout))
)
task2 = loop.create_task(
_read_stream(process.stderr, lambda l: tee(l, err, sys.stderr))
)

await asyncio.wait({task1, task2})
tasks = []
if process.stdout:
tasks.append(
loop.create_task(
_read_stream(process.stdout, lambda l: tee(l, out, sys.stdout))
)
)
if process.stderr:
tasks.append(
loop.create_task(
_read_stream(process.stderr, lambda l: tee(l, err, sys.stderr))
)
)

await asyncio.wait(set(tasks))

# We need to be sure we keep the stdout/stderr output identical with
# the ones procued by subprocess.run(), at least when in text mode.
Expand All @@ -72,21 +85,23 @@ def tee(line: bytes, sink: List[str], pipe: Optional[Any]):
)


def run(cmd, **kwargs):
def run(args: Union[str, List[str]], **kwargs: Any) -> CompletedProcess:
"""Drop-in replacement for subprocerss.run that behaves like tee.
Extra arguments added by our version:
echo: False - Prints command before executing it.
quiet: False - Avoid printing output
"""
if not isinstance(cmd, str):
if isinstance(args, str):
cmd = args
else:
# run was called with a list instead of a single item but asyncio
# create_subprocess_shell requires command as a single string, so
# we need to convert it to string
cmd = join(cmd)
cmd = join(args)

if not isinstance(cmd, str):
raise RuntimeError(f"Unable to process {cmd}")
# if not isinstance(cmd, str):
# raise RuntimeError(f"Unable to process {cmd}")

if kwargs.get("echo", False):
print(f"COMMAND: {cmd}")
Expand Down
6 changes: 3 additions & 3 deletions lib/subprocess_tee/rich.py
Expand Up @@ -48,12 +48,12 @@ def flush(self) -> None:
class ConsoleEx(Console):
"""Extends rich Console class."""

def __init__(self, *args, **kwargs):
def __init__(self, *args: str, **kwargs: Any) -> None:
self.redirect = kwargs.get("redirect", False)
if "redirect" in kwargs:
del kwargs["redirect"]
super().__init__(*args, **kwargs)
self.extended = True
if self.redirect:
sys.stdout = FileProxy(self, sys.stdout)
sys.stderr = FileProxy(self, sys.stderr)
sys.stdout = FileProxy(self, sys.stdout) # type: ignore
sys.stderr = FileProxy(self, sys.stderr) # type: ignore
10 changes: 6 additions & 4 deletions lib/subprocess_tee/test/test_unit.py
@@ -1,10 +1,12 @@
"""Unittests."""
import subprocess

import _pytest.capture

from subprocess_tee import run


def test_run_string():
def test_run_string() -> None:
"""Valida run() called with a single string command."""
cmd = "echo 111 && >&2 echo 222"
old_result = subprocess.run(
Expand All @@ -21,7 +23,7 @@ def test_run_string():
assert result.stderr == old_result.stderr


def test_run_list():
def test_run_list() -> None:
"""Validate run call with a command made of list of strings."""
# NOTICE: subprocess.run() does fail to capture any output when cmd is
# a list and you specific shell=True. Still, when not mentioning shell,
Expand All @@ -41,7 +43,7 @@ def test_run_list():
assert result.stderr == old_result.stderr


def test_run_echo(capsys):
def test_run_echo(capsys: _pytest.capture.CaptureFixture) -> None:
"""Validate run call with echo dumps command."""
cmd = ["python3", "--version"]
old_result = subprocess.run(
Expand All @@ -61,7 +63,7 @@ def test_run_echo(capsys):
assert err == ""


def test_run_with_env():
def test_run_with_env() -> None:
"""Validate that passing custom env to run() works."""
env = {"FOO": "BAR"}
result = run("echo $FOO", env=env, echo=True)
Expand Down
15 changes: 13 additions & 2 deletions mypy.ini
Expand Up @@ -2,5 +2,16 @@
python_version = 3.6
color_output = True
error_summary = True
disallow_untyped_calls=True
warn_redundant_casts=True
disallow_any_generics = True
disallow_any_unimported = True
disallow_untyped_calls = True
disallow_untyped_defs = True
warn_redundant_casts = True
warn_return_any = True
warn_unused_configs = True

[mypy-subprocess_tee.test.*]
# Temporary relaxed on typing for pytests:
# https://github.com/pytest-dev/pytest/issues/3342
disallow_untyped_defs = False
disallow_any_generics = False

0 comments on commit 33d411c

Please sign in to comment.