Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adds new external account authorized user credentials #1160

Merged
merged 21 commits into from
Oct 14, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
25 changes: 25 additions & 0 deletions google/auth/_default.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,14 @@
_AUTHORIZED_USER_TYPE = "authorized_user"
_SERVICE_ACCOUNT_TYPE = "service_account"
_EXTERNAL_ACCOUNT_TYPE = "external_account"
_EXTERNAL_ACCOUNT_AUTHORIZED_USER_TYPE = "external_account_authorized_user"
ScruffyProdigy marked this conversation as resolved.
Show resolved Hide resolved
_IMPERSONATED_SERVICE_ACCOUNT_TYPE = "impersonated_service_account"
_GDCH_SERVICE_ACCOUNT_TYPE = "gdch_service_account"
_VALID_TYPES = (
_AUTHORIZED_USER_TYPE,
_SERVICE_ACCOUNT_TYPE,
_EXTERNAL_ACCOUNT_TYPE,
_EXTERNAL_ACCOUNT_AUTHORIZED_USER_TYPE,
_IMPERSONATED_SERVICE_ACCOUNT_TYPE,
_GDCH_SERVICE_ACCOUNT_TYPE,
)
Expand Down Expand Up @@ -158,6 +160,12 @@ def _load_credentials_from_info(
default_scopes=default_scopes,
request=request,
)

elif credential_type == _EXTERNAL_ACCOUNT_AUTHORIZED_USER_TYPE:
credentials, project_Id = _get_external_account_authorized_user_credentials(
filename, info, request
)

elif credential_type == _IMPERSONATED_SERVICE_ACCOUNT_TYPE:
credentials, project_id = _get_impersonated_service_account_credentials(
filename, info, scopes
Expand Down Expand Up @@ -363,6 +371,23 @@ def _get_external_account_credentials(
return credentials, credentials.get_project_id(request=request)


def _get_external_account_authorized_user_credentials(
info, filename, scopes=None, default_scopes=None, request=None
):
try:
from google.auth import headful
ScruffyProdigy marked this conversation as resolved.
Show resolved Hide resolved

credentials = headful.Credentials.from_info(info)
except ValueError:
raise exceptions.DefaultCredentialsError(
"Failed to load external account authorized user credentials from {}".format(
filename
)
)

return credentials, None


def _get_authorized_user_credentials(filename, info, scopes=None):
from google.oauth2 import credentials

Expand Down
114 changes: 49 additions & 65 deletions google/auth/external_account.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,26 @@ def info(self):
}
return {key: value for key, value in config_info.items() if value is not None}

@property
ScruffyProdigy marked this conversation as resolved.
Show resolved Hide resolved
def constructor_args(self):
d = dict(
audience=self._audience,
subject_token_type=self._subject_token_type,
token_url=self._token_url,
credential_source=self._credential_source,
service_account_impersonation_url=self._service_account_impersonation_url,
service_account_impersonation_options=self._service_account_impersonation_options,
client_id=self._client_id,
client_secret=self._client_secret,
quota_project_id=self._quota_project_id,
scopes=self._scopes,
default_scopes=self._default_scopes,
workforce_pool_user_project=self._workforce_pool_user_project,
)
if not self.is_workforce_pool:
d.pop("workforce_pool_user_project")
return d

@property
def service_account_email(self):
"""Returns the service account email if service account impersonation is used.
Expand Down Expand Up @@ -257,23 +277,9 @@ def project_number(self):

@_helpers.copy_docstring(credentials.Scoped)
def with_scopes(self, scopes, default_scopes=None):
d = dict(
audience=self._audience,
subject_token_type=self._subject_token_type,
token_url=self._token_url,
credential_source=self._credential_source,
service_account_impersonation_url=self._service_account_impersonation_url,
service_account_impersonation_options=self._service_account_impersonation_options,
client_id=self._client_id,
client_secret=self._client_secret,
quota_project_id=self._quota_project_id,
scopes=scopes,
default_scopes=default_scopes,
workforce_pool_user_project=self._workforce_pool_user_project,
)
if not self.is_workforce_pool:
d.pop("workforce_pool_user_project")
return self.__class__(**d)
kwargs = self.constructor_args
kwargs.update(scopes=scopes, default_scopes=default_scopes)
return self.__class__(**kwargs)

@abc.abstractmethod
def retrieve_subject_token(self, request):
Expand Down Expand Up @@ -339,52 +345,41 @@ def get_project_id(self, request):

@_helpers.copy_docstring(credentials.Credentials)
def refresh(self, request):
scopes = self._scopes if self._scopes is not None else self._default_scopes
if self._impersonated_credentials:
self._impersonated_credentials.refresh(request)
self.token = self._impersonated_credentials.token
self.expiry = self._impersonated_credentials.expiry
else:
now = _helpers.utcnow()
additional_options = None
# Do not pass workforce_pool_user_project when client authentication
# is used. The client ID is sufficient for determining the user project.
if self._workforce_pool_user_project and not self._client_id:
additional_options = {"userProject": self._workforce_pool_user_project}
response_data = self._sts_client.exchange_token(
request=request,
grant_type=_STS_GRANT_TYPE,
subject_token=self.retrieve_subject_token(request),
subject_token_type=self._subject_token_type,
audience=self._audience,
scopes=scopes,
requested_token_type=_STS_REQUESTED_TOKEN_TYPE,
additional_options=additional_options,
)
response_data = self._make_sts_request(request)
self.token = response_data.get("access_token")
lifetime = datetime.timedelta(seconds=response_data.get("expires_in"))
self.expiry = now + lifetime

@_helpers.copy_docstring(credentials.CredentialsWithQuotaProject)
def with_quota_project(self, quota_project_id):
# Return copy of instance with the provided quota project ID.
d = dict(
audience=self._audience,
def _make_sts_request(self, request):
"""This method is the default method for making STS requests."""
scopes = self._scopes if self._scopes is not None else self._default_scopes
additional_options = None
# Do not pass workforce_pool_user_project when client authentication
# is used. The client ID is sufficient for determining the user project.
if self._workforce_pool_user_project and not self._client_id:
additional_options = {"userProject": self._workforce_pool_user_project}
return self._sts_client.exchange_token(
request=request,
grant_type=_STS_GRANT_TYPE,
subject_token=self.retrieve_subject_token(request),
subject_token_type=self._subject_token_type,
token_url=self._token_url,
credential_source=self._credential_source,
service_account_impersonation_url=self._service_account_impersonation_url,
service_account_impersonation_options=self._service_account_impersonation_options,
client_id=self._client_id,
client_secret=self._client_secret,
quota_project_id=quota_project_id,
scopes=self._scopes,
default_scopes=self._default_scopes,
workforce_pool_user_project=self._workforce_pool_user_project,
audience=self._audience,
scopes=scopes,
requested_token_type=_STS_REQUESTED_TOKEN_TYPE,
additional_options=additional_options,
)
if not self.is_workforce_pool:
d.pop("workforce_pool_user_project")
return self.__class__(**d)

@_helpers.copy_docstring(credentials.CredentialsWithQuotaProject)
def with_quota_project(self, quota_project_id):
kwargs = self.constructor_args
kwargs.update(quota_project_id=quota_project_id)
return self.__class__(**kwargs)

@_helpers.copy_docstring(credentials.CredentialsWithTokenUri)
def with_token_uri(self, token_uri):
Expand Down Expand Up @@ -422,23 +417,12 @@ def _initialize_impersonated_credentials(self):
endpoint returned an error.
"""
# Return copy of instance with no service account impersonation.
d = dict(
audience=self._audience,
subject_token_type=self._subject_token_type,
token_url=self._token_url,
credential_source=self._credential_source,
kwargs = self.constructor_args
kwargs.update(
service_account_impersonation_url=None,
service_account_impersonation_options={},
client_id=self._client_id,
client_secret=self._client_secret,
quota_project_id=self._quota_project_id,
scopes=self._scopes,
default_scopes=self._default_scopes,
workforce_pool_user_project=self._workforce_pool_user_project,
)
if not self.is_workforce_pool:
d.pop("workforce_pool_user_project")
source_credentials = self.__class__(**d)
source_credentials = self.__class__(**kwargs)

# Determine target_principal.
target_principal = self.service_account_email
Expand Down
182 changes: 182 additions & 0 deletions google/auth/external_account_authorized_user.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""External Account Authorized User Credentials.
This module provides credentials to access Google Cloud resources from on-prem
ScruffyProdigy marked this conversation as resolved.
Show resolved Hide resolved
or non-Google Cloud platforms which support external credentials (e.g. OIDC ID
tokens) as part of a web-based 3-legged OAuth flow.

Example credential:
ScruffyProdigy marked this conversation as resolved.
Show resolved Hide resolved
{
"type": "external_account_authorized_user",
"audience": "//iam.googleapis.com/locations/global/workforcePools/$WORKFORCE_POOL_ID/providers/$PROVIDER_ID",
"refresh_token": "refreshToken",
"token_url": "https://sts.googleapis.com/v1/oauth/token",
"token_info_url": "https://sts.googleapis.com/v1/instrospect"
"client_id": "clientId",
"client_secret": "clientSecret"
}
"""

import datetime
import io
import json

from google.auth import _helpers
from google.auth import credentials
from google.oauth2 import sts
from google.oauth2 import utils


ScruffyProdigy marked this conversation as resolved.
Show resolved Hide resolved
_EXTERNAL_ACCOUNT_AUTHORIZED_USER_JSON_TYPE = "external_account_authorized_user"


class Credentials(credentials.CredentialsWithQuotaProject):
ScruffyProdigy marked this conversation as resolved.
Show resolved Hide resolved
"""Credentials for External Account Authorized Users.
ScruffyProdigy marked this conversation as resolved.
Show resolved Hide resolved

This is used to instantiate Credentials for exchanging refresh tokens from
authorized users for Google access token and authorizing requests to Google
APIs.
"""

def __init__(
ScruffyProdigy marked this conversation as resolved.
Show resolved Hide resolved
ScruffyProdigy marked this conversation as resolved.
Show resolved Hide resolved
self,
audience,
refresh_token,
token_url,
token_info_url,
ScruffyProdigy marked this conversation as resolved.
Show resolved Hide resolved
client_id,
client_secret,
revoke_url="",
quota_project_id="",
):
"""Instantiates a external account authorized user credentials object."""
ScruffyProdigy marked this conversation as resolved.
Show resolved Hide resolved
super(Credentials, self).__init__()

self._audience = audience
self._refresh_token = refresh_token
self._token_url = token_url
self._token_info_url = token_info_url
self._client_id = client_id
self._client_secret = client_secret
self._revoke_url = revoke_url
self._quota_project_id = quota_project_id

self._client_auth = utils.ClientAuthentication(
utils.ClientAuthType.basic, self._client_id, self._client_secret
)
self._sts_client = sts.Client(self._token_url, self._client_auth)

@property
def info(self):
"""Generates the dictionary representation of the current credentials.

Returns:
Mapping: The dictionary representation of the credentials. This is the
reverse of "from_info" defined on the subclasses of this class. It is
useful for serializing the current credentials so it can deserialized
later.
"""
config_info = {
ScruffyProdigy marked this conversation as resolved.
Show resolved Hide resolved
"type": _HEADFUL_JSON_TYPE,
ScruffyProdigy marked this conversation as resolved.
Show resolved Hide resolved
"audience": self._audience,
"refresh_token": self._refresh_token,
"token_url": self._token_url,
"token_info_url": self._token_info_url,
"client_id": self._client_id,
"client_secret": self._client_secret,
"revoke_url": self._revoke_url,
"quota_project_id": self._quota_project_id,
}
return {key: value for key, value in config_info.items() if value is not None}

@property
def constructor_args(self):
return {
"audience": self._audience,
"refresh_token": self._refresh_token,
"token_url": self._token_url,
"token_info_url": self._token_info_url,
"client_id": self._client_id,
"client_secret": self._client_secret,
"revoke_url": self._revoke_url,
"quota_project_id": self._quota_project_id,
}

@property
def requires_scopes(self):
""" False: OAuth 2.0 credentials have their scopes set when
the initial token is requested and can not be changed."""
return False

def _make_sts_request(self, request):
return self._sts_client.refresh_token(request, self._refresh_token)

def get_project_id(self):
return None

def refresh(self, request):
now = _helpers.utcnow()
response_data = self._make_sts_request(request)
self.token = response_data.get("access_token")
ScruffyProdigy marked this conversation as resolved.
Show resolved Hide resolved
lifetime = datetime.timedelta(seconds=response_data.get("expires_in"))
self.expiry = now + lifetime

@_helpers.copy_docstring(credentials.CredentialsWithQuotaProject)
def with_quota_project(self, quota_project_id):
return self.__class__(
quota_project_id=quota_project_id, **self.constructor_args
)

@classmethod
def from_info(cls, info, **kwargs):
"""Creates a Credentials instance from parsed external account info.

Args:
info (Mapping[str, str]): The external account info in Google
format.
kwargs: Additional arguments to pass to the constructor.

Returns:
google.auth.identity_pool.Credentials: The constructed
ScruffyProdigy marked this conversation as resolved.
Show resolved Hide resolved
credentials.

Raises:
ValueError: For invalid parameters.
"""
return cls(
audience=info.get("audience"),
refresh_token=info.get("refresh_token"),
token_url=info.get("token_url"),
token_info_url=info.get("token_info_url"),
client_id=info.get("client_id"),
client_secret=info.get("client_secret"),
**kwargs
)

@classmethod
def from_file(cls, filename, **kwargs):
"""Creates a Credentials instance from an external account json file.

Args:
filename (str): The path to the external account json file.
kwargs: Additional arguments to pass to the constructor.

Returns:
google.auth.identity_pool.Credentials: The constructed
ScruffyProdigy marked this conversation as resolved.
Show resolved Hide resolved
credentials.
"""
with io.open(filename, "r", encoding="utf-8") as json_file:
data = json.load(json_file)
return cls.from_info(data, **kwargs)