Skip to content

Commit

Permalink
Merge pull request ipfs#301 from libp2p/fix/dq-defer-start
Browse files Browse the repository at this point in the history
defer dialqueue action until initial peers have been added
  • Loading branch information
Stebalien committed Mar 13, 2019
2 parents ac67725 + bd60c95 commit 076b93d
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 9 deletions.
28 changes: 19 additions & 9 deletions dial_queue.go
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"math"
"sync"
"time"

peer "github.com/libp2p/go-libp2p-peer"
Expand Down Expand Up @@ -31,8 +32,9 @@ const (
type dialQueue struct {
*dqParams

nWorkers uint
out *queue.ChanQueue
nWorkers uint
out *queue.ChanQueue
startOnce sync.Once

waitingCh chan waitingCh
dieCh chan struct{}
Expand Down Expand Up @@ -90,9 +92,10 @@ type waitingCh struct {
ts time.Time
}

// newDialQueue returns an adaptive dial queue that spawns a dynamically sized set of goroutines to preemptively
// stage dials for later handoff to the DHT protocol for RPC. It identifies backpressure on both ends (dial consumers
// and dial producers), and takes compensating action by adjusting the worker pool.
// newDialQueue returns an _unstarted_ adaptive dial queue that spawns a dynamically sized set of goroutines to
// preemptively stage dials for later handoff to the DHT protocol for RPC. It identifies backpressure on both
// ends (dial consumers and dial producers), and takes compensating action by adjusting the worker pool. To
// activate the dial queue, call Start().
//
// Why? Dialing is expensive. It's orders of magnitude slower than running an RPC on an already-established
// connection, as it requires establishing a TCP connection, multistream handshake, crypto handshake, mux handshake,
Expand All @@ -112,21 +115,28 @@ type waitingCh struct {
func newDialQueue(params *dqParams) (*dialQueue, error) {
dq := &dialQueue{
dqParams: params,
nWorkers: params.config.minParallelism,
out: queue.NewChanQueue(params.ctx, queue.NewXORDistancePQ(params.target)),
growCh: make(chan struct{}, 1),
shrinkCh: make(chan struct{}, 1),
waitingCh: make(chan waitingCh),
dieCh: make(chan struct{}, params.config.maxParallelism),
}

for i := 0; i < int(params.config.minParallelism); i++ {
go dq.worker()
}
go dq.control()
return dq, nil
}

// Start initiates action on this dial queue. It should only be called once; subsequent calls are ignored.
func (dq *dialQueue) Start() {
dq.startOnce.Do(func() {
tgt := int(dq.dqParams.config.minParallelism)
for i := 0; i < tgt; i++ {
go dq.worker()
}
dq.nWorkers = uint(tgt)
})
}

func (dq *dialQueue) control() {
var (
dialled <-chan peer.ID
Expand Down
8 changes: 8 additions & 0 deletions dial_queue_test.go
Expand Up @@ -42,6 +42,8 @@ func TestDialQueueGrowsOnSlowDials(t *testing.T) {
t.Error("unexpected error when constructing the dial queue", err)
}

dq.Start()

for i := 0; i < 4; i++ {
_ = dq.Consume()
time.Sleep(100 * time.Millisecond)
Expand Down Expand Up @@ -86,6 +88,8 @@ func TestDialQueueShrinksWithNoConsumers(t *testing.T) {
t.Error("unexpected error when constructing the dial queue", err)
}

dq.Start()

// acquire 3 consumers, everytime we acquire a consumer, we will grow the pool because no dial job is completed
// and immediately returnable.
for i := 0; i < 3; i++ {
Expand Down Expand Up @@ -158,6 +162,8 @@ func TestDialQueueShrinksWithWhenIdle(t *testing.T) {
t.Error("unexpected error when constructing the dial queue", err)
}

dq.Start()

// keep up to speed with backlog by releasing the dial function every time we acquire a channel.
for i := 0; i < 13; i++ {
ch := dq.Consume()
Expand Down Expand Up @@ -210,6 +216,8 @@ func TestDialQueueMutePeriodHonored(t *testing.T) {
t.Error("unexpected error when constructing the dial queue", err)
}

dq.Start()

// pick up three consumers.
for i := 0; i < 3; i++ {
_ = dq.Consume()
Expand Down
5 changes: 5 additions & 0 deletions query.go
Expand Up @@ -136,6 +136,11 @@ func (r *dhtQueryRunner) Run(ctx context.Context, peers []peer.ID) (*dhtQueryRes
r.addPeerToQuery(p)
}

// start the dial queue only after we've added the initial set of peers.
// this is to avoid race conditions that could cause the peersRemaining todoctr
// to be done too early if the initial dial fails before others make it into the queue.
r.peersDialed.Start()

// go do this thing.
// do it as a child proc to make sure Run exits
// ONLY AFTER spawn workers has exited.
Expand Down

0 comments on commit 076b93d

Please sign in to comment.