Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use alternative to ANSI control sequences in Jupyter notebooks #195

Merged
merged 25 commits into from Jun 3, 2022
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions tests/conftest.py
Expand Up @@ -95,6 +95,11 @@ def reversal(request):
return request.param


@pytest.fixture(scope="session", params=[True, False], ids=["terminal", "Jupyter"])
pavdmyt marked this conversation as resolved.
Show resolved Hide resolved
def isatty(request):
pavdmyt marked this conversation as resolved.
Show resolved Hide resolved
return request.param


def color_id_func(case):
if isinstance(case, tuple):
color, _ = case
Expand Down
10 changes: 7 additions & 3 deletions tests/test_attrs.py
Expand Up @@ -5,6 +5,7 @@
Test Yaspin attributes magic hidden in __getattr__.
"""

import sys
pavdmyt marked this conversation as resolved.
Show resolved Hide resolved
import pytest

from yaspin import yaspin
Expand All @@ -19,12 +20,13 @@ def test_set_spinner_by_name(attr_name):


# Values for ``color`` argument
def test_color(color_test_cases):
def test_color(monkeypatch, color_test_cases):
color, expected = color_test_cases
# ``None`` and ``""`` are skipped
if not color:
pytest.skip("{0} - unsupported case".format(repr(color)))

monkeypatch.setattr(sys.stdout, "isatty", lambda: True)
Comment on lines +24 to +30
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe there is an easier way to do this over and over?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is possible to make this patch at the module level, like this:

@pytest.fixture(autouse=True)
def isatty_true(monkeypatch):
    monkeypatch.setattr(sys.stdout, "isatty", lambda: True)

From the docs: https://docs.pytest.org/en/6.2.x/monkeypatch.html#global-patch-example-preventing-requests-from-remote-operations

sp = yaspin()

if isinstance(expected, Exception):
Expand All @@ -37,12 +39,13 @@ def test_color(color_test_cases):


# Values for ``on_color`` argument
def test_on_color(on_color_test_cases):
def test_on_color(monkeypatch, on_color_test_cases):
on_color, expected = on_color_test_cases
# ``None`` and ``""`` are skipped
if not on_color:
pytest.skip("{0} - unsupported case".format(repr(on_color)))

monkeypatch.setattr(sys.stdout, "isatty", lambda: True)
sp = yaspin()

if isinstance(expected, Exception):
Expand All @@ -60,7 +63,8 @@ def test_on_color(on_color_test_cases):
@pytest.mark.parametrize(
"attr", sorted([k for k, v in COLOR_MAP.items() if v == "attrs"])
)
def test_attrs(attr):
def test_attrs(monkeypatch, attr):
monkeypatch.setattr(sys.stdout, "isatty", lambda: True)
sp = yaspin()
getattr(sp, attr)
assert sp.attrs == [attr]
Expand Down
62 changes: 50 additions & 12 deletions tests/test_in_out.py
Expand Up @@ -45,7 +45,7 @@ def test_repr(text, frames, interval):


def test_compose_out_with_color(
color_test_cases, on_color_test_cases, attrs_test_cases
monkeypatch, color_test_cases, on_color_test_cases, attrs_test_cases
pavdmyt marked this conversation as resolved.
Show resolved Hide resolved
):
color, color_exp = color_test_cases
on_color, on_color_exp = on_color_test_cases
Expand All @@ -65,6 +65,7 @@ def test_compose_out_with_color(
pytest.skip("{0} - unsupported case".format(items))

# Actual test
monkeypatch.setattr(sys.stdout, "isatty", lambda: True)
sp = yaspin(color=color, on_color=on_color, attrs=attrs)
assert sp._color == color
assert sp._on_color == on_color
Expand All @@ -75,22 +76,47 @@ def test_compose_out_with_color(
assert isinstance(out, str)


def test_write(capsys, text):
def test_color_jupyter(monkeypatch):
monkeypatch.setattr(sys.stdout, "isatty", lambda: False)
sp = yaspin(color="red")

out = sp._compose_out(frame=u"/")
assert "\033" not in out


def test_write(monkeypatch, capsys, text, isatty):
monkeypatch.setattr(sys.stdout, "isatty", lambda: isatty)
sp = yaspin()
sp.write(text)

out, _ = capsys.readouterr()
# cleans stdout from _clear_line and \r
out = out.replace("\r\033[K", "")
# cleans stdout from _clear_line
if isatty:
out = out.replace("\r\033[K", "")
else:
out = out.replace("\r", "")

assert isinstance(out, (str, bytes))
assert out[-1] == "\n"
if text:
assert out[:-1] == text


def test_hide_show(capsys, text, request):
def test_show_jupyter(monkeypatch, capsys):
monkeypatch.setattr(sys.stdout, "isatty", lambda: False)
with yaspin(text="12345") as sp:
sp.start()
sp.write("123")

out, _ = capsys.readouterr()
# check spinner line was correctly overridden with whitespaces
# r = \r, s = spinner char, w = space, 12345 = printed chars
assert "12345\r" + " " * len("rsw12345") + "\r123" in out


def test_hide_show(monkeypatch, capsys, text, request, isatty):
# Setup
monkeypatch.setattr(sys.stdout, "isatty", lambda: isatty)
sp = yaspin()
sp.start()

Expand All @@ -110,15 +136,21 @@ def teardown():
out, _ = capsys.readouterr()

# ensure that text was cleared with the hide method
assert out[-4:] == "\r\033[K"
if isatty:
assert out[-4:] == "\r\033[K"
else:
assert out[-1:] == "\r"

# ``\n`` is required to flush stdout during
# the hidden state of the spinner
sys.stdout.write("{0}\n".format(text))
out, _ = capsys.readouterr()

# cleans stdout from _clear_line and \r
out = out.replace("\r\033[K", "")
# cleans stdout from _clear_line
if isatty:
out = out.replace("\r\033[K", "")
else:
out = out.replace("\r", "")

assert isinstance(out, (str, bytes))
assert out[-1] == "\n"
Expand All @@ -132,7 +164,10 @@ def teardown():
out, _ = capsys.readouterr()

# ensure that text was cleared before resuming the spinner
assert out[:4] == "\r\033[K"
if isatty:
assert out[:4] == "\r\033[K"
else:
assert out[:1] == "\r"


def test_spinner_write_race_condition(capsys):
Expand All @@ -154,9 +189,10 @@ def test_spinner_write_race_condition(capsys):
assert not re.search(r"aaaa[^\rb]*bbbb", out)


def test_spinner_hiding_with_context_manager(capsys):
def test_spinner_hiding_with_context_manager(monkeypatch, capsys):
HIDDEN_START = "hidden start"
HIDDEN_END = "hidden end"
monkeypatch.setattr(sys.stdout, "isatty", lambda: True)
pavdmyt marked this conversation as resolved.
Show resolved Hide resolved
sp = yaspin(text="foo")
sp.start()

Expand All @@ -178,9 +214,10 @@ def test_spinner_hiding_with_context_manager(capsys):
assert "{}\n{}".format(HIDDEN_START, HIDDEN_END) in out


def test_spinner_nested_hiding_with_context_manager(capsys):
def test_spinner_nested_hiding_with_context_manager(monkeypatch, capsys):
HIDDEN_START = "hidden start"
HIDDEN_END = "hidden end"
monkeypatch.setattr(sys.stdout, "isatty", lambda: True)
sp = yaspin(text="foo")
sp.start()

Expand Down Expand Up @@ -234,7 +271,8 @@ def test_spinner_hiding_with_context_manager_and_exception():
(["foo", "bar", "'", 23], """['foo', 'bar', "'", 23]"""),
],
)
def test_write_non_str_objects(capsys, obj, obj_str):
def test_write_non_str_objects(monkeypatch, capsys, obj, obj_str):
monkeypatch.setattr(sys.stdout, "isatty", lambda: True)
sp = yaspin()
capsys.readouterr()
sp.write(obj)
Expand Down
46 changes: 26 additions & 20 deletions yaspin/core.py
Expand Up @@ -75,6 +75,7 @@ def __init__( # pylint: disable=too-many-arguments
self._last_frame = None
self._stdout_lock = threading.Lock()
self._hidden_level = 0
self._cur_line_len = 0

# Signals

Expand Down Expand Up @@ -225,9 +226,7 @@ def start(self):
if self._sigmap:
self._register_signal_handlers()

if sys.stdout.isatty():
self._hide_cursor()

self._hide_cursor()
self._start_time = time.time()
self._stop_time = None # Reset value to properly calculate subsequent spinner starts (if any) # pylint: disable=line-too-long
self._stop_spin = threading.Event()
Expand All @@ -251,11 +250,8 @@ def stop(self):
self._stop_spin.set()
self._spin_thread.join()

sys.stdout.write("\r")
self._clear_line()

if sys.stdout.isatty():
self._show_cursor()
self._show_cursor()

def hide(self):
"""Hide the spinner to allow for custom writing to the terminal."""
Expand All @@ -265,9 +261,6 @@ def hide(self):
with self._stdout_lock:
# set the hidden spinner flag
self._hide_spin.set()

# clear the current line
sys.stdout.write("\r")
self._clear_line()

# flush the stdout buffer so the current line
Expand Down Expand Up @@ -298,15 +291,13 @@ def show(self):
self._hide_spin.clear()

# clear the current line so the spinner is not appended to it
sys.stdout.write("\r")
self._clear_line()

def write(self, text):
"""Write text in the terminal without breaking the spinner."""
# similar to tqdm.write()
# https://pypi.python.org/pypi/tqdm#writing-messages
with self._stdout_lock:
sys.stdout.write("\r")
self._clear_line()

if isinstance(text, (str, bytes)):
Expand All @@ -318,6 +309,7 @@ def write(self, text):
assert isinstance(_text, str)

sys.stdout.write("{0}\n".format(_text))
self._cur_line_len = 0

def ok(self, text="OK"):
"""Set Ok (success) finalizer to a spinner."""
Expand All @@ -342,6 +334,7 @@ def _freeze(self, final_text):
self.stop()
with self._stdout_lock:
sys.stdout.write(self._last_frame)
self._cur_line_len = 0

def _spin(self):
while not self._stop_spin.is_set():
Expand All @@ -357,14 +350,19 @@ def _spin(self):

# Write
with self._stdout_lock:
sys.stdout.write(out)
self._clear_line()
sys.stdout.write(out)
sys.stdout.flush()
self._cur_line_len = max(self._cur_line_len, len(out))

# Wait
self._stop_spin.wait(self._interval)

def _compose_color_func(self):
if not sys.stdout.isatty():
# ANSI Color Control Sequences are problematic in Jupyter
return None
pavdmyt marked this conversation as resolved.
Show resolved Hide resolved

return functools.partial(
colored,
color=self._color,
Expand Down Expand Up @@ -533,14 +531,22 @@ def _set_cycle(frames):

@staticmethod
def _hide_cursor():
sys.stdout.write("\033[?25l")
sys.stdout.flush()
if sys.stdout.isatty():
# ANSI Control Sequence DECTCEM 1 does not work in Jupyter
sys.stdout.write("\033[?25l")
sys.stdout.flush()

@staticmethod
def _show_cursor():
sys.stdout.write("\033[?25h")
sys.stdout.flush()
if sys.stdout.isatty():
# ANSI Control Sequence DECTCEM 2 does not work in Jupyter
sys.stdout.write("\033[?25h")
sys.stdout.flush()

@staticmethod
def _clear_line():
sys.stdout.write("\033[K")
def _clear_line(self):
if sys.stdout.isatty():
# ANSI Control Sequence EL does not work in Jupyter
sys.stdout.write("\r\033[K")
else:
fill = " " * self._cur_line_len
pavdmyt marked this conversation as resolved.
Show resolved Hide resolved
sys.stdout.write("\r{0}\r".format(fill))