Skip to content

Commit

Permalink
[network] Check the originator for a libp2p message corresponds to it…
Browse files Browse the repository at this point in the history
…s authenticated source
  • Loading branch information
huitseeker committed Aug 18, 2021
1 parent 82c2c48 commit 4cd3b85
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 6 deletions.
27 changes: 24 additions & 3 deletions network/p2p/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
libp2pnetwork "github.com/libp2p/go-libp2p-core/network"
"github.com/rs/zerolog"

"github.com/onflow/flow-go/crypto"
"github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module"
Expand Down Expand Up @@ -316,7 +317,7 @@ func (m *Middleware) handleIncomingStream(s libp2pnetwork.Stream) {
log.Info().Msg("incoming stream received")

//create a new readConnection with the context of the middleware
conn := newReadConnection(m.ctx, s, m.processMessage, log, m.metrics, LargeMsgMaxUnicastMsgSize)
conn := newReadConnection(m.ctx, s, m.processUnauthenticatedMessage, log, m.metrics, LargeMsgMaxUnicastMsgSize)

// kick off the receive loop to continuously receive messages
m.wg.Add(1)
Expand Down Expand Up @@ -360,8 +361,28 @@ func (m *Middleware) Unsubscribe(channel network.Channel) error {
return nil
}

// processMessage processes a message and eventually passes it to the overlay
func (m *Middleware) processMessage(msg *message.Message) {
// processMessage processes a message and a source (indicated by its PublicKey) and eventually passes it to the overlay
func (m *Middleware) processMessage(msg *message.Message, originKey crypto.PublicKey) {
identities, err := m.ov.Identity()
if err != nil {
m.log.Error().Err(err).Msg("failed to retrieve identities list while delivering a message")
return
}

// check the origin of the message corresponds to the one claimed in the OriginID
originID := flow.HashToID(msg.OriginID)

originIdentity, found := identities[originID]
if !found || !originIdentity.NetworkPubKey.Equals(originKey) {
m.log.Warn().Msgf("message claiming to be from nodeID %v with key %x was actually signed by %x and dropped", originID, originIdentity.NetworkPubKey, originKey)
return
}

m.processUnauthenticatedMessage(msg)
}

// processUnAuthenticatedMessage processes a message and eventually passes it to the overlay
func (m *Middleware) processUnauthenticatedMessage(msg *message.Message) {

// run through all the message validators
for _, v := range m.validators {
Expand Down
57 changes: 54 additions & 3 deletions network/p2p/readSubscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,17 @@ package p2p

import (
"context"
"fmt"
"strings"
"sync"

lcrypto "github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub"

"github.com/rs/zerolog"

"github.com/onflow/flow-go/crypto"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/network/message"
_ "github.com/onflow/flow-go/utils/binstat"
Expand All @@ -20,13 +25,13 @@ type readSubscription struct {
log zerolog.Logger
sub *pubsub.Subscription
metrics module.NetworkMetrics
callback func(msg *message.Message)
callback func(msg *message.Message, pubKey crypto.PublicKey)
}

// newReadSubscription reads the messages coming in on the subscription
func newReadSubscription(ctx context.Context,
sub *pubsub.Subscription,
callback func(msg *message.Message),
callback func(msg *message.Message, pubKey crypto.PublicKey),
log zerolog.Logger,
metrics module.NetworkMetrics) *readSubscription {

Expand Down Expand Up @@ -87,10 +92,56 @@ func (r *readSubscription) receiveLoop(wg *sync.WaitGroup) {
return
}

// if pubsub.WithMessageSigning(true) and pubsub.WithStrictSignatureVerification(true),
// the emitter is authenticated
emitterKey, err := messagePubKey(rawMsg)
if err != nil {
r.log.Err(err).Msg("failed to extract libp2p public key of message")
}

flowKey, err := FlowPublicKeyFromLibP2P(emitterKey)
if err != nil {
r.log.Err(err).Msg("failed to extract flow public key of libp2p key")
return
}

// log metrics
r.metrics.NetworkMessageReceived(msg.Size(), msg.ChannelID, msg.Type)

// call the callback
r.callback(&msg)
r.callback(&msg, flowKey)
}
}

// This reproduces the exact logic of the private https://github.com/libp2p/go-libp2p-pubsub/blob/ba28f8ecfc551d4d916beb748d3384951bce3ed0/sign.go#L77
func messagePubKey(m *pubsub.Message) (lcrypto.PubKey, error) {
var pubk lcrypto.PubKey

pid, err := peer.IDFromBytes(m.From)
if err != nil {
return nil, err
}

if m.Key == nil {
// no attached key, it must be extractable from the source ID
pubk, err = pid.ExtractPublicKey()
if err != nil {
return nil, fmt.Errorf("cannot extract signing key: %s", err.Error())
}
if pubk == nil {
return nil, fmt.Errorf("cannot extract signing key")
}
} else {
pubk, err = lcrypto.UnmarshalPublicKey(m.Key)
if err != nil {
return nil, fmt.Errorf("cannot unmarshal signing key: %s", err.Error())
}

// verify that the source ID matches the attached key
if !pid.MatchesPublicKey(pubk) {
return nil, fmt.Errorf("bad signing key; source ID %s doesn't match key", pid)
}
}

return pubk, nil
}
39 changes: 39 additions & 0 deletions network/test/middleware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,45 @@ func (m *MiddlewareTestSuite) TestEcho() {
}
}

// TestSpoofedPubSubHello evaluates checking the originID of the message w.r.t. its libp2p network ID on PubSub
// we check a pubsub message with a spoofed OriginID does not get delivered
// This would be doubled with cryptographic verification of the libp2p network ID in production (see message signing options in pubSub initialization)
func (m *MiddlewareTestSuite) TestSpoofedPubSubHello() {
first := 0
last := m.size - 1
firstNode := m.ids[first].NodeID
lastNode := m.ids[last].NodeID

// initially subscribe the nodes to the channel
for _, mw := range m.mws {
err := mw.Subscribe(testChannel)
require.NoError(m.Suite.T(), err)
}

// set up waiting for m.size pubsub tags indicating a mesh has formed
for i := 0; i < m.size; i++ {
select {
case <-m.obs:
case <-time.After(2 * time.Second):
assert.FailNow(m.T(), "could not receive pubsub tag indicating mesh formed")
}
}

var spoofedID flow.Identifier
copy(spoofedID[:16], firstNode[:16])
copy(spoofedID[16:], lastNode[16:])

message1 := createMessage(spoofedID, lastNode, "hello1")

err := m.mws[first].Publish(message1, testChannel)
assert.NoError(m.T(), err)

// assert that the spoofed message is not received by the target node
assert.Never(m.T(), func() bool {
return !m.ov[last].AssertNumberOfCalls(m.T(), "Receive", 0)
}, 2*time.Second, time.Millisecond)
}

// TestMaxMessageSize_SendDirect evaluates that invoking SendDirect method of the middleware on a message
// size beyond the permissible unicast message size returns an error.
func (m *MiddlewareTestSuite) TestMaxMessageSize_SendDirect() {
Expand Down

0 comments on commit 4cd3b85

Please sign in to comment.