Skip to content

Commit

Permalink
enh: allow private data access for S3 file format
Browse files Browse the repository at this point in the history
  • Loading branch information
paulmueller committed Apr 24, 2024
1 parent 7ffe974 commit 501d572
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 40 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
0.58.6
- enh: for access to private S3 data, introduce the environment
variables `DCLAB_S3_ENDPOINT_URL`, `DCLAB_S3_ACCESS_KEY_ID`,
and `DCLAB_S3_SECRET_ACCESS_KEY`
- ref: reorganize the `fmt_s3` module
0.58.5
- fix: CLI methods returned exit status 1 since paths were returned
0.58.4
Expand Down
153 changes: 113 additions & 40 deletions dclab/rtdc_dataset/fmt_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
# import multiprocessing BaseManager here, because there is some kind
# of circular dependency issue with s3transfer.compat and multiprocessing.
from multiprocessing.managers import BaseManager # noqa: F401
import os
import pathlib
import re
import socket
from urllib.parse import urlparse
Expand Down Expand Up @@ -32,14 +34,40 @@
r".+\/" # bucket
r".+" # key
)
REGEXP_S3_BUCKET_KEY = re.compile(r"^[0-9a-z-]+(\/[0-9a-z-]+)+$")

S3_ENDPOINT_URL = os.environ.get("DCLAB_S3_ENDPOINT_URL")
S3_ACCESS_KEY_ID = os.environ.get("DCLAB_S3_ACCESS_KEY_ID")
S3_SECRET_ACCESS_KEY = os.environ.get("DCLAB_S3_SECRET_ACCESS_KEY")


class S3File(HTTPFile):
"""Monkeypatched `HTTPFile` to support authenticated access to S3"""
def __init__(self, url, access_key_id="", secret_access_key="",
use_ssl=True, verify_ssl=True):
# Extract the bucket and object names
s3_endpoint, s3_path = parse_s3_url(url)
def __init__(self,
object_path: str,
endpoint_url: str,
access_key_id: str = "",
secret_access_key: str = "",
use_ssl: bool = True,
verify_ssl: bool = True):
"""
Parameters
----------
object_path: str
bucket/key path to object in the object store
endpoint_url: str
the explicit endpoint URL for accessing the object store
access_key_id:
S3 access key
secret_access_key:
secret S3 key mathcing `access_key_id`
use_ssl: bool
use SSL to connect to the endpoint, only disabled for testing
verify_ssl: bool
make sure the SSL certificate is sound, only used for testing
"""
endpoint_url = endpoint_url.strip().rstrip("/")
self.botocore_session = botocore.session.get_session()
self.s3_session = boto3.Session(
aws_access_key_id=access_key_id,
Expand All @@ -49,7 +77,7 @@ def __init__(self, url, access_key_id="", secret_access_key="",
service_name='s3',
use_ssl=use_ssl,
verify=verify_ssl,
endpoint_url=s3_endpoint,
endpoint_url=endpoint_url,
)
# Use a configuration that allows anonymous access
# https://stackoverflow.com/a/34866092
Expand All @@ -59,18 +87,20 @@ def __init__(self, url, access_key_id="", secret_access_key="",
region_name='us-east-1')
else:
config = None

self.s3_resource = self.s3_session.resource(
service_name="s3",
use_ssl=use_ssl,
verify=verify_ssl,
endpoint_url=s3_endpoint,
endpoint_url=endpoint_url,
config=config)

bucket_name, object_name = s3_path.strip("/").split("/", 1)
bucket_name, object_name = object_path.strip("/").split("/", 1)
self.s3_object = self.s3_resource.Object(
bucket_name=bucket_name,
key=object_name)
super(S3File, self).__init__(url)

super(S3File, self).__init__(f"{endpoint_url}/{object_path}")

def _parse_header(self):
if self._len is None:
Expand All @@ -93,8 +123,9 @@ def download_range(self, start, stop):
class RTDC_S3(RTDC_HDF5):
def __init__(self,
url: str,
secret_id: str = "",
secret_key: str = "",
endpoint_url: str = None,
access_key_id: str = None,
secret_access_key: str = None,
use_ssl: bool = True,
*args, **kwargs):
"""Access RT-DC measurements in an S3-compatible object store
Expand All @@ -105,10 +136,11 @@ def __init__(self,
Parameters
----------
url: str
Full URL to an object in an S3 instance
secret_id: str
URL to an object in an S3 instance; this can be either a full
URL (including the endpoint), or just `bucket/key`
access_key_id: str
S3 access identifier
secret_key: str
secret_access_key: str
Secret S3 access key
use_ssl: bool
Whether to enforce SSL (defaults to True)
Expand All @@ -125,19 +157,28 @@ def __init__(self,
if not BOTO3_AVAILABLE:
raise ModuleNotFoundError(
"Package `boto3` required for S3 format!")
self._s3file = S3File(url,
access_key_id=secret_id,
secret_access_key=secret_key,
use_ssl=use_ssl,
verify_ssl=use_ssl,
)

self._s3file = S3File(
object_path=get_object_path(url),
endpoint_url=(endpoint_url
or get_endpoint_url(url)
or S3_ENDPOINT_URL),
access_key_id=(access_key_id
or S3_ACCESS_KEY_ID
or ""),
secret_access_key=(secret_access_key
or S3_SECRET_ACCESS_KEY
or ""),
use_ssl=use_ssl,
verify_ssl=use_ssl,
)
# Initialize the HDF5 dataset
super(RTDC_S3, self).__init__(
h5path=self._s3file,
*args,
**kwargs)
# Override self.path with the actual S3 URL
self.path = url
self.path = self._s3file.url

def close(self):
super(RTDC_S3, self).close()
Expand Down Expand Up @@ -172,24 +213,25 @@ def is_available(self):


def is_s3_object_available(url: str,
secret_id: str = "",
secret_key: str = "",
access_key_id: str = None,
secret_access_key: str = None,
):
"""Check whether an S3 object is available
Parameters
----------
url: str
full URL to the object
secret_id: str
access_key_id: str
S3 access identifier
secret_key: str
secret_access_key: str
Secret S3 access key
"""
avail = False
if is_s3_url(url):
urlp = urlparse(url)
endpoint_url = get_endpoint_url(url) or S3_ENDPOINT_URL
# default to https if no scheme or port is specified
urlp = urlparse(endpoint_url)
port = urlp.port or (80 if urlp.scheme == "http" else 443)
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(1)
Expand All @@ -202,9 +244,16 @@ def is_s3_object_available(url: str,
pass
else:
# Try to access the object
s3file = S3File(url=url,
access_key_id=secret_id,
secret_access_key=secret_key)
s3file = S3File(
object_path=get_object_path(url),
endpoint_url=endpoint_url,
access_key_id=(access_key_id
or S3_ACCESS_KEY_ID
or ""),
secret_access_key=(secret_access_key
or S3_SECRET_ACCESS_KEY
or ""),
)
try:
s3file.s3_object.load()
except botocore.exceptions.ClientError:
Expand All @@ -214,21 +263,45 @@ def is_s3_object_available(url: str,
return avail


@functools.lru_cache()
def get_endpoint_url(url):
"""Given a URL of an S3 object, return the endpoint URL
Return None if no endpoint URL can be extracted (e.g. because
just `bucket_name/object_path` was passed).
"""
urlp = urlparse(url=url)
if urlp.hostname:
scheme = urlp.scheme or "https"
port = urlp.port or (80 if scheme == "http" else 443)
return f"{scheme}://{urlp.hostname}:{port}"
else:
return None


@functools.lru_cache()
def get_object_path(url):
"""Given a URL of an S3 object, return the `bucket_name/object_path` part
Return object paths always without leading slash `/`.
"""
urlp = urlparse(url=url)
return urlp.path.lstrip("/")


@functools.lru_cache()
def is_s3_url(string):
"""Check whether `string` is a valid S3 URL using regexp"""
if not isinstance(string, str):
return False
elif REGEXP_S3_URL.match(string.strip()):
# this is pretty clear
return True
elif pathlib.Path(string).exists():
# this is actually a file
return False
elif REGEXP_S3_BUCKET_KEY.match(string.strip()):
# bucket_name/key
return True
else:
return REGEXP_S3_URL.match(string.strip())


@functools.lru_cache()
def parse_s3_url(url):
"""Parse S3 `url`, returning `endpoint` URL and `key`"""
urlp = urlparse(url)
scheme = urlp.scheme or "https"
port = urlp.port or (80 if scheme == "http" else 443)
s3_endpoint = f"{scheme}://{urlp.hostname}:{port}"
s3_path = urlp.path
return s3_endpoint, s3_path
return False

0 comments on commit 501d572

Please sign in to comment.