Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Go IDONTWANT #553

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
26 changes: 11 additions & 15 deletions comm.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (p *PubSub) notifyPeerDead(pid peer.ID) {
}
}

func (p *PubSub) handleNewPeer(ctx context.Context, pid peer.ID, outgoing <-chan *RPC) {
func (p *PubSub) handleNewPeer(ctx context.Context, pid peer.ID, outgoing *rpcQueue) {
s, err := p.host.NewStream(p.ctx, pid, p.rt.Protocols()...)
if err != nil {
log.Debug("opening new stream to peer: ", err, pid)
Expand All @@ -132,7 +132,7 @@ func (p *PubSub) handleNewPeer(ctx context.Context, pid peer.ID, outgoing <-chan
}
}

func (p *PubSub) handleNewPeerWithBackoff(ctx context.Context, pid peer.ID, backoff time.Duration, outgoing <-chan *RPC) {
func (p *PubSub) handleNewPeerWithBackoff(ctx context.Context, pid peer.ID, backoff time.Duration, outgoing *rpcQueue) {
select {
case <-time.After(backoff):
p.handleNewPeer(ctx, pid, outgoing)
Expand All @@ -153,7 +153,7 @@ func (p *PubSub) handlePeerDead(s network.Stream) {
p.notifyPeerDead(pid)
}

func (p *PubSub) handleSendingMessages(ctx context.Context, s network.Stream, outgoing <-chan *RPC) {
func (p *PubSub) handleSendingMessages(ctx context.Context, s network.Stream, outgoing *rpcQueue) {
writeRpc := func(rpc *RPC) error {
size := uint64(rpc.Size())

Expand All @@ -172,19 +172,15 @@ func (p *PubSub) handleSendingMessages(ctx context.Context, s network.Stream, ou

defer s.Close()
for {
select {
case rpc, ok := <-outgoing:
if !ok {
return
}
rpc, err := outgoing.Pop(ctx)
if err != nil {
return
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might want to log the error (at very low priority) if it is not a closed indicator.

}

err := writeRpc(rpc)
if err != nil {
s.Reset()
log.Debugf("writing message to %s: %s", s.Conn().RemotePeer(), err)
return
}
case <-ctx.Done():
err = writeRpc(rpc)
if err != nil {
s.Reset()
log.Debugf("writing message to %s: %s", s.Conn().RemotePeer(), err)
return
}
}
Expand Down
10 changes: 5 additions & 5 deletions floodsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,19 +83,19 @@ func (fs *FloodSubRouter) Publish(msg *Message) {
continue
}

mch, ok := fs.p.peers[pid]
q, ok := fs.p.peers[pid]
if !ok {
continue
}

select {
case mch <- out:
fs.tracer.SendRPC(out, pid)
default:
err := q.Push(out, false)
if err != nil {
log.Infof("dropping message to peer %s: queue full", pid)
fs.tracer.DropRPC(out, pid)
// Drop it. The peer is too slow.
continue
}
fs.tracer.SendRPC(out, pid)
}
}

Expand Down
16 changes: 8 additions & 8 deletions gossipsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -1159,14 +1159,14 @@ func (gs *GossipSubRouter) sendRPC(p peer.ID, out *RPC) {
delete(gs.gossip, p)
}

mch, ok := gs.p.peers[p]
q, ok := gs.p.peers[p]
if !ok {
return
}

// If we're below the max message size, go ahead and send
if out.Size() < gs.p.maxMessageSize {
gs.doSendRPC(out, p, mch)
gs.doSendRPC(out, p, q)
return
}

Expand All @@ -1178,7 +1178,7 @@ func (gs *GossipSubRouter) sendRPC(p peer.ID, out *RPC) {
}

for _, rpc := range outRPCs {
gs.doSendRPC(rpc, p, mch)
gs.doSendRPC(rpc, p, q)
}
}

Expand All @@ -1192,13 +1192,13 @@ func (gs *GossipSubRouter) doDropRPC(rpc *RPC, p peer.ID, reason string) {
}
}

func (gs *GossipSubRouter) doSendRPC(rpc *RPC, p peer.ID, mch chan *RPC) {
select {
case mch <- rpc:
gs.tracer.SendRPC(rpc, p)
default:
func (gs *GossipSubRouter) doSendRPC(rpc *RPC, p peer.ID, q *rpcQueue) {
err := q.Push(rpc, false)
if err != nil {
gs.doDropRPC(rpc, p, "queue full")
return
}
gs.tracer.SendRPC(rpc, p)
}

func fragmentRPC(rpc *RPC, limit int) ([]*RPC, error) {
Expand Down
52 changes: 26 additions & 26 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ type PubSub struct {
blacklist Blacklist
blacklistPeer chan peer.ID

peers map[peer.ID]chan *RPC
peers map[peer.ID]*rpcQueue

inboundStreamsMx sync.Mutex
inboundStreams map[peer.ID]network.Stream
Expand Down Expand Up @@ -285,7 +285,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option
mySubs: make(map[string]map[*Subscription]struct{}),
myRelays: make(map[string]int),
topics: make(map[string]map[peer.ID]struct{}),
peers: make(map[peer.ID]chan *RPC),
peers: make(map[peer.ID]*rpcQueue),
inboundStreams: make(map[peer.ID]network.Stream),
blacklist: NewMapBlacklist(),
blacklistPeer: make(chan peer.ID),
Expand Down Expand Up @@ -561,8 +561,8 @@ func WithAppSpecificRpcInspector(inspector func(peer.ID, *RPC) error) Option {
func (p *PubSub) processLoop(ctx context.Context) {
defer func() {
// Clean up go routines.
for _, ch := range p.peers {
close(ch)
for _, queue := range p.peers {
queue.Close()
}
p.peers = nil
p.topics = nil
Expand All @@ -577,7 +577,7 @@ func (p *PubSub) processLoop(ctx context.Context) {
case s := <-p.newPeerStream:
pid := s.Conn().RemotePeer()

ch, ok := p.peers[pid]
q, ok := p.peers[pid]
if !ok {
log.Warn("new stream for unknown peer: ", pid)
s.Reset()
Expand All @@ -586,7 +586,7 @@ func (p *PubSub) processLoop(ctx context.Context) {

if p.blacklist.Contains(pid) {
log.Warn("closing stream for blacklisted peer: ", pid)
close(ch)
q.Close()
delete(p.peers, pid)
s.Reset()
continue
Expand Down Expand Up @@ -654,9 +654,9 @@ func (p *PubSub) processLoop(ctx context.Context) {
log.Infof("Blacklisting peer %s", pid)
p.blacklist.Add(pid)

ch, ok := p.peers[pid]
q, ok := p.peers[pid]
if ok {
close(ch)
q.Close()
delete(p.peers, pid)
for t, tmap := range p.topics {
if _, ok := tmap[pid]; ok {
Expand Down Expand Up @@ -701,10 +701,10 @@ func (p *PubSub) handlePendingPeers() {
continue
}

messages := make(chan *RPC, p.peerOutboundQueueSize)
messages <- p.getHelloPacket()
go p.handleNewPeer(p.ctx, pid, messages)
p.peers[pid] = messages
rpcQueue := newRpcQueue(p.peerOutboundQueueSize)
rpcQueue.Push(p.getHelloPacket(), true)
go p.handleNewPeer(p.ctx, pid, rpcQueue)
p.peers[pid] = rpcQueue
}
}

Expand All @@ -721,12 +721,12 @@ func (p *PubSub) handleDeadPeers() {
p.peerDeadPrioLk.Unlock()

for pid := range deadPeers {
ch, ok := p.peers[pid]
q, ok := p.peers[pid]
if !ok {
continue
}

close(ch)
q.Close()
delete(p.peers, pid)

for t, tmap := range p.topics {
Expand All @@ -748,10 +748,10 @@ func (p *PubSub) handleDeadPeers() {
// still connected, must be a duplicate connection being closed.
// we respawn the writer as we need to ensure there is a stream active
log.Debugf("peer declared dead but still connected; respawning writer: %s", pid)
messages := make(chan *RPC, p.peerOutboundQueueSize)
messages <- p.getHelloPacket()
p.peers[pid] = messages
go p.handleNewPeerWithBackoff(p.ctx, pid, backoffDelay, messages)
rpcQueue := newRpcQueue(p.peerOutboundQueueSize)
rpcQueue.Push(p.getHelloPacket(), true)
p.peers[pid] = rpcQueue
go p.handleNewPeerWithBackoff(p.ctx, pid, backoffDelay, rpcQueue)
}
}
}
Expand Down Expand Up @@ -915,14 +915,14 @@ func (p *PubSub) announce(topic string, sub bool) {

out := rpcWithSubs(subopt)
for pid, peer := range p.peers {
select {
case peer <- out:
p.tracer.SendRPC(out, pid)
default:
err := peer.Push(out, false)
if err != nil {
log.Infof("Can't send announce message to peer %s: queue full; scheduling retry", pid)
p.tracer.DropRPC(out, pid)
go p.announceRetry(pid, topic, sub)
continue
}
p.tracer.SendRPC(out, pid)
}
}

Expand Down Expand Up @@ -958,14 +958,14 @@ func (p *PubSub) doAnnounceRetry(pid peer.ID, topic string, sub bool) {
}

out := rpcWithSubs(subopt)
select {
case peer <- out:
p.tracer.SendRPC(out, pid)
default:
err := peer.Push(out, false)
if err != nil {
log.Infof("Can't send announce message to peer %s: queue full; scheduling retry", pid)
p.tracer.DropRPC(out, pid)
go p.announceRetry(pid, topic, sub)
return
}
p.tracer.SendRPC(out, pid)
}

// notifySubs sends a given message to all corresponding subscribers.
Expand Down
10 changes: 5 additions & 5 deletions randomsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,18 +144,18 @@ func (rs *RandomSubRouter) Publish(msg *Message) {

out := rpcWithMessages(msg.Message)
for p := range tosend {
mch, ok := rs.p.peers[p]
q, ok := rs.p.peers[p]
if !ok {
continue
}

select {
case mch <- out:
rs.tracer.SendRPC(out, p)
default:
err := q.Push(out, false)
if err != nil {
log.Infof("dropping message to peer %s: queue full", p)
rs.tracer.DropRPC(out, p)
continue
}
rs.tracer.SendRPC(out, p)
}
}

Expand Down