Skip to content

Commit

Permalink
Implemented readuntil in StreamResponse
Browse files Browse the repository at this point in the history
  • Loading branch information
WisdomPill committed Nov 1, 2020
1 parent 7ebc07a commit 538b3e7
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 10 deletions.
29 changes: 19 additions & 10 deletions aiohttp/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,34 +310,41 @@ async def _wait(self, func_name: str) -> None:
self._waiter = None

async def readline(self) -> bytes:
return await self.readuntil()

async def readuntil(self, separator: bytes=b'\n') -> bytes:
seplen = len(separator)
if seplen == 0:
raise ValueError('Separator should be at least one-byte string')

if self._exception is not None:
raise self._exception

line = []
line_size = 0
chunk = b''
chunk_size = 0
not_enough = True

while not_enough:
while self._buffer and not_enough:
offset = self._buffer_offset
ichar = self._buffer[0].find(b"\n", offset) + 1
# Read from current offset to found b'\n' or to the end.
ichar = self._buffer[0].find(separator, offset) + 1
# Read from current offset to found separator or to the end.
data = self._read_nowait_chunk(ichar - offset if ichar else -1)
line.append(data)
line_size += len(data)
chunk += data
chunk_size += len(data)
if ichar:
not_enough = False

if line_size > self._high_water:
raise ValueError("Line is too long")
if chunk_size > self._high_water:
raise ValueError('Chunk too big')

if self._eof:
break

if not_enough:
await self._wait("readline")
await self._wait('readuntil')

return b"".join(line)
return chunk

async def read(self, n: int = -1) -> bytes:
if self._exception is not None:
Expand Down Expand Up @@ -517,6 +524,8 @@ async def readline(self) -> bytes:
async def read(self, n: int = -1) -> bytes:
return b""

# TODO add async def readuntil

async def readany(self) -> bytes:
return b""

Expand Down
109 changes: 109 additions & 0 deletions tests/test_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,115 @@ async def test_readline_exception(self) -> None:
with pytest.raises(ValueError):
await stream.readline()

async def test_readuntil(self) -> None:
loop = asyncio.get_event_loop()
# Read one chunk. 'readuntil' will need to wait for the data
# to come from 'cb'
stream = self._make_one()
stream.feed_data(b'chunk1 ')
read_task = loop.create_task(stream.readuntil(b'*'))

def cb():
stream.feed_data(b'chunk2 ')
stream.feed_data(b'chunk3 ')
stream.feed_data(b'* chunk4')
loop.call_soon(cb)

line = await read_task
assert b'chunk1 chunk2 chunk3 *' == line

stream.feed_eof()
data = await stream.read()
assert b' chunk4' == data

async def test_readuntil_limit_with_existing_data(self) -> None:
# Read one chunk. The data is in StreamReader's buffer
# before the event loop is run.

stream = self._make_one(limit=2)
stream.feed_data(b'li')
stream.feed_data(b'ne1&line2&')

with pytest.raises(ValueError):
await stream.readuntil(b'&')
# The buffer should contain the remaining data after exception
stream.feed_eof()
data = await stream.read()
assert b'line2&' == data

async def test_readuntil_limit(self) -> None:
loop = asyncio.get_event_loop()
# Read one chunk. StreamReaders are fed with data after
# their 'readuntil' methods are called.
stream = self._make_one(limit=4)

def cb():
stream.feed_data(b'chunk1')
stream.feed_data(b'chunk2$')
stream.feed_data(b'chunk3#')
stream.feed_eof()
loop.call_soon(cb)

with pytest.raises(ValueError):
await stream.readuntil(b'$')
data = await stream.read()
assert b'chunk3#' == data

async def test_readuntil_nolimit_nowait(self) -> None:
# All needed data for the first 'readuntil' call will be
# in the buffer.
stream = self._make_one()
data = b'line1!line2!line3!'
stream.feed_data(data[:6])
stream.feed_data(data[6:])

line = await stream.readuntil(b'!')
assert b'line1!' == line

stream.feed_eof()
data = await stream.read()
assert b'line2!line3!' == data

async def test_readuntil_eof(self) -> None:
stream = self._make_one()
stream.feed_data(b'some data')
stream.feed_eof()

line = await stream.readuntil(b'@')
assert b'some data' == line

async def test_readuntil_empty_eof(self) -> None:
stream = self._make_one()
stream.feed_eof()

line = await stream.readuntil(b'@')
assert b'' == line

async def test_readuntil_read_byte_count(self) -> None:
stream = self._make_one()
data = b'line1!line2!line3!'
stream.feed_data(data)

await stream.readuntil(b'!')

data = await stream.read(7)
assert b'line2!l' == data

stream.feed_eof()
data = await stream.read()
assert b'ine3!' == data

async def test_readuntil_exception(self) -> None:
stream = self._make_one()
stream.feed_data(b'line#')

data = await stream.readuntil(b'#')
assert b'line#' == data

stream.set_exception(ValueError())
with pytest.raises(ValueError):
await stream.readuntil(b'#')

async def test_readexactly_zero_or_less(self) -> None:
# Read exact number of bytes (zero or less).
stream = self._make_one()
Expand Down

0 comments on commit 538b3e7

Please sign in to comment.