Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update pubsub and add default validator #9684

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
63 changes: 56 additions & 7 deletions core/node/libp2p/pubsub.go
Original file line number Diff line number Diff line change
@@ -1,26 +1,75 @@
package libp2p

import (
"context"
"errors"
"fmt"

datastore "github.com/ipfs/go-datastore"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/discovery"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"go.uber.org/fx"

"github.com/ipfs/kubo/core/node/helpers"
"github.com/ipfs/kubo/repo"
)

type P2PPubSubIn struct {
fx.In

Repo repo.Repo
Host host.Host
Discovery discovery.Discovery
}
Comment on lines +19 to +25
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the point of P2PPubSubIn ? Couldn't you use arguments on the anonymous functions ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its 3 of them, this is more tidy and it costs nothing.


func FloodSub(pubsubOptions ...pubsub.Option) interface{} {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, disc discovery.Discovery) (service *pubsub.PubSub, err error) {
return pubsub.NewFloodSub(helpers.LifecycleCtx(mctx, lc), host, append(pubsubOptions, pubsub.WithDiscovery(disc))...)
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, params P2PPubSubIn) (service *pubsub.PubSub, err error) {
return pubsub.NewFloodSub(
helpers.LifecycleCtx(mctx, lc),
params.Host,
append(pubsubOptions,
pubsub.WithDiscovery(params.Discovery),
pubsub.WithDefaultValidator(pubsub.NewBasicSeqnoValidator(makePubSubMetadataStore(params.Repo.Datastore()))))...,
)
}
}

func GossipSub(pubsubOptions ...pubsub.Option) interface{} {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, disc discovery.Discovery) (service *pubsub.PubSub, err error) {
return pubsub.NewGossipSub(helpers.LifecycleCtx(mctx, lc), host, append(
pubsubOptions,
pubsub.WithDiscovery(disc),
pubsub.WithFloodPublish(true))...,
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, params P2PPubSubIn) (service *pubsub.PubSub, err error) {
return pubsub.NewGossipSub(
helpers.LifecycleCtx(mctx, lc),
params.Host,
append(
pubsubOptions,
pubsub.WithDiscovery(params.Discovery),
pubsub.WithFloodPublish(true),
pubsub.WithDefaultValidator(pubsub.NewBasicSeqnoValidator(makePubSubMetadataStore(params.Repo.Datastore()))))...,
)
}
}

func makePubSubMetadataStore(ds datastore.Datastore) pubsub.PeerMetadataStore {
return &pubsubMetadataStore{ds: ds}
}

type pubsubMetadataStore struct {
ds datastore.Datastore
}

func (m *pubsubMetadataStore) Get(ctx context.Context, p peer.ID) ([]byte, error) {
k := datastore.NewKey(fmt.Sprintf("/pubsub/seqno/%s", p))

v, err := m.ds.Get(ctx, k)
if err != nil && errors.Is(err, datastore.ErrNotFound) {
return nil, nil
}

return v, err
}

func (m *pubsubMetadataStore) Put(ctx context.Context, p peer.ID, v []byte) error {
k := datastore.NewKey(fmt.Sprintf("/pubsub/seqno/%s", p))
return m.ds.Put(ctx, k, v)
}
3 changes: 1 addition & 2 deletions docs/examples/kubo-as-a-library/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ require (
github.com/docker/go-units v0.5.0 // indirect
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/elastic/gosigar v0.14.2 // indirect
github.com/emirpasic/gods v1.18.1 // indirect
github.com/facebookgo/atomicfile v0.0.0-20151019160806-2de1f203e7d5 // indirect
github.com/flynn/noise v1.0.0 // indirect
github.com/francoispqt/gojay v1.2.13 // indirect
Expand Down Expand Up @@ -122,7 +121,7 @@ require (
github.com/libp2p/go-libp2p-asn-util v0.3.0 // indirect
github.com/libp2p/go-libp2p-kad-dht v0.21.1 // indirect
github.com/libp2p/go-libp2p-kbucket v0.5.0 // indirect
github.com/libp2p/go-libp2p-pubsub v0.9.0 // indirect
github.com/libp2p/go-libp2p-pubsub v0.9.2 // indirect
github.com/libp2p/go-libp2p-pubsub-router v0.6.0 // indirect
github.com/libp2p/go-libp2p-record v0.2.0 // indirect
github.com/libp2p/go-libp2p-routing-helpers v0.6.1 // indirect
Expand Down
6 changes: 2 additions & 4 deletions docs/examples/kubo-as-a-library/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,6 @@ github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaB
github.com/elastic/gosigar v0.12.0/go.mod h1:iXRIGg2tLnu7LBdpqzyQfGDEidKCfWcCMS0WKyPWoMs=
github.com/elastic/gosigar v0.14.2 h1:Dg80n8cr90OZ7x+bAax/QjoW/XqTI11RmA79ZwIm9/4=
github.com/elastic/gosigar v0.14.2/go.mod h1:iXRIGg2tLnu7LBdpqzyQfGDEidKCfWcCMS0WKyPWoMs=
github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc=
github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ=
github.com/envoyproxy/go-control-plane v0.6.9/go.mod h1:SBwIajubJHhxtWwsL9s8ss4safvEdbitLhGGK48rN6g=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
Expand Down Expand Up @@ -801,8 +799,8 @@ github.com/libp2p/go-libp2p-peerstore v0.2.2/go.mod h1:NQxhNjWxf1d4w6PihR8btWIRj
github.com/libp2p/go-libp2p-peerstore v0.2.6/go.mod h1:ss/TWTgHZTMpsU/oKVVPQCGuDHItOpf2W8RxAi50P2s=
github.com/libp2p/go-libp2p-peerstore v0.2.7/go.mod h1:ss/TWTgHZTMpsU/oKVVPQCGuDHItOpf2W8RxAi50P2s=
github.com/libp2p/go-libp2p-pnet v0.2.0/go.mod h1:Qqvq6JH/oMZGwqs3N1Fqhv8NVhrdYcO0BW4wssv21LA=
github.com/libp2p/go-libp2p-pubsub v0.9.0 h1:mcLb4WzwhUG4OKb0rp1/bYMd/DYhvMyzJheQH3LMd1s=
github.com/libp2p/go-libp2p-pubsub v0.9.0/go.mod h1:OEsj0Cc/BpkqikXRTrVspWU/Hx7bMZwHP+6vNMd+c7I=
github.com/libp2p/go-libp2p-pubsub v0.9.2 h1:CoWrvqtIbk+8iTLk1yCN8zODMgBSCqRgyVCvHaGJx8Y=
github.com/libp2p/go-libp2p-pubsub v0.9.2/go.mod h1:RYA7aM9jIic5VV47WXu4GkcRxRhrdElWf8xtyli+Dzc=
github.com/libp2p/go-libp2p-pubsub-router v0.6.0 h1:D30iKdlqDt5ZmLEYhHELCMRj8b4sFAqrUcshIUvVP/s=
github.com/libp2p/go-libp2p-pubsub-router v0.6.0/go.mod h1:FY/q0/RBTKsLA7l4vqC2cbRbOvyDotg8PJQ7j8FDudE=
github.com/libp2p/go-libp2p-quic-transport v0.10.0/go.mod h1:RfJbZ8IqXIhxBRm5hqUEJqjiiY8xmEuq3HUDS993MkA=
Expand Down
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ require (
github.com/libp2p/go-libp2p-http v0.4.0
github.com/libp2p/go-libp2p-kad-dht v0.21.1
github.com/libp2p/go-libp2p-kbucket v0.5.0
github.com/libp2p/go-libp2p-pubsub v0.9.0
github.com/libp2p/go-libp2p-pubsub v0.9.2
github.com/libp2p/go-libp2p-pubsub-router v0.6.0
github.com/libp2p/go-libp2p-record v0.2.0
github.com/libp2p/go-libp2p-routing-helpers v0.6.1
Expand Down Expand Up @@ -130,7 +130,6 @@ require (
github.com/dgraph-io/ristretto v0.0.2 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/elastic/gosigar v0.14.2 // indirect
github.com/emirpasic/gods v1.18.1 // indirect
github.com/felixge/httpsnoop v1.0.2 // indirect
github.com/flynn/noise v1.0.0 // indirect
github.com/francoispqt/gojay v1.2.13 // indirect
Expand Down
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,6 @@ github.com/elastic/gosigar v0.14.2 h1:Dg80n8cr90OZ7x+bAax/QjoW/XqTI11RmA79ZwIm9/
github.com/elastic/gosigar v0.14.2/go.mod h1:iXRIGg2tLnu7LBdpqzyQfGDEidKCfWcCMS0WKyPWoMs=
github.com/elgris/jsondiff v0.0.0-20160530203242-765b5c24c302 h1:QV0ZrfBLpFc2KDk+a4LJefDczXnonRwrYrQJY/9L4dA=
github.com/elgris/jsondiff v0.0.0-20160530203242-765b5c24c302/go.mod h1:qBlWZqWeVx9BjvqBsnC/8RUlAYpIFmPvgROcw0n1scE=
github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc=
github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ=
github.com/envoyproxy/go-control-plane v0.6.9/go.mod h1:SBwIajubJHhxtWwsL9s8ss4safvEdbitLhGGK48rN6g=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
Expand Down Expand Up @@ -835,8 +833,8 @@ github.com/libp2p/go-libp2p-peerstore v0.2.2/go.mod h1:NQxhNjWxf1d4w6PihR8btWIRj
github.com/libp2p/go-libp2p-peerstore v0.2.6/go.mod h1:ss/TWTgHZTMpsU/oKVVPQCGuDHItOpf2W8RxAi50P2s=
github.com/libp2p/go-libp2p-peerstore v0.2.7/go.mod h1:ss/TWTgHZTMpsU/oKVVPQCGuDHItOpf2W8RxAi50P2s=
github.com/libp2p/go-libp2p-pnet v0.2.0/go.mod h1:Qqvq6JH/oMZGwqs3N1Fqhv8NVhrdYcO0BW4wssv21LA=
github.com/libp2p/go-libp2p-pubsub v0.9.0 h1:mcLb4WzwhUG4OKb0rp1/bYMd/DYhvMyzJheQH3LMd1s=
github.com/libp2p/go-libp2p-pubsub v0.9.0/go.mod h1:OEsj0Cc/BpkqikXRTrVspWU/Hx7bMZwHP+6vNMd+c7I=
github.com/libp2p/go-libp2p-pubsub v0.9.2 h1:CoWrvqtIbk+8iTLk1yCN8zODMgBSCqRgyVCvHaGJx8Y=
github.com/libp2p/go-libp2p-pubsub v0.9.2/go.mod h1:RYA7aM9jIic5VV47WXu4GkcRxRhrdElWf8xtyli+Dzc=
github.com/libp2p/go-libp2p-pubsub-router v0.6.0 h1:D30iKdlqDt5ZmLEYhHELCMRj8b4sFAqrUcshIUvVP/s=
github.com/libp2p/go-libp2p-pubsub-router v0.6.0/go.mod h1:FY/q0/RBTKsLA7l4vqC2cbRbOvyDotg8PJQ7j8FDudE=
github.com/libp2p/go-libp2p-quic-transport v0.10.0/go.mod h1:RfJbZ8IqXIhxBRm5hqUEJqjiiY8xmEuq3HUDS993MkA=
Expand Down
1 change: 1 addition & 0 deletions test/integration/pubsub_msg_seen_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
)

func TestMessageSeenCacheTTL(t *testing.T) {
t.Skip("the behaviour of seen cache has changed wrt to timing and this test cannot capture this behaviour now; it also has unit tests in go-libp2p-pubsub.")
if err := RunMessageSeenCacheTTLTest(t, "10s"); err != nil {
t.Fatal(err)
}
Expand Down