Skip to content

Commit

Permalink
extractring utils functions to sansio
Browse files Browse the repository at this point in the history
This is to make these functions available to non-WSGI uses.
  • Loading branch information
rrahkola authored and pgjones committed Jul 22, 2022
1 parent b42d1a4 commit 81d2ed0
Show file tree
Hide file tree
Showing 3 changed files with 200 additions and 73 deletions.
4 changes: 3 additions & 1 deletion CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ Version 2.2.0
URLs. :issue:`2388`
- The debugger shows enhanced error locations in tracebacks in Python
3.11. :issue:`2407`

- Extracted is_resource_modified and parse_cookie from http.py
to sansio/http.py. :issue:`2408`
- Extracted utility get_content_length, get_query_string, get_path_info
functions from wsgi.py. :pr:`2415`


Version 2.1.2
-------------
Expand Down
162 changes: 162 additions & 0 deletions src/werkzeug/sansio/utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import typing as t

from .._internal import _encode_idna
from .._internal import _to_str
from ..exceptions import SecurityError
from ..urls import _URLTuple
from ..urls import uri_to_iri
from ..urls import url_join
from ..urls import url_parse
from ..urls import url_quote


Expand Down Expand Up @@ -140,3 +144,161 @@ def get_current_url(
url.append(url_quote(query_string, safe=":&%=+$!*'(),"))

return uri_to_iri("".join(url))


def get_content_length(
http_content_length: t.Union[str, None] = None,
http_transfer_encoding: t.Union[str, None] = "",
) -> t.Optional[int]:
"""Returns the content length as an integer or ``None`` if
unavailable or chunked transfer encoding is used.
:param http_content_length: The Content-Length HTTP header.
:param http_transfer_encoding: The Transfer-Encoding HTTP header.
.. versionchanged:: 2.2
Using explicit header parameters to support ASGI.
.. versionadded:: 0.9
"""
if http_transfer_encoding == "chunked":
return None

if http_content_length is not None:
try:
return max(0, int(http_content_length))
except (ValueError, TypeError):
pass
return None


def get_query_string(query_string: str = "") -> str:
"""Returns a sanitized query string.
:param query_string: The (potentially unsafe) query string.
.. versionchanged: 2.2
Using explicit string parameter to support ASGI.
.. versionadded:: 0.9
"""
qs = query_string.encode("latin1")
# QUERY_STRING really should be ascii safe but some browsers
# will send us some unicode stuff (I am looking at you IE).
# In that case we want to urllib quote it badly.
return url_quote(qs, safe=":&%=+$!*'(),")


def get_path_info(
path: str = "", charset: str = "utf-8", errors: str = "replace"
) -> str:
"""Return the decoded ``path`` unless ``charset`` is ``None``.
:param path_info: The URL path.
:param charset: The charset for the path info, or ``None`` if no
decoding should be performed.
:param errors: The decoding error handling.
.. versionchanged: 2.2
Using explicit string parameter to support ASGI.
.. versionadded:: 0.9
"""
path = path.encode("latin1")
return _to_str(path, charset, errors, allow_none_charset=True)


def extract_path_info(
baseurl: str,
path_or_url: t.Union[str, _URLTuple],
charset: str = "utf-8",
errors: str = "werkzeug.url_quote",
collapse_http_schemes: bool = True,
) -> t.Optional[str]:
"""Extracts the path info as a string from the baseurl and path.
The URLs might also be IRIs.
If the path info could not be determined, `None` is returned.
Some examples:
>>> extract_path_info('http://example.com/app', '/app/hello')
'/hello'
>>> extract_path_info('http://example.com/app',
... 'https://example.com/app/hello')
'/hello'
>>> extract_path_info('http://example.com/app',
... 'https://example.com/app/hello',
... collapse_http_schemes=False) is None
True
:param baseurl: a base URL or base IRI.
This is the root of the application.
:param path_or_url: an absolute path from the server root, a
relative path (in which case it's the path info)
or a full URL.
:param charset: the charset for byte data in URLs
:param errors: the error handling on decode
:param collapse_http_schemes: if set to `False` the algorithm does
not assume that http and https on the
same server point to the same
resource.
.. versionchanged: 2.2
Using explicit baseurl string parameter to support ASGI.
.. versionchanged:: 0.15
The ``errors`` parameter defaults to leaving invalid bytes
quoted instead of replacing them.
.. versionadded:: 0.6
"""

def _normalize_netloc(scheme: str, netloc: str) -> str:
parts = netloc.split("@", 1)[-1].split(":", 1)
port: t.Optional[str]

if len(parts) == 2:
netloc, port = parts
if (scheme == "http" and port == "80") or (
scheme == "https" and port == "443"
):
port = None
else:
netloc = parts[0]
port = None

if port is not None:
netloc += f":{port}"

return netloc

# make sure whatever we are working on is a IRI and parse it
path = uri_to_iri(path_or_url, charset, errors)
base_iri = uri_to_iri(baseurl, charset, errors)
base_scheme, base_netloc, base_path = url_parse(base_iri)[:3]
cur_scheme, cur_netloc, cur_path = url_parse(url_join(base_iri, path))[:3]

# normalize the network location
base_netloc = _normalize_netloc(base_scheme, base_netloc)
cur_netloc = _normalize_netloc(cur_scheme, cur_netloc)

# is that IRI even on a known HTTP scheme?
if collapse_http_schemes:
for scheme in base_scheme, cur_scheme:
if scheme not in ("http", "https"):
return None
else:
if not (base_scheme in ("http", "https") and base_scheme == cur_scheme):
return None

# are the netlocs compatible?
if base_netloc != cur_netloc:
return None

# are we below the application path?
base_path = base_path.rstrip("/")
if not cur_path.startswith(base_path):
return None

return f"/{cur_path[len(base_path) :].lstrip('/')}"
107 changes: 35 additions & 72 deletions src/werkzeug/wsgi.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,6 @@
from .sansio import utils as _sansio_utils
from .sansio.utils import host_is_trusted # noqa: F401 # Imported as part of API
from .urls import _URLTuple
from .urls import uri_to_iri
from .urls import url_join
from .urls import url_parse
from .urls import url_quote

if t.TYPE_CHECKING:
from _typeshed.wsgi import WSGIApplication
Expand Down Expand Up @@ -122,20 +118,17 @@ def get_content_length(environ: "WSGIEnvironment") -> t.Optional[int]:
integer. If it's not available or chunked transfer encoding is used,
``None`` is returned.
.. versionchanged:: 2.2
Extracted this to sansio/util.py
.. versionadded:: 0.9
:param environ: the WSGI environ to fetch the content length from.
"""
if environ.get("HTTP_TRANSFER_ENCODING", "") == "chunked":
return None

content_length = environ.get("CONTENT_LENGTH")
if content_length is not None:
try:
return max(0, int(content_length))
except (ValueError, TypeError):
pass
return None
return _sansio_utils.get_content_length(
http_content_length=environ.get("CONTENT_LENGTH"),
http_transfer_encoding=environ.get("HTTP_TRANSFER_ENCODING", ""),
)


def get_input_stream(
Expand Down Expand Up @@ -183,13 +176,12 @@ def get_query_string(environ: "WSGIEnvironment") -> str:
:param environ: WSGI environment to get the query string from.
.. versionchanged:: 2.2
Extracted this to sansio/util.py
.. versionadded:: 0.9
"""
qs = environ.get("QUERY_STRING", "").encode("latin1")
# QUERY_STRING really should be ascii safe but some browsers
# will send us some unicode stuff (I am looking at you IE).
# In that case we want to urllib quote it badly.
return url_quote(qs, safe=":&%=+$!*'(),")
return _sansio_utils.get_query_string(query_string=environ.get("QUERY_STRING", ""))


def get_path_info(
Expand All @@ -203,10 +195,14 @@ def get_path_info(
decoding should be performed.
:param errors: The decoding error handling.
.. versionchanged:: 2.2
Extracted this to sansio/util.py
.. versionadded:: 0.9
"""
path = environ.get("PATH_INFO", "").encode("latin1")
return _to_str(path, charset, errors, allow_none_charset=True) # type: ignore
return _sansio_utils.get_path_info(
path=environ.get("PATH_INFO", ""), charset=charset, errors=errors
)


def get_script_name(
Expand All @@ -220,10 +216,14 @@ def get_script_name(
should be performed.
:param errors: The decoding error handling.
.. versionchanged:: 2.2
Extracted this to sansio/util.py
.. versionadded:: 0.9
"""
path = environ.get("SCRIPT_NAME", "").encode("latin1")
return _to_str(path, charset, errors, allow_none_charset=True) # type: ignore
return _sansio_utils.get_path_info(
path=environ.get("SCRIPT_NAME", ""), charset=charset, errors=errors
)


def pop_path_info(
Expand Down Expand Up @@ -354,63 +354,26 @@ def extract_path_info(
same server point to the same
resource.
.. versionchanged:: 2.2
Extracted this to sansio/util.py
.. versionchanged:: 0.15
The ``errors`` parameter defaults to leaving invalid bytes
quoted instead of replacing them.
.. versionadded:: 0.6
"""

def _normalize_netloc(scheme: str, netloc: str) -> str:
parts = netloc.split("@", 1)[-1].split(":", 1)
port: t.Optional[str]

if len(parts) == 2:
netloc, port = parts
if (scheme == "http" and port == "80") or (
scheme == "https" and port == "443"
):
port = None
else:
netloc = parts[0]
port = None

if port is not None:
netloc += f":{port}"

return netloc

# make sure whatever we are working on is a IRI and parse it
path = uri_to_iri(path_or_url, charset, errors)
if isinstance(environ_or_baseurl, dict):
environ_or_baseurl = get_current_url(environ_or_baseurl, root_only=True)
base_iri = uri_to_iri(environ_or_baseurl, charset, errors)
base_scheme, base_netloc, base_path = url_parse(base_iri)[:3]
cur_scheme, cur_netloc, cur_path = url_parse(url_join(base_iri, path))[:3]

# normalize the network location
base_netloc = _normalize_netloc(base_scheme, base_netloc)
cur_netloc = _normalize_netloc(cur_scheme, cur_netloc)

# is that IRI even on a known HTTP scheme?
if collapse_http_schemes:
for scheme in base_scheme, cur_scheme:
if scheme not in ("http", "https"):
return None
baseurl = get_current_url(environ_or_baseurl, root_only=True)
else:
if not (base_scheme in ("http", "https") and base_scheme == cur_scheme):
return None

# are the netlocs compatible?
if base_netloc != cur_netloc:
return None

# are we below the application path?
base_path = base_path.rstrip("/")
if not cur_path.startswith(base_path):
return None

return f"/{cur_path[len(base_path) :].lstrip('/')}"
baseurl = environ_or_baseurl
return _sansio_utils.extract_path_info(
baseurl=baseurl,
path_or_url=path_or_url,
charset=charset,
errors=errors,
collapse_http_schemes=collapse_http_schemes,
)


class ClosingIterator:
Expand Down

0 comments on commit 81d2ed0

Please sign in to comment.