diff --git a/CHANGELOG-PENDING.md b/CHANGELOG-PENDING.md index b3d9ca6d9f..5347ffba07 100644 --- a/CHANGELOG-PENDING.md +++ b/CHANGELOG-PENDING.md @@ -36,6 +36,7 @@ Month, DD, YYYY ### BUG FIXES +- [service/header: fix ExtendedHeader message duplicates on the network #409](https://github.com/celestiaorg/celestia-node/pull/409) [@Wondertan](https://github.com/Wondertan) - [fix(header/service): #339 race](https://github.com/celestiaorg/celestia-node/pull/343) [@Wondertan](https://github.com/Wondertan) - [core: Properly fetch Validators from Core and two more fixes #328](https://github.com/celestiaorg/celestia-node/pull/328) [@Wondertan](https://github.com/Wondertan) - [header: Added missing `err` value in ErrorW logging calls](https://github.com/celestiaorg/celestia-node/pull/282) [@jbowen93](https://github.com/jbowen93) diff --git a/go.mod b/go.mod index 56e2e407c0..55400d957d 100644 --- a/go.mod +++ b/go.mod @@ -45,4 +45,7 @@ require ( go.uber.org/zap v1.20.0 ) -replace github.com/tendermint/tendermint v0.34.14 => github.com/celestiaorg/celestia-core v0.34.14-celestia +replace ( + github.com/libp2p/go-libp2p-pubsub v0.5.7-0.20211029175501-5c90105738cf => github.com/celestiaorg/go-libp2p-pubsub v0.5.7-0.20220202152246-c33ecdf03b34 + github.com/tendermint/tendermint v0.34.14 => github.com/celestiaorg/celestia-core v0.34.14-celestia +) diff --git a/go.sum b/go.sum index cf5865bc71..b234c58980 100644 --- a/go.sum +++ b/go.sum @@ -148,6 +148,8 @@ github.com/celestiaorg/go-leopard v0.1.0 h1:28z2EkvKJIez5J9CEaiiUEC+OxalRLtTGJJ1 github.com/celestiaorg/go-leopard v0.1.0/go.mod h1:NtO/rjlB8dw2aq7jr06vZFKGvryQcTDXaNHelmPNOAM= github.com/celestiaorg/go-libp2p-messenger v0.1.0 h1:rFldTa3ZWcRRn8E2bRWS94Qp1GFYXO2a0uvqpIey1B8= github.com/celestiaorg/go-libp2p-messenger v0.1.0/go.mod h1:XzNksXrH0VxuNRGOnjPL9Ck4UyQlbmMpCYg9YwSBerI= +github.com/celestiaorg/go-libp2p-pubsub v0.5.7-0.20220202152246-c33ecdf03b34 h1:NB+H2aczLNQgPUu59MHrk9ZAzCsGOkxszpHaClUGRUo= +github.com/celestiaorg/go-libp2p-pubsub v0.5.7-0.20220202152246-c33ecdf03b34/go.mod h1:gVOzwebXVdSMDQBTfH8ACO5EJ4SQrvsHqCmYsCZpD0E= github.com/celestiaorg/go-verifcid v0.0.1-lazypatch h1:9TSe3w1cmJmbWlweCwCTIZkan7jV8M+KwglXpdD+UG8= github.com/celestiaorg/go-verifcid v0.0.1-lazypatch/go.mod h1:kXPYu0XqTNUKWA1h3M95UHjUqBzDwXVVt/RXZDjKJmQ= github.com/celestiaorg/merkletree v0.0.0-20210714075610-a84dc3ddbbe4 h1:CJdIpo8n5MFP2MwK0gSRcOVlDlFdQJO1p+FqdxYzmvc= @@ -804,8 +806,6 @@ github.com/libp2p/go-libp2p-peerstore v0.3.0 h1:wp/G0+37+GLr7tu+wE+4GWNrA3uxKg6I github.com/libp2p/go-libp2p-peerstore v0.3.0/go.mod h1:fNX9WlOENMvdx/YD7YO/5Hkrn8+lQIk5A39BHa1HIrM= github.com/libp2p/go-libp2p-pnet v0.2.0 h1:J6htxttBipJujEjz1y0a5+eYoiPcFHhSYHH6na5f0/k= github.com/libp2p/go-libp2p-pnet v0.2.0/go.mod h1:Qqvq6JH/oMZGwqs3N1Fqhv8NVhrdYcO0BW4wssv21LA= -github.com/libp2p/go-libp2p-pubsub v0.5.7-0.20211029175501-5c90105738cf h1:uW9m2wQ64gkTugA9nffzoiZR1ZQ0D2/xzBQfW6sgHPk= -github.com/libp2p/go-libp2p-pubsub v0.5.7-0.20211029175501-5c90105738cf/go.mod h1:gVOzwebXVdSMDQBTfH8ACO5EJ4SQrvsHqCmYsCZpD0E= github.com/libp2p/go-libp2p-quic-transport v0.10.0/go.mod h1:RfJbZ8IqXIhxBRm5hqUEJqjiiY8xmEuq3HUDS993MkA= github.com/libp2p/go-libp2p-quic-transport v0.11.2 h1:p1YQDZRHH4Cv2LPtHubqlQ9ggz4CKng/REZuXZbZMhM= github.com/libp2p/go-libp2p-quic-transport v0.11.2/go.mod h1:wlanzKtIh6pHrq+0U3p3DY9PJfGqxMgPaGKaK5LifwQ= diff --git a/service/header/p2p_subscriber.go b/service/header/p2p_subscriber.go index 602978341d..8ff1185193 100644 --- a/service/header/p2p_subscriber.go +++ b/service/header/p2p_subscriber.go @@ -6,6 +6,8 @@ import ( "github.com/libp2p/go-libp2p-core/peer" pubsub "github.com/libp2p/go-libp2p-pubsub" + pb "github.com/libp2p/go-libp2p-pubsub/pb" + "github.com/minio/blake2b-simd" ) // PubSubTopic hardcodes the name of the ExtendedHeader @@ -30,7 +32,7 @@ func NewP2PSubscriber(ps *pubsub.PubSub) *P2PSubscriber { // Start starts the P2PSubscriber, registering a topic validator for the "header-sub" // topic and joining it. func (p *P2PSubscriber) Start(context.Context) (err error) { - p.topic, err = p.pubsub.Join(PubSubTopic) + p.topic, err = p.pubsub.Join(PubSubTopic, pubsub.WithTopicMessageIdFn(msgID)) return err } @@ -78,3 +80,38 @@ func (p *P2PSubscriber) Broadcast(ctx context.Context, header *ExtendedHeader) e } return p.topic.Publish(ctx, bin) } + +// msgID computes an id for a pubsub message +// TODO(@Wondertan): This cause additional allocations per each recvd message in the topic. Find a way to avoid those. +func msgID(pmsg *pb.Message) string { + mID := func(data []byte) string { + hash := blake2b.Sum256(data) + return string(hash[:]) + } + + h, err := UnmarshalExtendedHeader(pmsg.Data) + if err != nil { + // There is nothing we can do about the error, and it will be anyway caught during validation. + // We also *have* to return some ID for the msg, so give the hash of even faulty msg + return mID(pmsg.Data) + } + + // IMPORTANT NOTE: + // Due to the nature of the Tendermint consensus, validators don't necessarily collect commit signatures from the + // entire validator set, but only the minimum required amount of them (>2/3 of voting power). In addition, + // signatures are collected asynchronously. Therefore, each validator may have a different set of signatures that + // pass the minimum required voting power threshold, causing nondeterminism in the header message gossiped over the + // network. Subsequently, this causes message duplicates as each Bridge Node, connected to a personal validator, + // sends the validator's own view of commits of effectively the same header. + // + // To solve the problem above, we exclude nondeterministic value from message id calculation + h.Commit.Signatures = nil + + data, err := MarshalExtendedHeader(h) + if err != nil { + // See the note under unmarshalling step + return mID(pmsg.Data) + } + + return mID(data) +} diff --git a/service/header/sync.go b/service/header/sync.go index 4c2718bbf5..ade15c422e 100644 --- a/service/header/sync.go +++ b/service/header/sync.go @@ -221,9 +221,6 @@ func (s *Syncer) processIncoming(ctx context.Context, maybeHead *ExtendedHeader) log.Warnw("received known header", "height", maybeHead.Height, "hash", maybeHead.Hash()) - - // TODO(@Wondertan): Remove once duplicates are fully fixed - log.Warnf("Ignore the warn above - there is a known issue with duplicate headers on the network.") return pubsub.ValidationIgnore // we don't know if header is invalid so ignore }