Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[s3] switch to clean_name util #1198

Merged
merged 1 commit into from Dec 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
33 changes: 10 additions & 23 deletions storages/backends/s3boto3.py
Expand Up @@ -23,6 +23,7 @@
from storages.compress import CompressedFileMixin
from storages.compress import CompressStorageMixin
from storages.utils import check_location
from storages.utils import clean_name
from storages.utils import get_available_overwrite_name
from storages.utils import is_seekable
from storages.utils import lookup_env
Expand Down Expand Up @@ -406,20 +407,6 @@ def _get_security_token(self):
security_token = self.security_token or lookup_env(self.security_token_names)
return security_token

def _clean_name(self, name):
"""
Cleans the name so that Windows style paths work
"""
# Normalize Windows style paths
clean_name = posixpath.normpath(name).replace('\\', '/')

# os.path.normpath() can strip trailing slashes so we implement
# a workaround here.
if name.endswith('/') and not clean_name.endswith('/'):
# Add a trailing slash as it was stripped.
clean_name += '/'
return clean_name

def _normalize_name(self, name):
"""
Normalizes the name so that paths like /path/to/ignored/../something.txt
Expand All @@ -432,7 +419,7 @@ def _normalize_name(self, name):
raise SuspiciousOperation("Attempted access to '%s' denied." % name)

def _open(self, name, mode='rb'):
name = self._normalize_name(self._clean_name(name))
name = self._normalize_name(clean_name(name))
try:
f = S3Boto3StorageFile(name, mode, self)
except ClientError as err:
Expand All @@ -442,7 +429,7 @@ def _open(self, name, mode='rb'):
return f

def _save(self, name, content):
cleaned_name = self._clean_name(name)
cleaned_name = clean_name(name)
name = self._normalize_name(cleaned_name)
params = self._get_write_parameters(name, content)

Expand All @@ -459,11 +446,11 @@ def _save(self, name, content):
return cleaned_name

def delete(self, name):
name = self._normalize_name(self._clean_name(name))
name = self._normalize_name(clean_name(name))
self.bucket.Object(name).delete()

def exists(self, name):
name = self._normalize_name(self._clean_name(name))
name = self._normalize_name(clean_name(name))
try:
self.connection.meta.client.head_object(Bucket=self.bucket_name, Key=name)
return True
Expand All @@ -475,7 +462,7 @@ def exists(self, name):
raise

def listdir(self, name):
path = self._normalize_name(self._clean_name(name))
path = self._normalize_name(clean_name(name))
# The path needs to end with a slash, but if the root is empty, leave it.
if path and not path.endswith('/'):
path += '/'
Expand All @@ -494,7 +481,7 @@ def listdir(self, name):
return directories, files

def size(self, name):
name = self._normalize_name(self._clean_name(name))
name = self._normalize_name(clean_name(name))
return self.bucket.Object(name).content_length

def _get_write_parameters(self, name, content=None):
Expand Down Expand Up @@ -531,7 +518,7 @@ def get_modified_time(self, name):
Returns an (aware) datetime object containing the last modified time if
USE_TZ is True, otherwise returns a naive datetime in the local timezone.
"""
name = self._normalize_name(self._clean_name(name))
name = self._normalize_name(clean_name(name))
entry = self.bucket.Object(name)
if setting('USE_TZ'):
# boto3 returns TZ aware timestamps
Expand Down Expand Up @@ -569,7 +556,7 @@ def _strip_signing_parameters(self, url):

def url(self, name, parameters=None, expire=None, http_method=None):
# Preserve the trailing slash after normalizing the path.
name = self._normalize_name(self._clean_name(name))
name = self._normalize_name(clean_name(name))
params = parameters.copy() if parameters else {}
if expire is None:
expire = self.querystring_expire
Expand Down Expand Up @@ -598,7 +585,7 @@ def url(self, name, parameters=None, expire=None, http_method=None):

def get_available_name(self, name, max_length=None):
"""Overwrite existing file with the same name."""
name = self._clean_name(name)
name = clean_name(name)
if self.file_overwrite:
return get_available_overwrite_name(name, max_length)
return super().get_available_name(name, max_length)
Expand Down
28 changes: 0 additions & 28 deletions tests/test_s3boto3.py
Expand Up @@ -30,34 +30,6 @@ def setUp(self):
self.storage = s3boto3.S3Boto3Storage()
self.storage._connections.connection = mock.MagicMock()

def test_clean_name(self):
"""
Test the base case of _clean_name
"""
path = self.storage._clean_name("path/to/somewhere")
self.assertEqual(path, "path/to/somewhere")

def test_clean_name_normalize(self):
"""
Test the normalization of _clean_name
"""
path = self.storage._clean_name("path/to/../somewhere")
self.assertEqual(path, "path/somewhere")

def test_clean_name_trailing_slash(self):
"""
Test the _clean_name when the path has a trailing slash
"""
path = self.storage._clean_name("path/to/somewhere/")
self.assertEqual(path, "path/to/somewhere/")

def test_clean_name_windows(self):
"""
Test the _clean_name when the path has a trailing slash
"""
path = self.storage._clean_name("path\\to\\somewhere")
self.assertEqual(path, "path/to/somewhere")

def test_s3_session(self):
settings.AWS_S3_SESSION_PROFILE = "test_profile"
with mock.patch('boto3.Session') as mock_session:
Expand Down