Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[network] Check the OriginID of a libp2p message corresponds to its authenticated source #1163

Merged
merged 5 commits into from
Aug 20, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
29 changes: 28 additions & 1 deletion network/p2p/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
libp2pnetwork "github.com/libp2p/go-libp2p-core/network"
"github.com/rs/zerolog"

"github.com/onflow/flow-go/crypto"
"github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module"
Expand Down Expand Up @@ -334,7 +335,7 @@ func (m *Middleware) Subscribe(channel network.Channel) error {
}

// create a new readSubscription with the context of the middleware
rs := newReadSubscription(m.ctx, s, m.processMessage, m.log, m.metrics)
rs := newReadSubscription(m.ctx, s, m.processAuthenticatedMessage, m.log, m.metrics)
m.wg.Add(1)

// kick off the receive loop to continuously receive messages
Expand All @@ -360,6 +361,32 @@ func (m *Middleware) Unsubscribe(channel network.Channel) error {
return nil
}

// processAuthenticatedMessage processes a message and a source (indicated by its PublicKey) and eventually passes it to the overlay
func (m *Middleware) processAuthenticatedMessage(msg *message.Message, originKey crypto.PublicKey) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found still a bit confusing.

It sounds like the message has been authenticated, but actually we are going to check the authentication, right?

If so, would checkOriginAndProcess be better?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It sounds like the message has been authenticated,

It has, the message has a clear network origin, which is the signer of its envelope, which signature has been checked.

we are going to check the authentication

No, we are going to check the protocol-level authorship claim against the prior network-level authentication.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I see. would be great to add these knowledges into comments

identities, err := m.ov.Identity()
if err != nil {
m.log.Error().Err(err).Msg("failed to retrieve identities list while delivering a message")
return
}

// check the origin of the message corresponds to the one claimed in the OriginID
originID := flow.HashToID(msg.OriginID)

originIdentity, found := identities[originID]
if !found {
m.log.Warn().Msgf("received message with claimed originID %x, which is not known by this node, and was dropped", originID)
return
} else if originIdentity.NetworkPubKey == nil {
m.log.Warn().Msgf("received message with claimed originID %x, wich has no network identifiers at this node, and was dropped", originID)
return
} else if !originIdentity.NetworkPubKey.Equals(originKey) {
m.log.Warn().Msgf("received message claiming to be from nodeID %v with key %x was actually signed by %x and dropped", originID, originIdentity.NetworkPubKey, originKey)
return
}

m.processMessage(msg)
}

// processMessage processes a message and eventually passes it to the overlay
func (m *Middleware) processMessage(msg *message.Message) {

Expand Down
62 changes: 59 additions & 3 deletions network/p2p/readSubscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,17 @@ package p2p

import (
"context"
"fmt"
"strings"
"sync"

lcrypto "github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub"

"github.com/rs/zerolog"

"github.com/onflow/flow-go/crypto"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/network/message"
_ "github.com/onflow/flow-go/utils/binstat"
Expand All @@ -20,13 +25,13 @@ type readSubscription struct {
log zerolog.Logger
sub *pubsub.Subscription
metrics module.NetworkMetrics
callback func(msg *message.Message)
callback func(msg *message.Message, pubKey crypto.PublicKey)
}

// newReadSubscription reads the messages coming in on the subscription
func newReadSubscription(ctx context.Context,
sub *pubsub.Subscription,
callback func(msg *message.Message),
callback func(msg *message.Message, pubKey crypto.PublicKey),
log zerolog.Logger,
metrics module.NetworkMetrics) *readSubscription {

Expand Down Expand Up @@ -77,6 +82,19 @@ func (r *readSubscription) receiveLoop(wg *sync.WaitGroup) {
return
}

// if pubsub.WithMessageSigning(true) and pubsub.WithStrictSignatureVerification(true),
// the emitter is authenticated
emitterKey, err := messagePubKey(rawMsg)
if err != nil {
r.log.Err(err).Msg("failed to extract libp2p public key of message")
huitseeker marked this conversation as resolved.
Show resolved Hide resolved
return
}
flowKey, err := FlowPublicKeyFromLibP2P(emitterKey)
if err != nil {
r.log.Err(err).Msg("failed to extract flow public key of libp2p key")
return
}

var msg message.Message
// convert the incoming raw message payload to Message type
//bs := binstat.EnterTimeVal(binstat.BinNet+":wire>1protobuf2message", int64(len(rawMsg.Data)))
Expand All @@ -91,6 +109,44 @@ func (r *readSubscription) receiveLoop(wg *sync.WaitGroup) {
r.metrics.NetworkMessageReceived(msg.Size(), msg.ChannelID, msg.Type)

// call the callback
r.callback(&msg)
r.callback(&msg, flowKey)
}
}

// messagePubKey extracts the public key of the envelope signer from a libp2p message.
// The location of that key depends on the type of the key, see:
// https://github.com/libp2p/specs/blob/master/peer-ids/peer-ids.md
// This reproduces the exact logic of the private function doing the same decoding in libp2p:
// https://github.com/libp2p/go-libp2p-pubsub/blob/ba28f8ecfc551d4d916beb748d3384951bce3ed0/sign.go#L77
func messagePubKey(m *pubsub.Message) (lcrypto.PubKey, error) {
var pubk lcrypto.PubKey

// m.From is the original sender of the message (versus `m.ReceivedFrom` which is the last hop which sent us this message)
pid, err := peer.IDFromBytes(m.From)
huitseeker marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}

if m.Key == nil {
// no attached key, it must be extractable from the source ID
pubk, err = pid.ExtractPublicKey()
if err != nil {
return nil, fmt.Errorf("cannot extract signing key: %s", err.Error())
}
if pubk == nil {
return nil, fmt.Errorf("cannot extract signing key")
}
} else {
pubk, err = lcrypto.UnmarshalPublicKey(m.Key)
if err != nil {
return nil, fmt.Errorf("cannot unmarshal signing key: %s", err.Error())
}

// verify that the source ID matches the attached key
if !pid.MatchesPublicKey(pubk) {
return nil, fmt.Errorf("bad signing key; source ID %s doesn't match key", pid)
}
}

return pubk, nil
}
55 changes: 54 additions & 1 deletion network/test/middleware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ import (

const testChannel = "test-channel"

// libp2p emits a call to `Protect` with a topic-specific tag upon establishing each peering connection in a GossipSUb mesh, see:
// https://github.com/libp2p/go-libp2p-pubsub/blob/master/tag_tracer.go
// One way to make sure such a mesh has formed, asynchronously, in unit tests, is to wait for libp2p.GossipSubD such calls,
// and that's what we do with tagsObserver.
//
type tagsObserver struct {
tags chan string
log zerolog.Logger
Expand Down Expand Up @@ -268,6 +273,54 @@ func (m *MiddlewareTestSuite) TestEcho() {
}
}

// TestSpoofedPubSubHello evaluates checking the originID of the message w.r.t. its libp2p network ID on PubSub
// we check a pubsub message with a spoofed OriginID does not get delivered
// This would be doubled with cryptographic verification of the libp2p network ID in production (see message signing options in pubSub initialization)
func (m *MiddlewareTestSuite) TestSpoofedPubSubHello() {
first := 0
last := m.size - 1
lastNode := m.ids[last].NodeID

// initially subscribe the nodes to the channel
for _, mw := range m.mws {
err := mw.Subscribe(testChannel)
require.NoError(m.Suite.T(), err)
}

// set up waiting for m.size pubsub tags indicating a mesh has formed
for i := 0; i < m.size; i++ {
select {
case <-m.obs:
case <-time.After(2 * time.Second):
assert.FailNow(m.T(), "could not receive pubsub tag indicating mesh formed")
}
}
Comment on lines +291 to +297
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain when a pubsub tag is emmitted? Thanks

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Andy why getting m.size of them indicates that a mesh has formed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Libp2p emits Protect calls with tag arguments marked by their pubsub topic when establishing each peering in the mesh:
https://github.com/libp2p/go-libp2p-pubsub/blob/master/tag_tracer.go

You would usually wait for libp2p.GossipSubD such links, but here there's only two nodes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to understand this a bit more.

From what I understand, for each connection both peers would get a Protect call for each peering, right?

So if there are 5 peers in total, then we should expect to see 8 tags before we can be confident that a mesh has formed, correct? (5 peers, 5 - 1 = 4 edges required for a connected graph, 4 * 2 = 8 tags).

Here, you are only checking for 5 tags.

Maybe I'm not understanding how this works?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, are we sure that Protect is only called once for each peer when establishing a peering? In otherwise, is counting the number of tags enough, or would it make sense to actually keep track of the peer IDs in case we get multiple tags for the same peer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So here's where the calls to Protect are intercepted.

This is where they originate at the libp2p level, in reaction to a join.

I don't think you're incorrect in your analysis. However:

  • when I said "You would usually wait for libp2p.GossipSubD such links" I misremembered, we actually wait for pubsub.GossipSubD*count in tests with count = 10
  • the way I got to that number is by running the test many times at a log level that allowed me to witness the number of tags emitted,
  • indeed, the value there is meant as a fast & loose heuristic to make sure we wait until after the flow/networking level that triggers the UpdatePeers and then the libp2p mesh update downstream of that, NOT as a countdown to full mesh,
  • it was nowhere near the theoretical number we would obtain: if, when a node joins a topic, we get 2m graft messages when there are k nodes already on it (m = k if k < D, m = D otherwise, since we don't count pruning), then for 10 nodes we'd expect 90 messages, and we only witness around 65-ish,
  • we don't need to finish mesh formation to ensure message delivery, as you noticed we only need a connected graph.

I'm happy to update to a value you'd find more suitable, but for count = 2 (in the test we're commenting on), I note we have, practically and theoretically, 2 graft messages, every time.

are we sure that Protect is only called once for each peer when establishing a peering?

Yep, the Protect calls otherwise made by our engines are all tagless.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation :)


spoofedID := unittest.IdentifierFixture()

message1 := createMessage(spoofedID, lastNode, "hello1")

err := m.mws[first].Publish(message1, testChannel)
assert.NoError(m.T(), err)

// assert that the spoofed message is not received by the target node
assert.Never(m.T(), func() bool {
return !m.ov[last].AssertNumberOfCalls(m.T(), "Receive", 0)
}, 2*time.Second, 100*time.Millisecond)

// invalid message sent by firstNode claims to be from lastNode
message2 := createMessage(lastNode, lastNode, "hello1")

err = m.mws[first].Publish(message2, testChannel)
assert.NoError(m.T(), err)

// assert that the invalid message is not received by the target node
assert.Never(m.T(), func() bool {
return !m.ov[last].AssertNumberOfCalls(m.T(), "Receive", 0)
}, 2*time.Second, 100*time.Millisecond)

}

// TestMaxMessageSize_SendDirect evaluates that invoking SendDirect method of the middleware on a message
// size beyond the permissible unicast message size returns an error.
func (m *MiddlewareTestSuite) TestMaxMessageSize_SendDirect() {
Expand Down Expand Up @@ -424,7 +477,7 @@ func (m *MiddlewareTestSuite) TestUnsubscribe() {
// assert that the new message is not received by the target node
assert.Never(m.T(), func() bool {
return !m.ov[last].AssertNumberOfCalls(m.T(), "Receive", 1)
}, 2*time.Second, time.Millisecond)
}, 2*time.Second, 100*time.Millisecond)
}

func createMessage(originID flow.Identifier, targetID flow.Identifier, msg ...string) *message.Message {
Expand Down