Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make message large message handling robust and noisy #189

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions comm.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (p *PubSub) getHelloPacket() *RPC {
}

func (p *PubSub) handleNewStream(s network.Stream) {
r := ggio.NewDelimitedReader(s, 1<<20)
r := ggio.NewDelimitedReader(s, maxRPCSize)
for {
rpc := new(RPC)
err := r.ReadMsg(&rpc.RPC)
Expand Down Expand Up @@ -85,7 +85,7 @@ func (p *PubSub) handleNewPeer(ctx context.Context, pid peer.ID, outgoing <-chan
}

func (p *PubSub) handlePeerEOF(ctx context.Context, s network.Stream) {
r := ggio.NewDelimitedReader(s, 1<<20)
r := ggio.NewDelimitedReader(s, maxRPCSize)
rpc := new(RPC)
for {
err := r.ReadMsg(&rpc.RPC)
Expand Down
11 changes: 10 additions & 1 deletion floodsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,20 @@ func (fs *FloodSubRouter) Publish(from peer.ID, msg *pb.Message) {
continue
}

if out.Size() > maxRPCSize {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this check really necessary here?
We already drop large messages on publish and can't read messages larger than maxRCPSize anyway.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it should be fine in floodsub. My worry is gossipsub.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, let's remove it from here as it's redundant, and rethink the gossipsub case.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can't hurt and future proofs us against, e.g., batching too many messages in a single RPC and then silently failing when the remote side drops it.

log.Errorf(
"dropping RPC outbound message that's too large: %d > %d",
out.Size(),
maxRPCSize,
)
continue
}

select {
case mch <- out:
default:
log.Infof("dropping message to peer %s: queue full", pid)
// Drop it. The peer is too slow.
log.Infof("dropping message to peer %s: queue full", pid)
}
}
}
Expand Down
26 changes: 18 additions & 8 deletions gossipsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,15 +342,25 @@ func (gs *GossipSubRouter) sendRPC(p peer.ID, out *RPC) {
return
}

select {
case mch <- out:
default:
log.Infof("dropping message to peer %s: queue full", p)
// push control messages that need to be retried
ctl := out.GetControl()
if ctl != nil {
gs.pushControl(p, ctl)
if out.Size() > maxRPCSize {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here too, this check seems unnecessary.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm worried we could add too much gossip. That's why I have that check. But I can remove it if you think it's overkill.

Copy link
Collaborator

@vyzo vyzo Jun 12, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but the check is done before we even add the gossip!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, this check is reasonable on second thought, if we are adding too much gossip (over 64KB) we probably want to know.

log.Errorf(
"dropping RPC outbound message that's too large: %d > %d",
out.Size(),
maxRPCSize,
)
} else {
select {
case mch <- out:
return
default:
}
log.Infof("dropping message to peer %s: queue full", p)
}

// push control messages that need to be retried
ctl := out.GetControl()
if ctl != nil {
gs.pushControl(p, ctl)
}
}

Expand Down
12 changes: 12 additions & 0 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ var (
TimeCacheDuration = 120 * time.Second
)

const (
// maxRPCSize is the maximum size of any RPC protobuf.
maxRPCSize = maxMessageSize + (64 * 1024) // a message + 64 KiB.
/// maxMessageSize is the maximum size of outbound message.
maxMessageSize = 1 << 20 // 1MiB
vyzo marked this conversation as resolved.
Show resolved Hide resolved
)

var log = logging.Logger("pubsub")

// PubSub is the implementation of the pubsub system.
Expand Down Expand Up @@ -696,7 +703,12 @@ func (p *PubSub) GetTopics() []string {
}

// Publish publishes data to the given topic.
//
// The message data must be less than the maximum message size, 1MiB.
func (p *PubSub) Publish(topic string, data []byte) error {
if len(data) > 0 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BUG: that should be

if len(data) > maxMessageSize

return fmt.Errorf("message too large: %d > %d", len(data), maxMessageSize)
}
seqno := p.nextSeqno()
m := &pb.Message{
Data: data,
Expand Down