From 01adb9a9f39f79fa0d3c2ed848f0be5638a95a3a Mon Sep 17 00:00:00 2001 From: Wondertan Date: Wed, 2 Feb 2022 17:55:53 +0200 Subject: [PATCH 1/8] deps: use customized pubsub fork with excluded new go-datastore plumbing commit --- go.mod | 5 ++++- go.sum | 4 ++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 56e2e407c0..55400d957d 100644 --- a/go.mod +++ b/go.mod @@ -45,4 +45,7 @@ require ( go.uber.org/zap v1.20.0 ) -replace github.com/tendermint/tendermint v0.34.14 => github.com/celestiaorg/celestia-core v0.34.14-celestia +replace ( + github.com/libp2p/go-libp2p-pubsub v0.5.7-0.20211029175501-5c90105738cf => github.com/celestiaorg/go-libp2p-pubsub v0.5.7-0.20220202152246-c33ecdf03b34 + github.com/tendermint/tendermint v0.34.14 => github.com/celestiaorg/celestia-core v0.34.14-celestia +) diff --git a/go.sum b/go.sum index cf5865bc71..b234c58980 100644 --- a/go.sum +++ b/go.sum @@ -148,6 +148,8 @@ github.com/celestiaorg/go-leopard v0.1.0 h1:28z2EkvKJIez5J9CEaiiUEC+OxalRLtTGJJ1 github.com/celestiaorg/go-leopard v0.1.0/go.mod h1:NtO/rjlB8dw2aq7jr06vZFKGvryQcTDXaNHelmPNOAM= github.com/celestiaorg/go-libp2p-messenger v0.1.0 h1:rFldTa3ZWcRRn8E2bRWS94Qp1GFYXO2a0uvqpIey1B8= github.com/celestiaorg/go-libp2p-messenger v0.1.0/go.mod h1:XzNksXrH0VxuNRGOnjPL9Ck4UyQlbmMpCYg9YwSBerI= +github.com/celestiaorg/go-libp2p-pubsub v0.5.7-0.20220202152246-c33ecdf03b34 h1:NB+H2aczLNQgPUu59MHrk9ZAzCsGOkxszpHaClUGRUo= +github.com/celestiaorg/go-libp2p-pubsub v0.5.7-0.20220202152246-c33ecdf03b34/go.mod h1:gVOzwebXVdSMDQBTfH8ACO5EJ4SQrvsHqCmYsCZpD0E= github.com/celestiaorg/go-verifcid v0.0.1-lazypatch h1:9TSe3w1cmJmbWlweCwCTIZkan7jV8M+KwglXpdD+UG8= github.com/celestiaorg/go-verifcid v0.0.1-lazypatch/go.mod h1:kXPYu0XqTNUKWA1h3M95UHjUqBzDwXVVt/RXZDjKJmQ= github.com/celestiaorg/merkletree v0.0.0-20210714075610-a84dc3ddbbe4 h1:CJdIpo8n5MFP2MwK0gSRcOVlDlFdQJO1p+FqdxYzmvc= @@ -804,8 +806,6 @@ github.com/libp2p/go-libp2p-peerstore v0.3.0 h1:wp/G0+37+GLr7tu+wE+4GWNrA3uxKg6I github.com/libp2p/go-libp2p-peerstore v0.3.0/go.mod h1:fNX9WlOENMvdx/YD7YO/5Hkrn8+lQIk5A39BHa1HIrM= github.com/libp2p/go-libp2p-pnet v0.2.0 h1:J6htxttBipJujEjz1y0a5+eYoiPcFHhSYHH6na5f0/k= github.com/libp2p/go-libp2p-pnet v0.2.0/go.mod h1:Qqvq6JH/oMZGwqs3N1Fqhv8NVhrdYcO0BW4wssv21LA= -github.com/libp2p/go-libp2p-pubsub v0.5.7-0.20211029175501-5c90105738cf h1:uW9m2wQ64gkTugA9nffzoiZR1ZQ0D2/xzBQfW6sgHPk= -github.com/libp2p/go-libp2p-pubsub v0.5.7-0.20211029175501-5c90105738cf/go.mod h1:gVOzwebXVdSMDQBTfH8ACO5EJ4SQrvsHqCmYsCZpD0E= github.com/libp2p/go-libp2p-quic-transport v0.10.0/go.mod h1:RfJbZ8IqXIhxBRm5hqUEJqjiiY8xmEuq3HUDS993MkA= github.com/libp2p/go-libp2p-quic-transport v0.11.2 h1:p1YQDZRHH4Cv2LPtHubqlQ9ggz4CKng/REZuXZbZMhM= github.com/libp2p/go-libp2p-quic-transport v0.11.2/go.mod h1:wlanzKtIh6pHrq+0U3p3DY9PJfGqxMgPaGKaK5LifwQ= From 994f4ec805613fe4d546b01b99b77c24704f24db Mon Sep 17 00:00:00 2001 From: Wondertan Date: Wed, 2 Feb 2022 17:57:06 +0200 Subject: [PATCH 2/8] service/header: implement custom message id function to resolve header duplicates issue --- service/header/p2p_subscriber.go | 37 +++++++++++++++++++++++++++++++- 1 file changed, 36 insertions(+), 1 deletion(-) diff --git a/service/header/p2p_subscriber.go b/service/header/p2p_subscriber.go index 602978341d..83ba3cdf7f 100644 --- a/service/header/p2p_subscriber.go +++ b/service/header/p2p_subscriber.go @@ -6,6 +6,8 @@ import ( "github.com/libp2p/go-libp2p-core/peer" pubsub "github.com/libp2p/go-libp2p-pubsub" + pb "github.com/libp2p/go-libp2p-pubsub/pb" + "github.com/minio/blake2b-simd" ) // PubSubTopic hardcodes the name of the ExtendedHeader @@ -30,7 +32,7 @@ func NewP2PSubscriber(ps *pubsub.PubSub) *P2PSubscriber { // Start starts the P2PSubscriber, registering a topic validator for the "header-sub" // topic and joining it. func (p *P2PSubscriber) Start(context.Context) (err error) { - p.topic, err = p.pubsub.Join(PubSubTopic) + p.topic, err = p.pubsub.Join(PubSubTopic, pubsub.WithTopicMessageIdFn(msgID)) return err } @@ -78,3 +80,36 @@ func (p *P2PSubscriber) Broadcast(ctx context.Context, header *ExtendedHeader) e } return p.topic.Publish(ctx, bin) } + +// msgID computes an id for a pubsub message +// TODO(@Wondertan): This cause additional allocations per each recvd message in the topic. Find a way to avoid those. +func msgID(pmsg *pb.Message) string { + msgID := func(data []byte) string { + hash := blake2b.Sum256(data) + return string(hash[:]) + } + + h, err := UnmarshalExtendedHeader(pmsg.Data) + if err != nil { + log.Errorw("unmarshalling header while computing msg id", "err", err) + return msgID(pmsg.Data) + } + + // IMPORTANT NOTE: + // Due to the nature of the Tendermint consensus, validators don't collect all commit signatures, + // but only the minimum required amount of them(+2/3 of power). Also, signatures are collected asynchronously. + // So for, each validator may have a different set of minimally required signatures causing nondeterminism in + // the header message gossiped over the network. Subsequently, this causes message duplicates as each Bridge Node, + // connected to personal a validator, sends the validaotor's own view of commits of effectively the same header. + // + // To solve the problem above, we exclude nondeterministic value from message id calculation + h.Commit.Signatures = nil + + data, err := MarshalExtendedHeader(h) + if err != nil { + log.Errorw("unmarshalling header while computing msg id", "err", err) + return msgID(pmsg.Data) + } + + return msgID(data) +} From 20efe7def02957d82f3c5f385c41a813136e48cf Mon Sep 17 00:00:00 2001 From: Wondertan Date: Wed, 2 Feb 2022 17:58:04 +0200 Subject: [PATCH 3/8] log(service/header): remove log about known issue --- service/header/sync.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/service/header/sync.go b/service/header/sync.go index 4c2718bbf5..ade15c422e 100644 --- a/service/header/sync.go +++ b/service/header/sync.go @@ -221,9 +221,6 @@ func (s *Syncer) processIncoming(ctx context.Context, maybeHead *ExtendedHeader) log.Warnw("received known header", "height", maybeHead.Height, "hash", maybeHead.Hash()) - - // TODO(@Wondertan): Remove once duplicates are fully fixed - log.Warnf("Ignore the warn above - there is a known issue with duplicate headers on the network.") return pubsub.ValidationIgnore // we don't know if header is invalid so ignore } From 6866618d637100c7b1f69766b66c6189f8be34eb Mon Sep 17 00:00:00 2001 From: Hlib Kanunnikov Date: Tue, 8 Feb 2022 18:46:21 +0200 Subject: [PATCH 4/8] Update service/header/p2p_subscriber.go Co-authored-by: John Adler --- service/header/p2p_subscriber.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/service/header/p2p_subscriber.go b/service/header/p2p_subscriber.go index 83ba3cdf7f..1bbfa32a9b 100644 --- a/service/header/p2p_subscriber.go +++ b/service/header/p2p_subscriber.go @@ -96,11 +96,12 @@ func msgID(pmsg *pb.Message) string { } // IMPORTANT NOTE: - // Due to the nature of the Tendermint consensus, validators don't collect all commit signatures, - // but only the minimum required amount of them(+2/3 of power). Also, signatures are collected asynchronously. - // So for, each validator may have a different set of minimally required signatures causing nondeterminism in - // the header message gossiped over the network. Subsequently, this causes message duplicates as each Bridge Node, - // connected to personal a validator, sends the validaotor's own view of commits of effectively the same header. + // Due to the nature of the Tendermint consensus, validators don't necessarily collect commit signatures from the + // entire validator set, but only the minimum required amount of them (>2/3 of voting power). In addition, + // signatures are collected asynchronously. Therefore, each validator may have a different set of signatures that + // pass the minimum required voting power threshold, causing nondeterminism in the header message gossiped over the + // network. Subsequently, this causes message duplicates as each Bridge Node, connected to a personal validator, + // sends the validator's own view of commits of effectively the same header. // // To solve the problem above, we exclude nondeterministic value from message id calculation h.Commit.Signatures = nil From 4a6cf9ca3df6c31d6a649e3632116b88acb3b66d Mon Sep 17 00:00:00 2001 From: Hlib Kanunnikov Date: Tue, 8 Feb 2022 18:46:39 +0200 Subject: [PATCH 5/8] Update service/header/p2p_subscriber.go Co-authored-by: rene <41963722+renaynay@users.noreply.github.com> --- service/header/p2p_subscriber.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/service/header/p2p_subscriber.go b/service/header/p2p_subscriber.go index 1bbfa32a9b..2c989a3282 100644 --- a/service/header/p2p_subscriber.go +++ b/service/header/p2p_subscriber.go @@ -108,7 +108,7 @@ func msgID(pmsg *pb.Message) string { data, err := MarshalExtendedHeader(h) if err != nil { - log.Errorw("unmarshalling header while computing msg id", "err", err) + log.Errorw("marshaling header while computing msg id", "err", err) return msgID(pmsg.Data) } From c660c114867b8c5ff1704919bbfb23fb6aeb4ea2 Mon Sep 17 00:00:00 2001 From: Wondertan Date: Tue, 8 Feb 2022 19:09:24 +0200 Subject: [PATCH 6/8] changelog: add entry for #409 --- CHANGELOG-PENDING.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG-PENDING.md b/CHANGELOG-PENDING.md index b3d9ca6d9f..5347ffba07 100644 --- a/CHANGELOG-PENDING.md +++ b/CHANGELOG-PENDING.md @@ -36,6 +36,7 @@ Month, DD, YYYY ### BUG FIXES +- [service/header: fix ExtendedHeader message duplicates on the network #409](https://github.com/celestiaorg/celestia-node/pull/409) [@Wondertan](https://github.com/Wondertan) - [fix(header/service): #339 race](https://github.com/celestiaorg/celestia-node/pull/343) [@Wondertan](https://github.com/Wondertan) - [core: Properly fetch Validators from Core and two more fixes #328](https://github.com/celestiaorg/celestia-node/pull/328) [@Wondertan](https://github.com/Wondertan) - [header: Added missing `err` value in ErrorW logging calls](https://github.com/celestiaorg/celestia-node/pull/282) [@jbowen93](https://github.com/jbowen93) From ce3a7ea70ad122a008be57d2b0cc7aee024e987b Mon Sep 17 00:00:00 2001 From: Wondertan Date: Tue, 8 Feb 2022 19:26:00 +0200 Subject: [PATCH 7/8] chore(service/header): remove logs that will be anyway duplicated during validation and document error handling --- service/header/p2p_subscriber.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/service/header/p2p_subscriber.go b/service/header/p2p_subscriber.go index 2c989a3282..926094808e 100644 --- a/service/header/p2p_subscriber.go +++ b/service/header/p2p_subscriber.go @@ -91,7 +91,8 @@ func msgID(pmsg *pb.Message) string { h, err := UnmarshalExtendedHeader(pmsg.Data) if err != nil { - log.Errorw("unmarshalling header while computing msg id", "err", err) + // There is nothing we can do about the error, and it will be anyway caught during validation. + // We also *have* to return some ID for the msg, so give the hash of even faulty msg return msgID(pmsg.Data) } @@ -108,7 +109,7 @@ func msgID(pmsg *pb.Message) string { data, err := MarshalExtendedHeader(h) if err != nil { - log.Errorw("marshaling header while computing msg id", "err", err) + // See the note under unmarshalling step return msgID(pmsg.Data) } From c1960d86b9fc427fc41c8e432a5109b8f1a780f4 Mon Sep 17 00:00:00 2001 From: Wondertan Date: Mon, 14 Feb 2022 12:17:20 +0200 Subject: [PATCH 8/8] chore(service/header): rename inner msgID func to a shorter one --- service/header/p2p_subscriber.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/service/header/p2p_subscriber.go b/service/header/p2p_subscriber.go index 926094808e..8ff1185193 100644 --- a/service/header/p2p_subscriber.go +++ b/service/header/p2p_subscriber.go @@ -84,7 +84,7 @@ func (p *P2PSubscriber) Broadcast(ctx context.Context, header *ExtendedHeader) e // msgID computes an id for a pubsub message // TODO(@Wondertan): This cause additional allocations per each recvd message in the topic. Find a way to avoid those. func msgID(pmsg *pb.Message) string { - msgID := func(data []byte) string { + mID := func(data []byte) string { hash := blake2b.Sum256(data) return string(hash[:]) } @@ -93,7 +93,7 @@ func msgID(pmsg *pb.Message) string { if err != nil { // There is nothing we can do about the error, and it will be anyway caught during validation. // We also *have* to return some ID for the msg, so give the hash of even faulty msg - return msgID(pmsg.Data) + return mID(pmsg.Data) } // IMPORTANT NOTE: @@ -110,8 +110,8 @@ func msgID(pmsg *pb.Message) string { data, err := MarshalExtendedHeader(h) if err != nil { // See the note under unmarshalling step - return msgID(pmsg.Data) + return mID(pmsg.Data) } - return msgID(data) + return mID(data) }