Skip to content

Commit

Permalink
Inject current request in security handlers (#1883)
Browse files Browse the repository at this point in the history
Fixes #1881
Fixes #1880
Fixes #1876

Alternative to #1750

This PR makes the current request available to the security handlers by
injecting it as a keyword. I think this is a proper alternative to
#1750, since this is the only place in the default middleware stack
where I expect this to be needed.
  • Loading branch information
RobbeSneyders committed Mar 20, 2024
1 parent 994f53f commit b3dd986
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 6 deletions.
14 changes: 8 additions & 6 deletions connexion/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
class AbstractSecurityHandler:

required_scopes_kw = "required_scopes"
request_kw = "request"
client = None
security_definition_key: str
"""The key which contains the value for the function name to resolve."""
Expand Down Expand Up @@ -106,12 +107,12 @@ def _get_function(
return default

def _generic_check(self, func, exception_msg):
need_to_add_required_scopes = self._need_to_add_scopes(func)

async def wrapper(request, *args, required_scopes=None):
kwargs = {}
if need_to_add_required_scopes:
if self._accepts_kwarg(func, self.required_scopes_kw):
kwargs[self.required_scopes_kw] = required_scopes
if self._accepts_kwarg(func, self.request_kw):
kwargs[self.request_kw] = request
token_info = func(*args, **kwargs)
while asyncio.iscoroutine(token_info):
token_info = await token_info
Expand Down Expand Up @@ -140,10 +141,11 @@ def get_auth_header_value(request):
raise OAuthProblem(detail="Invalid authorization header")
return auth_type.lower(), value

def _need_to_add_scopes(self, func):
@staticmethod
def _accepts_kwarg(func: t.Callable, keyword: str) -> bool:
"""Check if the function accepts the provided keyword argument."""
arguments, has_kwargs = inspect_function_arguments(func)
need_required_scopes = has_kwargs or self.required_scopes_kw in arguments
return need_required_scopes
return has_kwargs or keyword in arguments

def _resolve_func(self, security_scheme):
"""
Expand Down
5 changes: 5 additions & 0 deletions docs/security.rst
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ The function should accept the following arguments:
- username
- password
- required_scopes (optional)
- request (optional)

You can find a `minimal Basic Auth example application`_ in Connexion's "examples" folder.

Expand All @@ -85,6 +86,7 @@ The function should accept the following arguments:

- token
- required_scopes (optional)
- request (optional)

You can find a `minimal Bearer example application`_ in Connexion's "examples" folder.

Expand All @@ -100,6 +102,7 @@ The function should accept the following arguments:

- apikey
- required_scopes (optional)
- request (optional)

You can find a `minimal API Key example application`_ in Connexion's "examples" folder.

Expand All @@ -115,6 +118,7 @@ The function should accept the following arguments:

- token
- required_scopes (optional)
- request (optional)

As alternative to an ``x-tokenInfoFunc`` definition, you can set an ``x-tokenInfoUrl`` definition or
``TOKENINFO_URL`` environment variable, and connexion will call the url instead of a local
Expand All @@ -132,6 +136,7 @@ The function should accept the following arguments:

- required_scopes
- token_scopes
- request (optional)

and return a boolean indicating if the validation was successful.

Expand Down
49 changes: 49 additions & 0 deletions tests/decorators/test_security.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,3 +328,52 @@ def test_raise_most_specific(errors, most_specific):
security_handler_factory = SecurityHandlerFactory()
with pytest.raises(most_specific):
security_handler_factory._raise_most_specific(errors)


async def test_optional_kwargs_injected():
"""Test that optional keyword arguments 'required_scopes' and 'request' are injected when
defined as arguments in the user security function. This test uses the ApiKeySecurityHandler,
but the tested behavior is generic across handlers."""
security_handler_factory = ApiKeySecurityHandler()

request = ConnexionRequest(
scope={"type": "http", "headers": [[b"x-auth", b"foobar"]]}
)

def apikey_info_no_kwargs(key):
"""Will fail if additional keywords are injected."""
return {"sub": "no_kwargs"}

wrapped_func_no_kwargs = security_handler_factory._get_verify_func(
apikey_info_no_kwargs, "header", "X-Auth"
)
assert await wrapped_func_no_kwargs(request) == {"sub": "no_kwargs"}

def apikey_info_request(key, request):
"""Will fail if request is not injected."""
return {"sub": "request"}

wrapped_func_request = security_handler_factory._get_verify_func(
apikey_info_request, "header", "X-Auth"
)
assert await wrapped_func_request(request) == {"sub": "request"}

def apikey_info_scopes(key, required_scopes):
"""Will fail if required_scopes is not injected."""
return {"sub": "scopes"}

wrapped_func_scopes = security_handler_factory._get_verify_func(
apikey_info_scopes, "header", "X-Auth"
)
assert await wrapped_func_scopes(request) == {"sub": "scopes"}

def apikey_info_kwargs(key, **kwargs):
"""Will fail if request and required_scopes are not injected."""
assert "request" in kwargs
assert "required_scopes" in kwargs
return {"sub": "kwargs"}

wrapped_func_kwargs = security_handler_factory._get_verify_func(
apikey_info_kwargs, "header", "X-Auth"
)
assert await wrapped_func_kwargs(request) == {"sub": "kwargs"}

0 comments on commit b3dd986

Please sign in to comment.