Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(kad): introduce AsyncBehaviour #5294

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

stormshield-frb
Copy link
Contributor

Description

The more things our software is doing, the more it is complicated to track Kademlia queries. We find ourselves needing to have distinct HashMap to keep track of every queries we are doing so we can link them back to where and why there were triggered and we needed them. This began to be very messy and that is why we have implemented a wrapper of the kad::Behaviour with the goal of simplifying the tracking of Kademlia queries.

This wrapper is pretty simple and has one method for each of the possible Kademlia queries (bootstrap, get_closest_peers, etc) respectively suffixed _async (bootstrap_async, get_closest_peers_async, etc). Those methods, instead of returning a QueryId, return a typed UnboundedQueryResultReceiver that will receive every Event::OutboundQueryProgressed corresponding to the QueryId. The only purpose of this wrapper is to map an OutboundQueryProgressed to the corresponding sender and send it.

Doing so, it is very easy for the developer to track his queries and keep a correct state in his application specific code.

Notes & open questions

I have chosen to use UnboundedChannel considering that the data transmitted is already in memory since this we obtain it when receiving an OutboundQueryProgressed. That is why I don't think there is a particular risk of running out of memory. If there is, I can rework my wrapper to work with bounded channels.

Change checklist

  • I have performed a self-review of my own code
  • I have made corresponding changes to the documentation
  • I have added tests that prove my fix is effective or that my feature works
  • A changelog entry has been made in the appropriate crates

@drHuangMHT
Copy link
Contributor

drHuangMHT commented Apr 5, 2024

I think it's OK to drop some messages though, they are not that important.
I like the idea of directing query result to the query-er but the event system is designed in such a way so that everyone can listen on any event emitted by anyone. However I do encounter times when swarm events overwhelm consumers and that forced me to implement backpressure so this is to-be-discussed.

Copy link
Member

@jxs jxs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi François, thanks for starting this! I think this is a great idea for kademlia to offer async await friendly primitives, left some notes to help move this forward

/// with an [`Event::OutboundQueryProgressed`] like nothing happen.
///
/// For more information, see [`Behaviour`].
pub struct AsyncBehaviour<TStore> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think of following the design introduced with stream, i.e. introducing a clonable Control
handle which implements the methods, that way we don't need to keep a reference to the Swarm to call the methods.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it would probably be great !!

I'm thinking about it but I really don't see how I could implement it since I need to capture the events emitted by the kad::Behaviour. Do you have an idea ?

}
}

type UnboundedQueryResultSender<T> = mpsc::UnboundedSender<AsyncQueryResult<T>>;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wdyt of using a wrapper structure that impl's Future and abstracts UnboundedSender (we shouldn't expose third party types as it introduces possible breaking change see here for a discussion on it).
Like AsyncQueryResult<T> where T could be BootstrapResult, GetClosestPeersResult etc

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah you're right. I'm going to introduce a wrapper AsyncQueryResultStream around UnboundedQueryResultReceiver which implement futures::Stream. It will still depends on the definition of futures::Stream but I don't think it is possible to remove it and since it is done in protocols/stream/src/control.rs for IncomingStreams I think it is ok.

impl futures::Stream for IncomingStreams {
type Item = (PeerId, Stream);
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.receiver.poll_next_unpin(cx)
}
}

Is it ok ?

@@ -387,6 +387,43 @@ impl<TOutEvent, THandlerIn> ToSwarm<TOutEvent, THandlerIn> {
},
}
}

/// Map the event the swarm will return to an optional one.
pub fn map_out_opt<E>(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need this? I see it's only called on poll so why not do it there?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah indeed. It is just that we use it in our fork because we have several Behaviour wrapper and we often need to map an event into a different optional event. Currently there is no method like this on ToSwarm. I thought it was a good idea to upstream it. But if you prefer I can make it private in the async_behaviour.rs file.

@stormshield-frb
Copy link
Contributor Author

@jxs like I mentioned in this PR description (in section Notes and open questions), I have used unbounded channels thinking that this could not cause a memory issue more than what there is already since all the sent events are already in memory. Still, clippy does not agree with me and the CI fails. What do you prefer ? I put an allow(clippy::disallowed_method) or I try to use bounded channels ?

I really don't have an opinion on this.

@dariusc93
Copy link
Contributor

@jxs like I mentioned in this PR description (in section Notes and open questions), I have used unbounded channels thinking that this could not cause a memory issue more than what there is already since all the sent events are already in memory. Still, clippy does not agree with me and the CI fails. What do you prefer ? I put an allow(clippy::disallowed_method) or I try to use bounded channels ?

I really don't have an opinion on this.

It would be preferred to use bounded channels so we dont introduce that vector.

Copy link

mergify bot commented Apr 19, 2024

This pull request has merge conflicts. Could you please resolve them @stormshield-frb? 🙏

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants