-
-
Notifications
You must be signed in to change notification settings - Fork 175
/
junit.py
238 lines (192 loc) · 7.81 KB
/
junit.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
import os
from collections import defaultdict
from typing import Optional, Iterable, Union, Tuple, List, Dict
import junitparser
from junitparser import Element, JUnitXml, TestCase, TestSuite, Skipped
from junitparser.junitparser import etree
from publish.unittestresults import ParsedUnitTestResults, UnitTestCase, ParseError
try:
import lxml
lxml_available = True
except ImportError:
lxml_available = False
def get_results(results: Union[Element, List[Element]], status: Optional[str] = None) -> List[Element]:
"""
Returns the results with the most severe state.
For example: If there are failures and succeeded tests, returns only the failures.
"""
if isinstance(results, List):
d = defaultdict(list)
for result in results:
if result:
d[get_result(result)].append(result)
for state in ['error', 'failure', 'success', 'skipped', 'disabled']:
if state in d:
return d[state]
if status and status in ['disabled']:
return [Disabled()]
return []
return [results]
def get_result(results: Union[Element, List[Element]]) -> str:
"""
Returns the result of the given results.
All results are expected to be of the same state.
:param results:
:return:
"""
if isinstance(results, List):
return get_result(results[0]) if results else 'success'
return results._tag if results else 'success'
def get_message(results: Union[Element, List[Element]]) -> str:
"""
Returns an aggregated message from all given results.
:param results:
:return:
"""
if isinstance(results, List):
messages = [result.message
for result in results
if result and result.message]
message = '\n'.join(messages) if messages else None
else:
message = results.message if results else None
return message
def get_content(results: Union[Element, List[Element]]) -> str:
"""
Returns an aggregated content form all given results.
:param results:
:return:
"""
if isinstance(results, List):
contents = [result.text
for result in results
if result is not None and result.text is not None]
content = '\n'.join(contents) if contents else None
else:
content = results.text if results else None
return content
class DropTestCaseBuilder(etree.TreeBuilder):
_stack = []
def parse(self, filepath):
self._stack.clear()
parser = etree.XMLParser(target=self)
return etree.parse(filepath, parser=parser)
def start(self, tag: Union[str, bytes], attrs: Dict[Union[str, bytes], Union[str, bytes]]) -> Element:
self._stack.append(tag)
if junitparser.TestCase._tag not in self._stack:
return super().start(tag, attrs)
def end(self, tag: Union[str, bytes]) -> Element:
try:
if junitparser.TestCase._tag not in self._stack:
return super().end(tag)
finally:
if self._stack:
self._stack.pop()
def close(self) -> Element:
# when lxml is around, we have to return an ElementTree here, otherwise
# XMLParser(target=...).parse(..., parser=...)
# returns an Element, not a ElementTree, but junitparser expects an ElementTree
#
# https://lxml.de/parsing.html:
# Note that the parser does not build a tree when using a parser target. The result of the parser run is
# whatever the target object returns from its .close() method. If you want to return an XML tree here, you
# have to create it programmatically in the target object.
if lxml_available:
return lxml.etree.ElementTree(super().close())
else:
return super().close()
def parse_junit_xml_files(files: Iterable[str], drop_testcases: bool = False) -> Iterable[Tuple[str, Union[JUnitXml, BaseException]]]:
"""Parses junit xml files."""
def parse(path: str) -> Union[JUnitXml, BaseException]:
if not os.path.exists(path):
return FileNotFoundError(f'File does not exist.')
if os.stat(path).st_size == 0:
return Exception(f'File is empty.')
try:
if drop_testcases:
builder = DropTestCaseBuilder()
return JUnitXml.fromfile(path, parse_func=builder.parse)
return JUnitXml.fromfile(path)
except BaseException as e:
return e
return [(result_file, parse(result_file)) for result_file in files]
def process_junit_xml_elems(elems: Iterable[Tuple[str, Union[JUnitXml, BaseException]]],
time_factor: float = 1.0) -> ParsedUnitTestResults:
junits = [(result_file, junit)
for result_file, junit in elems
if not isinstance(junit, BaseException)]
errors = [ParseError.from_exception(result_file, exception)
for result_file, exception in elems
if isinstance(exception, BaseException)]
suites = [(result_file, suite)
for result_file, junit in junits
for suite in (junit if junit._tag == "testsuites" else [junit])]
suite_tests = sum([suite.tests for result_file, suite in suites])
suite_skipped = sum([suite.skipped + suite.disabled for result_file, suite in suites])
suite_failures = sum([suite.failures for result_file, suite in suites])
suite_errors = sum([suite.errors for result_file, suite in suites])
suite_time = int(sum([suite.time for result_file, suite in suites]) * time_factor)
def int_opt(string: Optional[str]) -> Optional[int]:
try:
return int(string) if string else None
except ValueError:
return None
def get_cases(suite: TestSuite) -> List[TestCase]:
"""
JUnit seems to allow for testsuite tags inside testsuite tags, potentially at any depth.
https://llg.cubic.org/docs/junit/
This skips all inner testsuite tags and returns a list of all contained testcase tags.
"""
suites = list(suite.iterchildren(TestSuite))
cases = list(suite.iterchildren(TestCase))
return [case
for suite in suites
for case in get_cases(suite)] + cases
cases = [
UnitTestCase(
result_file=result_file,
test_file=case._elem.get('file'),
line=int_opt(case._elem.get('line')),
class_name=case.classname,
test_name=case.name,
result=get_result(results),
message=get_message(results),
content=get_content(results),
time=case.time * time_factor if case.time is not None else case.time
)
for result_file, suite in suites
for case in get_cases(suite)
if case.classname is not None or case.name is not None
for results in [get_results(case.result, case.status)]
]
return ParsedUnitTestResults(
files=len(list(elems)),
errors=errors,
# test state counts from suites
suites=len(suites),
suite_tests=suite_tests,
suite_skipped=suite_skipped,
suite_failures=suite_failures,
suite_errors=suite_errors,
suite_time=suite_time,
# test cases
cases=cases
)
@property
def disabled(self) -> int:
disabled = self._elem.get('disabled', '0')
if disabled.isnumeric():
return int(disabled)
return 0
# add special type of test case result to TestSuite
TestSuite.disabled = disabled
@property
def status(self) -> str:
return self._elem.get('status')
# special attribute of TestCase
TestCase.status = status
class Disabled(Skipped):
"""Test result when the test is disabled."""
_tag = "disabled"
def __eq__(self, other):
return super(Disabled, self).__eq__(other)