forked from ClearcodeHQ/mirakuru
-
Notifications
You must be signed in to change notification settings - Fork 0
/
output.py
154 lines (123 loc) · 5.54 KB
/
output.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# Copyright (C) 2014 by Clearcode <http://clearcode.cc>
# and associates (see AUTHORS).
# This file is part of mirakuru.
# mirakuru is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# mirakuru is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public License
# along with mirakuru. If not, see <http://www.gnu.org/licenses/>.
"""Executor that awaits for appearance of a predefined banner in output."""
import platform
import re
import select
from typing import Union, List, Any, TypeVar, Tuple, IO, Optional
from mirakuru.base import SimpleExecutor
IS_DARWIN = platform.system() == 'Darwin'
OutputExecutorType = TypeVar("OutputExecutorType", bound="OutputExecutor")
class OutputExecutor(SimpleExecutor):
"""Executor that awaits for string output being present in output."""
def __init__(self,
command: Union[str, List[str], Tuple[str, ...]],
banner: str,
**kwargs: Any) -> None:
"""
Initialize OutputExecutor executor.
:param (str, list) command: command to be run by the subprocess
:param str banner: string that has to appear in process output -
should compile to regular expression.
:param bool shell: same as the `subprocess.Popen` shell definition
:param int timeout: number of seconds to wait for the process to start
or stop. If None or False, wait indefinitely.
:param float sleep: how often to check for start/stop condition
:param int sig_stop: signal used to stop process run by the executor.
default is `signal.SIGTERM`
:param int sig_kill: signal used to kill process run by the executor.
default is `signal.SIGKILL` (`signal.SIGTERM` on Windows)
"""
super(OutputExecutor, self).__init__(command, **kwargs)
self._banner = re.compile(banner)
if not any((self._stdout, self._stderr)):
raise TypeError(
'At least one of stdout or stderr has to be initialized'
)
def start(self: OutputExecutorType) -> OutputExecutorType:
"""
Start process.
:returns: itself
:rtype: OutputExecutor
.. note::
Process will be considered started, when defined banner will appear
in process output.
"""
super(OutputExecutor, self).start()
if not IS_DARWIN:
polls: List[Tuple[select.poll, IO[Any]]] = []
for output_handle, output_method in (
(self._stdout, self.output),
(self._stderr, self.err_output)
):
if output_handle is not None:
# get a polling object
std_poll = select.poll()
output_file = output_method()
if output_file is None:
raise ValueError(
"The process is started but "
"the output file is None"
)
# register a file descriptor
# POLLIN because we will wait for data to read
std_poll.register(output_file, select.POLLIN)
polls.append((std_poll, output_file))
try:
def await_for_output() -> bool:
return self._wait_for_output(*polls)
self.wait_for(await_for_output)
for poll, output in polls:
# unregister the file descriptor
# and delete the polling object
poll.unregister(output)
finally:
for poll_and_output in polls:
del poll_and_output
else:
outputs = []
for output_handle, output_method in (
(self._stdout, self.output),
(self._stderr, self.err_output)
):
if output_handle is not None:
outputs.append(output_method())
def await_for_output() -> bool:
return self._wait_for_darwin_output(*outputs)
self.wait_for(await_for_output)
return self
def _wait_for_darwin_output(self, *fds: Optional[IO[Any]]) -> bool:
"""Select implementation to be used on MacOSX"""
rlist, _, _ = select.select(fds, [], [], 0)
for output in rlist:
line = output.readline()
if self._banner.match(line):
return True
return False
def _wait_for_output(self, *polls: Tuple['select.poll', IO[Any]]) -> bool:
"""
Check if output matches banner.
.. warning::
Waiting for I/O completion. It does not work on Windows. Sorry.
"""
for poll, output in polls:
# Here we should get an empty list or list with a tuple
# [(fd, event)]. When we get list with a tuple we can use readline
# method on the file descriptor.
poll_result = poll.poll(0)
if poll_result:
line = output.readline()
if self._banner.match(line):
return True
return False