Skip to content

Commit

Permalink
Add custom storage backend for GCS
Browse files Browse the repository at this point in the history
This is a workaround, to be able to sign blob URLs without a credential
file/private key.

References:

jschneier/django-storages#941
https://googleapis.dev/python/storage/latest/blobs.html#google.cloud.storage.blob.Blob.generate_signed_url
  • Loading branch information
np5 committed Sep 6, 2022
1 parent fae369a commit 129a511
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 1 deletion.
67 changes: 67 additions & 0 deletions zentral/utils/gcs_storage.py
@@ -0,0 +1,67 @@
import threading
import google.auth
from storages.backends.gcloud import _quote, clean_name, GoogleCloudStorage


class ZentralGoogleCloudStorage(GoogleCloudStorage):
"""A subclass to force the use of the IAM signBlob API
This allows the signing of blob URLs without having to use a credential file.
The service account must have the iam.serviceAccounts.signBlob permission."""

def __init__(self, **settings):
super().__init__(**settings)
self._signing_credentials = None
self._signing_credentials_lock = threading.Lock()

def url(self, name):
"""
Return public url or a signed url for the Blob.
This DOES NOT check for existance of Blob - that makes codes too slow
for many use cases.
Overridden to force the use of the IAM signBlob API.
See https://github.com/googleapis/python-storage/blob/519074112775c19742522158f612b467cf590219/google/cloud/storage/_signing.py#L628 # NOQA
"""
name = self._normalize_name(clean_name(name))
blob = self.bucket.blob(name)
blob_params = self.get_object_parameters(name)
no_signed_url = (
blob_params.get('acl', self.default_acl) == 'publicRead' or not self.querystring_auth)

if not self.custom_endpoint and no_signed_url:
return blob.public_url
elif no_signed_url:
return '{storage_base_url}/{quoted_name}'.format(
storage_base_url=self.custom_endpoint,
quoted_name=_quote(name, safe=b"/~"),
)
elif not self.custom_endpoint:
return blob.generate_signed_url(
expiration=self.expiration,
version="v4",
**self._get_signing_kwargs()
)
else:
return blob.generate_signed_url(
bucket_bound_hostname=self.custom_endpoint,
expiration=self.expiration,
version="v4",
**self._get_signing_kwargs()
)

def _get_signing_credentials(self):
with self._signing_credentials_lock:
if self._signing_credentials is None or self._signing_credentials.expired:
credentials, _ = google.auth.default()
auth_req = google.auth.transport.requests.Request()
credentials.refresh(auth_req)
self._signing_credentials = credentials
return self._signing_credentials

def _get_signing_kwargs(self):
credentials = self._get_signing_credentials()
return {
"service_account_email": credentials.service_account_email,
"access_token": credentials.token
}
2 changes: 1 addition & 1 deletion zentral/utils/storage.py
Expand Up @@ -3,4 +3,4 @@

def file_storage_has_signed_urls():
# TODO better detection!
return get_storage_class().__name__ in ('S3Boto3Storage', 'GoogleCloudStorage')
return get_storage_class().__name__ in ('S3Boto3Storage', 'GoogleCloudStorage', 'ZentralGoogleCloudStorage')

0 comments on commit 129a511

Please sign in to comment.