Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bidirectional streaming? #53

Open
goodboy opened this issue Jan 14, 2019 · 8 comments
Open

Bidirectional streaming? #53

goodboy opened this issue Jan 14, 2019 · 8 comments
Labels
api discussion enhancement New feature or request experiment Exploratory design and testing help wanted Extra attention is needed IPC and transport

Comments

@goodboy
Copy link
Owner

goodboy commented Jan 14, 2019

I've already left some notes in the code about how we could do two way streaming using the native received = yield sent generator semantics but it's probably worth looking at how other projects are approaching it and if it's even a good idea.

Some projects I found through different communities recently:

  • bidirectional streaming in google's gprc
  • streaming in faust (though I think this is moreso a demonstration of a need for a proper asyncitertools)

Another question is how to accomplish this with traditional messaging patterns like those found in nng. There was some suggestions in gitter about combining protocols/socket types.

More to come...

Update

The api I'm currently most convinced of is at this comment. Afaict it would also suffice the needs of #122 to some degree since cancelling a "context" would effectively be like cancelling a cross-actor scope.

@goodboy goodboy changed the title Two way streaming? Bidirectional streaming? Jan 14, 2019
@ryanhiebert
Copy link
Collaborator

This is a really interesting idea. I'm not totally sure what to make of it, but it's very interesting. It seems like it would enable some kind of reactive-programming model across machines, which is a trippy thought. Like CycleJS for distributed systems. I'm not sure what the target application would be for such a system, because I'm remarkably uncreative.

@goodboy goodboy added api enhancement New feature or request experiment Exploratory design and testing help wanted Extra attention is needed labels Mar 4, 2021
@goodboy
Copy link
Owner Author

goodboy commented Mar 4, 2021

The more I think about this the more I think we should be adopting the multi-task style considerations being discussed in the following trio related issues:

The entire "channels" related issue list is also worth keeping and eye on:
https://github.com/python-trio/trio/issues?q=is%3Aopen+channels

We already have this api to run an async func and have it stream but currently there's no way to have the caller send values to the calee. In essence we want the same streaming behavior as if you had to actors call each other and receive stream results but in a more compact single-context-channel thing API.

The thought i had was something like the following based on our context api:

@tractor.stream
async def streamer(
   ctx: tractor.Context,
) -> None:
   """A simple web response streaming server.
   """
      tx, rx = context.attach_stream() 
      while True:
      
          url = await rx.recv()
          val = await web_request(url)

          # this is the same as ``yield`` in the async gen case
          await tx.send(val)


async def caller(urls: List[str]):
   async with tractor.open_nursery() as n:
   
       portal = await n.run_in_actor(streamer)
       
       tx, rx = portal.attach_stream()
       for url in urls:
            await tx.send(url)
            resp = await rx.recv()

I guess this would give us a more mem chan style "select statement considered harmful" style api?

We can then have per-side shutdown signalling just like trio's memory channels?

I dunno, what do the lurkers think.

@goodboy
Copy link
Owner Author

goodboy commented Mar 24, 2021

Ok so thought some more about this designing some fairly involved persistent service system stuff and I think we probably want a bit more stringency in this api.

second actor: the sub

@tractor.context
async def streamer(
   ctx: tractor.Context,
   *args,
) -> None:
   """A simple web response streaming server.
   """
      # do setup stuff
      
      # signal to calling actor that we are synced and ready 
      # this value to `.started()` i returned from calling side's ``.__aenter__()``
      await ctx.started(first_message_relayed_back_to_caller)
      
      # this will error if we haven't yet called ``.started()`` above
      # this entry blocks until the other side also opens a stream
      async with context.open_stream() as send, recv:
      
         while True:
      
             url = await recv()  # should this be clonable?
             val = await web_request(url)

             # this is the same as ``yield`` in the async gen case
             await send(val)

first actor: the parent / caller

async def caller(urls: List[str]):

   async with tractor.open_nursery() as n:
       portal = n.start_actor('streamer')
      
       # `.open_context()` here returns some kind of caller end `Context`
       ctx = await portal.open_context(streamer)
       
       async with ctx, ctx.open_stream() as (first_msg_from_started_call, (send, recv)):
           for url in urls:
               await send(url)
               resp = await recv()  # could be clonable?

The nice part of enforcing a context manager style "session" on the calling side is that we guarantee setup/teardown sync stages with the remote task. In particular, we can raise appropriate errors if for whatever reason either end has "terminated" their block without having to wrangle it through channel / connection state and messages.

Obviously this api also allows for just calling a function that acts like a context manager but doesn't have to initialize any streaming. In that case I'm not sure if the caller should also be required to async with context.open_stream()? In theory the Context type can serve async context manager protocol methods and then just be used as a cross-actor cancel scope like we mentioned in #122?

@goodboy
Copy link
Owner Author

goodboy commented Mar 25, 2021

Yah so the main question is how many nested async withs you gotta do:

ctx = await portal.open_context(stream)
async with ctx, ctx.open_stream() as send, recv:
   # start streaming

Might be the more explicit correct api since it distinguishes the pre-started sync point with the callee versus it being implied by with-ing .open_stream()?

Alternatively the stream could be opened without using async with and then provide objects for send, recv which support it much like trio' s mem chans?

@goodboy
Copy link
Owner Author

goodboy commented Mar 25, 2021

Interesting simple example from arq:

import asyncio
from aiohttp import ClientSession
from arq import create_pool
from arq.connections import RedisSettings

async def download_content(ctx, url):
    session: ClientSession = ctx['session']
    async with session.get(url) as response:
        content = await response.text()
        print(f'{url}: {content:.80}...')
    return len(content)

async def startup(ctx):
    ctx['session'] = ClientSession()

async def shutdown(ctx):
    await ctx['session'].close()

async def main():
    redis = await create_pool(RedisSettings())
    for url in ('https://facebook.com', 'https://microsoft.com', 'https://github.com'):
        await redis.enqueue_job('download_content', url)

# WorkerSettings defines the settings to use when creating the work,
# it's used by the arq cli
class WorkerSettings:
    functions = [download_content]
    on_startup = startup
    on_shutdown = shutdown

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Basically what i've proposed but with setup/teardown inside the target func.

We should probably mock this up exactly using httpx.

@goodboy
Copy link
Owner Author

goodboy commented Mar 25, 2021

Alternatively the stream could be opened without using async with and then provide objects for send, recv which support it much like trio' s mem chans?

Yeah I'm starting to think this might be the way to go and we should basically just build-out a send-side equivalent of our existing ReceiveStream type and then basically just offer the same interface as trio's send chan.

@goodboy
Copy link
Owner Author

goodboy commented Mar 25, 2021

The last thing I have questions about is whether we need to support individual send vs. recv stream cloning.

Cloning on a recv side stream makes obvious sense since it'll copy messages/packets to each consumer receiving from that stream (versus only one task getting the value and the others missing it) but with a send side I'm not so sure it's useful given the nature of an IPC oriented "channel".

The following comes from some rubber ducking on the trio channel.

The many to many example in the trio docs makes clear why having an implicit task semaphore built-in to the memory channel is handy since it avoids having to use more then one nursery to make sure bookkeeping is done correctly by the user.

We could avoid this by using some complicated bookkeeping to make sure that only the last producer and the last consumer close their channel endpoints… but that would be tiresome and fragile.

This is, kinda true. It definitely requires more stringent task bookkeeping since you need one set of tasks (consumers) to be distinctly separate from another sec (producers) in terms of closing each set's end of the channel. But, arguably you lose some supervision control; if you're interested in knowing which task set was the source of a failure you can't really.

For example if you used the following code you don't really need cloning but it requires 2 extra nurseries and distinctly concurrent task set allocations such that either set can terminate and signal closing of their side of the channel without interfering with the other or requiring any task ordering or channel bookkeeping whatsoever:

async def sender(tx):
    async with tx:
        async with trio.open_nursery() as t_n:
            t_n.start_soon(send_stuff_with_tx, tx)


tx, rx = trio.open_memory_channel()


async with trio.open_nursery() as root_n:
    root_n.start_soon(sender, tx)
    
    async with rx:
        async with trio.open_nursery() as r_n:
            r_n.start_soon(recv_stuff_with_rx, rx)

Following this example in detail you'll see it's the same result as with using .clone() on each side - all consumers and producers appropriately close off and signal to the other end when complete due to the sub-task tree arrangement.

Thanks to with statement nesting this can be further simplified to:

async def sender(tx, task_status):
    async with tx, trio.open_nursery() as (_, t_n):
        t_n.start_soon(send_stuff_with_tx, tx)


tx, rx = trio.open_memory_channel()

async with trio.open_nursery() as root_n:
    root_n.start_soon(send, tx)
    async with rx, trio.open_nursery() as (_, r_n):
        r_n.start_soon(recv_stuff_with_rx, rx)

which, to me, is definitely no more fragile, and I'm not convinced it's that much less simple:

tx, rx = trio.open_memory_channel()

async with trio.open_nursery() as n:
    n.start_soon(recv_stuff_with_rx, rx.clone())
    n.start_soon(send_stuff_with_tx, tx.clone())

but obviously now the async with tx / rx is embedded in the task funcs which means you actually have no less code taking the .clone() approach since you'll have the 2 async with tx blocks in each task function which is used to signal the close of each individual clone.

Yes, you have less nurseries and a flat task hierarchy but you also lose lifetime control / error handling on each set individually. For example if one of the producer tasks fails for a reason that can be handled, in the flat, single nursery case, both sets must be restarted versus possibly recovering one or the other side with supervision code. So afaict from a LOC perspective it's not really different minus the await root_n.start(send, tx) call to spawn the task to spawn the second task set. Again, yes it's more nurseries, but we like nurseries and with more nurseries comes more supervision controls.

Further once we start thinking about multiple-actors (processes) where the send vs. recv side of the channel API are actually in completely separate memory spaces I'm not sure the argument for less nurseries applies at all actually. If you have tasks in 2 separate actors they already are running under distinct actor scoped nurseries and there should be no way to have a single-direction-side (meaning the send or recv side attached in separate processes) be oriented in a way such that you can "close one side" without also closing some top level task or nursery.

Let's take the one example from above:

# code which will be mostly duplicate in both processes
 async with context.open_stream() as send, recv:
 
    # yes you could spawn a bunch of tasks which send on clones
    # but closing all these will teardown only one side of the 2-way stream
    async with recv, trio.open_nursery() as (_, n):
    
        # the main question is, when will you need it such that the above nursery
        #  **does not also close** when all these tasks are complete? 
        n.start_soon(send_stuff_with_tx, send.clone())
        
        # remember this is different because each task will receive **a copy of each message**
        # every time something new is sent from the other side
        n.start_soon(send_stuff_with_rx, recv.clone())        

The only case I can think of to justify send.clone() semantics is if want:

  • one nursery for the senders and consumers of the IPC stream in each actor
  • want to allow one direction of the stream to closed without the other direction being closed

But, when or why would you need that?

@goodboy
Copy link
Owner Author

goodboy commented Mar 27, 2021

Thanks to @nimaje for further examples that might provide further incentive for send side cloning.

Quoting from trio gitter:

async def run_pair(send_channel, …):
    async def partner(…):
        …
        try:
            while some_condition:
                …
                await send_channel.send(something)
                …
                if some_other_condition:
                    await send_channel.aclose()
        except trio.ClosedResourceError:
            # stop that stuff when the channel was closed
            passtry:
            # maybe send something if possible
            await send_channel.send(something)
        except trio.ClosedResourceError:
            passasync with trio.open_nursery() as nursery:
        nursery.start_soon(partner, …)
        nursery.start_soon(partner, …)

send_channel =with trio.open_nursery() as nursery:
    # each of the pairs should independently stop using the channel
    # without closing them for the others
    nursery.start_soon(run_pair, send_channel.clone(), …)
    nursery.start_soon(run_pair, send_channel.clone(), …)
    nursery.start_soon(run_pair, send_channel.clone(), …)

if you don't have .clone() with seperate closing you would need to build some communication between the partner tasks yourself and what if you want to allow closing the channel for those partners from outside?

and the other case

A long running task allocated by the same nursery that creates tasks which will use and eventually close the channel when complete.

# maybe every case of that is solveable with a second nursery
send_channel =async with send_channel, trio.open_nursery() as nursery:
    # scenario:
    # to signal the reciving side send_channel should be closed as soon as possible,
    # but some_long_running_task will run longer than that in most cases
    # (you can't swap send_channel and nursery as that would close the send_channel directly after this body)
    nursery.start_soon(some_task, send_channel, …)
    nursery.start_soon(some_other_task, send_channel, …)
    nursery.start_soon(some_long_running_task, …)

ah, here the variant with .clone() (the tasks would have their own async with send_channel:

send_channel =async with trio.open_nursery() as nursery, send_channel:
    nursery.start_soon(some_task, send_channel.clone(), …)
    nursery.start_soon(some_other_task, send_channel.clone(), …)
    nursery.start_soon(some_long_running_task, …)

So I guess I'll try to summarize these 2 examples above:

  1. a single nursery which creates N tasks that all plan to call await send_channel.aclose() once terminated and you don't want any one task to destruct the channel before sibling tasks are complete
  2. a single nursery which creates N tasks where one of the tasks is long living and prevents the nursery block from exiting when all other producer tasks are complete

This was referenced Apr 28, 2021
goodboy added a commit that referenced this issue May 2, 2021
This mostly adds the api described in
#53 (comment)

The first draft summary:
- formalize bidir steaming using the `trio.Channel` style interface
  which we derive as a `MsgStream` type.
- add `Portal.open_context()` which provides a `trio.Nursery.start()`
  remote task invocation style for setting up and tearing down tasks
  contexts in remote actors.
- add a distinct `'started'` message to the ipc protocol to facilitate
  `Context.start()` with a first return value.
- for our `ReceiveMsgStream` type, don't cancel the remote task in
  `.aclose()`; this is now done explicitly by the surrounding `Context`
   usage: `Context.cancel()`.
- streams in either direction still use a `'yield'` message keeping the
  proto mostly symmetric without having to worry about which side is the
  caller / portal opener.
- subtlety: only allow sending a `'stop'` message during a 2-way
  streaming context from `ReceiveStream.aclose()`, detailed comment
  with explanation is included.

Relates to #53
@goodboy goodboy mentioned this issue May 2, 2021
9 tasks
goodboy added a commit that referenced this issue May 6, 2021
This mostly adds the api described in
#53 (comment)

The first draft summary:
- formalize bidir steaming using the `trio.Channel` style interface
  which we derive as a `MsgStream` type.
- add `Portal.open_context()` which provides a `trio.Nursery.start()`
  remote task invocation style for setting up and tearing down tasks
  contexts in remote actors.
- add a distinct `'started'` message to the ipc protocol to facilitate
  `Context.start()` with a first return value.
- for our `ReceiveMsgStream` type, don't cancel the remote task in
  `.aclose()`; this is now done explicitly by the surrounding `Context`
   usage: `Context.cancel()`.
- streams in either direction still use a `'yield'` message keeping the
  proto mostly symmetric without having to worry about which side is the
  caller / portal opener.
- subtlety: only allow sending a `'stop'` message during a 2-way
  streaming context from `ReceiveStream.aclose()`, detailed comment
  with explanation is included.

Relates to #53
goodboy added a commit that referenced this issue May 7, 2021
This mostly adds the api described in
#53 (comment)

The first draft summary:
- formalize bidir steaming using the `trio.Channel` style interface
  which we derive as a `MsgStream` type.
- add `Portal.open_context()` which provides a `trio.Nursery.start()`
  remote task invocation style for setting up and tearing down tasks
  contexts in remote actors.
- add a distinct `'started'` message to the ipc protocol to facilitate
  `Context.start()` with a first return value.
- for our `ReceiveMsgStream` type, don't cancel the remote task in
  `.aclose()`; this is now done explicitly by the surrounding `Context`
   usage: `Context.cancel()`.
- streams in either direction still use a `'yield'` message keeping the
  proto mostly symmetric without having to worry about which side is the
  caller / portal opener.
- subtlety: only allow sending a `'stop'` message during a 2-way
  streaming context from `ReceiveStream.aclose()`, detailed comment
  with explanation is included.

Relates to #53
goodboy added a commit that referenced this issue May 31, 2021
This mostly adds the api described in
#53 (comment)

The first draft summary:
- formalize bidir steaming using the `trio.Channel` style interface
  which we derive as a `MsgStream` type.
- add `Portal.open_context()` which provides a `trio.Nursery.start()`
  remote task invocation style for setting up and tearing down tasks
  contexts in remote actors.
- add a distinct `'started'` message to the ipc protocol to facilitate
  `Context.start()` with a first return value.
- for our `ReceiveMsgStream` type, don't cancel the remote task in
  `.aclose()`; this is now done explicitly by the surrounding `Context`
   usage: `Context.cancel()`.
- streams in either direction still use a `'yield'` message keeping the
  proto mostly symmetric without having to worry about which side is the
  caller / portal opener.
- subtlety: only allow sending a `'stop'` message during a 2-way
  streaming context from `ReceiveStream.aclose()`, detailed comment
  with explanation is included.

Relates to #53
goodboy added a commit that referenced this issue Jun 14, 2021
This mostly adds the api described in
#53 (comment)

The first draft summary:
- formalize bidir steaming using the `trio.Channel` style interface
  which we derive as a `MsgStream` type.
- add `Portal.open_context()` which provides a `trio.Nursery.start()`
  remote task invocation style for setting up and tearing down tasks
  contexts in remote actors.
- add a distinct `'started'` message to the ipc protocol to facilitate
  `Context.start()` with a first return value.
- for our `ReceiveMsgStream` type, don't cancel the remote task in
  `.aclose()`; this is now done explicitly by the surrounding `Context`
   usage: `Context.cancel()`.
- streams in either direction still use a `'yield'` message keeping the
  proto mostly symmetric without having to worry about which side is the
  caller / portal opener.
- subtlety: only allow sending a `'stop'` message during a 2-way
  streaming context from `ReceiveStream.aclose()`, detailed comment
  with explanation is included.

Relates to #53
goodboy added a commit that referenced this issue Jul 2, 2021
This mostly adds the api described in
#53 (comment)

The first draft summary:
- formalize bidir steaming using the `trio.Channel` style interface
  which we derive as a `MsgStream` type.
- add `Portal.open_context()` which provides a `trio.Nursery.start()`
  remote task invocation style for setting up and tearing down tasks
  contexts in remote actors.
- add a distinct `'started'` message to the ipc protocol to facilitate
  `Context.start()` with a first return value.
- for our `ReceiveMsgStream` type, don't cancel the remote task in
  `.aclose()`; this is now done explicitly by the surrounding `Context`
   usage: `Context.cancel()`.
- streams in either direction still use a `'yield'` message keeping the
  proto mostly symmetric without having to worry about which side is the
  caller / portal opener.
- subtlety: only allow sending a `'stop'` message during a 2-way
  streaming context from `ReceiveStream.aclose()`, detailed comment
  with explanation is included.

Relates to #53
goodboy added a commit that referenced this issue Jul 5, 2021
This mostly adds the api described in
#53 (comment)

The first draft summary:
- formalize bidir steaming using the `trio.Channel` style interface
  which we derive as a `MsgStream` type.
- add `Portal.open_context()` which provides a `trio.Nursery.start()`
  remote task invocation style for setting up and tearing down tasks
  contexts in remote actors.
- add a distinct `'started'` message to the ipc protocol to facilitate
  `Context.start()` with a first return value.
- for our `ReceiveMsgStream` type, don't cancel the remote task in
  `.aclose()`; this is now done explicitly by the surrounding `Context`
   usage: `Context.cancel()`.
- streams in either direction still use a `'yield'` message keeping the
  proto mostly symmetric without having to worry about which side is the
  caller / portal opener.
- subtlety: only allow sending a `'stop'` message during a 2-way
  streaming context from `ReceiveStream.aclose()`, detailed comment
  with explanation is included.

Relates to #53
goodboy added a commit that referenced this issue Jul 6, 2021
This mostly adds the api described in
#53 (comment)

The first draft summary:
- formalize bidir steaming using the `trio.Channel` style interface
  which we derive as a `MsgStream` type.
- add `Portal.open_context()` which provides a `trio.Nursery.start()`
  remote task invocation style for setting up and tearing down tasks
  contexts in remote actors.
- add a distinct `'started'` message to the ipc protocol to facilitate
  `Context.start()` with a first return value.
- for our `ReceiveMsgStream` type, don't cancel the remote task in
  `.aclose()`; this is now done explicitly by the surrounding `Context`
   usage: `Context.cancel()`.
- streams in either direction still use a `'yield'` message keeping the
  proto mostly symmetric without having to worry about which side is the
  caller / portal opener.
- subtlety: only allow sending a `'stop'` message during a 2-way
  streaming context from `ReceiveStream.aclose()`, detailed comment
  with explanation is included.

Relates to #53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api discussion enhancement New feature or request experiment Exploratory design and testing help wanted Extra attention is needed IPC and transport
Projects
None yet
Development

No branches or pull requests

2 participants