Skip to content

Commit

Permalink
Drop private imports on test_utils.py
Browse files Browse the repository at this point in the history
  • Loading branch information
T-256 authored and Tester committed Oct 30, 2023
1 parent e63b659 commit c97f840
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 141 deletions.
12 changes: 8 additions & 4 deletions httpx/_config.py
Expand Up @@ -10,7 +10,6 @@
from ._models import Headers
from ._types import CertTypes, HeaderTypes, TimeoutTypes, URLTypes, VerifyTypes
from ._urls import URL
from ._utils import get_ca_bundle_from_env

DEFAULT_CIPHERS = ":".join(
[
Expand Down Expand Up @@ -102,9 +101,14 @@ def load_ssl_context_verify(self) -> ssl.SSLContext:
Return an SSL context for verified connections.
"""
if self.trust_env and self.verify is True:
ca_bundle = get_ca_bundle_from_env()
if ca_bundle is not None:
self.verify = ca_bundle
if "SSL_CERT_FILE" in os.environ:
ssl_file = Path(os.environ["SSL_CERT_FILE"])
if ssl_file.is_file():
self.verify = str(ssl_file)
if "SSL_CERT_DIR" in os.environ:
ssl_path = Path(os.environ["SSL_CERT_DIR"])
if ssl_path.is_dir():
self.verify = str(ssl_path)

if isinstance(self.verify, ssl.SSLContext):
# Allow passing in our own SSLContext object that's pre-configured.
Expand Down
34 changes: 31 additions & 3 deletions httpx/_models.py
@@ -1,3 +1,4 @@
import codecs
import datetime
import email.message
import json as jsonlib
Expand Down Expand Up @@ -43,7 +44,6 @@
)
from ._urls import URL
from ._utils import (
guess_json_utf,
is_known_encoding,
normalize_header_key,
normalize_header_value,
Expand Down Expand Up @@ -758,9 +758,37 @@ def raise_for_status(self) -> "Response":
message = message.format(self, error_type=error_type)
raise HTTPStatusError(message, request=request, response=self)

def _guess_content_json_utf(self) -> typing.Optional[str]:
# JSON always starts with two ASCII characters, so detection is as
# easy as counting the nulls and from their location and count
# determine the encoding. Also detect a BOM, if present.
sample = self.content[:4]
if sample in (codecs.BOM_UTF32_LE, codecs.BOM_UTF32_BE):
return "utf-32" # BOM included
if sample[:3] == codecs.BOM_UTF8:
return "utf-8-sig" # BOM included, MS style (discouraged)
if sample[:2] in (codecs.BOM_UTF16_LE, codecs.BOM_UTF16_BE):
return "utf-16" # BOM included
nullcount = sample.count(b"\0")
if nullcount == 0:
return "utf-8"
if nullcount == 2:
if sample[::2] == b"\0\0": # 1st and 3rd are null
return "utf-16-be"
if sample[1::2] == b"\0\0": # 2nd and 4th are null
return "utf-16-le"
# Did not detect 2 valid UTF-16 ascii-range characters
if nullcount == 3:
if sample[:3] == b"\0\0\0":
return "utf-32-be"
if sample[1:] == b"\0\0\0":
return "utf-32-le"
# Did not detect a valid UTF-32 ascii-range character
return None

def json(self, **kwargs: typing.Any) -> typing.Any:
if self.charset_encoding is None and self.content and len(self.content) > 3:
encoding = guess_json_utf(self.content)
encoding = self._guess_content_json_utf()
if encoding is not None:
return jsonlib.loads(self.content.decode(encoding), **kwargs)
return jsonlib.loads(self.text, **kwargs)
Expand All @@ -779,7 +807,7 @@ def links(self) -> typing.Dict[typing.Optional[str], typing.Dict[str, str]]:
"""
header = self.headers.get("link")
ldict = {}
if header:
if header is not None:
links = parse_header_links(header)
for link in links:
key = link.get("rel") or link.get("url")
Expand Down
48 changes: 0 additions & 48 deletions httpx/_utils.py
Expand Up @@ -6,7 +6,6 @@
import re
import time
import typing
from pathlib import Path
from urllib.request import getproxies

import sniffio
Expand Down Expand Up @@ -91,53 +90,6 @@ def replacer(match: typing.Match[str]) -> str:
return f'{name}="{value}"'.encode()


# Null bytes; no need to recreate these on each call to guess_json_utf
_null = b"\x00"
_null2 = _null * 2
_null3 = _null * 3


def guess_json_utf(data: bytes) -> typing.Optional[str]:
# JSON always starts with two ASCII characters, so detection is as
# easy as counting the nulls and from their location and count
# determine the encoding. Also detect a BOM, if present.
sample = data[:4]
if sample in (codecs.BOM_UTF32_LE, codecs.BOM_UTF32_BE):
return "utf-32" # BOM included
if sample[:3] == codecs.BOM_UTF8:
return "utf-8-sig" # BOM included, MS style (discouraged)
if sample[:2] in (codecs.BOM_UTF16_LE, codecs.BOM_UTF16_BE):
return "utf-16" # BOM included
nullcount = sample.count(_null)
if nullcount == 0:
return "utf-8"
if nullcount == 2:
if sample[::2] == _null2: # 1st and 3rd are null
return "utf-16-be"
if sample[1::2] == _null2: # 2nd and 4th are null
return "utf-16-le"
# Did not detect 2 valid UTF-16 ascii-range characters
if nullcount == 3:
if sample[:3] == _null3:
return "utf-32-be"
if sample[1:] == _null3:
return "utf-32-le"
# Did not detect a valid UTF-32 ascii-range character
return None


def get_ca_bundle_from_env() -> typing.Optional[str]:
if "SSL_CERT_FILE" in os.environ:
ssl_file = Path(os.environ["SSL_CERT_FILE"])
if ssl_file.is_file():
return str(ssl_file)
if "SSL_CERT_DIR" in os.environ:
ssl_path = Path(os.environ["SSL_CERT_DIR"])
if ssl_path.is_dir():
return str(ssl_path)
return None


def parse_header_links(value: str) -> typing.List[typing.Dict[str, str]]:
"""
Returns a list of parsed link headers, for more info see:
Expand Down

0 comments on commit c97f840

Please sign in to comment.