-
-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch 'main' into fix/gh-2644
- Loading branch information
Showing
46 changed files
with
1,034 additions
and
486 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,6 +9,7 @@ omit = | |
sanic/simple.py | ||
sanic/utils.py | ||
sanic/cli | ||
sanic/pages | ||
|
||
[html] | ||
directory = coverage | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,5 +23,6 @@ module = [ | |
"trustme.*", | ||
"sanic_routing.*", | ||
"aioquic.*", | ||
"html5tagger.*", | ||
] | ||
ignore_missing_imports = true |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
from .content_range import ContentRangeHandler | ||
from .directory import DirectoryHandler | ||
from .error import ErrorHandler | ||
|
||
|
||
__all__ = ( | ||
"ContentRangeHandler", | ||
"DirectoryHandler", | ||
"ErrorHandler", | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
from __future__ import annotations | ||
|
||
from sanic.exceptions import ( | ||
HeaderNotFound, | ||
InvalidRangeType, | ||
RangeNotSatisfiable, | ||
) | ||
|
||
|
||
class ContentRangeHandler: | ||
""" | ||
A mechanism to parse and process the incoming request headers to | ||
extract the content range information. | ||
:param request: Incoming api request | ||
:param stats: Stats related to the content | ||
:type request: :class:`sanic.request.Request` | ||
:type stats: :class:`posix.stat_result` | ||
:ivar start: Content Range start | ||
:ivar end: Content Range end | ||
:ivar size: Length of the content | ||
:ivar total: Total size identified by the :class:`posix.stat_result` | ||
instance | ||
:ivar ContentRangeHandler.headers: Content range header ``dict`` | ||
""" | ||
|
||
__slots__ = ("start", "end", "size", "total", "headers") | ||
|
||
def __init__(self, request, stats): | ||
self.total = stats.st_size | ||
_range = request.headers.getone("range", None) | ||
if _range is None: | ||
raise HeaderNotFound("Range Header Not Found") | ||
unit, _, value = tuple(map(str.strip, _range.partition("="))) | ||
if unit != "bytes": | ||
raise InvalidRangeType( | ||
"%s is not a valid Range Type" % (unit,), self | ||
) | ||
start_b, _, end_b = tuple(map(str.strip, value.partition("-"))) | ||
try: | ||
self.start = int(start_b) if start_b else None | ||
except ValueError: | ||
raise RangeNotSatisfiable( | ||
"'%s' is invalid for Content Range" % (start_b,), self | ||
) | ||
try: | ||
self.end = int(end_b) if end_b else None | ||
except ValueError: | ||
raise RangeNotSatisfiable( | ||
"'%s' is invalid for Content Range" % (end_b,), self | ||
) | ||
if self.end is None: | ||
if self.start is None: | ||
raise RangeNotSatisfiable( | ||
"Invalid for Content Range parameters", self | ||
) | ||
else: | ||
# this case represents `Content-Range: bytes 5-` | ||
self.end = self.total - 1 | ||
else: | ||
if self.start is None: | ||
# this case represents `Content-Range: bytes -5` | ||
self.start = self.total - self.end | ||
self.end = self.total - 1 | ||
if self.start >= self.end: | ||
raise RangeNotSatisfiable( | ||
"Invalid for Content Range parameters", self | ||
) | ||
self.size = self.end - self.start + 1 | ||
self.headers = { | ||
"Content-Range": "bytes %s-%s/%s" | ||
% (self.start, self.end, self.total) | ||
} | ||
|
||
def __bool__(self): | ||
return self.size > 0 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
from __future__ import annotations | ||
|
||
from datetime import datetime | ||
from operator import itemgetter | ||
from pathlib import Path | ||
from stat import S_ISDIR | ||
from typing import Dict, Iterable, Optional, Sequence, Union, cast | ||
|
||
from sanic.exceptions import NotFound | ||
from sanic.pages.directory_page import DirectoryPage, FileInfo | ||
from sanic.request import Request | ||
from sanic.response import file, html, redirect | ||
|
||
|
||
class DirectoryHandler: | ||
def __init__( | ||
self, | ||
uri: str, | ||
directory: Path, | ||
directory_view: bool = False, | ||
index: Optional[Union[str, Sequence[str]]] = None, | ||
) -> None: | ||
if isinstance(index, str): | ||
index = [index] | ||
elif index is None: | ||
index = [] | ||
self.base = uri.strip("/") | ||
self.directory = directory | ||
self.directory_view = directory_view | ||
self.index = tuple(index) | ||
|
||
async def handle(self, request: Request, path: str): | ||
current = path.strip("/")[len(self.base) :].strip("/") # noqa: E203 | ||
for file_name in self.index: | ||
index_file = self.directory / current / file_name | ||
if index_file.is_file(): | ||
return await file(index_file) | ||
|
||
if self.directory_view: | ||
return self._index( | ||
self.directory / current, path, request.app.debug | ||
) | ||
|
||
if self.index: | ||
raise NotFound("File not found") | ||
|
||
raise IsADirectoryError(f"{self.directory.as_posix()} is a directory") | ||
|
||
def _index(self, location: Path, path: str, debug: bool): | ||
# Remove empty path elements, append slash | ||
if "//" in path or not path.endswith("/"): | ||
return redirect( | ||
"/" + "".join([f"{p}/" for p in path.split("/") if p]) | ||
) | ||
|
||
# Render file browser | ||
page = DirectoryPage(self._iter_files(location), path, debug) | ||
return html(page.render()) | ||
|
||
def _prepare_file(self, path: Path) -> Dict[str, Union[int, str]]: | ||
stat = path.stat() | ||
modified = ( | ||
datetime.fromtimestamp(stat.st_mtime) | ||
.isoformat()[:19] | ||
.replace("T", " ") | ||
) | ||
is_dir = S_ISDIR(stat.st_mode) | ||
icon = "📁" if is_dir else "📄" | ||
file_name = path.name | ||
if is_dir: | ||
file_name += "/" | ||
return { | ||
"priority": is_dir * -1, | ||
"file_name": file_name, | ||
"icon": icon, | ||
"file_access": modified, | ||
"file_size": stat.st_size, | ||
} | ||
|
||
def _iter_files(self, location: Path) -> Iterable[FileInfo]: | ||
prepared = [self._prepare_file(f) for f in location.iterdir()] | ||
for item in sorted(prepared, key=itemgetter("priority", "file_name")): | ||
del item["priority"] | ||
yield cast(FileInfo, item) |
Oops, something went wrong.