/
test_blackd.py
221 lines (190 loc) · 7.97 KB
/
test_blackd.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
import re
from typing import TYPE_CHECKING, Any, Callable, TypeVar
from unittest.mock import patch
import pytest
from click.testing import CliRunner
from tests.util import DETERMINISTIC_HEADER, read_data
try:
from aiohttp import web
from aiohttp.test_utils import AioHTTPTestCase
import blackd
except ImportError as e:
raise RuntimeError("Please install Black with the 'd' extra") from e
if TYPE_CHECKING:
F = TypeVar("F", bound=Callable[..., Any])
unittest_run_loop: Callable[[F], F] = lambda x: x
else:
try:
from aiohttp.test_utils import unittest_run_loop
except ImportError:
# unittest_run_loop is unnecessary and a no-op since aiohttp 3.8, and
# aiohttp 4 removed it. To maintain compatibility we can make our own
# no-op decorator.
def unittest_run_loop(func, *args, **kwargs):
return func
@pytest.mark.blackd
class BlackDTestCase(AioHTTPTestCase): # type: ignore[misc]
def test_blackd_main(self) -> None:
with patch("blackd.web.run_app"):
result = CliRunner().invoke(blackd.main, [])
if result.exception is not None:
raise result.exception
self.assertEqual(result.exit_code, 0)
async def get_application(self) -> web.Application:
return blackd.make_app()
@unittest_run_loop
async def test_blackd_request_needs_formatting(self) -> None:
response = await self.client.post("/", data=b"print('hello world')")
self.assertEqual(response.status, 200)
self.assertEqual(response.charset, "utf8")
self.assertEqual(await response.read(), b'print("hello world")\n')
@unittest_run_loop
async def test_blackd_request_no_change(self) -> None:
response = await self.client.post("/", data=b'print("hello world")\n')
self.assertEqual(response.status, 204)
self.assertEqual(await response.read(), b"")
@unittest_run_loop
async def test_blackd_request_syntax_error(self) -> None:
response = await self.client.post("/", data=b"what even ( is")
self.assertEqual(response.status, 400)
content = await response.text()
self.assertTrue(
content.startswith("Cannot parse"),
msg=f"Expected error to start with 'Cannot parse', got {repr(content)}",
)
@unittest_run_loop
async def test_blackd_unsupported_version(self) -> None:
response = await self.client.post(
"/", data=b"what", headers={blackd.PROTOCOL_VERSION_HEADER: "2"}
)
self.assertEqual(response.status, 501)
@unittest_run_loop
async def test_blackd_supported_version(self) -> None:
response = await self.client.post(
"/", data=b"what", headers={blackd.PROTOCOL_VERSION_HEADER: "1"}
)
self.assertEqual(response.status, 200)
@unittest_run_loop
async def test_blackd_invalid_python_variant(self) -> None:
async def check(header_value: str, expected_status: int = 400) -> None:
response = await self.client.post(
"/",
data=b"what",
headers={blackd.PYTHON_VARIANT_HEADER: header_value},
)
self.assertEqual(response.status, expected_status)
await check("lol")
await check("ruby3.5")
await check("pyi3.6")
await check("py1.5")
await check("2")
await check("2.7")
await check("py2.7")
await check("2.8")
await check("py2.8")
await check("3.0")
await check("pypy3.0")
await check("jython3.4")
@unittest_run_loop
async def test_blackd_pyi(self) -> None:
source, expected = read_data("miscellaneous", "stub.pyi")
response = await self.client.post(
"/", data=source, headers={blackd.PYTHON_VARIANT_HEADER: "pyi"}
)
self.assertEqual(response.status, 200)
self.assertEqual(await response.text(), expected)
@unittest_run_loop
async def test_blackd_diff(self) -> None:
diff_header = re.compile(
r"(In|Out)\t\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d\.\d\d\d\d\d\d \+\d\d\d\d"
)
source, _ = read_data("miscellaneous", "blackd_diff")
expected, _ = read_data("miscellaneous", "blackd_diff.diff")
response = await self.client.post(
"/", data=source, headers={blackd.DIFF_HEADER: "true"}
)
self.assertEqual(response.status, 200)
actual = await response.text()
actual = diff_header.sub(DETERMINISTIC_HEADER, actual)
self.assertEqual(actual, expected)
@unittest_run_loop
async def test_blackd_python_variant(self) -> None:
code = (
"def f(\n"
" and_has_a_bunch_of,\n"
" very_long_arguments_too,\n"
" and_lots_of_them_as_well_lol,\n"
" **and_very_long_keyword_arguments\n"
"):\n"
" pass\n"
)
async def check(header_value: str, expected_status: int) -> None:
response = await self.client.post(
"/", data=code, headers={blackd.PYTHON_VARIANT_HEADER: header_value}
)
self.assertEqual(
response.status, expected_status, msg=await response.text()
)
await check("3.6", 200)
await check("py3.6", 200)
await check("3.6,3.7", 200)
await check("3.6,py3.7", 200)
await check("py36,py37", 200)
await check("36", 200)
await check("3.6.4", 200)
await check("3.4", 204)
await check("py3.4", 204)
await check("py34,py36", 204)
await check("34", 204)
@unittest_run_loop
async def test_blackd_line_length(self) -> None:
response = await self.client.post(
"/", data=b'print("hello")\n', headers={blackd.LINE_LENGTH_HEADER: "7"}
)
self.assertEqual(response.status, 200)
@unittest_run_loop
async def test_blackd_invalid_line_length(self) -> None:
response = await self.client.post(
"/",
data=b'print("hello")\n',
headers={blackd.LINE_LENGTH_HEADER: "NaN"},
)
self.assertEqual(response.status, 400)
@unittest_run_loop
async def test_blackd_preview(self) -> None:
response = await self.client.post(
"/", data=b'print("hello")\n', headers={blackd.PREVIEW: "true"}
)
self.assertEqual(response.status, 204)
@unittest_run_loop
async def test_blackd_response_black_version_header(self) -> None:
response = await self.client.post("/")
self.assertIsNotNone(response.headers.get(blackd.BLACK_VERSION_HEADER))
@unittest_run_loop
async def test_cors_preflight(self) -> None:
response = await self.client.options(
"/",
headers={
"Access-Control-Request-Method": "POST",
"Origin": "*",
"Access-Control-Request-Headers": "Content-Type",
},
)
self.assertEqual(response.status, 200)
self.assertIsNotNone(response.headers.get("Access-Control-Allow-Origin"))
self.assertIsNotNone(response.headers.get("Access-Control-Allow-Headers"))
self.assertIsNotNone(response.headers.get("Access-Control-Allow-Methods"))
@unittest_run_loop
async def test_cors_headers_present(self) -> None:
response = await self.client.post("/", headers={"Origin": "*"})
self.assertIsNotNone(response.headers.get("Access-Control-Allow-Origin"))
self.assertIsNotNone(response.headers.get("Access-Control-Expose-Headers"))
@unittest_run_loop
async def test_preserve_newline(self) -> None:
for data in (b"c\r\nc\r\n", b"l\nl\n"):
# test preserved newlines when reformatted
response = await self.client.post("/", data=data + b" ")
self.assertEqual(await response.text(), data.decode())
# test 204 when no change
response = await self.client.post("/", data=data)
self.assertEqual(response.status, 204)