Skip to content

Commit

Permalink
[s3] Fix newline handling for text-mode files
Browse files Browse the repository at this point in the history
  • Loading branch information
jschneier committed Apr 21, 2024
1 parent 0765aa8 commit 0560ab1
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 24 deletions.
6 changes: 3 additions & 3 deletions storages/backends/s3.py
Expand Up @@ -123,7 +123,6 @@ def __init__(self, name, mode, storage, buffer_size=None):
self._storage = storage
self.name = name[len(self._storage.location) :].lstrip("/")
self._mode = mode
self._force_mode = (lambda b: b) if "b" in mode else (lambda b: b.decode())
self.obj = storage.bucket.Object(name)
if "w" not in mode:
# Force early RAII-style exception if object does not exist
Expand Down Expand Up @@ -172,6 +171,7 @@ def _get_file(self):
max_size=self._storage.max_memory_size,
suffix=".S3File",
dir=setting("FILE_UPLOAD_TEMP_DIR"),
mode=self._mode,
)
if "r" in self._mode:
self._is_dirty = False
Expand All @@ -195,12 +195,12 @@ def _set_file(self, value):
def read(self, *args, **kwargs):
if "r" not in self._mode:
raise AttributeError("File was not opened in read mode.")
return self._force_mode(super().read(*args, **kwargs))
return super().read(*args, **kwargs)

def readline(self, *args, **kwargs):
if "r" not in self._mode:
raise AttributeError("File was not opened in read mode.")
return self._force_mode(super().readline(*args, **kwargs))
return super().readline(*args, **kwargs)

def readlines(self):
return list(self)
Expand Down
65 changes: 44 additions & 21 deletions tests/test_s3.py
Expand Up @@ -305,25 +305,6 @@ def test_storage_open_read_string(self):
self.assertEqual(content_str, "")
file.close()

def test_storage_open_readlines(self):
"""
Test readlines with file opened in "r" and "rb" modes
"""
name = "test_open_readlines_string.txt"
with io.BytesIO() as temp_file:
temp_file.write(b"line1\nline2")
file = self.storage.open(name, "r")
file._file = temp_file

content_lines = file.readlines()
self.assertEqual(content_lines, ["line1\n", "line2"])

temp_file.seek(0)
file = self.storage.open(name, "rb")
file._file = temp_file
content_lines = file.readlines()
self.assertEqual(content_lines, [b"line1\n", b"line2"])

def test_storage_open_write(self):
"""
Test opening a file in write mode
Expand All @@ -338,7 +319,7 @@ def test_storage_open_write(self):
"ACL": "public-read",
}

file = self.storage.open(name, "w")
file = self.storage.open(name, "wb")
self.storage.bucket.Object.assert_called_with(name)
obj = self.storage.bucket.Object.return_value
# Set the name of the mock object
Expand Down Expand Up @@ -449,7 +430,7 @@ def test_storage_write_beyond_buffer_size(self):
"StorageClass": "REDUCED_REDUNDANCY",
}

file = self.storage.open(name, "w")
file = self.storage.open(name, "wb")
self.storage.bucket.Object.assert_called_with(name)
obj = self.storage.bucket.Object.return_value
# Set the name of the mock object
Expand Down Expand Up @@ -1145,6 +1126,48 @@ def test_content_type_not_detectable(self):
s3.S3Storage.default_content_type,
)

def test_storage_open_read_with_newlines(self):
"""
Test opening a file in "r" and "rb" mode with various newline characters
"""
name = "test_storage_open_read_with_newlines.txt"
with io.BytesIO() as temp_file:
temp_file.write(b"line1\nline2\r\nmore\rtext\n")
self.storage.save(name, temp_file)
file = self.storage.open(name, "r")
content_str = file.read()
file.close()
self.assertEqual(content_str, "line1\nline2\nmore\ntext\n")

with io.BytesIO() as temp_file:
temp_file.write(b"line1\nline2\r\nmore\rtext\n")
self.storage.save(name, temp_file)
file = self.storage.open(name, "rb")
content_str = file.read()
file.close()
self.assertEqual(content_str, b"line1\nline2\r\nmore\rtext\n")

def test_storage_open_readlines_with_newlines(self):
"""
Test readlines with file opened in "r" and "rb" mode with various newline chars
"""
name = "test_storage_open_readlines_with_newlines.txt"
with io.BytesIO() as temp_file:
temp_file.write(b"line1\nline2\r\nmore\rtext")
self.storage.save(name, temp_file)
file = self.storage.open(name, "r")
content_lines = file.readlines()
file.close()
self.assertEqual(content_lines, ["line1\n", "line2\n", "more\n", "text"])

with io.BytesIO() as temp_file:
temp_file.write(b"line1\nline2\r\nmore\rtext")
self.storage.save(name, temp_file)
file = self.storage.open(name, "rb")
content_lines = file.readlines()
file.close()
self.assertEqual(content_lines, [b"line1\n", b"line2\r\n", b"more\r", b"text"])


class TestBackwardsNames(TestCase):
def test_importing(self):
Expand Down

0 comments on commit 0560ab1

Please sign in to comment.