Skip to content

Commit

Permalink
Merge #1028 #1163
Browse files Browse the repository at this point in the history
1028: adding secure-rpc-addr to access node systemd file r=vishalchangrani a=vishalchangrani



1163: [network] Check the OriginID of a libp2p message corresponds to its authenticated source r=huitseeker a=huitseeker

Contributes to #1115.
Follow-up of #1129.

Edit: there was no problem with the integration tests, 🤦 

However, TestMiddlewareTestSuit/TestUnsubscribe is still flaky based on the delay in the last step (sending the message once a mesh has formed). But it is problematic on master, see [[1]](https://github.com/onflow/flow-go/runs/3357056741) [[2]](https://github.com/onflow/flow-go/runs/3356508175), so this is not relevant to this PR per se.

Co-authored-by: vishal <1117327+vishalchangrani@users.noreply.github.com>
Co-authored-by: Simon Zhu <simon.zsiyan@gmail.com>
Co-authored-by: Vishal <1117327+vishalchangrani@users.noreply.github.com>
Co-authored-by: François Garillot <francois.garillot@dapperlabs.com>
  • Loading branch information
4 people committed Aug 19, 2021
3 parents 85dc4a6 + dbd7191 + 18cf175 commit f034213
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 5 deletions.
1 change: 1 addition & 0 deletions deploy/systemd-docker/flow-access.service
Expand Up @@ -31,6 +31,7 @@ ExecStart=docker run --rm \
--bootstrapdir /bootstrap \
--datadir /data/protocol \
--rpc-addr 0.0.0.0:9000 \
--secure-rpc-addr 0.0.0.0:9001 \
--http-addr 0.0.0.0:8000 \
--collection-ingress-port 9000 \
--bind 0.0.0.0:3569 \
Expand Down
29 changes: 28 additions & 1 deletion network/p2p/middleware.go
Expand Up @@ -13,6 +13,7 @@ import (
libp2pnetwork "github.com/libp2p/go-libp2p-core/network"
"github.com/rs/zerolog"

"github.com/onflow/flow-go/crypto"
"github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module"
Expand Down Expand Up @@ -334,7 +335,7 @@ func (m *Middleware) Subscribe(channel network.Channel) error {
}

// create a new readSubscription with the context of the middleware
rs := newReadSubscription(m.ctx, s, m.processMessage, m.log, m.metrics)
rs := newReadSubscription(m.ctx, s, m.processAuthenticatedMessage, m.log, m.metrics)
m.wg.Add(1)

// kick off the receive loop to continuously receive messages
Expand All @@ -360,6 +361,32 @@ func (m *Middleware) Unsubscribe(channel network.Channel) error {
return nil
}

// processAuthenticatedMessage processes a message and a source (indicated by its PublicKey) and eventually passes it to the overlay
func (m *Middleware) processAuthenticatedMessage(msg *message.Message, originKey crypto.PublicKey) {
identities, err := m.ov.Identity()
if err != nil {
m.log.Error().Err(err).Msg("failed to retrieve identities list while delivering a message")
return
}

// check the origin of the message corresponds to the one claimed in the OriginID
originID := flow.HashToID(msg.OriginID)

originIdentity, found := identities[originID]
if !found {
m.log.Warn().Msgf("received message with claimed originID %x, which is not known by this node, and was dropped", originID)
return
} else if originIdentity.NetworkPubKey == nil {
m.log.Warn().Msgf("received message with claimed originID %x, wich has no network identifiers at this node, and was dropped", originID)
return
} else if !originIdentity.NetworkPubKey.Equals(originKey) {
m.log.Warn().Msgf("received message claiming to be from nodeID %v with key %x was actually signed by %x and dropped", originID, originIdentity.NetworkPubKey, originKey)
return
}

m.processMessage(msg)
}

// processMessage processes a message and eventually passes it to the overlay
func (m *Middleware) processMessage(msg *message.Message) {

Expand Down
62 changes: 59 additions & 3 deletions network/p2p/readSubscription.go
Expand Up @@ -2,12 +2,17 @@ package p2p

import (
"context"
"fmt"
"strings"
"sync"

lcrypto "github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub"

"github.com/rs/zerolog"

"github.com/onflow/flow-go/crypto"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/network/message"
_ "github.com/onflow/flow-go/utils/binstat"
Expand All @@ -20,13 +25,13 @@ type readSubscription struct {
log zerolog.Logger
sub *pubsub.Subscription
metrics module.NetworkMetrics
callback func(msg *message.Message)
callback func(msg *message.Message, pubKey crypto.PublicKey)
}

// newReadSubscription reads the messages coming in on the subscription
func newReadSubscription(ctx context.Context,
sub *pubsub.Subscription,
callback func(msg *message.Message),
callback func(msg *message.Message, pubKey crypto.PublicKey),
log zerolog.Logger,
metrics module.NetworkMetrics) *readSubscription {

Expand Down Expand Up @@ -77,6 +82,19 @@ func (r *readSubscription) receiveLoop(wg *sync.WaitGroup) {
return
}

// if pubsub.WithMessageSigning(true) and pubsub.WithStrictSignatureVerification(true),
// the emitter is authenticated
emitterKey, err := messagePubKey(rawMsg)
if err != nil {
r.log.Err(err).Msg("failed to extract libp2p public key of message")
return
}
flowKey, err := FlowPublicKeyFromLibP2P(emitterKey)
if err != nil {
r.log.Err(err).Msg("failed to extract flow public key of libp2p key")
return
}

var msg message.Message
// convert the incoming raw message payload to Message type
//bs := binstat.EnterTimeVal(binstat.BinNet+":wire>1protobuf2message", int64(len(rawMsg.Data)))
Expand All @@ -91,6 +109,44 @@ func (r *readSubscription) receiveLoop(wg *sync.WaitGroup) {
r.metrics.NetworkMessageReceived(msg.Size(), msg.ChannelID, msg.Type)

// call the callback
r.callback(&msg)
r.callback(&msg, flowKey)
}
}

// messagePubKey extracts the public key of the envelope signer from a libp2p message.
// The location of that key depends on the type of the key, see:
// https://github.com/libp2p/specs/blob/master/peer-ids/peer-ids.md
// This reproduces the exact logic of the private function doing the same decoding in libp2p:
// https://github.com/libp2p/go-libp2p-pubsub/blob/ba28f8ecfc551d4d916beb748d3384951bce3ed0/sign.go#L77
func messagePubKey(m *pubsub.Message) (lcrypto.PubKey, error) {
var pubk lcrypto.PubKey

// m.From is the original sender of the message (versus `m.ReceivedFrom` which is the last hop which sent us this message)
pid, err := peer.IDFromBytes(m.From)
if err != nil {
return nil, err
}

if m.Key == nil {
// no attached key, it must be extractable from the source ID
pubk, err = pid.ExtractPublicKey()
if err != nil {
return nil, fmt.Errorf("cannot extract signing key: %s", err.Error())
}
if pubk == nil {
return nil, fmt.Errorf("cannot extract signing key")
}
} else {
pubk, err = lcrypto.UnmarshalPublicKey(m.Key)
if err != nil {
return nil, fmt.Errorf("cannot unmarshal signing key: %s", err.Error())
}

// verify that the source ID matches the attached key
if !pid.MatchesPublicKey(pubk) {
return nil, fmt.Errorf("bad signing key; source ID %s doesn't match key", pid)
}
}

return pubk, nil
}
55 changes: 54 additions & 1 deletion network/test/middleware_test.go
Expand Up @@ -28,6 +28,11 @@ import (

const testChannel = "test-channel"

// libp2p emits a call to `Protect` with a topic-specific tag upon establishing each peering connection in a GossipSUb mesh, see:
// https://github.com/libp2p/go-libp2p-pubsub/blob/master/tag_tracer.go
// One way to make sure such a mesh has formed, asynchronously, in unit tests, is to wait for libp2p.GossipSubD such calls,
// and that's what we do with tagsObserver.
//
type tagsObserver struct {
tags chan string
log zerolog.Logger
Expand Down Expand Up @@ -268,6 +273,54 @@ func (m *MiddlewareTestSuite) TestEcho() {
}
}

// TestSpoofedPubSubHello evaluates checking the originID of the message w.r.t. its libp2p network ID on PubSub
// we check a pubsub message with a spoofed OriginID does not get delivered
// This would be doubled with cryptographic verification of the libp2p network ID in production (see message signing options in pubSub initialization)
func (m *MiddlewareTestSuite) TestSpoofedPubSubHello() {
first := 0
last := m.size - 1
lastNode := m.ids[last].NodeID

// initially subscribe the nodes to the channel
for _, mw := range m.mws {
err := mw.Subscribe(testChannel)
require.NoError(m.Suite.T(), err)
}

// set up waiting for m.size pubsub tags indicating a mesh has formed
for i := 0; i < m.size; i++ {
select {
case <-m.obs:
case <-time.After(2 * time.Second):
assert.FailNow(m.T(), "could not receive pubsub tag indicating mesh formed")
}
}

spoofedID := unittest.IdentifierFixture()

message1 := createMessage(spoofedID, lastNode, "hello1")

err := m.mws[first].Publish(message1, testChannel)
assert.NoError(m.T(), err)

// assert that the spoofed message is not received by the target node
assert.Never(m.T(), func() bool {
return !m.ov[last].AssertNumberOfCalls(m.T(), "Receive", 0)
}, 2*time.Second, 100*time.Millisecond)

// invalid message sent by firstNode claims to be from lastNode
message2 := createMessage(lastNode, lastNode, "hello1")

err = m.mws[first].Publish(message2, testChannel)
assert.NoError(m.T(), err)

// assert that the invalid message is not received by the target node
assert.Never(m.T(), func() bool {
return !m.ov[last].AssertNumberOfCalls(m.T(), "Receive", 0)
}, 2*time.Second, 100*time.Millisecond)

}

// TestMaxMessageSize_SendDirect evaluates that invoking SendDirect method of the middleware on a message
// size beyond the permissible unicast message size returns an error.
func (m *MiddlewareTestSuite) TestMaxMessageSize_SendDirect() {
Expand Down Expand Up @@ -424,7 +477,7 @@ func (m *MiddlewareTestSuite) TestUnsubscribe() {
// assert that the new message is not received by the target node
assert.Never(m.T(), func() bool {
return !m.ov[last].AssertNumberOfCalls(m.T(), "Receive", 1)
}, 2*time.Second, time.Millisecond)
}, 2*time.Second, 100*time.Millisecond)
}

func createMessage(originID flow.Identifier, targetID flow.Identifier, msg ...string) *message.Message {
Expand Down

0 comments on commit f034213

Please sign in to comment.