-
-
Notifications
You must be signed in to change notification settings - Fork 855
/
authentication.py
52 lines (45 loc) 路 1.75 KB
/
authentication.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import typing
from starlette.authentication import (
AuthCredentials,
AuthenticationBackend,
AuthenticationError,
UnauthenticatedUser,
)
from starlette.requests import HTTPConnection
from starlette.responses import PlainTextResponse, Response
from starlette.types import ASGIApp, Receive, Scope, Send
class AuthenticationMiddleware:
def __init__(
self,
app: ASGIApp,
backend: AuthenticationBackend,
on_error: typing.Optional[
typing.Callable[[HTTPConnection, AuthenticationError], Response]
] = None,
) -> None:
self.app = app
self.backend = backend
self.on_error: typing.Callable[
[HTTPConnection, AuthenticationError], Response
] = (on_error if on_error is not None else self.default_on_error)
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
if scope["type"] not in ["http", "websocket"]:
await self.app(scope, receive, send)
return
conn = HTTPConnection(scope)
try:
auth_result = await self.backend.authenticate(conn)
except AuthenticationError as exc:
response = self.on_error(conn, exc)
if scope["type"] == "websocket":
await send({"type": "websocket.close", "code": 1000})
else:
await response(scope, receive, send)
return
if auth_result is None:
auth_result = AuthCredentials(), UnauthenticatedUser()
scope["auth"], scope["user"] = auth_result
await self.app(scope, receive, send)
@staticmethod
def default_on_error(conn: HTTPConnection, exc: Exception) -> Response:
return PlainTextResponse(str(exc), status_code=400)