-
Notifications
You must be signed in to change notification settings - Fork 0
/
log.py
141 lines (127 loc) · 4.17 KB
/
log.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
from sys import stderr
import os
from io import UnsupportedOperation
from warnings import warn
import time
class TerminalLog:
def __init__(self):
# Defaults
self.tty = False
self.curses = None
if not stderr:
return
try:
if not stderr.buffer.isatty():
raise UnsupportedOperation()
except (AttributeError, UnsupportedOperation):
return
self.tty = True
try:
import curses
except ImportError:
return
self.write(str())
self.flush()
termstr = os.getenv("TERM", "")
fd = stderr.buffer.fileno()
try:
curses.setupterm(termstr, fd)
except curses.error as err:
warn(err)
self.curses = curses
def write(self, text):
if stderr:
stderr.write(text)
self.flushed = False
def flush(self):
if stderr and not self.flushed:
stderr.flush()
self.flushed = True
def carriage_return(self):
if self.curses:
self.tput("cr")
elif self.tty:
self.write("\r")
else:
self.write("\n")
def clear_eol(self):
return self.tput("el")
def tput(self, capname):
if not self.curses:
return
string = self.curses.tigetstr(capname)
segs = string.split(b"$<")
string = segs[0]
string += bytes().join(s.split(b">", 1)[1] for s in segs[1:])
self.flush()
stderr.buffer.write(string)
def format_size(size):
"""
format_size(0) -> "0 B" # Only instance of leading zero
format_size(33) -> "33 B" # No extra significant digits
format_size(10150) -> "10.2 kB" # Round half up to even; three digits
format_size(222500) -> "222 kB" # Half down to even; no decimal point
format_size(9876543) -> "9876 kB" # Four digits
format_size(9999500) -> "10.0 MB" # Rounding forces prefix change
format_size(9999499999) -> "9999 MB" # Only round off once
"""
ndigits = 0
multiple = 1
prefix = ""
prefixes = iter("kMGTPEZY")
while True:
rounded = round(size, ndigits)
if rounded < 10000 * multiple:
break
p = next(prefixes, None)
if not p:
break
prefix = p
multiple *= 1000
ndigits -= 2
rounded = round(size, ndigits)
if rounded < 100 * multiple:
[int, fract] = divmod(rounded, multiple)
return "{}.{} {}B".format(int, fract * 10 // multiple, prefix)
ndigits -= 1
return "{} {}B".format(rounded // multiple, prefix)
class Progress:
SAMPLES = 30
# TODO: try keeping samples for say up to 10 s, but drop samples
# that are older than 10 s rather than having a fixed # of samples
def __init__(self, log, total, progress=0):
self.log = log
self.total = total
self.last = time.monotonic()
self.samples = [(self.last, progress)] * self.SAMPLES
self.sample = 0
def update(self, progress):
now = time.monotonic()
interval = now - self.last
if interval < 0.1:
return
self.last = now
[then, prev] = self.samples[self.sample]
rate = (progress - prev) / (now - then)
self.samples[self.sample] = (now, progress)
self.sample = (self.sample + 1) % self.SAMPLES
# TODO: detect non terminal, including IDLE; allow this determination to be overridden
if rate:
eta = (self.total - progress) / rate
if rate and eta < 9999 * 60 + 59:
[min, sec] = divmod(eta, 60)
else:
min = 9999
sec = 99
progress /= self.total
# TODO: round progress pc down so 100% means exactly done
# Flexible units for rate
self.log.carriage_return()
self.log.clear_eol()
self.log.write("{:5.1%}{:6.0f}kB/s{:5}m{:02}s".format(
progress, rate / 1000, -int(min), int(sec)))
self.log.flush()
@classmethod
def close(cls, log):
log.carriage_return()
log.clear_eol()