Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adds new external account authorized user credentials #1160

Merged
merged 21 commits into from
Oct 14, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
25 changes: 25 additions & 0 deletions google/auth/_default.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,14 @@
_AUTHORIZED_USER_TYPE = "authorized_user"
_SERVICE_ACCOUNT_TYPE = "service_account"
_EXTERNAL_ACCOUNT_TYPE = "external_account"
_EXTERNAL_ACCOUNT_AUTHORIZED_USER_TYPE = "external_account_authorized_user"
ScruffyProdigy marked this conversation as resolved.
Show resolved Hide resolved
_IMPERSONATED_SERVICE_ACCOUNT_TYPE = "impersonated_service_account"
_GDCH_SERVICE_ACCOUNT_TYPE = "gdch_service_account"
_VALID_TYPES = (
_AUTHORIZED_USER_TYPE,
_SERVICE_ACCOUNT_TYPE,
_EXTERNAL_ACCOUNT_TYPE,
_EXTERNAL_ACCOUNT_AUTHORIZED_USER_TYPE,
_IMPERSONATED_SERVICE_ACCOUNT_TYPE,
_GDCH_SERVICE_ACCOUNT_TYPE,
)
Expand Down Expand Up @@ -158,6 +160,12 @@ def _load_credentials_from_info(
default_scopes=default_scopes,
request=request,
)

elif credential_type == _EXTERNAL_ACCOUNT_AUTHORIZED_USER_TYPE:
credentials, project_Id = _get_external_account_authorized_use_credentials(
filename, info, request
)

elif credential_type == _IMPERSONATED_SERVICE_ACCOUNT_TYPE:
credentials, project_id = _get_impersonated_service_account_credentials(
filename, info, scopes
Expand Down Expand Up @@ -363,6 +371,23 @@ def _get_external_account_credentials(
return credentials, credentials.get_project_id(request=request)


def _get_external_account_authorized_use_credentials(
lsirac marked this conversation as resolved.
Show resolved Hide resolved
lsirac marked this conversation as resolved.
Show resolved Hide resolved
info, filename, scopes=None, default_scopes=None, request=None
):
try:
from google.auth import headful
ScruffyProdigy marked this conversation as resolved.
Show resolved Hide resolved

credentials = headful.Credentials.from_info(info)
except ValueError:
raise exceptions.DefaultCredentialsError(
"Failed to load external account authorized user credentials from {}".format(
filename
)
)

return credentials, None


def _get_authorized_user_credentials(filename, info, scopes=None):
from google.oauth2 import credentials

Expand Down
114 changes: 50 additions & 64 deletions google/auth/external_account.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,26 @@ def info(self):
}
return {key: value for key, value in config_info.items() if value is not None}

@property
ScruffyProdigy marked this conversation as resolved.
Show resolved Hide resolved
def constructor_args(self):
d = dict(
audience=self._audience,
subject_token_type=self._subject_token_type,
token_url=self._token_url,
credential_source=self._credential_source,
service_account_impersonation_url=None,
service_account_impersonation_options={},
client_id=self._client_id,
client_secret=self._client_secret,
quota_project_id=self._quota_project_id,
scopes=self._scopes,
default_scopes=self._default_scopes,
workforce_pool_user_project=self._workforce_pool_user_project,
)
if not self.is_workforce_pool:
d.pop("workforce_pool_user_project")
return d

@property
def service_account_email(self):
"""Returns the service account email if service account impersonation is used.
Expand Down Expand Up @@ -253,23 +273,11 @@ def project_number(self):

@_helpers.copy_docstring(credentials.Scoped)
def with_scopes(self, scopes, default_scopes=None):
d = dict(
audience=self._audience,
subject_token_type=self._subject_token_type,
token_url=self._token_url,
credential_source=self._credential_source,
service_account_impersonation_url=self._service_account_impersonation_url,
service_account_impersonation_options=self._service_account_impersonation_options,
client_id=self._client_id,
client_secret=self._client_secret,
quota_project_id=self._quota_project_id,
return self.__class__(
**self.constructor_args,
scopes=scopes,
default_scopes=default_scopes,
workforce_pool_user_project=self._workforce_pool_user_project,
default_scopes=default_scopes
)
if not self.is_workforce_pool:
d.pop("workforce_pool_user_project")
return self.__class__(**d)

@abc.abstractmethod
def retrieve_subject_token(self, request):
Expand Down Expand Up @@ -342,45 +350,39 @@ def refresh(self, request):
self.expiry = self._impersonated_credentials.expiry
else:
now = _helpers.utcnow()
additional_options = None
# Do not pass workforce_pool_user_project when client authentication
# is used. The client ID is sufficient for determining the user project.
if self._workforce_pool_user_project and not self._client_id:
additional_options = {"userProject": self._workforce_pool_user_project}
response_data = self._sts_client.exchange_token(
request=request,
grant_type=_STS_GRANT_TYPE,
subject_token=self.retrieve_subject_token(request),
subject_token_type=self._subject_token_type,
audience=self._audience,
scopes=scopes,
requested_token_type=_STS_REQUESTED_TOKEN_TYPE,
additional_options=additional_options,
)
response_data = self._make_sts_request(request)
self.token = response_data.get("access_token")
lifetime = datetime.timedelta(seconds=response_data.get("expires_in"))
self.expiry = now + lifetime

def _make_sts_request(self, request):
"""This method is the default method for making STS requests.

This method is overrideable, as some credential types have alternate
methods of making this request.
"""
additional_options = None
# Do not pass workforce_pool_user_project when client authentication
# is used. The client ID is sufficient for determining the user project.
if self._workforce_pool_user_project and not self._client_id:
additional_options = {"userProject": self._workforce_pool_user_project}
return self._sts_client.exchange_token(
request=request,
grant_type=_STS_GRANT_TYPE,
subject_token=self.retrieve_subject_token(request),
subject_token_type=self._subject_token_type,
audience=self._audience,
scopes=scopes,
requested_token_type=_STS_REQUESTED_TOKEN_TYPE,
additional_options=additional_options,
)

@_helpers.copy_docstring(credentials.CredentialsWithQuotaProject)
def with_quota_project(self, quota_project_id):
# Return copy of instance with the provided quota project ID.
d = dict(
audience=self._audience,
subject_token_type=self._subject_token_type,
token_url=self._token_url,
credential_source=self._credential_source,
service_account_impersonation_url=self._service_account_impersonation_url,
service_account_impersonation_options=self._service_account_impersonation_options,
client_id=self._client_id,
client_secret=self._client_secret,
quota_project_id=quota_project_id,
scopes=self._scopes,
default_scopes=self._default_scopes,
workforce_pool_user_project=self._workforce_pool_user_project,
return self.__class__(
**self.constructor_args,
quota_project_id=quota_project_id
)
if not self.is_workforce_pool:
d.pop("workforce_pool_user_project")
return self.__class__(**d)

def _initialize_impersonated_credentials(self):
"""Generates an impersonated credentials.
Expand All @@ -398,23 +400,7 @@ def _initialize_impersonated_credentials(self):
endpoint returned an error.
"""
# Return copy of instance with no service account impersonation.
d = dict(
audience=self._audience,
subject_token_type=self._subject_token_type,
token_url=self._token_url,
credential_source=self._credential_source,
service_account_impersonation_url=None,
service_account_impersonation_options={},
client_id=self._client_id,
client_secret=self._client_secret,
quota_project_id=self._quota_project_id,
scopes=self._scopes,
default_scopes=self._default_scopes,
workforce_pool_user_project=self._workforce_pool_user_project,
)
if not self.is_workforce_pool:
d.pop("workforce_pool_user_project")
source_credentials = self.__class__(**d)
source_credentials = self.__class__(**self.constructor_args)

# Determine target_principal.
target_principal = self.service_account_email
Expand Down
136 changes: 136 additions & 0 deletions google/auth/headful.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
# Copyright 2022 Google LLC
lsirac marked this conversation as resolved.
Show resolved Hide resolved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Headful Credentials.
Headful Credentials are …
lsirac marked this conversation as resolved.
Show resolved Hide resolved

Example headful credential:
{
"type": "external_account_authorized_user",
"audience": "//iam.googleapis.com/locations/global/workforcePools/$WORKFORCE_POOL_ID/providers/$PROVIDER_ID",
"refresh_token": "refreshToken",
"token_url": "https://sts.googleapis.com/v1/oauth/token",
"token_info_url": "https://sts.googleapis.com/v1/instrospect"
"client_id": "clientId",
"client_secret": "clientSecret"
}
"""

_HEADFUL_JSON_TYPE = "external_account_authorized_user"
lsirac marked this conversation as resolved.
Show resolved Hide resolved

class Credentials(external_account.Credentials):
lsirac marked this conversation as resolved.
Show resolved Hide resolved
lsirac marked this conversation as resolved.
Show resolved Hide resolved
"""
"""

def __init__(
self,
audience,
refresh_token,
token_url,
token_info_url,
client_id,
client_secret,
revoke_url="",
quota_project_id="",
):
"""Instantiates a headful credentials object."""
lsirac marked this conversation as resolved.
Show resolved Hide resolved
super(Credentials, self).__init__(
audience=audience,
subject_token_type=None,
token_url=token_url,
token_info_url=token_info_url,
revoke_url=revoke_url,
credential_source=None,
client_id=client_id,
client_secret=client_secret,
quota_project_id=quota_project_id,
)

self._refresh_token = refresh_token
self._token_info_url = token_info_url

@property
def info(self):
sai-sunder-s marked this conversation as resolved.
Show resolved Hide resolved
"""Generates the dictionary representation of the current credentials.

Returns:
Mapping: The dictionary representation of the credentials. This is the
reverse of "from_info" defined on the subclasses of this class. It is
useful for serializing the current credentials so it can deserialized
later.
"""
config_info = {
"type": _HEADFUL_JSON_TYPE,
"audience": self._audience,
"refresh_token": self._refresh_token,
"token_url": self._token_url,
"token_info_url": self._token_info_url,
"client_id": self._client_id,
"client_secret": self._client_secret,
}
return {key: value for key, value in config_info.items() if value is not None}

@property
def constructor_args(self):
return {
"audience": audience,
"refresh_token": refresh_token,
"token_url": token_url,
"token_info_url": token_info_url,
"client_id": client_id,
"client_secret": client_secret,
"revoke_url": revoke_url,
"quota_project_id": quota_project_id
)

@property
def requires_scopes(self):
"""Checks if the credentials requires scopes.

Returns:
bool: True if there are no scopes set otherwise False.
"""
return False

def _make_sts_request(self, request):
return self._sts_client.refresh_token(request, self._refresh_token)

def with_scopes(self, scopes, default_scopes=None):
raise NotImplementedError("with_scopes is not available for this class")

@classmethod
def from_info(cls, info, **kwargs):
"""Creates a Credentials instance from parsed external account info.

Args:
info (Mapping[str, str]): The external account info in Google
format.
kwargs: Additional arguments to pass to the constructor.

Returns:
google.auth.identity_pool.Credentials: The constructed
credentials.

Raises:
ValueError: For invalid parameters.
"""
return cls(
audience=info.get("audience"),
refresh_token=info.get("refresh_token"),
token_url=info.get("token_url"),
token_info_url=info.get("token_info_url"),
client_id=info.get("client_id"),
client_secret=info.get("client_secret"),
**kwargs
)