New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add distinct connections over Io #18
Conversation
Real world servers often use concurrency for processing requests. Protocols also lean on message ordering per connection, ie TCP. Neither of these features were available with the current `Io` implementation. This change overlays connection support on top of `Io` by multiplexing through an actor. A simple TCP-like state machine is used to initiate and accept connections.
I'm looking for specific feedback on:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, left a few questions as I am still learning the codebase. My biggest nit is the amount of M: Message
where I only thing you need it in the impl for Segment but I think you can remove a lot of them.
src/connection.rs
Outdated
use tokio_stream::wrappers::ReceiverStream; | ||
|
||
/// A connection between two hosts. | ||
pub struct Connection<M: Message> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pub struct Connection<M: Message> { | |
pub struct Connection<M> { |
You don't need any thing from the trait except for the generic type on the struct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TIL. Applying this suggestion throughout.
|
||
async fn event_loop(mut inner: Inner<M>) { | ||
loop { | ||
futures::select_biased! { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are we using the futures one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Purely cosmetics, avoiding the biased;
inline on the branches.
I can link a TODO once something like this lands: tokio-rs/tokio#4910
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I swapped to tokio::select!
w/ biased
, but I end up busy looping due to something with the fused nature of the unicycle component. I need to investigate this further..
by_idx: IndexMap<usize, ConnectionInfo>, | ||
|
||
/// Connection message receivers | ||
receivers: unicycle::IndexedStreamsUnordered<ReceiverStream<(ConnectionInfo, M)>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could use use tokio's StreamMap here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I looked at this earlier, but I can't recall why I didn't choose it. I'll swap in and see how it looks... could simplify the idx lookup bit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see randomness in the polling order.
https://docs.rs/tokio-stream/latest/src/tokio_stream/stream_map.rs.html#493
fn connect(&mut self, id: u64, dst: SocketAddr, notify: oneshot::Sender<Connection<M>>) { | ||
let info = ConnectionInfo { id, peer: dst }; | ||
|
||
let (tx, rx) = mpsc::channel(1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why size 1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I always default no buffer for consistent behavior. There is an interesting question here in that a slow consumer can back up the whole host. The tests I generally write don't have behavior like that, rather we might see a slow host due to conditions we created in the network topology. Thoughts?
src/lib.rs
Outdated
mod connection; | ||
pub use connection::Connection; | ||
pub use connection::ConnectionIo; | ||
pub use connection::Segment; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a blocker, but pub use connection::{Connection, Connectionio, Segment};
is more idiomatic here.l
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sg
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it works, merge it 👍
Real world servers often use concurrency for processing requests.
Protocols also lean on message ordering per connection, ie TCP. Neither
of these features were available with the current
Io
implementation.This change overlays connection support on top of
Io
by multiplexingthrough an actor. A simple TCP-like state machine is used to initiate and
accept connections.