Skip to content

Commit

Permalink
Add type checking with Mypy
Browse files Browse the repository at this point in the history
  • Loading branch information
adamchainz committed Nov 8, 2022
1 parent da10745 commit 6d403ed
Show file tree
Hide file tree
Showing 26 changed files with 342 additions and 118 deletions.
8 changes: 8 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,11 @@ repos:
- flake8-comprehensions
- flake8-tidy-imports
- flake8-typing-imports
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v0.990
hooks:
- id: mypy
additional_dependencies:
- django-stubs==1.12.0
- requests
- types-requests
15 changes: 15 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,21 @@ build-backend = "setuptools.build_meta"
[tool.black]
target-version = ['py37']

[tool.django-stubs]
django_settings_module = "tests.django_settings"

[tool.mypy]
mypy_path = "src/"
namespace_packages = false
plugins = ["mypy_django_plugin.main"]
show_error_codes = true
strict = true
warn_unreachable = true

[[tool.mypy.overrides]]
module = "tests.*"
allow_untyped_defs = true

[tool.pytest.ini_options]
addopts = """\
--strict-config
Expand Down
3 changes: 2 additions & 1 deletion requirements/py37-django32.txt
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ sqlparse==0.4.3
# via django
tomli==2.0.1
# via pytest
typing-extensions==4.4.0
typing-extensions==4.4.0 ; python_version < "3.10"
# via
# -r requirements.in
# asgiref
# importlib-metadata
urllib3==1.26.12
Expand Down
2 changes: 2 additions & 0 deletions requirements/py38-django32.txt
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ sqlparse==0.4.3
# via django
tomli==2.0.1
# via pytest
typing-extensions==4.4.0 ; python_version < "3.10"
# via -r requirements.in
urllib3==1.26.12
# via requests
zipp==3.10.0
Expand Down
2 changes: 2 additions & 0 deletions requirements/py38-django40.txt
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ sqlparse==0.4.3
# via django
tomli==2.0.1
# via pytest
typing-extensions==4.4.0 ; python_version < "3.10"
# via -r requirements.in
urllib3==1.26.12
# via requests
zipp==3.10.0
Expand Down
2 changes: 2 additions & 0 deletions requirements/py38-django41.txt
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ sqlparse==0.4.3
# via django
tomli==2.0.1
# via pytest
typing-extensions==4.4.0 ; python_version < "3.10"
# via -r requirements.in
urllib3==1.26.12
# via requests
zipp==3.10.0
Expand Down
2 changes: 2 additions & 0 deletions requirements/py39-django32.txt
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ sqlparse==0.4.3
# via django
tomli==2.0.1
# via pytest
typing-extensions==4.4.0 ; python_version < "3.10"
# via -r requirements.in
urllib3==1.26.12
# via requests
zipp==3.10.0
Expand Down
2 changes: 2 additions & 0 deletions requirements/py39-django40.txt
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ sqlparse==0.4.3
# via django
tomli==2.0.1
# via pytest
typing-extensions==4.4.0 ; python_version < "3.10"
# via -r requirements.in
urllib3==1.26.12
# via requests
zipp==3.10.0
Expand Down
2 changes: 2 additions & 0 deletions requirements/py39-django41.txt
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ sqlparse==0.4.3
# via django
tomli==2.0.1
# via pytest
typing-extensions==4.4.0 ; python_version < "3.10"
# via -r requirements.in
urllib3==1.26.12
# via requests
zipp==3.10.0
Expand Down
1 change: 1 addition & 0 deletions requirements/requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ django
pytest
pytest-randomly
requests
typing-extensions ; python_version < "3.10"
2 changes: 2 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ package_dir=
=src
packages = find:
include_package_data = True
install_requires =
typing-extensions ; python_version < "3.10"
python_requires = >=3.7
zip_safe = False

Expand Down
72 changes: 47 additions & 25 deletions src/whitenoise/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
import re
import warnings
from posixpath import normpath
from typing import Callable
from typing import Callable, Generator, Iterable
from wsgiref.headers import Headers
from wsgiref.util import FileWrapper

from .compat import StartResponse, WSGIApplication, WSGIEnvironment
from .media_types import MediaTypes
from .responders import IsDirectoryError
from .responders import MissingFileError
Expand All @@ -26,9 +27,9 @@ class WhiteNoise:

def __init__(
self,
application,
root=None,
prefix=None,
application: WSGIApplication,
root: str | os.PathLike[str] | None = None,
prefix: str | None = None,
*,
# Re-check the filesystem on every request so that any changes are
# automatically picked up. NOTE: For use in development only, not supported
Expand All @@ -45,7 +46,7 @@ def __init__(
mimetypes: dict[str, str] | None = None,
add_headers_function: Callable[[Headers, str, str], None] | None = None,
index_file: str | bool | None = None,
immutable_file_test: Callable | str | None = None,
immutable_file_test: Callable[[str, str], bool] | str | None = None,
):
self.autorefresh = autorefresh
self.max_age = max_age
Expand All @@ -68,12 +69,14 @@ def __init__(

self.media_types = MediaTypes(extra_types=mimetypes)
self.application = application
self.files = {}
self.directories = []
self.files: dict[str, Redirect | StaticFile] = {}
self.directories: list[tuple[str, str]] = []
if root is not None:
self.add_files(root, prefix)

def __call__(self, environ, start_response):
def __call__(
self, environ: WSGIEnvironment, start_response: StartResponse
) -> Iterable[bytes]:
path = decode_path_info(environ.get("PATH_INFO", ""))
if self.autorefresh:
static_file = self.find_file(path)
Expand All @@ -85,17 +88,23 @@ def __call__(self, environ, start_response):
return self.serve(static_file, environ, start_response)

@staticmethod
def serve(static_file, environ, start_response):
def serve(
static_file: Redirect | StaticFile,
environ: WSGIEnvironment,
start_response: StartResponse,
) -> Iterable[bytes]:
response = static_file.get_response(environ["REQUEST_METHOD"], environ)
status_line = f"{response.status} {response.status.phrase}"
start_response(status_line, list(response.headers))
if response.file is not None:
file_wrapper = environ.get("wsgi.file_wrapper", FileWrapper)
file_wrapper: type[FileWrapper] = environ.get(
"wsgi.file_wrapper", FileWrapper
)
return file_wrapper(response.file)
else:
return []

def add_files(self, root, prefix=None):
def add_files(self, root: str, prefix: str | None = None) -> None:
root = decode_if_byte_string(root, force_text=True)
root = os.path.abspath(root)
root = root.rstrip(os.path.sep) + os.path.sep
Expand All @@ -112,7 +121,7 @@ def add_files(self, root, prefix=None):
else:
warnings.warn(f"No directory at: {root}")

def update_files_dictionary(self, root, prefix):
def update_files_dictionary(self, root: str, prefix: str) -> None:
# Build a mapping from paths to the results of `os.stat` calls
# so we only have to touch the filesystem once
stat_cache = dict(scantree(root))
Expand All @@ -122,7 +131,12 @@ def update_files_dictionary(self, root, prefix):
url = prefix + relative_url
self.add_file_to_dictionary(url, path, stat_cache=stat_cache)

def add_file_to_dictionary(self, url, path, stat_cache=None):
def add_file_to_dictionary(
self,
url: str,
path: str,
stat_cache: dict[str, os.stat_result] | None = None,
) -> None:
if self.is_compressed_variant(path, stat_cache=stat_cache):
return
if self.index_file is not None and url.endswith("/" + self.index_file):
Expand All @@ -134,26 +148,27 @@ def add_file_to_dictionary(self, url, path, stat_cache=None):
static_file = self.get_static_file(path, url, stat_cache=stat_cache)
self.files[url] = static_file

def find_file(self, url):
def find_file(self, url: str) -> Redirect | StaticFile | None:
# Optimization: bail early if the URL can never match a file
if self.index_file is None and url.endswith("/"):
return
if not self.url_is_canonical(url):
return
return None
for path in self.candidate_paths_for_url(url):
try:
return self.find_file_at_path(path, url)
except MissingFileError:
pass
return None

def candidate_paths_for_url(self, url):
def candidate_paths_for_url(self, url: str) -> Generator[str, None, None]:
for root, prefix in self.directories:
if url.startswith(prefix):
path = os.path.join(root, url[len(prefix) :])
if os.path.commonprefix((root, path)) == root:
yield path

def find_file_at_path(self, path, url):
def find_file_at_path(self, path: str, url: str) -> Redirect | StaticFile:
if self.is_compressed_variant(path):
raise MissingFileError(path)

Expand All @@ -175,7 +190,7 @@ def find_file_at_path(self, path, url):
return self.get_static_file(path, url)

@staticmethod
def url_is_canonical(url):
def url_is_canonical(url: str) -> bool:
"""
Check that the URL path is in canonical format i.e. has normalised
slashes and no path traversal elements
Expand All @@ -188,7 +203,9 @@ def url_is_canonical(url):
return normalised == url

@staticmethod
def is_compressed_variant(path, stat_cache=None):
def is_compressed_variant(
path: str, stat_cache: dict[str, os.stat_result] | None = None
) -> bool:
if path[-3:] in (".gz", ".br"):
uncompressed_path = path[:-3]
if stat_cache is None:
Expand All @@ -197,7 +214,12 @@ def is_compressed_variant(path, stat_cache=None):
return uncompressed_path in stat_cache
return False

def get_static_file(self, path, url, stat_cache=None):
def get_static_file(
self,
path: str,
url: str,
stat_cache: dict[str, os.stat_result] | None = None,
) -> StaticFile:
# Optimization: bail early if file does not exist
if stat_cache is None and not os.path.exists(path):
raise MissingFileError(path)
Expand All @@ -215,30 +237,30 @@ def get_static_file(self, path, url, stat_cache=None):
encodings={"gzip": path + ".gz", "br": path + ".br"},
)

def add_mime_headers(self, headers, path, url):
def add_mime_headers(self, headers: Headers, path: str, url: str) -> None:
media_type = self.media_types.get_type(path)
if media_type.startswith("text/"):
params = {"charset": str(self.charset)}
else:
params = {}
headers.add_header("Content-Type", str(media_type), **params)

def add_cache_headers(self, headers, path, url):
def add_cache_headers(self, headers: Headers, path: str, url: str) -> None:
if self.immutable_file_test(path, url):
headers["Cache-Control"] = "max-age={}, public, immutable".format(
self.FOREVER
)
elif self.max_age is not None:
headers["Cache-Control"] = f"max-age={self.max_age}, public"

def immutable_file_test(self, path, url):
def immutable_file_test(self, path: str, url: str) -> bool:
"""
This should be implemented by sub-classes (see e.g. WhiteNoiseMiddleware)
or by setting the `immutable_file_test` config option
"""
return False

def redirect(self, from_url, to_url):
def redirect(self, from_url: str, to_url: str) -> Redirect:
"""
Return a relative 302 redirect
Expand All @@ -258,7 +280,7 @@ def redirect(self, from_url, to_url):
return Redirect(relative_url, headers=headers)


def scantree(root):
def scantree(root: str) -> Generator[tuple[str, os.stat_result], None, None]:
"""
Recurse the given directory yielding (pathname, os.stat(pathname)) pairs
"""
Expand Down

0 comments on commit 6d403ed

Please sign in to comment.