Skip to content

Commit

Permalink
fix: Fix BlobReader handling of interleaved reads and seeks (#721)
Browse files Browse the repository at this point in the history
* tests (fileio): add tarfile based test case for reader seek

Background info: #462

* fix: Patch blob reader to return correct seek/tell values (#462)

Co-authored-by: Andrew Gorcester <gorcester@google.com>
  • Loading branch information
aabbccddeeffgghhii1438 and andrewsg committed Mar 11, 2022
1 parent e0b3b35 commit 5d1cfd2
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 16 deletions.
38 changes: 22 additions & 16 deletions google/cloud/storage/fileio.py
Expand Up @@ -123,9 +123,12 @@ def read(self, size=-1):
# If the read request demands more bytes than are buffered, fetch more.
remaining_size = size - len(result)
if remaining_size > 0 or size < 0:
self._pos += self._buffer.tell()
read_size = len(result)

self._buffer.seek(0)
self._buffer.truncate(0) # Clear the buffer to make way for new data.
fetch_start = self._pos + len(result)
fetch_start = self._pos
if size > 0:
# Fetch the larger of self._chunk_size or the remaining_size.
fetch_end = fetch_start + max(remaining_size, self._chunk_size)
Expand Down Expand Up @@ -154,9 +157,8 @@ def read(self, size=-1):
self._buffer.write(result[size:])
self._buffer.seek(0)
result = result[:size]

self._pos += len(result)

# Increment relative offset by true amount read.
self._pos += len(result) - read_size
return result

def read1(self, size=-1):
Expand All @@ -174,29 +176,33 @@ def seek(self, pos, whence=0):
if self._blob.size is None:
self._blob.reload(**self._download_kwargs)

initial_pos = self._pos
initial_offset = self._pos + self._buffer.tell()

if whence == 0:
self._pos = pos
target_pos = pos
elif whence == 1:
self._pos += pos
target_pos = initial_offset + pos
elif whence == 2:
self._pos = self._blob.size + pos
target_pos = self._blob.size + pos
if whence not in {0, 1, 2}:
raise ValueError("invalid whence value")

if self._pos > self._blob.size:
self._pos = self._blob.size
if target_pos > self._blob.size:
target_pos = self._blob.size

# Seek or invalidate buffer as needed.
difference = self._pos - initial_pos
new_buffer_pos = self._buffer.seek(difference, 1)
if new_buffer_pos != difference: # Buffer does not contain new pos.
# Invalidate buffer.
if target_pos < self._pos:
# Target position < relative offset <= true offset.
# As data is not in buffer, invalidate buffer.
self._buffer.seek(0)
self._buffer.truncate(0)

return self._pos
new_pos = target_pos
self._pos = target_pos
else:
# relative offset <= target position <= size of file.
difference = target_pos - initial_offset
new_pos = self._pos + self._buffer.seek(difference, 1)
return new_pos

def close(self):
self._buffer.close()
Expand Down
35 changes: 35 additions & 0 deletions tests/unit/test_fileio.py
Expand Up @@ -247,6 +247,41 @@ def initialize_size(**_):

reader.close()

def test_advanced_seek(self):
blob = mock.Mock()

def read_from_fake_data(start=0, end=None, **_):
return TEST_BINARY_DATA[start:end] * 1024

blob.download_as_bytes = mock.Mock(side_effect=read_from_fake_data)
blob.size = None
download_kwargs = {"if_metageneration_match": 1}
reader = self._make_blob_reader(blob, chunk_size=1024, **download_kwargs)

# Seek needs the blob size to work and should call reload() if the size
# is not known. Set a mock to initialize the size if reload() is called.
def initialize_size(**_):
blob.size = len(TEST_BINARY_DATA) * 1024

blob.reload = mock.Mock(side_effect=initialize_size)

self.assertEqual(reader.tell(), 0)
# Mimic tarfile access pattern. Read tarinfo block.
reader.read(512)
self.assertEqual(reader.tell(), 512)
self.assertEqual(reader.seek(512), 512)
# Mimic read actual tar content.
reader.read(400)
self.assertEqual(reader.tell(), 912)
# Tarfile offsets are rounded up by block size
# A sanity seek/read is used to check for unexpected ends.
reader.seek(1023)
reader.read(1)
self.assertEqual(reader.tell(), 1024)
reader.read(512)
self.assertEqual(reader.tell(), 1536)
reader.close()

def test_close(self):
blob = mock.Mock()
reader = self._make_blob_reader(blob)
Expand Down

0 comments on commit 5d1cfd2

Please sign in to comment.