Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

service/header: fix ExtendedHeader message duplicates on the network #409

Merged
merged 8 commits into from Feb 14, 2022
1 change: 1 addition & 0 deletions CHANGELOG-PENDING.md
Expand Up @@ -36,6 +36,7 @@ Month, DD, YYYY

### BUG FIXES

- [service/header: fix ExtendedHeader message duplicates on the network #409](https://github.com/celestiaorg/celestia-node/pull/409) [@Wondertan](https://github.com/Wondertan)
- [fix(header/service): #339 race](https://github.com/celestiaorg/celestia-node/pull/343) [@Wondertan](https://github.com/Wondertan)
- [core: Properly fetch Validators from Core and two more fixes #328](https://github.com/celestiaorg/celestia-node/pull/328) [@Wondertan](https://github.com/Wondertan)
- [header: Added missing `err` value in ErrorW logging calls](https://github.com/celestiaorg/celestia-node/pull/282) [@jbowen93](https://github.com/jbowen93)
Expand Down
5 changes: 4 additions & 1 deletion go.mod
Expand Up @@ -45,4 +45,7 @@ require (
go.uber.org/zap v1.20.0
)

replace github.com/tendermint/tendermint v0.34.14 => github.com/celestiaorg/celestia-core v0.34.14-celestia
replace (
github.com/libp2p/go-libp2p-pubsub v0.5.7-0.20211029175501-5c90105738cf => github.com/celestiaorg/go-libp2p-pubsub v0.5.7-0.20220202152246-c33ecdf03b34
Wondertan marked this conversation as resolved.
Show resolved Hide resolved
github.com/tendermint/tendermint v0.34.14 => github.com/celestiaorg/celestia-core v0.34.14-celestia
)
4 changes: 2 additions & 2 deletions go.sum
Expand Up @@ -148,6 +148,8 @@ github.com/celestiaorg/go-leopard v0.1.0 h1:28z2EkvKJIez5J9CEaiiUEC+OxalRLtTGJJ1
github.com/celestiaorg/go-leopard v0.1.0/go.mod h1:NtO/rjlB8dw2aq7jr06vZFKGvryQcTDXaNHelmPNOAM=
github.com/celestiaorg/go-libp2p-messenger v0.1.0 h1:rFldTa3ZWcRRn8E2bRWS94Qp1GFYXO2a0uvqpIey1B8=
github.com/celestiaorg/go-libp2p-messenger v0.1.0/go.mod h1:XzNksXrH0VxuNRGOnjPL9Ck4UyQlbmMpCYg9YwSBerI=
github.com/celestiaorg/go-libp2p-pubsub v0.5.7-0.20220202152246-c33ecdf03b34 h1:NB+H2aczLNQgPUu59MHrk9ZAzCsGOkxszpHaClUGRUo=
github.com/celestiaorg/go-libp2p-pubsub v0.5.7-0.20220202152246-c33ecdf03b34/go.mod h1:gVOzwebXVdSMDQBTfH8ACO5EJ4SQrvsHqCmYsCZpD0E=
github.com/celestiaorg/go-verifcid v0.0.1-lazypatch h1:9TSe3w1cmJmbWlweCwCTIZkan7jV8M+KwglXpdD+UG8=
github.com/celestiaorg/go-verifcid v0.0.1-lazypatch/go.mod h1:kXPYu0XqTNUKWA1h3M95UHjUqBzDwXVVt/RXZDjKJmQ=
github.com/celestiaorg/merkletree v0.0.0-20210714075610-a84dc3ddbbe4 h1:CJdIpo8n5MFP2MwK0gSRcOVlDlFdQJO1p+FqdxYzmvc=
Expand Down Expand Up @@ -804,8 +806,6 @@ github.com/libp2p/go-libp2p-peerstore v0.3.0 h1:wp/G0+37+GLr7tu+wE+4GWNrA3uxKg6I
github.com/libp2p/go-libp2p-peerstore v0.3.0/go.mod h1:fNX9WlOENMvdx/YD7YO/5Hkrn8+lQIk5A39BHa1HIrM=
github.com/libp2p/go-libp2p-pnet v0.2.0 h1:J6htxttBipJujEjz1y0a5+eYoiPcFHhSYHH6na5f0/k=
github.com/libp2p/go-libp2p-pnet v0.2.0/go.mod h1:Qqvq6JH/oMZGwqs3N1Fqhv8NVhrdYcO0BW4wssv21LA=
github.com/libp2p/go-libp2p-pubsub v0.5.7-0.20211029175501-5c90105738cf h1:uW9m2wQ64gkTugA9nffzoiZR1ZQ0D2/xzBQfW6sgHPk=
github.com/libp2p/go-libp2p-pubsub v0.5.7-0.20211029175501-5c90105738cf/go.mod h1:gVOzwebXVdSMDQBTfH8ACO5EJ4SQrvsHqCmYsCZpD0E=
github.com/libp2p/go-libp2p-quic-transport v0.10.0/go.mod h1:RfJbZ8IqXIhxBRm5hqUEJqjiiY8xmEuq3HUDS993MkA=
github.com/libp2p/go-libp2p-quic-transport v0.11.2 h1:p1YQDZRHH4Cv2LPtHubqlQ9ggz4CKng/REZuXZbZMhM=
github.com/libp2p/go-libp2p-quic-transport v0.11.2/go.mod h1:wlanzKtIh6pHrq+0U3p3DY9PJfGqxMgPaGKaK5LifwQ=
Expand Down
39 changes: 38 additions & 1 deletion service/header/p2p_subscriber.go
Expand Up @@ -6,6 +6,8 @@ import (

"github.com/libp2p/go-libp2p-core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub"
pb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/minio/blake2b-simd"
)

// PubSubTopic hardcodes the name of the ExtendedHeader
Expand All @@ -30,7 +32,7 @@ func NewP2PSubscriber(ps *pubsub.PubSub) *P2PSubscriber {
// Start starts the P2PSubscriber, registering a topic validator for the "header-sub"
// topic and joining it.
func (p *P2PSubscriber) Start(context.Context) (err error) {
p.topic, err = p.pubsub.Join(PubSubTopic)
p.topic, err = p.pubsub.Join(PubSubTopic, pubsub.WithTopicMessageIdFn(msgID))
return err
}

Expand Down Expand Up @@ -78,3 +80,38 @@ func (p *P2PSubscriber) Broadcast(ctx context.Context, header *ExtendedHeader) e
}
return p.topic.Publish(ctx, bin)
}

// msgID computes an id for a pubsub message
// TODO(@Wondertan): This cause additional allocations per each recvd message in the topic. Find a way to avoid those.
func msgID(pmsg *pb.Message) string {
msgID := func(data []byte) string {
Wondertan marked this conversation as resolved.
Show resolved Hide resolved
hash := blake2b.Sum256(data)
Wondertan marked this conversation as resolved.
Show resolved Hide resolved
return string(hash[:])
}

h, err := UnmarshalExtendedHeader(pmsg.Data)
if err != nil {
// There is nothing we can do about the error, and it will be anyway caught during validation.
// We also *have* to return some ID for the msg, so give the hash of even faulty msg
return msgID(pmsg.Data)
Wondertan marked this conversation as resolved.
Show resolved Hide resolved
}

// IMPORTANT NOTE:
// Due to the nature of the Tendermint consensus, validators don't necessarily collect commit signatures from the
// entire validator set, but only the minimum required amount of them (>2/3 of voting power). In addition,
// signatures are collected asynchronously. Therefore, each validator may have a different set of signatures that
// pass the minimum required voting power threshold, causing nondeterminism in the header message gossiped over the
// network. Subsequently, this causes message duplicates as each Bridge Node, connected to a personal validator,
// sends the validator's own view of commits of effectively the same header.
//
// To solve the problem above, we exclude nondeterministic value from message id calculation
h.Commit.Signatures = nil

data, err := MarshalExtendedHeader(h)
if err != nil {
// See the note under unmarshalling step
return msgID(pmsg.Data)
}

return msgID(data)
}
3 changes: 0 additions & 3 deletions service/header/sync.go
Expand Up @@ -221,9 +221,6 @@ func (s *Syncer) processIncoming(ctx context.Context, maybeHead *ExtendedHeader)
log.Warnw("received known header",
"height", maybeHead.Height,
"hash", maybeHead.Hash())

// TODO(@Wondertan): Remove once duplicates are fully fixed
log.Warnf("Ignore the warn above - there is a known issue with duplicate headers on the network.")
return pubsub.ValidationIgnore // we don't know if header is invalid so ignore
}

Expand Down