diff --git a/network/p2p/libp2pConnector.go b/network/p2p/libp2pConnector.go index 985bd117f20..71c5b550635 100644 --- a/network/p2p/libp2pConnector.go +++ b/network/p2p/libp2pConnector.go @@ -42,7 +42,7 @@ func (e UnconvertibleIdentitiesError) Error() string { for id, err := range e.errs { multierr = multierror.Append(multierr, fmt.Errorf("failed to connect to %s: %w", id.String(), err)) } - return multierr.GoString() + return multierr.Error() } // IsUnconvertibleIdentitiesError returns whether the given error is an UnconvertibleIdentitiesError error diff --git a/network/p2p/middleware.go b/network/p2p/middleware.go index 9a29d9c803a..d4d85919f2e 100644 --- a/network/p2p/middleware.go +++ b/network/p2p/middleware.go @@ -13,6 +13,7 @@ import ( libp2pnetwork "github.com/libp2p/go-libp2p-core/network" "github.com/rs/zerolog" + "github.com/onflow/flow-go/crypto" "github.com/onflow/flow-go/engine" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module" @@ -334,7 +335,7 @@ func (m *Middleware) Subscribe(channel network.Channel) error { } // create a new readSubscription with the context of the middleware - rs := newReadSubscription(m.ctx, s, m.processMessage, m.log, m.metrics) + rs := newReadSubscription(m.ctx, s, m.processAuthenticatedMessage, m.log, m.metrics) m.wg.Add(1) // kick off the receive loop to continuously receive messages @@ -360,6 +361,32 @@ func (m *Middleware) Unsubscribe(channel network.Channel) error { return nil } +// processAuthenticatedMessage processes a message and a source (indicated by its PublicKey) and eventually passes it to the overlay +func (m *Middleware) processAuthenticatedMessage(msg *message.Message, originKey crypto.PublicKey) { + identities, err := m.ov.Identity() + if err != nil { + m.log.Error().Err(err).Msg("failed to retrieve identities list while delivering a message") + return + } + + // check the origin of the message corresponds to the one claimed in the OriginID + originID := flow.HashToID(msg.OriginID) + + originIdentity, found := identities[originID] + if !found { + m.log.Warn().Msgf("received message with claimed originID %x, which is not known by this node, and was dropped", originID) + return + } else if originIdentity.NetworkPubKey == nil { + m.log.Warn().Msgf("received message with claimed originID %x, wich has no network identifiers at this node, and was dropped", originID) + return + } else if !originIdentity.NetworkPubKey.Equals(originKey) { + m.log.Warn().Msgf("received message claiming to be from nodeID %v with key %x was actually signed by %x and dropped", originID, originIdentity.NetworkPubKey, originKey) + return + } + + m.processMessage(msg) +} + // processMessage processes a message and eventually passes it to the overlay func (m *Middleware) processMessage(msg *message.Message) { diff --git a/network/p2p/peerManager.go b/network/p2p/peerManager.go index 031732caadb..71049031237 100644 --- a/network/p2p/peerManager.go +++ b/network/p2p/peerManager.go @@ -118,15 +118,8 @@ func (pm *PeerManager) updatePeers() { // ask the connector to connect to all peers in the list err = pm.connector.UpdatePeers(pm.unit.Ctx(), ids) - if err == nil { - return - } - - if IsUnconvertibleIdentitiesError(err) { - // log conversion error as fatal since it indicates a bad identity table - pm.logger.Fatal().Err(err).Msg("failed to connect to peers") - return + if err != nil { + // one of more identities in the identity table could not be connected to + pm.logger.Error().Err(err).Msg("failed to connect to one or more peers") } - - pm.logger.Error().Err(err).Msg("failed to connect to peers") } diff --git a/network/p2p/readSubscription.go b/network/p2p/readSubscription.go index f81756e3def..3b1b66f8e69 100644 --- a/network/p2p/readSubscription.go +++ b/network/p2p/readSubscription.go @@ -2,12 +2,17 @@ package p2p import ( "context" + "fmt" "strings" "sync" + lcrypto "github.com/libp2p/go-libp2p-core/crypto" + "github.com/libp2p/go-libp2p-core/peer" pubsub "github.com/libp2p/go-libp2p-pubsub" + "github.com/rs/zerolog" + "github.com/onflow/flow-go/crypto" "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/network/message" _ "github.com/onflow/flow-go/utils/binstat" @@ -20,13 +25,13 @@ type readSubscription struct { log zerolog.Logger sub *pubsub.Subscription metrics module.NetworkMetrics - callback func(msg *message.Message) + callback func(msg *message.Message, pubKey crypto.PublicKey) } // newReadSubscription reads the messages coming in on the subscription func newReadSubscription(ctx context.Context, sub *pubsub.Subscription, - callback func(msg *message.Message), + callback func(msg *message.Message, pubKey crypto.PublicKey), log zerolog.Logger, metrics module.NetworkMetrics) *readSubscription { @@ -77,6 +82,19 @@ func (r *readSubscription) receiveLoop(wg *sync.WaitGroup) { return } + // if pubsub.WithMessageSigning(true) and pubsub.WithStrictSignatureVerification(true), + // the emitter is authenticated + emitterKey, err := messagePubKey(rawMsg) + if err != nil { + r.log.Err(err).Msg("failed to extract libp2p public key of message") + return + } + flowKey, err := FlowPublicKeyFromLibP2P(emitterKey) + if err != nil { + r.log.Err(err).Msg("failed to extract flow public key of libp2p key") + return + } + var msg message.Message // convert the incoming raw message payload to Message type //bs := binstat.EnterTimeVal(binstat.BinNet+":wire>1protobuf2message", int64(len(rawMsg.Data))) @@ -91,6 +109,44 @@ func (r *readSubscription) receiveLoop(wg *sync.WaitGroup) { r.metrics.NetworkMessageReceived(msg.Size(), msg.ChannelID, msg.Type) // call the callback - r.callback(&msg) + r.callback(&msg, flowKey) + } +} + +// messagePubKey extracts the public key of the envelope signer from a libp2p message. +// The location of that key depends on the type of the key, see: +// https://github.com/libp2p/specs/blob/master/peer-ids/peer-ids.md +// This reproduces the exact logic of the private function doing the same decoding in libp2p: +// https://github.com/libp2p/go-libp2p-pubsub/blob/ba28f8ecfc551d4d916beb748d3384951bce3ed0/sign.go#L77 +func messagePubKey(m *pubsub.Message) (lcrypto.PubKey, error) { + var pubk lcrypto.PubKey + + // m.From is the original sender of the message (versus `m.ReceivedFrom` which is the last hop which sent us this message) + pid, err := peer.IDFromBytes(m.From) + if err != nil { + return nil, err } + + if m.Key == nil { + // no attached key, it must be extractable from the source ID + pubk, err = pid.ExtractPublicKey() + if err != nil { + return nil, fmt.Errorf("cannot extract signing key: %s", err.Error()) + } + if pubk == nil { + return nil, fmt.Errorf("cannot extract signing key") + } + } else { + pubk, err = lcrypto.UnmarshalPublicKey(m.Key) + if err != nil { + return nil, fmt.Errorf("cannot unmarshal signing key: %s", err.Error()) + } + + // verify that the source ID matches the attached key + if !pid.MatchesPublicKey(pubk) { + return nil, fmt.Errorf("bad signing key; source ID %s doesn't match key", pid) + } + } + + return pubk, nil } diff --git a/network/test/middleware_test.go b/network/test/middleware_test.go index 62715056af9..6a5ebebc75c 100644 --- a/network/test/middleware_test.go +++ b/network/test/middleware_test.go @@ -28,6 +28,11 @@ import ( const testChannel = "test-channel" +// libp2p emits a call to `Protect` with a topic-specific tag upon establishing each peering connection in a GossipSUb mesh, see: +// https://github.com/libp2p/go-libp2p-pubsub/blob/master/tag_tracer.go +// One way to make sure such a mesh has formed, asynchronously, in unit tests, is to wait for libp2p.GossipSubD such calls, +// and that's what we do with tagsObserver. +// type tagsObserver struct { tags chan string log zerolog.Logger @@ -268,6 +273,54 @@ func (m *MiddlewareTestSuite) TestEcho() { } } +// TestSpoofedPubSubHello evaluates checking the originID of the message w.r.t. its libp2p network ID on PubSub +// we check a pubsub message with a spoofed OriginID does not get delivered +// This would be doubled with cryptographic verification of the libp2p network ID in production (see message signing options in pubSub initialization) +func (m *MiddlewareTestSuite) TestSpoofedPubSubHello() { + first := 0 + last := m.size - 1 + lastNode := m.ids[last].NodeID + + // initially subscribe the nodes to the channel + for _, mw := range m.mws { + err := mw.Subscribe(testChannel) + require.NoError(m.Suite.T(), err) + } + + // set up waiting for m.size pubsub tags indicating a mesh has formed + for i := 0; i < m.size; i++ { + select { + case <-m.obs: + case <-time.After(2 * time.Second): + assert.FailNow(m.T(), "could not receive pubsub tag indicating mesh formed") + } + } + + spoofedID := unittest.IdentifierFixture() + + message1 := createMessage(spoofedID, lastNode, "hello1") + + err := m.mws[first].Publish(message1, testChannel) + assert.NoError(m.T(), err) + + // assert that the spoofed message is not received by the target node + assert.Never(m.T(), func() bool { + return !m.ov[last].AssertNumberOfCalls(m.T(), "Receive", 0) + }, 2*time.Second, 100*time.Millisecond) + + // invalid message sent by firstNode claims to be from lastNode + message2 := createMessage(lastNode, lastNode, "hello1") + + err = m.mws[first].Publish(message2, testChannel) + assert.NoError(m.T(), err) + + // assert that the invalid message is not received by the target node + assert.Never(m.T(), func() bool { + return !m.ov[last].AssertNumberOfCalls(m.T(), "Receive", 0) + }, 2*time.Second, 100*time.Millisecond) + +} + // TestMaxMessageSize_SendDirect evaluates that invoking SendDirect method of the middleware on a message // size beyond the permissible unicast message size returns an error. func (m *MiddlewareTestSuite) TestMaxMessageSize_SendDirect() { @@ -424,7 +477,7 @@ func (m *MiddlewareTestSuite) TestUnsubscribe() { // assert that the new message is not received by the target node assert.Never(m.T(), func() bool { return !m.ov[last].AssertNumberOfCalls(m.T(), "Receive", 1) - }, 2*time.Second, time.Millisecond) + }, 2*time.Second, 100*time.Millisecond) } func createMessage(originID flow.Identifier, targetID flow.Identifier, msg ...string) *message.Message {