Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bidir streaming #209

Closed
wants to merge 40 commits into from
Closed

Bidir streaming #209

wants to merge 40 commits into from

Conversation

goodboy
Copy link
Owner

@goodboy goodboy commented May 2, 2021

First draft bidirectional streaming support as discussed in #53.

Further todos:

  • more extensive tests for closing down either side of the stream early
  • port _debug.py remote tty locking to context api.
  • possibly a better internal design for tracking bidir vs unidir context usage to avoid hacky if not self._ctx._portal checking inside ReceiveMsgStream.aclose()
  • tests where the consumer tasks use async for stream while sender is running independently in another trio task
  • docs on the new apis
  • should we add broadcasting support here for We need broadcast channels, stat. #204 ?
  • move to a split SendMsgStream / ReceiveMsgStream type set and staple them together using a channel/messaging equivalent of trio.StapledStream?
  • example in the main readme. i'm thinking that should be a big boon when compared to projects like faust and ray which (i think) are unidirectional only?
  • from Ems to bidir streaming pikers/piker#190

    would be nice if in tractor we can require either a ctx arg, or a named arg with ctx in it and a type annotation of tractor.Context instead of strictly requiring a ctx arg.

Critique and suggestions very welcome from lurkers 😎.


# possible outcomes:
# - normal termination: far end returns
# - premature close: far end relays a stop message to tear down stream
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Near end sends a message, not far end, right?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should eventually be one of either, we'll need tests for both cases.

You should be able to close either side's Context.open_stream() and the other side shuts down gracefully i'm thinking?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense to me. I figured you were laying out how it might happen from either side

# NOTE: we're telling the far end actor to cancel a task
# corresponding to *this actor*. The far end local channel
# instance is passed to `Actor._cancel_task()` implicitly.
await self._portal.run_from_ns('self', '_cancel_task', cid=cid)
Copy link
Owner Author

@goodboy goodboy May 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one outstanding question i still have is whether instead of calling this special cancel method on the "actor runtime" we should have a special cancel message. I mention this same question in #36.

The only other internal change required for the message would basically replace hacky code in the message loop anyway. It would also get us closer to a protocol that isn't tractor specific or something.

try:
yield rchan

except trio.EndOfChannel:
Copy link
Owner Author

@goodboy goodboy May 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if we receive a stop msg then we don't send one since we can assume the far end has already closed.

Should probably add a comment about this.

except trio.EndOfChannel:
raise

else:
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're closing just send the stop msg and move on right?

import tractor


_registry: Dict[str, Set[tractor.ReceiveMsgStream]] = {
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Linking comment f9dd2ad#r50700925.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gc-ss mentioned,

… but if you don't like that, what if we make _registry an argument to publisher?

Yup can do that, I just didn't bc turns out the consumers can update via sending updates on their individual streams.

It would be perfect if publisher accepted an environment variable (or a Context?)

This can also be done though I'm not sure what you mean by environment. A Context isn't really required explicitly here but could be used to control cancellation if wanted.

For an alt to _registry we could do something like,

class PublisherState:
    subscriptions: dict[str, tractor.MsgStream] = {}

kinda thing?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what you mean by environment. A Context isn't really required explicitly here but could be used to control cancellation if wanted

By environment or Context, I meant the context/environment/configuration due to which the function might behave in a non-deterministic manner but the values of which don't necessarily change between function invocations - like a remote host address:port or username/password.

In this case - think about two actors communicating with each other. The exact same actors (types) might behave in very different ways if their context/environment/configuration were different with everything else (eg: arguments) being the same.

Unlike arguments though, their context/environment/configuration dont change between function invocations (like arguments do).

For example, the registry does not need to change between function invocations.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, yeah a service actor can easily create wtv objects they need and mutate them as actor-local state.
They can go further and offer a seperate api to mutate that object/state from other actors if needed.

I'm just not sure we need to show that in this example since the point is just a super basic dynamic pubsub system.
We do have a pretty terrible example in the docs that could be improved.

Bonus points for sweet PRs 😉

)

# make one dynamic subscriber
await n.run_in_actor(
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I'm thinking we could just move the dynamic case into the immediate task here instead of sleeping?

tractor/_streaming.py Outdated Show resolved Hide resolved

try:
log.debug(f"Delivering {msg} from {actorid} to caller {cid}")
# maintain backpressure
await send_chan.send(msg)

except trio.BrokenResourceError:
# TODO: what is the right way to handle the case where the
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a very difficult decision to make. It would have been less complex with old school unidirectional streams.

We might look into using both NACKs and ACKs. This coupled with atleast once delivery can ensure durable message processing

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This coupled with atleast once delivery can ensure durable message processing

Not sure what you mean.
TCP already has reliability built in, for other protocols this might be some for a user to consider yes.

Also, the messaging processing isn't the problem here, it's whether a stop/cancel message made it to the far end and what to do in specifically the case where no ack for that msg is received.
That's just two general's.

I'm pretty sure (at least right now) trio takes the same naive approach of just "hoping for the best".

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For interested lurkers:

For example, the first general could send 100 messengers, anticipating that the probability of all being captured is low. With this approach the first general will attack no matter what, and the second general will attack if any message is received. Alternatively the first general could send a stream of messages and the second general could send acknowledgments to each, with each general feeling more comfortable with every message received. As seen in the proof, however, neither can be certain that the attack will be coordinated.

TCP obvs already has built-in mechanisms for it's reliability requirements so currently we don't have to think too hard since failures should bubble up from the transport layer.

The main question was moreso about cancellation race conditions that can arise where the local channel is killed after it's sent the stop and whether or not we should wait / shield the mem chan until the msg is processed later (also presumably this is all before the sub-actor is killed).

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also to clarify, tractor's streams can in theory be restarted but i'm pretty sure internal feeder mem chans should be re-created?

log.debug(f"{send_chan} was terminated at remote end")
# indicate to consumer that far end has stopped
return await send_chan.aclose()
# if 'stop' in msg:
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah i think we can drop this now.
processing is now inside the ReceiveMsgStream.receive() body.

@@ -38,7 +37,9 @@
_in_debug = False

# lock in root actor preventing multi-access to local tty
_debug_lock = trio.StrictFIFOLock()
_debug_lock: trio.StrictFIFOLock = trio.StrictFIFOLock()
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might pull this debugger stuff into a new PR to keep things more separate?

The main thing was being able to use the context api for tty locking which is a lot cleaner now 🏄🏼

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nah, keeping it here for now; it's all internals anyway.


global _registry

# syn caller
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/syn/sync

@goodboy goodboy marked this pull request as ready for review June 14, 2021 00:28
@goodboy
Copy link
Owner Author

goodboy commented Jun 14, 2021

There's still quite a few things on the todo list but I think this is good enough to get peeps using on master and we can always follow up.

@goodboy goodboy requested a review from ryanhiebert June 14, 2021 00:29
goodboy added 20 commits July 5, 2021 08:44
Add clear teardown semantics for `Context` such that the remote side
cancellation propagation happens only on error or if client code
explicitly requests it (either by exit flag to `Portal.open_context()`
or by manually calling `Context.cancel()`).  Add `Context.result()`
to wait on and capture the final result from a remote context function;
any lingering msg sequence will be consumed/discarded.

Changes in order to make this possible:
- pass the runtime msg loop's feeder receive channel in to the context
  on the calling (portal opening) side such that a final 'return' msg
  can be waited upon using `Context.result()` which delivers the final
  return value from the callee side `@tractor.context` async function.
- always await a final result from the target context function in
  `Portal.open_context()`'s `__aexit__()` if the context has not
  been (requested to be) cancelled by client code on block exit.
- add an internal `Context._cancel_called` for context "cancel
  requested" tracking (much like `trio`'s cancel scope).
- allow flagging a stream as terminated using an internal
  `._eoc` flag which will mark the stream as stopped for iteration.
- drop `StopAsyncIteration` catching in `.receive()`; it does
  nothing.
Revert this change since it really is poking at internals and doesn't
make a lot of sense. If the context is going to be cancelled then the
msg loop will tear down the feed memory channel when ready, we don't
need to be clobbering it and confusing the runtime machinery lol.
Another face palm that was causing serious issues for code that is using
the `.shielded` feature..

Add a bunch more detailed comments for all this subtlety and hopefully
get it right once and for all. Also aggregated the `trio` errors that
should trigger closure inside `.aclose()`, hopefully that's right too.
@goodboy goodboy changed the base branch from master to transport_cleaning July 5, 2021 16:33
@goodboy goodboy mentioned this pull request Jul 5, 2021
5 tasks
@goodboy
Copy link
Owner Author

goodboy commented Jul 6, 2021

Ok so I've put up #219 as a replacement for this which pulls out all debugger work that relies on this change set.
This will hopefully make the patch simpler to grok and also more easy to resolve wtv strange CI stuff is going on.

I will be moving the todo list to that PR as well.

Base automatically changed from transport_cleaning to master July 6, 2021 12:20
@goodboy
Copy link
Owner Author

goodboy commented Jul 6, 2021

Replaced by #219.

@goodboy goodboy closed this Jul 6, 2021
@goodboy goodboy mentioned this pull request Jul 31, 2021
4 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants