From e941d79f83f4cef0efafc7fe956d29be1a9901a7 Mon Sep 17 00:00:00 2001 From: Tom Christie Date: Mon, 22 Nov 2021 09:59:41 +0000 Subject: [PATCH 1/3] Cap upload chunk sizes --- httpx/_content.py | 6 ++++-- httpx/_multipart.py | 4 +++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/httpx/_content.py b/httpx/_content.py index d7e8aa0974..6e9843901f 100644 --- a/httpx/_content.py +++ b/httpx/_content.py @@ -49,7 +49,8 @@ def __iter__(self) -> Iterator[bytes]: self._is_stream_consumed = True for part in self._stream: - yield part + for idx in range(0, len(part), 65_536): + yield part[idx : idx + 65_536] class AsyncIteratorByteStream(AsyncByteStream): @@ -64,7 +65,8 @@ async def __aiter__(self) -> AsyncIterator[bytes]: self._is_stream_consumed = True async for part in self._stream: - yield part + for idx in range(0, len(part), 65_536): + yield part[idx : idx + 65_536] class UnattachedStream(AsyncByteStream, SyncByteStream): diff --git a/httpx/_multipart.py b/httpx/_multipart.py index 4dfb838a68..e2b48cb797 100644 --- a/httpx/_multipart.py +++ b/httpx/_multipart.py @@ -143,7 +143,9 @@ def render_data(self) -> typing.Iterator[bytes]: self._consumed = True for chunk in self.file: - yield to_bytes(chunk) + chunk = to_bytes(chunk) + for idx in range(0, len(chunk), 65_536): + yield chunk[idx : idx + 65_536] def render(self) -> typing.Iterator[bytes]: yield self.render_headers() From f7863e2848f799ecfdd1a6a1ed3d9795b84703fc Mon Sep 17 00:00:00 2001 From: Tom Christie Date: Mon, 22 Nov 2021 12:12:24 +0000 Subject: [PATCH 2/3] Use '.read' for file streaming, where possible --- httpx/_content.py | 32 ++++++++++++++++++++++++++------ httpx/_multipart.py | 10 ++++++---- tests/test_content.py | 25 +++++++++++++++++++++++++ 3 files changed, 57 insertions(+), 10 deletions(-) diff --git a/httpx/_content.py b/httpx/_content.py index 6e9843901f..98d1cf0fe3 100644 --- a/httpx/_content.py +++ b/httpx/_content.py @@ -38,6 +38,8 @@ async def __aiter__(self) -> AsyncIterator[bytes]: class IteratorByteStream(SyncByteStream): + CHUNK_SIZE = 65_536 + def __init__(self, stream: Iterable[bytes]): self._stream = stream self._is_stream_consumed = False @@ -48,12 +50,22 @@ def __iter__(self) -> Iterator[bytes]: raise StreamConsumed() self._is_stream_consumed = True - for part in self._stream: - for idx in range(0, len(part), 65_536): - yield part[idx : idx + 65_536] + if hasattr(self._stream, "read"): + # File-like interfaces should use 'read' directly. + chunk = self._stream.read(self.CHUNK_SIZE) # type: ignore + while chunk: + yield chunk + chunk = self._stream.read(self.CHUNK_SIZE) # type: ignore + else: + # Otherwise iterate, but enforce a maximum chunk size. + for part in self._stream: + for idx in range(0, len(part), self.CHUNK_SIZE): + yield part[idx : idx + self.CHUNK_SIZE] class AsyncIteratorByteStream(AsyncByteStream): + CHUNK_SIZE = 65_536 + def __init__(self, stream: AsyncIterable[bytes]): self._stream = stream self._is_stream_consumed = False @@ -64,9 +76,17 @@ async def __aiter__(self) -> AsyncIterator[bytes]: raise StreamConsumed() self._is_stream_consumed = True - async for part in self._stream: - for idx in range(0, len(part), 65_536): - yield part[idx : idx + 65_536] + if hasattr(self._stream, "aread"): + # File-like interfaces should use 'aread' directly. + chunk = await self._stream.aread(self.CHUNK_SIZE) # type: ignore + while chunk: + yield chunk + chunk = await self._stream.aread(self.CHUNK_SIZE) # type: ignore + else: + # Otherwise iterate, but enforce a maximum chunk size. + async for part in self._stream: + for idx in range(0, len(part), self.CHUNK_SIZE): + yield part[idx : idx + self.CHUNK_SIZE] class UnattachedStream(AsyncByteStream, SyncByteStream): diff --git a/httpx/_multipart.py b/httpx/_multipart.py index e2b48cb797..f05f02b4c5 100644 --- a/httpx/_multipart.py +++ b/httpx/_multipart.py @@ -71,6 +71,8 @@ class FileField: A single file field item, within a multipart form field. """ + CHUNK_SIZE = 64 * 1024 + def __init__(self, name: str, value: FileTypes) -> None: self.name = name @@ -142,10 +144,10 @@ def render_data(self) -> typing.Iterator[bytes]: self.file.seek(0) self._consumed = True - for chunk in self.file: - chunk = to_bytes(chunk) - for idx in range(0, len(chunk), 65_536): - yield chunk[idx : idx + 65_536] + chunk = self.file.read(self.CHUNK_SIZE) + while chunk: + yield to_bytes(chunk) + chunk = self.file.read(self.CHUNK_SIZE) def render(self) -> typing.Iterator[bytes]: yield self.render_headers() diff --git a/tests/test_content.py b/tests/test_content.py index 2a273a133f..afd910399e 100644 --- a/tests/test_content.py +++ b/tests/test_content.py @@ -60,6 +60,31 @@ async def test_bytesio_content(): assert content == b"Hello, world!" +@pytest.mark.asyncio +async def test_async_bytesio_content(): + class AsyncBytesIO: + def __init__(self, content): + self._idx = 0 + self._content = content + + async def aread(self, chunk_size: int): + chunk = self._content[self._idx : self._idx + chunk_size] + self._idx = self._idx + chunk_size + return chunk + + async def __aiter__(self): + yield self._content # pragma: nocover + + headers, stream = encode_request(content=AsyncBytesIO(b"Hello, world!")) + assert not isinstance(stream, typing.Iterable) + assert isinstance(stream, typing.AsyncIterable) + + content = b"".join([part async for part in stream]) + + assert headers == {"Transfer-Encoding": "chunked"} + assert content == b"Hello, world!" + + @pytest.mark.asyncio async def test_iterator_content(): def hello_world(): From 71aa1cd7c84ed80cc90876a443c9b868d3862683 Mon Sep 17 00:00:00 2001 From: Tom Christie Date: Mon, 22 Nov 2021 12:17:29 +0000 Subject: [PATCH 3/3] Direct iteration should not apply chunk sizes --- httpx/_content.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/httpx/_content.py b/httpx/_content.py index 98d1cf0fe3..9032c1c056 100644 --- a/httpx/_content.py +++ b/httpx/_content.py @@ -57,10 +57,9 @@ def __iter__(self) -> Iterator[bytes]: yield chunk chunk = self._stream.read(self.CHUNK_SIZE) # type: ignore else: - # Otherwise iterate, but enforce a maximum chunk size. + # Otherwise iterate. for part in self._stream: - for idx in range(0, len(part), self.CHUNK_SIZE): - yield part[idx : idx + self.CHUNK_SIZE] + yield part class AsyncIteratorByteStream(AsyncByteStream): @@ -83,10 +82,9 @@ async def __aiter__(self) -> AsyncIterator[bytes]: yield chunk chunk = await self._stream.aread(self.CHUNK_SIZE) # type: ignore else: - # Otherwise iterate, but enforce a maximum chunk size. + # Otherwise iterate. async for part in self._stream: - for idx in range(0, len(part), self.CHUNK_SIZE): - yield part[idx : idx + self.CHUNK_SIZE] + yield part class UnattachedStream(AsyncByteStream, SyncByteStream):