Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix ANSI control sequences in Jupyter notebooks #193

Merged
merged 11 commits into from May 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 7 additions & 7 deletions tests/test_in_out.py
Expand Up @@ -81,7 +81,7 @@ def test_write(capsys, text):

out, _ = capsys.readouterr()
# cleans stdout from _clear_line and \r
out = out.replace("\r\033[K", "")
out = out.replace("\r\033[0K", "")

assert isinstance(out, (str, bytes))
assert out[-1] == "\n"
Expand Down Expand Up @@ -110,15 +110,15 @@ def teardown():
out, _ = capsys.readouterr()

# ensure that text was cleared with the hide method
assert out[-4:] == "\r\033[K"
assert out[-5:] == "\r\033[0K"

# ``\n`` is required to flush stdout during
# the hidden state of the spinner
sys.stdout.write("{0}\n".format(text))
out, _ = capsys.readouterr()

# cleans stdout from _clear_line and \r
out = out.replace("\r\033[K", "")
out = out.replace("\r\033[0K", "")

assert isinstance(out, (str, bytes))
assert out[-1] == "\n"
Expand All @@ -132,7 +132,7 @@ def teardown():
out, _ = capsys.readouterr()

# ensure that text was cleared before resuming the spinner
assert out[:4] == "\r\033[K"
assert out[:5] == "\r\033[0K"


def test_spinner_write_race_condition(capsys):
Expand Down Expand Up @@ -174,7 +174,7 @@ def test_spinner_hiding_with_context_manager(capsys):

# make sure no spinner text was printed while the spinner was hidden
out, _ = capsys.readouterr()
out = out.replace("\r\033[K", "")
out = out.replace("\r\033[0K", "")
assert "{}\n{}".format(HIDDEN_START, HIDDEN_END) in out


Expand Down Expand Up @@ -202,7 +202,7 @@ def test_spinner_nested_hiding_with_context_manager(capsys):

# make sure no spinner text was printed while the spinner was hidden
out, _ = capsys.readouterr()
out = out.replace("\r\033[K", "")
out = out.replace("\r\033[0K", "")
assert "{}\n{}".format(HIDDEN_START, HIDDEN_END) in out


Expand Down Expand Up @@ -239,4 +239,4 @@ def test_write_non_str_objects(capsys, obj, obj_str):
capsys.readouterr()
sp.write(obj)
out, _ = capsys.readouterr()
assert out == "\r\033[K{}\n".format(obj_str)
assert out == "\r\033[0K{}\n".format(obj_str)
29 changes: 11 additions & 18 deletions yaspin/core.py
Expand Up @@ -225,9 +225,7 @@ def start(self):
if self._sigmap:
self._register_signal_handlers()

if sys.stdout.isatty():
self._hide_cursor()

self._hide_cursor()
self._start_time = time.time()
self._stop_time = None # Reset value to properly calculate subsequent spinner starts (if any) # pylint: disable=line-too-long
self._stop_spin = threading.Event()
Expand All @@ -251,11 +249,8 @@ def stop(self):
self._stop_spin.set()
self._spin_thread.join()

sys.stdout.write("\r")
self._clear_line()

if sys.stdout.isatty():
self._show_cursor()
self._show_cursor()

def hide(self):
"""Hide the spinner to allow for custom writing to the terminal."""
Expand All @@ -265,9 +260,6 @@ def hide(self):
with self._stdout_lock:
# set the hidden spinner flag
self._hide_spin.set()

# clear the current line
sys.stdout.write("\r")
self._clear_line()

# flush the stdout buffer so the current line
Expand Down Expand Up @@ -298,15 +290,13 @@ def show(self):
self._hide_spin.clear()

# clear the current line so the spinner is not appended to it
sys.stdout.write("\r")
self._clear_line()

def write(self, text):
"""Write text in the terminal without breaking the spinner."""
# similar to tqdm.write()
# https://pypi.python.org/pypi/tqdm#writing-messages
with self._stdout_lock:
sys.stdout.write("\r")
self._clear_line()

if isinstance(text, (str, bytes)):
Expand Down Expand Up @@ -357,8 +347,8 @@ def _spin(self):

# Write
with self._stdout_lock:
sys.stdout.write(out)
self._clear_line()
sys.stdout.write(out)
sys.stdout.flush()

# Wait
Expand Down Expand Up @@ -533,14 +523,17 @@ def _set_cycle(frames):

@staticmethod
def _hide_cursor():
sys.stdout.write("\033[?25l")
sys.stdout.flush()
if sys.stdout.isatty():
sys.stdout.write("\033[?25l")
sys.stdout.flush()

@staticmethod
def _show_cursor():
sys.stdout.write("\033[?25h")
sys.stdout.flush()
if sys.stdout.isatty():
sys.stdout.write("\033[?25h")
sys.stdout.flush()

@staticmethod
def _clear_line():
sys.stdout.write("\033[K")
sys.stdout.write("\r")
sys.stdout.write("\033[0K")