Skip to content

Commit

Permalink
p2p: broadcast spawns goroutine to Send on each peer and times out af…
Browse files Browse the repository at this point in the history
…ter 10 seconds. Closes #7
  • Loading branch information
ebuchman committed Mar 18, 2015
1 parent 67f7cbd commit 98a81fb
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 17 deletions.
19 changes: 14 additions & 5 deletions p2p/connection.go
Expand Up @@ -28,6 +28,7 @@ const (
defaultRecvRate = 51200 // 5Kb/s
defaultSendQueueCapacity = 1
defaultRecvBufferCapacity = 4096
defaultSendTimeoutSeconds = 10
)

type receiveCbFunc func(chId byte, msgBytes []byte)
Expand Down Expand Up @@ -191,15 +192,15 @@ func (c *MConnection) Send(chId byte, msg interface{}) bool {
return false
}

channel.sendBytes(binary.BinaryBytes(msg))
success := channel.sendBytes(binary.BinaryBytes(msg))

// Wake up sendRoutine if necessary
select {
case c.send <- struct{}{}:
default:
}

return true
return success
}

// Queues a message to be sent to channel.
Expand Down Expand Up @@ -470,9 +471,17 @@ func newChannel(conn *MConnection, desc *ChannelDescriptor) *Channel {

// Queues message to send to this channel.
// Goroutine-safe
func (ch *Channel) sendBytes(bytes []byte) {
ch.sendQueue <- bytes
atomic.AddUint32(&ch.sendQueueSize, 1)
// Times out (and returns false) after defaultSendTimeoutSeconds
func (ch *Channel) sendBytes(bytes []byte) bool {
sendTicker := time.NewTicker(defaultSendTimeoutSeconds * time.Second)
select {
case <-sendTicker.C:
// timeout
return false
case ch.sendQueue <- bytes:
atomic.AddUint32(&ch.sendQueueSize, 1)
return true
}
}

// Queues message to send to this channel.
Expand Down
22 changes: 10 additions & 12 deletions p2p/switch.go
Expand Up @@ -161,24 +161,22 @@ func (sw *Switch) IsDialing(addr *NetAddress) bool {
return sw.dialing.Has(addr.String())
}

// XXX: This is wrong, we can't just ignore failures on TrySend.
func (sw *Switch) Broadcast(chId byte, msg interface{}) (numSuccess, numFailure int) {
// Broadcast runs a go routine for each attemptted send, which will block
// trying to send for defaultSendTimeoutSeconds. Returns a channel
// which receives success values for each attempted send (false if times out)
func (sw *Switch) Broadcast(chId byte, msg interface{}) chan bool {
if atomic.LoadUint32(&sw.stopped) == 1 {
return
}

successChan := make(chan bool, len(sw.peers.List()))
log.Debug("Broadcast", "channel", chId, "msg", msg)
for _, peer := range sw.peers.List() {
// XXX XXX Change.
// success := peer.TrySend(chId, msg)
success := peer.Send(chId, msg)
if success {
numSuccess += 1
} else {
numFailure += 1
}
go func() {
success := peer.Send(chId, msg)
successChan <- success
}()
}
return
return successChan

}

Expand Down

0 comments on commit 98a81fb

Please sign in to comment.