Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drop private imports on test_utils.py #2851

Closed
wants to merge 9 commits into from
12 changes: 8 additions & 4 deletions httpx/_config.py
Expand Up @@ -10,7 +10,6 @@
from ._models import Headers
from ._types import CertTypes, HeaderTypes, TimeoutTypes, URLTypes, VerifyTypes
from ._urls import URL
from ._utils import get_ca_bundle_from_env

DEFAULT_CIPHERS = ":".join(
[
Expand Down Expand Up @@ -102,9 +101,14 @@ def load_ssl_context_verify(self) -> ssl.SSLContext:
Return an SSL context for verified connections.
"""
if self.trust_env and self.verify is True:
ca_bundle = get_ca_bundle_from_env()
if ca_bundle is not None:
self.verify = ca_bundle
if "SSL_CERT_FILE" in os.environ:
ssl_file = Path(os.environ["SSL_CERT_FILE"])
if ssl_file.is_file():
self.verify = str(ssl_file)
if "SSL_CERT_DIR" in os.environ:
ssl_path = Path(os.environ["SSL_CERT_DIR"])
if ssl_path.is_dir():
self.verify = str(ssl_path)

if isinstance(self.verify, ssl.SSLContext):
# Allow passing in our own SSLContext object that's pre-configured.
Expand Down
2 changes: 1 addition & 1 deletion httpx/_models.py
Expand Up @@ -775,7 +775,7 @@ def links(self) -> typing.Dict[typing.Optional[str], typing.Dict[str, str]]:
"""
header = self.headers.get("link")
ldict = {}
if header:
if header is not None:
links = parse_header_links(header)
for link in links:
key = link.get("rel") or link.get("url")
Expand Down
13 changes: 0 additions & 13 deletions httpx/_utils.py
Expand Up @@ -6,7 +6,6 @@
import re
import time
import typing
from pathlib import Path
from urllib.request import getproxies

import sniffio
Expand Down Expand Up @@ -91,18 +90,6 @@ def replacer(match: typing.Match[str]) -> str:
return f'{name}="{value}"'.encode()


def get_ca_bundle_from_env() -> typing.Optional[str]:
if "SSL_CERT_FILE" in os.environ:
ssl_file = Path(os.environ["SSL_CERT_FILE"])
if ssl_file.is_file():
return str(ssl_file)
if "SSL_CERT_DIR" in os.environ:
ssl_path = Path(os.environ["SSL_CERT_DIR"])
if ssl_path.is_dir():
return str(ssl_path)
return None


def parse_header_links(value: str) -> typing.List[typing.Dict[str, str]]:
"""
Returns a list of parsed link headers, for more info see:
Expand Down
167 changes: 85 additions & 82 deletions tests/test_utils.py
Expand Up @@ -4,20 +4,10 @@
import random

import certifi
import httpcore
import pytest

import httpx
from httpx._utils import (
URLPattern,
get_ca_bundle_from_env,
get_environment_proxies,
is_https_redirect,
obfuscate_sensitive_headers,
parse_header_links,
same_origin,
)

from .common import TESTS_DIR


@pytest.mark.parametrize(
Expand Down Expand Up @@ -81,7 +71,8 @@ def test_guess_by_bom(encoding, expected):
),
)
def test_parse_header_links(value, expected):
assert parse_header_links(value) == expected
all_links = httpx.Response(200, headers={"link": value}).links.values()
assert all(link in all_links for link in expected)


def test_logging_request(server, caplog):
Expand Down Expand Up @@ -140,46 +131,6 @@ def test_logging_ssl(caplog):
]


def test_get_ssl_cert_file():
# Two environments is not set.
assert get_ca_bundle_from_env() is None

os.environ["SSL_CERT_DIR"] = str(TESTS_DIR)
# SSL_CERT_DIR is correctly set, SSL_CERT_FILE is not set.
ca_bundle = get_ca_bundle_from_env()
assert ca_bundle is not None and ca_bundle.endswith("tests")

del os.environ["SSL_CERT_DIR"]
os.environ["SSL_CERT_FILE"] = str(TESTS_DIR / "test_utils.py")
# SSL_CERT_FILE is correctly set, SSL_CERT_DIR is not set.
ca_bundle = get_ca_bundle_from_env()
assert ca_bundle is not None and ca_bundle.endswith("tests/test_utils.py")

os.environ["SSL_CERT_FILE"] = "wrongfile"
# SSL_CERT_FILE is set with wrong file, SSL_CERT_DIR is not set.
assert get_ca_bundle_from_env() is None

del os.environ["SSL_CERT_FILE"]
os.environ["SSL_CERT_DIR"] = "wrongpath"
# SSL_CERT_DIR is set with wrong path, SSL_CERT_FILE is not set.
assert get_ca_bundle_from_env() is None

os.environ["SSL_CERT_DIR"] = str(TESTS_DIR)
os.environ["SSL_CERT_FILE"] = str(TESTS_DIR / "test_utils.py")
# Two environments is correctly set.
ca_bundle = get_ca_bundle_from_env()
assert ca_bundle is not None and ca_bundle.endswith("tests/test_utils.py")

os.environ["SSL_CERT_FILE"] = "wrongfile"
# Two environments is set but SSL_CERT_FILE is not a file.
ca_bundle = get_ca_bundle_from_env()
assert ca_bundle is not None and ca_bundle.endswith("tests")

os.environ["SSL_CERT_DIR"] = "wrongpath"
# Two environments is set but both are not correct.
assert get_ca_bundle_from_env() is None


@pytest.mark.parametrize(
["environment", "proxies"],
[
Expand All @@ -201,9 +152,31 @@ def test_get_ssl_cert_file():
],
)
def test_get_environment_proxies(environment, proxies):
as_classes = {
pattern: None if proxy is None else httpx.Proxy(url=proxy)
for pattern, proxy in proxies.items()
}

os.environ.update(environment)
client = httpx.Client()

for pat, transport in client._mounts.items():
expected = as_classes[pat.pattern]
if transport is None:
assert expected is None
else:
assert expected is not None

assert get_environment_proxies() == proxies
assert isinstance(transport, httpx.HTTPTransport)
assert isinstance(
transport._pool, (httpcore.HTTPProxy, httpcore.SOCKSProxy)
)
proxy_url = transport._pool._proxy_url

assert proxy_url.scheme == expected.url.raw_scheme
assert proxy_url.host == expected.url.raw_host
assert proxy_url.port == expected.url.port
assert proxy_url.target == expected.url.raw_path


@pytest.mark.parametrize(
Expand All @@ -215,40 +188,65 @@ def test_get_environment_proxies(environment, proxies):
],
)
def test_obfuscate_sensitive_headers(headers, output):
bytes_headers = [(k.encode(), v.encode()) for k, v in headers]
bytes_output = [(k.encode(), v.encode()) for k, v in output]
assert list(obfuscate_sensitive_headers(headers)) == output
assert list(obfuscate_sensitive_headers(bytes_headers)) == bytes_output
as_dict = {k: v for k, v in output}
headers_class = httpx.Headers({k: v for k, v in headers})
assert repr(headers_class) == f"Headers({as_dict!r})"


def test_same_origin():
origin1 = httpx.URL("https://example.com")
origin2 = httpx.URL("HTTPS://EXAMPLE.COM:443")
assert same_origin(origin1, origin2)
origin = httpx.URL("https://example.com")
request = httpx.Request("GET", "HTTPS://EXAMPLE.COM:443")

client = httpx.Client()
headers = client._redirect_headers(request, origin, "GET")

assert headers["Host"] == request.url.netloc.decode("ascii")


def test_not_same_origin():
origin1 = httpx.URL("https://example.com")
origin2 = httpx.URL("HTTP://EXAMPLE.COM")
assert not same_origin(origin1, origin2)
origin = httpx.URL("https://example.com")
request = httpx.Request("GET", "HTTP://EXAMPLE.COM:80")

client = httpx.Client()
headers = client._redirect_headers(request, origin, "GET")

assert headers["Host"] == origin.netloc.decode("ascii")


def test_is_https_redirect():
url = httpx.URL("http://example.com")
location = httpx.URL("https://example.com")
assert is_https_redirect(url, location)
url = httpx.URL("https://example.com")
request = httpx.Request(
"GET", "http://example.com", headers={"Authorization": "empty"}
)

client = httpx.Client()
headers = client._redirect_headers(request, url, "GET")

assert "Authorization" in headers


def test_is_not_https_redirect():
url = httpx.URL("http://example.com")
location = httpx.URL("https://www.example.com")
assert not is_https_redirect(url, location)
url = httpx.URL("https://www.example.com")
request = httpx.Request(
"GET", "http://example.com", headers={"Authorization": "empty"}
)

client = httpx.Client()
headers = client._redirect_headers(request, url, "GET")

assert "Authorization" not in headers


def test_is_not_https_redirect_if_not_default_ports():
url = httpx.URL("http://example.com:9999")
location = httpx.URL("https://example.com:1337")
assert not is_https_redirect(url, location)
url = httpx.URL("https://example.com:1337")
request = httpx.Request(
"GET", "http://example.com:9999", headers={"Authorization": "empty"}
)

client = httpx.Client()
headers = client._redirect_headers(request, url, "GET")

assert "Authorization" not in headers


@pytest.mark.parametrize(
Expand All @@ -269,21 +267,26 @@ def test_is_not_https_redirect_if_not_default_ports():
],
)
def test_url_matches(pattern, url, expected):
pattern = URLPattern(pattern)
client = httpx.Client(mounts={pattern: httpx.BaseTransport()})
pattern = next(iter(client._mounts))
assert pattern.matches(httpx.URL(url)) == expected


def test_pattern_priority():
matchers = [
URLPattern("all://"),
URLPattern("http://"),
URLPattern("http://example.com"),
URLPattern("http://example.com:123"),
"all://",
"http://",
"http://example.com",
"http://example.com:123",
]
random.shuffle(matchers)
assert sorted(matchers) == [
URLPattern("http://example.com:123"),
URLPattern("http://example.com"),
URLPattern("http://"),
URLPattern("all://"),

transport = httpx.BaseTransport()
client = httpx.Client(mounts={m: transport for m in matchers})

assert [pat.pattern for pat in client._mounts] == [
"http://example.com:123",
"http://example.com",
"http://",
"all://",
]