Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the device authorization endpoint #844

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 8 additions & 0 deletions oauthlib/oauth2/rfc8628/endpoints/__init__.py
@@ -0,0 +1,8 @@
"""
oauthlib.oauth2.rfc8628
~~~~~~~~~~~~~~~~~~~~~~~

This module is an implementation of various logic needed
for consuming and providing OAuth 2.0 Device Authorization RFC8628.
"""
from .device_authorization import DeviceAuthorizationEndpoint
223 changes: 223 additions & 0 deletions oauthlib/oauth2/rfc8628/endpoints/device_authorization.py
@@ -0,0 +1,223 @@
"""
oauthlib.oauth2.rfc8628
~~~~~~~~~~~~~~~~~~~~~~~

This module is an implementation of various logic needed
for consuming and providing OAuth 2.0 RFC8628.
"""
import json
import logging

from oauthlib.common import Request, generate_token
from oauthlib.oauth2.rfc6749 import errors
from oauthlib.oauth2.rfc6749.endpoints.base import (
BaseEndpoint, catch_errors_and_unavailability,
)

log = logging.getLogger(__name__)


class DeviceAuthorizationEndpoint(BaseEndpoint):

"""DeviceAuthorization endpoint - used by the client to initiate
the authorization flow by requesting a set of verification codes
from the authorization server by making an HTTP "POST" request to
the device authorization endpoint.

The client authentication requirements of Section 3.2.1 of [RFC6749]
apply to requests on this endpoint, which means that confidential
clients (those that have established client credentials) authenticate
in the same manner as when making requests to the token endpoint, and
public clients provide the "client_id" parameter to identify
themselves.
"""

def __init__(
self,
request_validator,
verification_uri,
expires_in=1800,
interval=None,
verification_uri_complete=None,
):
"""
:param request_validator: An instance of RequestValidator.
:type request_validator: oauthlib.oauth2.rfc6749.RequestValidator.
:param verification_uri: a string containing the URL that can be polled by the client application
:param expires_in: a number that represents the lifetime of the `user_code` and `device_code`
:param interval: an option number that represents the number of seconds between each poll requests
:param verification_uri_complete: a string of a function that can be called with `user_data` as parameter
"""
self.request_validator = request_validator
self._expires_in = expires_in
self._interval = interval
self._verification_uri = verification_uri
self._verification_uri_complete = verification_uri_complete
self._interval = interval

BaseEndpoint.__init__(self)

@property
def interval(self):
"""The minimum amount of time in seconds that the client
SHOULD wait between polling requests to the token endpoint. If no
value is provided, clients MUST use 5 as the default.
"""
return self._interval

@property
def expires_in(self):
"""The lifetime in seconds of the "device_code" and "user_code"."""
return self._expires_in

@property
def verification_uri(self):
"""The end-user verification URI on the authorization
server. The URI should be short and easy to remember as end users
will be asked to manually type it into their user agent.
"""
return self._verification_uri

def verification_uri_complete(self, user_code):
if not self._verification_uri_complete:
return None
if isinstance(self._verification_uri_complete, str):
return self._verification_uri_complete.format(user_code=user_code)
if callable(self._verification_uri_complete):
return self._verification_uri_complete(user_code)
return None

@catch_errors_and_unavailability
def validate_device_authorization_request(self, request):
"""Validate the device authorization request.

The client_id is required if the client is not authenticating with the
authorization server as described in `Section 3.2.1. of [RFC6749]`_.
The client identifier as described in `Section 2.2 of [RFC6749]`_.

.. _`Section 3.2.1. of [RFC6749]`: https://www.rfc-editor.org/rfc/rfc6749#section-3.2.1
.. _`Section 2.2 of [RFC6749]`: https://www.rfc-editor.org/rfc/rfc6749#section-2.2
"""

# First check duplicate parameters
for param in ("client_id", "scope"):
try:
duplicate_params = request.duplicate_params
except ValueError:
raise errors.InvalidRequestFatalError(
description="Unable to parse query string", request=request
)
if param in duplicate_params:
raise errors.InvalidRequestFatalError(
description="Duplicate %s parameter." % param, request=request
)

# the "application/x-www-form-urlencoded" format, per Appendix B of [RFC6749]
# https://www.rfc-editor.org/rfc/rfc6749#appendix-B
if request.headers["Content-Type"] != "application/x-www-form-urlencoded":
raise errors.InvalidRequestError(
"Content-Type must be application/x-www-form-urlencoded",
request=request,
)

# REQUIRED. The client identifier as described in Section 2.2.
# https://tools.ietf.org/html/rfc6749#section-2.2
# TODO: extract client_id an helper validation function.
if not request.client_id:
raise errors.MissingClientIdError(request=request)

if not self.request_validator.validate_client_id(request.client_id, request):
raise errors.InvalidClientIdError(request=request)

# The client authentication requirements of Section 3.2.1 of [RFC6749]
# apply to requests on this endpoint, which means that confidential
# clients (those that have established client credentials) authenticate
# in the same manner as when making requests to the token endpoint, and
# public clients provide the "client_id" parameter to identify
# themselves.
self._raise_on_invalid_client(request)

@catch_errors_and_unavailability
def create_device_authorization_response(
self, uri, http_method="POST", body=None, headers=None
):
"""create_device_authorization_response - generates a unique device
verification code and an end-user code that are valid for a limited
time and includes them in the HTTP response body using the
"application/json" format [RFC8259] with a 200 (OK) status code, as
described in `Section-3.2`_.

:param uri: The full URI of the token request.
:param request: OAuthlib request.
:type request: oauthlib.common.Request
:returns: A tuple of 3 elements.
1. A dict of headers to set on the response.
2. The response body as a string.
3. The response status code as an integer.

The response contains the following parameters:

device_code
REQUIRED. The device verification code.

user_code
REQUIRED. The end-user verification code.

verification_uri
REQUIRED. The end-user verification URI on the authorization
server. The URI should be short and easy to remember as end users
will be asked to manually type it into their user agent.

verification_uri_complete
OPTIONAL. A verification URI that includes the "user_code" (or
other information with the same function as the "user_code"),
which is designed for non-textual transmission.

expires_in
REQUIRED. The lifetime in seconds of the "device_code" and
"user_code".

interval
OPTIONAL. The minimum amount of time in seconds that the client
SHOULD wait between polling requests to the token endpoint. If no
value is provided, clients MUST use 5 as the default.

For example:

HTTP/1.1 200 OK
Content-Type: application/json
Cache-Control: no-store

{
"device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS",
"user_code": "WDJB-MJHT",
"verification_uri": "https://example.com/device",
"verification_uri_complete":
"https://example.com/device?user_code=WDJB-MJHT",
"expires_in": 1800,
"interval": 5
}

.. _`Section-3.2`: https://www.rfc-editor.org/rfc/rfc8628#section-3.2
"""
request = Request(uri, http_method, body, headers)
self.validate_device_authorization_request(request)
log.debug("Pre resource owner authorization validation ok for %r.", request)

headers = {}
user_code = generate_token()
data = {
"verification_uri": self.verification_uri,
"expires_in": self.expires_in,
"user_code": user_code,
"device_code": generate_token(),
}
if self.interval is not None:
data["interval"] = self.interval

verification_uri_complete = self.verification_uri_complete(user_code)
if verification_uri_complete:
data["verification_uri_complete"] = verification_uri_complete

body = json.dumps(data)
return headers, body, 200
19 changes: 19 additions & 0 deletions oauthlib/oauth2/rfc8628/pre_configured.py
@@ -0,0 +1,19 @@
from oauthlib.oauth2.rfc8628.endpoints.device_authorization import (
DeviceAuthorizationEndpoint,
)


class DeviceApplicationServer(DeviceAuthorizationEndpoint):

"""An all-in-one endpoint featuring Authorization code grant and Bearer tokens."""

def __init__(self, request_validator, verification_uri, **kwargs):
"""Construct a new web application server.

:param request_validator: An implementation of
oauthlib.oauth2.rfc8626.RequestValidator.
:param verification_uri: the verification_uri to be send back.
"""
DeviceAuthorizationEndpoint.__init__(
self, request_validator, verification_uri=verification_uri
)
25 changes: 25 additions & 0 deletions oauthlib/oauth2/rfc8628/request_validator.py
@@ -0,0 +1,25 @@
from oauthlib.oauth2 import RequestValidator as OAuth2RequestValidator


class RequestValidator(OAuth2RequestValidator):
def client_authentication_required(self, request, *args, **kwargs):
"""Determine if client authentication is required for current request.

According to the rfc8628, client authentication is required in the following cases:
- Device Authorization Request follows the, the client authentication requirements
of Section 3.2.1 of [RFC6749] apply to requests on this endpoint, which means that
confidential clients (those that have established client credentials) authenticate
in the same manner as when making requests to the token endpoint, and
public clients provide the "client_id" parameter to identify themselves,
see `Section 3.1`_.

:param request: OAuthlib request.
:type request: oauthlib.common.Request
:rtype: True or False

Method is used by:
- Device Authorization Request

.. _`Section 3.1`: https://www.rfc-editor.org/rfc/rfc8628#section-3.1
"""
return True
2 changes: 1 addition & 1 deletion oauthlib/openid/connect/core/endpoints/pre_configured.py
Expand Up @@ -17,7 +17,7 @@
from oauthlib.oauth2.rfc6749.tokens import BearerToken

from ..grant_types import (
AuthorizationCodeGrant, HybridGrant, ImplicitGrant, RefreshTokenGrant
AuthorizationCodeGrant, HybridGrant, ImplicitGrant, RefreshTokenGrant,
)
from ..grant_types.dispatchers import (
AuthorizationCodeGrantDispatcher, AuthorizationTokenGrantDispatcher,
Expand Down
Empty file.
99 changes: 99 additions & 0 deletions tests/oauth2/rfc8628/endpoints/test_error_responses.py
@@ -0,0 +1,99 @@
import json
from unittest import TestCase, mock

from oauthlib.common import Request, urlencode
from oauthlib.oauth2.rfc6749 import errors
from oauthlib.oauth2.rfc8628.pre_configured import DeviceApplicationServer
from oauthlib.oauth2.rfc8628.request_validator import RequestValidator


class ErrorResponseTest(TestCase):
def set_client(self, request):
request.client = mock.MagicMock()
request.client.client_id = "mocked"
return True

def build_request(
self, uri="https://example.com/device_authorize", client_id="foo"
):
body = ""
if client_id:
body = f"client_id={client_id}"
return Request(
uri,
http_method="POST",
body=body,
headers={"Content-Type": "application/x-www-form-urlencoded"},
)

def assert_request_raises(self, error, request):
"""Test that the request fails similarly on the validation and response endpoint."""
self.assertRaises(
error,
self.device.validate_device_authorization_request,
request,
)
self.assertRaises(
error,
self.device.create_device_authorization_response,
uri=request.uri,
http_method=request.http_method,
body=request.body,
headers=request.headers,
)

def setUp(self):
self.validator = mock.MagicMock(spec=RequestValidator)
self.validator.get_default_redirect_uri.return_value = None
self.validator.get_code_challenge.return_value = None
self.device = DeviceApplicationServer(
self.validator, "https://example.com/verify"
)

def test_missing_client_id(self):
# Device code grant
request = self.build_request(client_id=None)
self.assert_request_raises(errors.MissingClientIdError, request)

def test_empty_client_id(self):
# Device code grant
self.assertRaises(
errors.MissingClientIdError,
self.device.create_device_authorization_response,
"https://i.l/",
"POST",
"client_id=",
{"Content-Type": "application/x-www-form-urlencoded"},
)

def test_invalid_client_id(self):
request = self.build_request(client_id="foo")
# Device code grant
self.validator.validate_client_id.return_value = False
self.assert_request_raises(errors.InvalidClientIdError, request)

def test_duplicate_client_id(self):
request = self.build_request()
request.body = "client_id=foo&client_id=bar"
# Device code grant
self.validator.validate_client_id.return_value = False
self.assert_request_raises(errors.InvalidRequestFatalError, request)

def test_unauthenticated_confidential_client(self):
self.validator.client_authentication_required.return_value = True
self.validator.authenticate_client.return_value = False
request = self.build_request()
self.assert_request_raises(errors.InvalidClientError, request)

def test_unauthenticated_public_client(self):
self.validator.client_authentication_required.return_value = False
self.validator.authenticate_client_id.return_value = False
request = self.build_request()
self.assert_request_raises(errors.InvalidClientError, request)

def test_duplicate_scope_parameter(self):
request = self.build_request()
request.body = "client_id=foo&scope=foo&scope=bar"
# Device code grant
self.validator.validate_client_id.return_value = False
self.assert_request_raises(errors.InvalidRequestFatalError, request)