Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide some standard mechanism for splitting a stream into lines, and other basic protocol tasks #796

Open
njsmith opened this issue Dec 3, 2018 · 27 comments

Comments

@njsmith
Copy link
Member

njsmith commented Dec 3, 2018

Most networking libraries provide some standard way to implement basic protocol building blocks like "split a stream into lines", "read exactly N bytes", or "split a stream into length-prefixed frames", e.g.:

We don't have anything like this currently, as I was reminded by this StackOverflow question from @basak.

Note: if you're just looking for a quick way to read lines from a trio Stream, then click on that SO link, it has an example.

Use cases

  • Simple protocols used in tutorials, to make it easy for beginners to get something working
  • A base for implementing substantial/standard protocols that happen to use one of these framing methods. This mostly applies to the line-based framing, e.g. twisted's LineReceiver and LineOnlyReceiver have subclasses implementing HTTP, IMAP, POP3, SMTP, Ident, Finger, FTP, Memcache, IRC, ... you get the idea.
  • Inventing little private mini-protocols where you don't want to have to build basic framing from scratch. I think a lot of cases that used to use this kind of thing nowadays use HTTP or WebSocket or ZeroMQ, but it still comes up occasionally. This mostly involves the length-prefixed framing variants (e.g. twisted AMP subclasses Int16Receiver), though sometimes it involves lines, e.g. newline-terminated JSON, or the log parser in linehaul.
  • If you have to script an interactive subprocess that was never designed to be scripted, then readline and read_until are pretty useful. This particular case can also benefit from more sophisticated tools, like TTY emulation and pexpect-style pattern matching.

Considerations

Our approach shouldn't involve adding new methods to Stream, because the point of the Stream interface is to allow for lots of different implementions, and we don't want to force everyone who implements Stream to have to reimplement their own version of the standard frame-splitting algorithms. So this should be some helper function that acts on a Stream, or wrapper class that has-a Stream, something like that.

For "real" protocols like HTTP, you definitely can implement them on top of explicit (async) blocking I/O operations like readline and read_exactly, but these days I'm pretty convinced that you will be happier using Sans I/O. Some of the arguments for sans-io design are kind of pure and theoretical, like "better modularity" and "higher reusability", but having done this twice now (with h11 and wsproto), I really don't feel like it's an eat-your-vegetables thing – the benefits are super practical: like, you can actually understand your protocol code, and test it, and people with totally different use cases show up to fix bugs for you. It's just a more pleasant way to do things.

OTOH, while trio is generally kind of opinionated and we should give confused users helpful nudges in the best direction we can, we don't want to be elitist. If someone's used to hacking together simple protocols using readline, and is comfortable doing that, we don't want to put up barriers to their using trio. And if the sans-i/O approach is harder to get started with, then for some people that will legitimately outweigh the long-term benefits.

There might be one way to have our cake and eat it to: if we can make the sans-I/O version so simple and easy to get started with that even beginners and folks used to readline don't find it a barrier. If we can pull this off, it'd be pretty sweet, because then we can teach the better approach from the beginning, and when they move on to implementing more complex protocols, or integrated existing libraries like h11/h2/wsproto, they're already prepared to do it right.

Alternatively, if we can't... there is really not a lot of harm in having a lines_from_stream generator, or whatever. But anything more than that is going to require exposing some kind of buffering to the user, which is the core of the sans-I/O pattern, so let's think about sans-I/O for a bit.

Can we make sans-I/O accessible and easy?

The core parts of implementing a high-quality streaming line reader, a streaming length-prefixed string reader, or an HTTP parser, are actually all kind of the same:

  • You need a buffer
  • It needs an efficient append-to-the-end operation
  • It needs an efficient extract-from-the-beginning operation
  • You need to be able to scan the buffer for a delimiter, with some cleverness to track how far you've scanned to avoid O(n^2) rescans after new data is added
  • And some kind of maximum buffer size to avoid memory DoS

h11 internally has a robust implementation of everything here except for specifying delimiters as a regex, and I need to add that anyway to fix python-hyper/h11#7. So I have a plan already to pull that out into a standalone library.

And the API to a sans-I/O line reader, length-prefixed string reader, HTTP parser, or websocket parser for that matter, are also all kind of the same: you wrap them around a Stream, and then call a receive method which tries to pull some "event" out of the internal buffer, while refiling the buffer as necessary.

In fact, if you had sans-I/O versions of any of these, that all followed the same interface conventions, you could even have a single generic wrapper that binds them to a Trio stream, and implements the ReceiveChannel interface! Where the objects being received are lines, or h11.Event objects, or whatever.

So if you really just wanted a way to receive and send lines on a Stream, that might be:

line_channel: trio.abc.Channel[bytes] = sansio_toolbelt.to_trio(sansio_toolbelt.LineProtocol(delimiter=b"\r\n", max_line_length=16384), my_stream)

await line_channel.send(b"hello")
response = await line_channel.receive()

That's maybe a little bit more complicated than I'd want to use in a tutorial, but it's pretty close? Maybe we can slim it down a little more?

This approach is also flexible enough to handle more complex cases, like protocols that switch between lines-oriented and bulk data (HTTP), or that enable TLS half-way through (SMTP's STARTTLS command), which in Twisted's LineReceiver requires some special hooks. You can detach the sans-I/O wrapper from the underlying stream and then wrap it again in a different protocol, so long as you have some way to hand-off the buffer between them.

But while it is flexible enough for that, and that approach is very elegant for Serious Robust Protocol implementations, it might be a lot to ask when someone really just wants to call readline twice and then read N bytes, or something like that. So maybe we'd also want something that wraps a ReceiveStream and provides read_line, read_exactly, read_until, based on the same buffering code described above but without the fancy sans-I/O event layer in between?

@oremanj
Copy link
Member

oremanj commented Dec 4, 2018

I literally just wrote one of these yesterday, with the interface

@attr.s(auto_attribs=True)
class BufferedReceiveStream(trio.abc.AsyncResource):
    transport_stream: trio.abc.ReceiveStream
    chunk_size: int = 4096
    async def aclose(self) -> None: ...
    async def receive(self, num_bytes: int) -> bytes: ...
    async def receive_all_or_none(self, num_bytes: int) -> Optional[bytes]: ...
    async def receive_exactly(self, num_bytes: int) -> bytes: ...

class TextReceiveStream(trio.abc.AsyncResource):
    transport_stream: trio.abc.ReceiveStream
    encoding: str
    errors: Optional[str]
    newlines: Union[str, Tuple[str, ...], None]
    chunk_size: int
    def __init__(
        self,
        transport_stream: trio.abc.ReceiveStream
        encoding: Optional[str] = None,
        *,
        errors: Optional[str] = None,
        newline: Optional[str] = "",
        chunk_size: int = 8192
    ): ...
    async def aclose(self) -> None: ...
    async def __aiter__(self) -> AsyncIterator[str]: ...
    async def receive_line(self, max_chars: int = -1) -> str: ...

I haven't tested it yet and I'm not totally sure about the interface, but I'll post code once I'm more convinced it works if people think either of these interfaces would be useful. The three receive methods in BufferedReceiveStream only differ in how they handle EOF; I wrote it to help with "you have a length and then that many bytes of data" type binary protocols.

@njsmith
Copy link
Member Author

njsmith commented Dec 5, 2018

This may go without saying, but just in case I'll say it anyway :-): I'm being a bit cautious in the issue because if we add something directly to trio itself, then we want to make sure it's the right thing we can support for a long time. But for helpers and utilities and stuff that's outside trio, that doesn't apply, so everyone should at the least feel free to implement what they find useful, share it, whatever.

In fact it can only be useful to see examples of what people come up with...

@njsmith
Copy link
Member Author

njsmith commented Dec 5, 2018

Speaking of which, @oremanj, what's your use case? (If you can share.)

@pawarren
Copy link

Any movement on this? I'm looking for an equivalent for StreamReader.readuntil so I can continuously listen for special termination chars in websocket messages.

@njsmith
Copy link
Member Author

njsmith commented Mar 11, 2019

@pawarren I think this is up to date at the moment. I'm a bit confused about how readuntil and websockets fit together, though – usually websocket messages are small and you read them in as single bytes or str objects? Are you using trio-websocket, or something else?

@pawarren
Copy link

I'm currently using asyncio.open_connection() and am referring to https://docs.python.org/3/library/asyncio-stream.html#asyncio.StreamReader.readuntil

I connect to 58 web sockets, keep the connections permanently open, and for each web socket continuously loop over the following steps asynchronously:

  1. send one of three different types of messages asking for something
  2. listen for one of four different types of responses (one per request type plus a heartbeat)
  3. do something with response

The responses themselves are typically small XML messages or images.

It seems like there's a default EOF for most websockets, which I imagine is why libraries like websockets can do things like "for message in socket". But these message generally have a separate end-of-message separator, b'\r\n\r\n'., and normal practice for dealing with that seems to be maintaining a buffer while repeatedly reading chunks of size ~4096 bytes while looking for the separator. reader.readuntil(b'\r\n\r\n') was a nice way of avoiding that.

I'm new to async + websockets, so I might be missing an obvious solution or best practice.

I hadn't seen trio-websockets before; thank you for the reference.

@njsmith
Copy link
Member Author

njsmith commented Mar 11, 2019

Ah, OK, I think you mean regular TCP sockets :-). "Web socket" is a specific somewhat complicated protocol that you can use on top of regular sockets. For some reason the people designing WebSocket decided to give it a really confusing name.

It sounds like you have your own simple ad-hoc protocol on top of sockets. You might say you're transmitting a series of "frames", and each frame is terminated by \r\n\r\n. Which is, indeed, exactly the sort of thing that this issue is about :-).

While you're waiting for us to get our ducks in a row and provide a more comprehensive solution, here's some code you can use:

import trio

_RECEIVE_SIZE = 4096  # pretty arbitrary

class TerminatedFrameReceiver:
    """Parse frames out of a Trio stream, where each frame is terminated by a
    fixed byte sequence.

    For example, you can parse newline-terminated lines by setting the
    terminator to b"\n".

    This uses some tricks to protect against denial of service attacks:

    - It puts a limit on the maximum frame size, to avoid memory overflow; you
    might want to adjust the limit for your situation.

    - It uses some algorithmic trickiness to avoid "slow loris" attacks. All
      algorithms are amortized O(n) in the length of the input.

    """
    def __init__(self, stream, terminator, max_frame_length=16384):
        self.stream = stream
        self.terminator = terminator
        self.max_frame_length = max_frame_length
        self._buf = bytearray()
        self._next_find_idx = 0

    async def receive(self):
        while True:
            terminator_idx = self._buf.find(
                self.terminator, self._next_find_idx
            )
            if terminator_idx < 0:
                # no terminator found
                if len(self._buf) > self.max_frame_length:
                    raise ValueError("frame too long")
                # next time, start the search where this one left off
                self._next_find_idx = max(0, len(self._buf) - len(self.terminator) + 1)
                # add some more data, then loop around
                more_data = await self.stream.receive_some(_RECEIVE_SIZE)
                if more_data == b"":
                    if self._buf:
                        raise ValueError("incomplete frame")
                    raise trio.EndOfChannel
                self._buf += more_data
            else:
                # terminator found in buf, so extract the frame
                frame = self._buf[:terminator_idx]
                # Update the buffer in place, to take advantage of bytearray's
                # optimized delete-from-beginning feature.
                del self._buf[:terminator_idx+len(self.terminator)]
                # next time, start the search from the beginning
                self._next_find_idx = 0
                return frame

    def __aiter__(self):
        return self

    async def __anext__(self):
        try:
            return await self.receive()
        except trio.EndOfChannel:
            raise StopAsyncIteration


################################################################
# Example
################################################################

from trio.testing import memory_stream_pair
async def main():
    sender_stream, receiver_stream = memory_stream_pair()

    async def sender():
        await sender_stream.send_all(b"hello\r\n\r\n")
        await trio.sleep(1)
        await sender_stream.send_all(b"split-up ")
        await trio.sleep(1)
        await sender_stream.send_all(b"message\r\n\r")
        await trio.sleep(1)
        await sender_stream.send_all(b"\n")
        await trio.sleep(1)
        await sender_stream.send_all(b"goodbye\r\n\r\n")
        await trio.sleep(1)
        await sender_stream.aclose()

    async def receiver():
        chan = TerminatedFrameReceiver(receiver_stream, b"\r\n\r\n")
        async for message in chan:
            print(f"Got message: {message!r}")

    async with trio.open_nursery() as nursery:
        nursery.start_soon(sender)
        nursery.start_soon(receiver)

trio.run(main)

@pawarren
Copy link

pawarren commented Mar 11, 2019 via email

@njsmith
Copy link
Member Author

njsmith commented Mar 11, 2019

What do you mean by "It uses some algorithmic trickiness to avoid "slow loris" attacks."? What's the trickiness?

There are two mistakes people often make, that cause this kind of code to have O(n^2) complexity:

  • They use a regular bytes object as the buffer, or instead of using del to remove stuff from the front, they copy the back (buf = buf[new_start:]). This is especially common on Python 2, because it doesn't have the optimized bytearray del method built in, so you have to implement it by hand. But even on py3, a lot of people don't realize how important it is to write this operation in exactly the right way.

  • after a failed search, they restart the next search from the beginning, instead of tracking how much of the buffer they already searched and only searching the new part

If you make either of these mistakes, then it's easy for a peer that's malicious, or just misbehaving, to burn up a ton of your CPU time doing repeated work. For example, say someone sends you a 2,000 byte frame, but they do it one byte at a time. Now a naive implementation ends up scanning through ~1,000,000 bytes looking for the \r\n\r\n, because it scans the first part of the buffer over and over and over.

It's surprisingly subtle!

@JefffHofffman
Copy link

Here's my hack on TerminatedFrameReceiver that extends the sans I/O idea, possible too far. It's similar to the sort/cmp split. Just as single definition of cmp allows lots of sort implementations, a single read & write allows interchangeable frame methods. My trio implementation leaves MUCH to be desired. It's more a POC that shows how a definition like receive_some() is sufficient for framing variations without pestering the runner (nursery) too much. Comments welcomed.

import trio
import attr

@attr.s
class Framer:
    '''
    a mash-up of an accumulator with a parser for frames with a terminator
    does no I/O, no runner dependecies, only lazy evaluation via async calls
    '''
    MAX_FRAME_SIZE = 16384
    terminator = attr.ib()
    more = attr.ib()  # async f() returns next chunk for buf
    done = attr.ib()  # async f(buf) put as frame, return ignored
    _buf = attr.ib(factory=bytearray)
    _next_find_idx = attr.ib(init=False, default=0)
    
    async def deframe(self):
        while True:
            terminator_idx = self._buf.find(
                self.terminator, self._next_find_idx
            )
            if terminator_idx < 0:
                more_data = await self.more()
                # paranoid that a goof in more() may return too much 
                if len(self._buf) + len(more_data) > self.MAX_FRAME_SIZE:
                    raise ValueError("frame too long")
                if more_data == b"" and self._buf:
                    raise ValueError("incomplete frame")
                self._next_find_idx = max(0, len(self._buf) - len(self.terminator) + 1)
                self._buf += more_data
            else:
                frame = self._buf[:terminator_idx]
                del self._buf[:terminator_idx+len(self.terminator)]
                self._next_find_idx = 0
                await self.done(frame)

    # be nice to have this here, but need another done() dest
    # ... and another more() source to use as a pump
    async def enframe(self, msg):
        return msg + self.terminator

################################################################
# Example
################################################################
_RECEIVE_SIZE = 4096  # pretty arbitrary

from trio.testing import memory_stream_pair
async def main():    
    # so sender.aclose() ripples to receiver
    wire = trio.StapledStream(*memory_stream_pair())
    
    async def sender():
        await wire.send_all(b"split-up ")
        await trio.sleep(1)
        await wire.send_all(b"message\r\n\r")
        await trio.sleep(1)
        await wire.send_all(b"\n")
        await trio.sleep(1)
        await wire.aclose()
        
    async def trio_more():
        return await wire.receive_some(_RECEIVE_SIZE)
        
    async def trio_done(msg):
        print((f"Got message: {msg}"))
        
    pump = Framer(b"\r\n\r\n", trio_more, trio_done)

    try:
        async with trio.open_nursery() as nursery:
            nursery.start_soon(sender)
            nursery.start_soon(pump.deframe)
    except trio.ClosedResourceError:
        print ("done")
    except ValueError as e:
        print (f'Unrecoverable error: {e}')

trio.run(main)

@smurfix
Copy link
Contributor

smurfix commented Mar 13, 2019

The trio-ish way would not use a done callback. Instead, the deframe method would be an async iterator, thus it would say yield frame instead of self.done(frame), and the caller would do

async for msg in pump.deframe():
    print(f"Got message: {msg}")

thus no need to start pump.deframe as a separate task.

@JefffHofffman
Copy link

I think I missed a crucial point about sans I/O; it's synchronous. It needs to have no dependence on trio.
Here's another attempt at a utility for creating basic protocols. I've omitted the klunky implementation for deframe (closures and yield from spaghetti). I'd first like to get some feedback on this kind of approach before addressing the implementation.

It seems the buffer mgmt in wsproto, hyper-h2, et. al. is just different enough to require specialized glue code for each to use in trio. A 'standard mechanism' of how to supply bytes and get out objects for simple protocols might also help uniform adoption of these heftier protocols.

from deframe import deframe
import trio
from trio.testing import memory_stream_pair

_RECEIVE_SIZE = 4096  # pretty arbitrary

def netstring_proto(rd, max_msg=9999, **kw_decode):
    ''' syncronous generator using rd() from deframe() for buffer mgmt '''
    max_hdr = len(str(max_msg)) + 1
    while True:
        size = yield from rd(b':', stop=max_hdr)
        body = yield from rd(b',', exact=int(size))
        yield body.decode(**kw_decode)

async def main():    
    wire = trio.StapledStream(*memory_stream_pair())
    
    async def sender():
        await wire.send_all(b"4:hiya,")
        await wire.send_all(b"12:two ")
        await wire.send_all(b"split-")
        await wire.send_all(b"up,9: messages,")
        await wire.send_eof()

    try:        
        async with trio.open_nursery() as nursery:
            nursery.start_soon(sender)
            make_msgs = deframe(netstring_proto)
            while True:
                data = await wire.receive_some(_RECEIVE_SIZE)
                if not data:
                    raise trio.ClosedResourceError
                # loop though a generator of msgs, stops when no more msgs in buffer
                for msg in make_msgs(data):                    
                    print(f"Got message: {msg.upper()}")
                    
    except trio.ClosedResourceError:
        print ("done")
        
if __name__ == '__main__':
    trio.run(main)


@njsmith
Copy link
Member Author

njsmith commented Jun 11, 2019

The rust tokio crate actually has a fully-worked-out version of exactly the ideas we're talking about here:

https://docs.rs/tokio/0.1.21/tokio/codec/index.html

@njsmith
Copy link
Member Author

njsmith commented Jun 23, 2019

I'm having a crisis of faith about sans-io. Well, at least a partial crisis.

Here's what happened:

I was looking again at my nascent library for writing sans-io protocols. Let's call it byoio just to have some name to refer to it. (reference) This goal is to make it such a good way to write protocols, that no-one is tempted to write protocols directly against Trio's own APIs.

The core technical foundation is a receive buffer with O(n) algorithms for basic operations like consuming data and searching for delimiters. But on top of that foundation, we want to be able to write clean, readable, composable protocol code. For example, if we're defining a sans-io implementation of sending/receiving single lines, then (a) we want our implementation to be easy to read and reason about, (b) we want to make it easy to use it as a foundation for building new sans-io libraries on top, for line-oriented protocols like SMTP or FTP.

The basic pattern in defining a sans-io parser is that you have a buffer, and you have some kind of "read more from the buffer" operation that either returns an event description or says that it needs more data (e.g. by returning h11.NEED_DATA), in which case the user has to feed in more data and then retry. This leads to pretty awkward code, because you might get half-way through some complicated parsing job and realize you've run out of data and have to unwind – BUT you have to store enough information about where you were when you ran out of data that you can resume in O(1) time, without re-processing all the early data, because if each attempt is O(n) and you make O(n) attempts then now you're O(n^2) and trivially DoSable.

The best trick I've seen for making this kind of code readable is to write the parser code itself as a generator, with a little wrapper to convert between the sans-io conventions and the generator conventions: see for example this version of wsproto's parser, which uses the convention that when you run out of data you yield, the wrapper converts that into NEED_DATA, and then after more data arrives the wrapper resumes the generator. Lomond uses a similar trick. Ohneio is a prototype for a more general sans-io parsing framework with similar ideas.

The main downside of this approach is that it's very convenient to have yield from so you can factor out reusable logic, and Python 2 has no yield from. That's why we ended up rewriting wsproto's parser. There are ways to hack around this (lomond has some clever tricks here), but it's definitely more awkward. But, I realized, for my sans-io library... well, that's just not very important. h11 might need to keep py2 support for a while yet but no-one is starting new protocol libraries with py2 support. In my original vision, one of the design goals for the new framework was to switch h11 to using it, but it's actually not that important... h11 works fine as it is. And if we really need py2 support, then we can invent a byoio.py2compat submodule that uses the awkward hacks to implement the same API, and keep yield from as the main API, and eventually deprecate py2compat and tell everyone to move to the good py3 version.

So my sketch at this point has a generic sans-io-ish interface for users:

class ByoioParser(Protocol, Generic[EventType]):
    @abstractmethod
    def next_event(self) -> Union[NEED_DATA, EventType]:
        ...

    @abstractmethod
    def feed_bytes(self, new_data: BytesLike) -> None:
        ...

But I'm imagining you wouldn't normally implement this interface by hand, just like you don't normally implement the context manager interface by hand... instead you use a decorator:

@byoio.protocol
def receive_uint32_prefixed_message(receive_buffer):
    prefix = yield from receive_exactly(receive_buffer, 4)
    (length,) = struct.unpack("<I", prefix)
    return yield from receive_exactly(receive_buffer, length)

proto = receive_uint32_prefixed()
assert isinstance(proto, ByoioParser)

And now composing protocols together is easy: either we provide some way for higher-level protocols to use the generator directly, like

@byoio.protocol
def higher_level_protocol(receive_buffer):
    message = yield from receive_uint32_prefixed.gen(receive_buffer)
    # ... do something with 'message' ...

Or better yet, provide some way to convert ByoioProtocol objects back into generators:

    message = yield from read_uint32_prefixed(receive_buffer).asgen()

But there's some awkwardness here... yield from is really convenient for implementing parsers that return a single value, like read_exactly or read_uint32_prefixed_message. But most sans-io parsers return a stream of events – you can keep calling next_event as long as you want, or at least until EOF.

If we want a generator that returns a stream of real results, that's not too hard – that's how my old wsproto parser works. We just declare that you can yield NEED_DATA or yield event and either way that's what next_event returns. So that works on its own. But as soon as we want to build a more complex protocol on top, things get harder – like say parse_lines repeatedly yields lines or NEED_DATA. Now say we want to use parse_lines to implement an SMTP parser. How? We can't use yield from parse_lines(...), because if our convention is to use the yield channel to pass back final events, then we'll end up yielding raw lines instead of processing them. We need to write something like:

@byoio.protocol
def parse_smtp(buffer):
   subprotocol = parse_lines(buffer)
    while True:
        next_line = yield from subprotocol.gen_for_next()
        ...

Except we also need some convention to signal that there is no next item... maybe we should use an exception?

Anyway point is the point where I realized that I'd just reinvented async iterators, except more awkward because generators can't use async for.

OK, so, new plan: instead of generators, we'll define byoio protocols using async/await. We can keep the ByoioParser abstract interface, but now we'll write something like:

# One-shot protocol is a regular async function
@byoio.protocol
async def receive_uint32_prefixed_message(receive_buffer: bytearray):
    prefix = await receive_exactly(receive_buffer, 4)
    (length,) = struct.unpack("<I", prefix)
    return await receive_exactly(receive_buffer, length)

# Multi-shot protocol is an async generator
@byoio.protocol
async def receive_lines(receive_buffer):
    while True:
        yield await receive_until(receive_buffer, b"\r\n")

Internally, byoio.protocol would implement a tiny little coroutine runner, with just a single trap that means "return NEED_DATA", and it iterates the coroutine each time someone calls .next_event().

Of course we also need some way to send data, so maybe we should have a way to do that too. For convenience maybe we should bundle these primitives together on a little interface:

class ByoioStream:
    async def receive_some(self): ...
    def send_all(self, data): ...

...but OK, that's basically a subset of the trio Stream interface. send_all is synchronous here because we assume that our caller will receive backpressure when they go to actually send out the bytes we've written, but that's a pretty trivial difference.

So it turns out that my grand plan is:

  1. We write protocols as ordinary async code, but using this abstract ByoioStream interface, so you can plug in any kind of transport underneath, or test against canned data, etc.
  2. We use a coroutine runner and various machinery to wrap up that async code behind a synchronous, state-machine-flavored interface.
  3. We use Trio to wrap the synchronous, state-machine-flavored interface
  4. Our wrappers expose some nice async API to Trio users

But alternatively, we could take the exact same code we wrote in step 1, pass in any trio Stream object as our abstract ByoioStream, and bam we have a nice async Trio API. Steps 2-4 are totally superfluous. So I'm feeling kinda silly. Apparently at the end of all that, my grand sans-io plans have looped around, and turned into "just write code directly against the Trio API" (and add then add some extra fluff that just makes things slower).

One of the major advantages of sans-io code is that because it has such a narrow and controllable interface with the outside world, it's easy to test. And of course the other advantage is that the public API is neutral and can be used anywhere, including in sync code. The byoio idea does have some advantages on both counts: because you only get access to a very narrow set of primitives, you're guaranteed you aren't accidentally doing something hard to test or non-portable, like using trio primitives directly instead of just calling stream.receive_some and stream.send_all. And maybe it would have some value as a way to export sync APIs for sync code? And just generally signal that these protocol implementations really are neutral, instead of looking like a Trio-only thing.

But it does suggest that we can feel free to start writing all the obvious primitives – receive_exactly, receive_until, receive_until_re, LineChannel, NetstringChannel, LengthPrefixedChannel, etc. directly against the Trio Stream API, and it would be OK to put them directly into Trio. We wouldn't lose the chance to have a byoio – we can always extract the code into a standalone library and have Trio re-export it.

One nasty issue I don't have a good answer for: the send/receive/bidi split makes everything super annoying to type, in the mypy sense. It's super annoying to have ReceiveLineChannel, SendLineChannel, LineChannel, ad infinitum. Tempting to just have LineChannel and tell people that if they're wrapping a ReceiveStream then that's fine, just don't call send, it will raise an error. Or for the basic receivebuffer+stream primitive, do I wrap those up in an object that implements Stream, or just ReceiveStream, since it's just a receive buffer? I'm half-tempted to say screw it, it's all runtime all the way down, there's just Stream and Channel and that's it. But this feels like something the type system should be able to check, so I suspect erasing it won't make people happy!


On another note: as I write receive_exactly and receive_until and receive receive receive, I'm somewhat regretting the decision to name our primitives send/receive. I do like the emphasis that these are active transfers "in the moment", versus read/write that refer to modifying passive media – and also it's good to avoid associations with Python's conventions around read, which are really inappropriate for Streams and for Channels, for different reasons. But receive is just an awkward word: kinda long, annoying to spell. I'd be tempted by get/put, except then we'd have Stream.get_some and that seems like a bad idea. give/take? put/take? put_all/get_any? Or just stick with send/receive... Maybe this should be a separate issue.

@smurfix
Copy link
Contributor

smurfix commented Jun 23, 2019

.You've described in a lot of words why I didn't bother with sans-io-ing any of my code yet.

There's two missing building blocks in sans-io. One is timeout handling, including the requirement to send frequent keepalives. The other is request multiplexing.

WRT put/get: Umm … thinking about it, put/get works for single messages, while send/receive makes more sense for byte/character streams. This is because I associate put/get with things like channels or queues that you can use to put some single thing in on one end and get it out the other end.
send/receive on the other hand is for bytestreams and similar where the focus of these methods is on partitioning the stream into semantic chunks.

Calling receive_exactly on four bytes (and then N bytes) to de-partition a byte-encoded message stream makes sense.
Calling get_exactly to read N HTTP requests or MQTT messages? not so much.

Yes, receive is awkward. So … use recv instead?

@njsmith
Copy link
Member Author

njsmith commented Jun 23, 2019

thinking about it, put/get works for single messages, while send/receive makes more sense for byte/character streams. This is because I associate put/get with things like channels or queues that you can use to put some single thing in on one end and get it out the other end.

That's why I find them attractive :-). For single objects we currently have Channel.send and Channel.receive, and for byte streams we have Stream.send_all and Stream.receive_some. So in all cases, the verb on its own is being used to refer to single objects, and then sometimes we have a modifier to make it plural.

And get_exactly would take a Stream specifically, not a Channel, so I don't think there'd be any confusion about using it for MQTT.

@njsmith
Copy link
Member Author

njsmith commented Jun 23, 2019

There's two missing building blocks in sans-io. One is timeout handling, including the requirement to send frequent keepalives. The other is request multiplexing.

Yeah, one big limitation of the whole byoio idea is that it only helps writing protocols like h11/h2/wsproto that are purely managing a single TCP connection.

There are a bunch of useful protocols that fit within this constraint – that list, plus basic protocols like line splitting, plus some others like SMTP. But there are also a lot of protocols that have some quirky extra requirement.

h2 does request multiplexing... not sure what you're missing there. But yeah, timing is a common one – protocols can specify timeouts, sometimes in odd corners. The ssh protocol doesn't seem like it cares about time, except, ssh servers need to rate-limit password checks.

Some protocols need to be able to make new connections. Think SOCKS, or anything with automatic reconnect on failure, or http redirects.

Some protocols need UDP. There's an interesting discussion happening at aiortc/aioquic#4 about how to make a sans-io interface for QUIC.

All these could in principle be fit into the sans-io paradigm: all your need to do is be extremely explicit about what your interactions with the environment are and expose them as public API. You can even imagine a library like byoio growing interfaces for things like time or UDP, where each protocol has to declare what resources it needs. That starts moving in the direction of anyio ... The main difference is that so far anyio has focused on providing maximal access to all possible functionality, and has actually avoided abstractions like Stream. But here we'd be focusing much more on abstractions and minimizing the interface.

There's not going to be a single universal answer here, but I think it's helpful to try to map out the space of options.

@JefffHofffman
Copy link

I came to some of the same conclusions. recv_exact and recv_until work better as distinct calls. Nesting protocols is hard. With yield from wrappers, there's a need for boilerplate conversion from sync generators to async access.

However, the last one doesn't seem so bad. If I want one await a_stream.receive_some(BIG_CHUNK) to become one list of MsgObjs synchronously, a different conversion can do that. It seems if recv functions are async, one is forced to go through the reactor for every small field. Not sure if some benefits of cooperative scheduling are possible with such an approach. My first pass had feed and next as one function, which seemed to make different await tweaking easier outside of the protocol. Maybe this is a SAX vs DOM thing; different enough to necessitate different parsers.

Some of the headaches with nesting are a consequence of of the Stream/Channel distinction. Utils for byte->obj can't be easily reused for obj->bigger_obj.

@njsmith
Copy link
Member Author

njsmith commented Jun 26, 2019

So my prototype so far has the signatures:

async def receive_exactly(rstream: ReceiveStream, rbuf: bytearray, size: int): ...
async def receive_until(rstream: ReceiveStream, rbuf: bytearray, delim: BytesLike, *, max_size: int): ...

class LineChannel(Channel[bytes]):
    def __init__(self, stream: Stream, *, max_size: int, eol=b"\r\n", initial_data=b""): ...
    async def send(...): ...
    async def receive(...): ...
    def detach(self) -> Tuple[Stream, BytesLike]: ...

class LengthPrefixedChannel(Channel[bytes]):
    def __init__(self, stream: Stream, prefix_format: str, *, max_size: int, initial_data=b""): ...
    # ... rest of API the same as LineChannel ...

I'm assuming #1123 or something like it will be merged, so we don't have to pass in receive_size constants everywhere.

Please notice the rbuf argument on the free-standing functions; it's a weird part. The idea is that this has to be a mutable bytearray object. On entry, it contains any data that's been received on this stream but hasn't been parsed yet. The function updates it in place, so that on exit it still contains that, but after removing the data that this function parsed. So if using these directly, you do something like:

# Start out with an empty bytearray
rbuf = bytearray()
# Pass it into every function call
header = await receive_exactly(stream, rbuf, 8)
length, flags = struct.decode("<HH", header)
body = await receive_exactly(stream, rbuf, length)

And the receive part of LineChannel would be implemented like:

class LineChannel(Channel[bytes]):
    def __init__(self, stream: Stream, *, max_size: int, eol=b"\r\n", initial_data=b""):
        self._stream = stream
        self._max_size = max_size
        self._eol = eol
        # If you already have a receive buffer, you can pass it in as initial_data
        self._rbuf = bytearray(initial_data)

    async def receive(self):
        # TODO: figure out EOF handling (discussed below)
        return await receive_until(self._stream, self._rbuf, self._eol, max_size=self._max_size)

    def detach(self):
        # Pass back out the buffer, so that others can use it
        stream, rbuf = self._stream, self._rbuf
        self._stream, self._rbuf = None, None
        return stream, rbuf

Another obvious option would be to encapsulate the stream+buffer together in an object, like a BufferedStream. When I tried to sketch it out this way, I ran into several small but annoying issues:

  • Does BufferedStream expose a send_all method? Or do we need separate BufferedReceiveStream and BufferedStream objects?
  • You can't hide the buffer and only expose BufferedStream.receive_exactly and BufferedStream.receive_until. If someone wants to implement their own reader then they need access to the raw buffer (e.g., h11 needs receive_until_re to search for b\r?\n\r?\n). If you want to switch protocols, e.g. to SSLStream, you need to detach any trailing data so you can feed it into the new wrapper. So it's not exactly an abstraction in the strict sense, because all the internals need to be fully exposed.
  • It's Yet Another Type, when we already have too many types to keep track of.

So I'm thinking, if the abstraction is that leaky, then maybe it's better to just get rid of it.

EOF handling is something of an open question, and related to the above. In my draft, if receive_exactly or receive_until hit EOF before finishing, then they raise an exception. receive_until can also raise an exception if it's read max_size bytes without seeing the delimiter. In either case, the unread data is left behind in the rbuf. For some cases, this is just what you want: e.g. if you do header = await receive_exactly(stream, rbuf, 4), and there are 2 bytes and then EOF, that's obviously some kind of corrupted stream and raising an exception is right. OTOH, if you go to read the header and get 0 bytes and then EOF, that might be fine. And with this design, the way you distinguish is to check rbuf afterwards and see how much data is in it. So I guess we would implement LineChannel.receive like:

    async def receive(self):
        try:
            return await receive_until(self._stream, self._rbuf, self._eol, max_size=self._max_size)
        except EOFError as exc:
            if self._rbuf:
                raise trio.BrokenResourceError from exc
            else:
                raise trio.EndOfChannel

If we had a BufferedStream quasi-abstraction, then it could potentially track the EOF state as part of its internal state, and refuse to call receive_some again after seeing an EOF. OTOH if we're just passing around a bytearray, we don't have a bool to store this info. I'm not sure if this is important in practice, or not?

There is at least one case where EOF is not inherently persistent: on a TTY, if someone hits control-D, then reading from the tty will report EOF, and also clear the EOF flag, so that future reads go back to reading new keyboard input. So, you generally don't want to rely on being able to call receive_some() repeatedly and keep getting EOF each time. But, for something like the receive code above that catches EOFError, once receive_some returning EOF is enough; it doesn't have any reason to store that flag anywhere.

For h11's receive buffer, it does have to store that flag because someone else is feeding in data and could feed in an EOF at any time, so we have to remember it. Here the reason we get away with potentially not needing it is that we don't call receive_some until we're sure we want more data, so we don't get an EOF until we're ready to handle it. Maybe that's generally true? I'm not 100% sure.

I also thought about having a NetstringChannel, but I'm not sure anyone actually uses these in real life? Maybe it's better to keep it as an example for building your own protocol classes (#472).

@njsmith
Copy link
Member Author

njsmith commented Jun 26, 2019

(I split off #1125 for the discussion about renaming send/receive)

One nasty issue I don't have a good answer for: the send/receive/bidi split makes everything super annoying to type, in the mypy sense. It's super annoying to have ReceiveLineChannel, SendLineChannel, LineChannel, ad infinitum. Tempting to just have LineChannel and tell people that if they're wrapping a ReceiveStream then that's fine, just don't call send, it will raise an error.

Another possibility: just have SendLineChannel, ReceiveLineChannel, and then StapledChannel(SendLineChannel(stream), ReceiveLineChannel(stream)). We'd want some shorthand though probably, like

# But how do we handle different constructor args?
LineChannel = StapledChannel.new_type(SendLineChannel, ReceiveLineChannel)

or

channel = make_line_channel(stream, ...)

Or maybe I should give up and add this to the very short list of cases where implementation inheritance is OK...

@smurfix
Copy link
Contributor

smurfix commented Jun 27, 2019

On the receive side we do need a buffer, obviously; equally obviously that buffer needs to be part of the object's interface. In fact we need to be able to share the buffer between "line" objects: some protocols (IMAP?) send a CRLF-delimited line that states "the next bit consists of N opaque bytes". Thus yes it should be accessible, but still be part of the object so that the easy case Just Works. I'd simply use an optional argument to the constructor.

This also applies to a (buffered) sender. (The unbuffered case is easy …)

Yes, mypy and similar is a challenge, but frankly I'd rather add some restriction-propagation logic to mypy to recognize "when you call LineChannel(read_stream) then this LineChannel object is in fact a ReadonlyLineChannel", than to impose that kind of cognitive load on our users. In fact I'd assume that this kind of thing happens often enough that adding some generic way to teach mypy about object type equivalence should be on their road map, if not actually possible, anyway:

FooChannel(foo: ReadStream) -> ReadonlyChannel[Foo]
FooChannel(foo: WriteStream) -> WriteonlyChannel[Foo]
FooChannel(foo: Stream) -> Channel[Foo]

(where "Foo" is the somewhat-generic type of thing you'd transmit/receive).

All that "read+write => bidirectional" stapling stuff should be restricted to the few cases where you really have two disparate streams you need to link up. I can think of only one relevant case, namely setting up two pipes for bidirectional communication, and even that is somewhat-obsolete when you have socketpair.

@njsmith
Copy link
Member Author

njsmith commented Jun 27, 2019

On the receive side we do need a buffer, obviously; equally obviously that buffer needs to be part of the object's interface. In fact we need to be able to share the buffer between "line" objects: some protocols (IMAP?) send a CRLF-delimited line that states "the next bit consists of N opaque bytes". Thus yes it should be accessible, but still be part of the object so that the easy case Just Works. I'd simply use an optional argument to the constructor.

Part of which object? Which constructor?

We don't want to add buffer management to the Stream interface, because there are many different classes that implement that interface and we don't want them all to be forced to add duplicate buffer management code. So the question above is exactly whether it's creating a new type of object just to hold the buffer.

This is for the lower-level tools like receive_exactly and receive_until. For the Channel objects, in the sketch above they're already holding the buffer internally, and the detach method is intended to handle those IMAP cases. (However, there is an interesting wrinkle: in the API above, the user can't directly access the buffer belong to a live Channel object – they have to destroy the Channel object to get back the stream state, then call receive_exactly, then make a new Channel object. I don't know if that's the best choice.)

Yes, mypy and similar is a challenge, but frankly I'd rather add some restriction-propagation logic to mypy to recognize "when you call LineChannel(read_stream) then this LineChannel object is in fact a ReadonlyLineChannel", than to impose that kind of cognitive load on our users. In fact I'd assume that this kind of thing happens often enough that adding some generic way to teach mypy about object type equivalence should be on their road map, if not actually possible, anyway:

Hmm. I suppose they do have @overload. I wonder what happens today if you do something like:

@overload
def line_channel(s: Stream, ...) -> LineChannel: ...
@overload
def line_channel(s: ReceiveStream, ...) -> LineReceiveChannel: ...
@overload
def line_channel(s: SendStream, ...) -> LineSendChannel: ...
def line_channel(s, ...):
    # ... actual implementation ...

I think you do still need separate LineSendChannel, LineReceiveChannel, LineChannel types, because LineChannel has some unique functionality that isn't in the abstract Channel interface. And I suspect this won't work on class constructors right now...

@smurfix
Copy link
Contributor

smurfix commented Jun 27, 2019

We don't want to add buffer management to the Stream interface, because there are many different classes that implement that interface and we don't want them all to be forced to add duplicate buffer management code.

Right. On the other hand we have channels that definitely need a shareable buffer for the underlying stream (LineChannel) which needs to be somewhat-accessible to the implementation (PatternDelimitedChannel).

So, well, interpose a BufferedReadStream with some low(ish)-level accessors to parse the buffer content and to read more data into it. (Same thing for a writer.) A make_buffered constructor that returns its argument if that already is a buffered stream is simple enough.

I don't think we'd need any sort of interim detaching mechanism for that buffer. A "I can use/modify the buffer" lock, or even a ConflictManager-protected accessor, should be sufficient.

@space88man
Copy link

space88man commented Jun 3, 2020

Slightly tangential: What would be a good trionic pattern that switches between line reader and then N bytes reader. E.g., for HTTP/SIP you want to parse by lines until Content-Length then you want N bytes. So I want to mix

line = stream.readline()
...
# until I get content-length, then
stream.readbytes(N)

In the SO line reader example, the stream bytes are sent into the generator, so after reading a few lines with some data pending in the buf variable, what would be the trionic way to switch into N-bytes reader mode, bearing in mind that some bytes are in the generator?

I'm thinking of sending a tuple like

gen.send((more_data, False)) # continue with line reader mode
# or 
gen.send((None, N)) # mean give up to  N bytes from buf
# If N > len(buf), buf is safely flushed and the regular 
# stream methods can be used for the rest of the data

(Actual use case: I'm looking to do a trio port (from gevent) of GreenSWITCH (https://github.com/EvoluxBR/greenswitch) which switches between line reader and N bytes reader. The application protocol is that of the PBX software FreeSWITCH which behaves like HTTP.)

@alexchamberlain
Copy link
Contributor

@space88man I have a half written blog post about adapting @njsmith's TerminatedFrameReceiver for that - in my case to parse RESP3. In short, I ended up with the following:

class TerminatedFrameReceiver:
    def __init__(
        self,
        buffer: bytes = b"",
        stream: Optional[trio.abc.ReceiveStream] = None,
        terminator: bytes = b"\r\n",
        max_frame_length: int = 16384,
    ):
        assert isinstance(buffer, bytes)
        assert not stream or isinstance(stream, trio.abc.ReceiveStream)

        self.stream = stream
        self.terminator = terminator
        self.max_frame_length = max_frame_length

        self._buf = bytearray(buffer)
        self._next_find_idx = 0

    def __bool__(self):
        return bool(self._buf)

    async def receive(self):
        while True:
            terminator_idx = self._buf.find(self.terminator, self._next_find_idx)
            if terminator_idx < 0:
                self._next_find_idx = max(0, len(self._buf) - len(self.terminator) + 1)
                await self._receive()
            else:
                return self._frame(terminator_idx + len(self.terminator))

    async def receive_exactly(self, n: int) -> bytes:
        while len(self._buf) < n:
            await self._receive()

        return self._frame(n)

    async def _receive(self):
        if len(self._buf) > self.max_frame_length:
            raise ValueError("frame too long")

        more_data = await self.stream.receive_some(_RECEIVE_SIZE) if self.stream is not None else b""
        if more_data == b"":
            if self._buf:
                raise ValueError("incomplete frame")
            raise trio.EndOfChannel

        self._buf += more_data

    def _frame(self, idx: int) -> bytes:
        frame = self._buf[:idx]
        del self._buf[:idx]
        self._next_find_idx = 0
        return frame

Essentially, refactored receive to separate _receive and _frame, then implemented receive_exactly off of the buffer. In short, this becomes a "buffered stream" I guess.

@arthur-tacca
Copy link
Contributor

Just to add a data point (or anecdata point): For me, the receive_until function (and receive_exactly) would be extremely useful. I've been using the code on StackOverflow posted by @njsmith (thanks!) with two modifications:

  1. While that answer is a Stream-like LineReader class, I reshuffled it into to a single receive_until function (with the signature suggested above in the hope that one day I can do a straight swap to the Trio-supplied version).
  2. I added a check against max_size even when the delimeter is found (as I said in the comments).

Having said that, I don't see wrapping those functions up in a Channel-like class to be especially useful. Once you have the underlying functions, there's not a great deal of difference between:

# API 1 : use receive_until directly
buffer = bytearray()
while True:
    msg = await receive_until(stream, buffer, b'x', max_size=MAX_SIZE)
    await do_something(msg)

Vs:

# API 2 : Channel interface
msg_stream = LineChannel(stream, max_size=MAX_SIZE, eol=b'x')
while True:
    msg = await msg_stream.receive()
    await do_something(msg)

I also considered wrapping receive_until in a simple generator function, but even that didn't seem a great deal simpler. In any case, I wanted the buffer accessible at the top level so I could log any unused bytes in it when an error occured (including an error not related to reading).

I can imagine you might want a wrapper LineChannel class if you want to call receive_until in several places in your code without duplicating all the parameters, or you have a general-purpose function that accepts a ReceiveChannel and you want to reuse it for delimited streams. I'm not sure how common those uses are in practice. Otherwise, I think it's easier just to recommend using the underlying functions directly. Certainly, if you want to mix different types of call, as in the HTTP example given above, mixing the underlying functions seems a simpler to me than using TerminatedFrameReceiver which is a bit like a ReceiveChannel (technically satisfies the requirements) but not quite.

@dolamroth
Copy link

dolamroth commented Mar 25, 2023

Very interesting discussion about sans-io approach!

I'm more of an applied engineer and much less of an architect, so maybe not understand the problem from all sides. However, do I understand it right, that those sans-io problems could be circumwented by simply making sans-io interface async by default? In that case, timeouts would not be an issue and no AsyncIterator should be reinvented.

This is regarding comment #796 (comment)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

9 participants