Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use event bus for monitoring peer connections and protocol updates #536

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

jefft0
Copy link

@jefft0 jefft0 commented Jul 6, 2023

This fixes a bug where another peer is not added to a topic after subscribing. This bug happens, for example, when a discovery system (like MDNS) is set up before pubsub initialization. Even though the peers have been discovered, they are not added to the new pubsub topics.

The bug can be seen by running TestNotifyPeerProtocolsUpdated in the master branch:

git clone https://github.com/libp2p/go-libp2p-pubsub
cd go-libp2p-pubsub
curl https://raw.githubusercontent.com/jefft0/go-libp2p-pubsub/fix/notify-protocol-update/notify_test.go -o notify_test.go
go test -run TestNotifyPeerProtocolsUpdated .

It prints "topic1 should at least have 1 peer". In our code we had to workaround this by disabling discovery (MDNS) and deferring discovery until after the pubsub protocols are registered. However, this pull request fixes the problem so that a workaround is not needed. It uses the event bus to monitor for peer connections and protocol updates. When hosts[1] joins topic1, the other peer is automatically added and the test passes.

Signed-off-by: gfanton <8671905+gfanton@users.noreply.github.com>
@jefft0
Copy link
Author

jefft0 commented Jul 6, 2023

Some background if it helps: We discovered and created a fix for this issue while working on this simple example of our networking library that is built on libp2p.
https://wesh.network/posts/share-contact-and-send-message

Copy link
Collaborator

@vyzo vyzo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for this pr.

This is something that we have had to do for a while, so it is a step on the right direction.

At first glance it looks good, but I will do a second more thorough pass to make sure we don't break stuff.

pubsub.go Outdated

ps.val.Start(ps)

go ps.processLoop(ctx)

(*PubSubNotif)(ps).Initialize()
if err := (*PubSubNotif)(ps).startMonitoring(); err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's do this check before spawning the processLoop, as if it fails the system will not work at all.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello @vyzo . With the help of @gfanton , I added a second commit to address this comment. It more closely follows the logic which was there before.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the thumbs up. Do we need to address any other issues with this PR?

Signed-off-by: Jeff Thompson <jeff@thefirst.org>
@jefft0 jefft0 force-pushed the fix/notify-protocol-update branch from e4c5794 to 7aeb7da Compare July 7, 2023 10:36
@vyzo vyzo changed the title fix: use event bus for monitoring peer connections and protocol updates use event bus for monitoring peer connections and protocol updates Jul 7, 2023
Copy link
Collaborator

@vyzo vyzo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good start, left some comments requiring changes.

notify.go Outdated Show resolved Hide resolved
Comment on lines -48 to -53
func (p *PubSubNotif) Initialize() {
isTransient := func(pid peer.ID) bool {
for _, c := range p.host.Network().ConnsToPeer(pid) {
if !c.Stat().Transient {
return false
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to keep this code around; please don't remove it and run it right after initializing the bus and before starting the monitoring goroutine.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the initialization code is still there (below), but I believe it is racey.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comments below.

notify.go Outdated
Comment on lines 37 to 38
if evt.Connectedness == network.Connected {
go p.AddPeers(evt.Peer)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should process the disconnect events as well and get rid of the nasty code that handles current peer disconnections.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The latest commit handles the NotConnected event and calls RemovePeers. How does it look?


ps.val.Start(ps)

go ps.processLoop(ctx)

(*PubSubNotif)(ps).Initialize()
// add current peers to notify system
notify.AddPeers(h.Network().Peers()...)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah ok, it is still here.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is racey; it needs to happen inside startMonitoring, after we have initialized the bus but before we have spawned the monitoring goroutine.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the latest commit, the following test passes:

go test -v -run=TestSimpleDiscovery -count=1 .

As you suggest, we could move AddPeers to inside startMonitoring before the goroutine as p.AddPeers(p.host.Network().Peers()...) . But with this change, TestSimpleDiscovery hangs and times out. What do you think?

Comment on lines -48 to -53
func (p *PubSubNotif) Initialize() {
isTransient := func(pid peer.ID) bool {
for _, c := range p.host.Network().ConnsToPeer(pid) {
if !c.Stat().Transient {
return false
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the initialization code is still there (below), but I believe it is racey.

Comment on lines -48 to -53
func (p *PubSubNotif) Initialize() {
isTransient := func(pid peer.ID) bool {
for _, c := range p.host.Network().ConnsToPeer(pid) {
if !c.Stat().Transient {
return false
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comments below.

Also set eventbus.Name to "libp2p/pubsub/notify".

Signed-off-by: Jeff Thompson <jeff@thefirst.org>
@jefft0 jefft0 requested a review from vyzo May 3, 2024 09:20
@jefft0
Copy link
Author

jefft0 commented May 3, 2024

Hello @vyzo . You suggested moving AddPeers to inside startMonitoring before the goroutine as p.AddPeers(p.host.Network().Peers()...) . But with this change, TestSimpleDiscovery hangs and times out. What do you suggest to do to resolve this? (Right now we have to use a fork of this repo with the fix.)

@vyzo
Copy link
Collaborator

vyzo commented May 3, 2024

Uhm, fix the deadlock?

Maybe add an extra goroutine for startup.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants