Skip to content

Commit

Permalink
make message large message handling robust and noisy
Browse files Browse the repository at this point in the history
1. Reject outgoing messages that are too large.
2. Drop outgoing RPCs that are too large and _log_.
3. Increase the max RPC size to 1MiB+64KiB (to allow for 1MiB messages).
  • Loading branch information
Stebalien committed Jun 6, 2019
1 parent 4221a39 commit da3290e
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 11 deletions.
4 changes: 2 additions & 2 deletions comm.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (p *PubSub) getHelloPacket() *RPC {
}

func (p *PubSub) handleNewStream(s network.Stream) {
r := ggio.NewDelimitedReader(s, 1<<20)
r := ggio.NewDelimitedReader(s, maxRPCSize)
for {
rpc := new(RPC)
err := r.ReadMsg(&rpc.RPC)
Expand Down Expand Up @@ -85,7 +85,7 @@ func (p *PubSub) handleNewPeer(ctx context.Context, pid peer.ID, outgoing <-chan
}

func (p *PubSub) handlePeerEOF(ctx context.Context, s network.Stream) {
r := ggio.NewDelimitedReader(s, 1<<20)
r := ggio.NewDelimitedReader(s, maxRPCSize)
rpc := new(RPC)
for {
err := r.ReadMsg(&rpc.RPC)
Expand Down
11 changes: 10 additions & 1 deletion floodsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,20 @@ func (fs *FloodSubRouter) Publish(from peer.ID, msg *pb.Message) {
continue
}

if out.Size() > maxRPCSize {
log.Errorf(
"dropping RPC outbound message that's too large: %d > %d",
out.Size(),
maxRPCSize,
)
continue
}

select {
case mch <- out:
default:
log.Infof("dropping message to peer %s: queue full", pid)
// Drop it. The peer is too slow.
log.Infof("dropping message to peer %s: queue full", pid)
}
}
}
Expand Down
26 changes: 18 additions & 8 deletions gossipsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,15 +342,25 @@ func (gs *GossipSubRouter) sendRPC(p peer.ID, out *RPC) {
return
}

select {
case mch <- out:
default:
log.Infof("dropping message to peer %s: queue full", p)
// push control messages that need to be retried
ctl := out.GetControl()
if ctl != nil {
gs.pushControl(p, ctl)
if out.Size() > maxRPCSize {
log.Errorf(
"dropping RPC outbound message that's too large: %d > %d",
out.Size(),
maxRPCSize,
)
} else {
select {
case mch <- out:
return
default:
}
log.Infof("dropping message to peer %s: queue full", p)
}

// push control messages that need to be retried
ctl := out.GetControl()
if ctl != nil {
gs.pushControl(p, ctl)
}
}

Expand Down
12 changes: 12 additions & 0 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ var (
TimeCacheDuration = 120 * time.Second
)

const (
// maxRPCSize is the maximum size of any RPC protobuf.
maxRPCSize = maxMessageSize + (64 * 1024) // a message + 64 KiB.
/// maxMessageSize is the maximum size of outbound message.
maxMessageSize = 1 << 20 // 1MiB
)

var log = logging.Logger("pubsub")

// PubSub is the implementation of the pubsub system.
Expand Down Expand Up @@ -696,7 +703,12 @@ func (p *PubSub) GetTopics() []string {
}

// Publish publishes data to the given topic.
//
// The message data must be less than the maximum message size, 1MiB.
func (p *PubSub) Publish(topic string, data []byte) error {
if len(data) > 0 {
return fmt.Errorf("message too large: %d > %d", len(data), maxMessageSize)
}
seqno := p.nextSeqno()
m := &pb.Message{
Data: data,
Expand Down

0 comments on commit da3290e

Please sign in to comment.