Skip to content

Commit

Permalink
Preserve blank form values for urlencoded forms (option) (#2439)
Browse files Browse the repository at this point in the history
* task(request.form): Add tests for blank values

* fix(request): abstract form property to implement get_form(), allow for preserving of blanks

* fix(request): hinting for parsed_form

* fix(request): typing for parsed_files

* fix(request): ignore type assumption

* fix(request): mypy typechecking caused E501 when type set to ignore

* fix(request): mypy is too stupid to parse continuations

* fix(request): formatting

* fix(request): fix annotation and return for get_form()

* fix(request): linting, hinting
  • Loading branch information
sjsadowski committed Apr 24, 2022
1 parent 3a6cc73 commit 78b6723
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 21 deletions.
54 changes: 33 additions & 21 deletions sanic/request.py
Expand Up @@ -152,8 +152,8 @@ def __init__(
self.parsed_accept: Optional[AcceptContainer] = None
self.parsed_credentials: Optional[Credentials] = None
self.parsed_json = None
self.parsed_form = None
self.parsed_files = None
self.parsed_form: Optional[RequestParameters] = None
self.parsed_files: Optional[RequestParameters] = None
self.parsed_token: Optional[str] = None
self.parsed_args: DefaultDict[
Tuple[bool, bool, str, str], RequestParameters
Expand Down Expand Up @@ -426,28 +426,40 @@ def credentials(self) -> Optional[Credentials]:
pass
return self.parsed_credentials

def get_form(
self, keep_blank_values: bool = False
) -> Optional[RequestParameters]:
self.parsed_form = RequestParameters()
self.parsed_files = RequestParameters()
content_type = self.headers.getone(
"content-type", DEFAULT_HTTP_CONTENT_TYPE
)
content_type, parameters = parse_content_header(content_type)
try:
if content_type == "application/x-www-form-urlencoded":
self.parsed_form = RequestParameters(
parse_qs(
self.body.decode("utf-8"),
keep_blank_values=keep_blank_values,
)
)
elif content_type == "multipart/form-data":
# TODO: Stream this instead of reading to/from memory
boundary = parameters["boundary"].encode( # type: ignore
"utf-8"
) # type: ignore
self.parsed_form, self.parsed_files = parse_multipart_form(
self.body, boundary
)
except Exception:
error_logger.exception("Failed when parsing form")

return self.parsed_form

@property
def form(self):
if self.parsed_form is None:
self.parsed_form = RequestParameters()
self.parsed_files = RequestParameters()
content_type = self.headers.getone(
"content-type", DEFAULT_HTTP_CONTENT_TYPE
)
content_type, parameters = parse_content_header(content_type)
try:
if content_type == "application/x-www-form-urlencoded":
self.parsed_form = RequestParameters(
parse_qs(self.body.decode("utf-8"))
)
elif content_type == "multipart/form-data":
# TODO: Stream this instead of reading to/from memory
boundary = parameters["boundary"].encode("utf-8")
self.parsed_form, self.parsed_files = parse_multipart_form(
self.body, boundary
)
except Exception:
error_logger.exception("Failed when parsing form")
self.get_form()

return self.parsed_form

Expand Down
66 changes: 66 additions & 0 deletions tests/test_requests.py
Expand Up @@ -1016,6 +1016,72 @@ async def handler(request):
assert request.form.get("test") == "OK" # For request.parsed_form


def test_post_form_urlencoded_keep_blanks(app):
@app.route("/", methods=["POST"])
async def handler(request):
request.get_form(keep_blank_values=True)
return text("OK")

payload = "test="
headers = {"content-type": "application/x-www-form-urlencoded"}

request, response = app.test_client.post(
"/", data=payload, headers=headers
)

assert request.form.get("test") == ""
assert request.form.get("test") == "" # For request.parsed_form


@pytest.mark.asyncio
async def test_post_form_urlencoded_keep_blanks_asgi(app):
@app.route("/", methods=["POST"])
async def handler(request):
request.get_form(keep_blank_values=True)
return text("OK")

payload = "test="
headers = {"content-type": "application/x-www-form-urlencoded"}

request, response = await app.asgi_client.post(
"/", data=payload, headers=headers
)

assert request.form.get("test") == ""
assert request.form.get("test") == "" # For request.parsed_form



def test_post_form_urlencoded_drop_blanks(app):
@app.route("/", methods=["POST"])
async def handler(request):
return text("OK")

payload = "test="
headers = {"content-type": "application/x-www-form-urlencoded"}

request, response = app.test_client.post(
"/", data=payload, headers=headers
)

assert "test" not in request.form.keys()

@pytest.mark.asyncio
async def test_post_form_urlencoded_drop_blanks_asgi(app):
@app.route("/", methods=["POST"])
async def handler(request):
return text("OK")

payload = "test="
headers = {"content-type": "application/x-www-form-urlencoded"}

request, response = await app.asgi_client.post(
"/", data=payload, headers=headers
)

assert "test" not in request.form.keys()


@pytest.mark.parametrize(
"payload",
[
Expand Down

0 comments on commit 78b6723

Please sign in to comment.