Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fullrt dht bug fixes #719

Merged
merged 6 commits into from May 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
185 changes: 126 additions & 59 deletions fullrt/dht.go
Expand Up @@ -464,7 +464,7 @@ func (dht *FullRT) PutValue(ctx context.Context, key string, value []byte, opts
})
err := dht.protoMessenger.PutValue(ctx, p, rec)
return err
}, peers)
}, peers, true)

if successes == 0 {
return fmt.Errorf("failed to complete put")
Expand Down Expand Up @@ -751,7 +751,7 @@ func (dht *FullRT) getValues(ctx context.Context, key string, stopQuery chan str
return nil
}

dht.execOnMany(ctx, queryFn, peers)
dht.execOnMany(ctx, queryFn, peers, false)
gammazero marked this conversation as resolved.
Show resolved Hide resolved
lookupResCh <- &lookupWithFollowupResult{peers: peers}
}()
return valCh, lookupResCh
Expand Down Expand Up @@ -817,7 +817,7 @@ func (dht *FullRT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err e
successes := dht.execOnMany(ctx, func(ctx context.Context, p peer.ID) error {
err := dht.protoMessenger.PutProvider(ctx, p, keyMH, dht.h)
return err
}, peers)
}, peers, true)

if exceededDeadline {
return context.DeadlineExceeded
Expand All @@ -830,41 +830,71 @@ func (dht *FullRT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err e
return ctx.Err()
}

func (dht *FullRT) execOnMany(ctx context.Context, fn func(context.Context, peer.ID) error, peers []peer.ID) int {
putctx, cancel := context.WithCancel(ctx)
defer cancel()
// execOnMany executes the given function on each of the peers, although it may only wait for a certain chunk of peers
// to respond before considering the results "good enough" and returning.
//
// If sloppyExit is true then this function will return without waiting for all of its internal goroutines to close.
// If sloppyExit is true then the passed in function MUST be able to safely complete an arbitrary amount of time after
// execOnMany has returned (e.g. do not write to resources that might get closed or set to nil and therefore result in
// a panic instead of just returning an error).
func (dht *FullRT) execOnMany(ctx context.Context, fn func(context.Context, peer.ID) error, peers []peer.ID, sloppyExit bool) int {
if len(peers) == 0 {
return 0
}

waitAllCh := make(chan struct{}, len(peers))
// having a buffer that can take all of the elements is basically a hack to allow for sloppy exits that clean up
// the goroutines after the function is done rather than before
errCh := make(chan error, len(peers))
numSuccessfulToWaitFor := int(float64(len(peers)) * dht.waitFrac)
waitSuccessCh := make(chan struct{}, numSuccessfulToWaitFor)

putctx, cancel := context.WithTimeout(ctx, dht.timeoutPerOp)
defer cancel()

for _, p := range peers {
go func(p peer.ID) {
fnCtx, fnCancel := context.WithTimeout(putctx, dht.timeoutPerOp)
defer fnCancel()
err := fn(fnCtx, p)
if err != nil {
logger.Debug(err)
} else {
waitSuccessCh <- struct{}{}
}
waitAllCh <- struct{}{}
errCh <- fn(putctx, p)
}(p)
}

numSuccess, numDone := 0, 0
t := time.NewTimer(time.Hour)
for numDone != len(peers) {
var numDone, numSuccess, successSinceLastTick int
var ticker *time.Ticker
var tickChan <-chan time.Time

for numDone < len(peers) {
select {
case <-waitAllCh:
case err := <-errCh:
numDone++
case <-waitSuccessCh:
if numSuccess >= numSuccessfulToWaitFor {
t.Reset(time.Millisecond * 500)
if err == nil {
numSuccess++
if numSuccess >= numSuccessfulToWaitFor && ticker == nil {
// Once there are enough successes, wait a little longer
ticker = time.NewTicker(time.Millisecond * 500)
defer ticker.Stop()
tickChan = ticker.C
successSinceLastTick = numSuccess
}
// This is equivalent to numSuccess * 2 + numFailures >= len(peers) and is a heuristic that seems to be
// performing reasonably.
// TODO: Make this metric more configurable
// TODO: Have better heuristics in this function whether determined from observing static network
// properties or dynamically calculating them
if numSuccess+numDone >= len(peers) {
cancel()
if sloppyExit {
return numSuccess
}
}
}
case <-tickChan:
if numSuccess > successSinceLastTick {
// If there were additional successes, then wait another tick
successSinceLastTick = numSuccess
} else {
cancel()
if sloppyExit {
return numSuccess
}
}
numSuccess++
numDone++
case <-t.C:
cancel()
}
}
return numSuccess
Expand Down Expand Up @@ -898,7 +928,7 @@ func (dht *FullRT) ProvideMany(ctx context.Context, keys []multihash.Multihash)
pmes.ProviderPeers = pbPeers

return dht.messageSender.SendMessage(ctx, p, pmes)
}, peers)
}, peers, true)
if successes == 0 {
return fmt.Errorf("no successful provides")
}
Expand Down Expand Up @@ -941,7 +971,7 @@ func (dht *FullRT) PutMany(ctx context.Context, keys []string, values [][]byte)
successes := dht.execOnMany(ctx, func(ctx context.Context, p peer.ID) error {
keyStr := string(k)
return dht.protoMessenger.PutValue(ctx, p, record.MakePutRecord(keyStr, keyRecMap[keyStr]))
}, peers)
}, peers, true)
if successes == 0 {
return fmt.Errorf("no successful puts")
}
Expand All @@ -962,39 +992,43 @@ func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func(
var numSendsSuccessful uint64 = 0

wg := sync.WaitGroup{}
wg.Add(dht.bulkSendParallelism)
chunkSize := len(sortedKeys) / dht.bulkSendParallelism
onePctKeys := uint64(len(sortedKeys)) / 100
for i := 0; i < dht.bulkSendParallelism; i++ {
var chunk []peer.ID
end := (i + 1) * chunkSize
if end > len(sortedKeys) {
chunk = sortedKeys[i*chunkSize:]
} else {
chunk = sortedKeys[i*chunkSize : end]
}

go func() {
defer wg.Done()
for _, key := range chunk {
sendsSoFar := atomic.AddUint64(&numSends, 1)
if sendsSoFar%onePctKeys == 0 {
logger.Infof("bulk sending goroutine: %.1f%% done - %d/%d done", 100*float64(sendsSoFar)/float64(len(sortedKeys)), sendsSoFar, len(sortedKeys))
}
if err := fn(ctx, key); err != nil {
var l interface{}
if isProvRec {
l = internal.LoggableProviderRecordBytes(key)
} else {
l = internal.LoggableRecordKeyString(key)
}
logger.Infof("failed to complete bulk sending of key :%v. %v", l, err)
bulkSendFn := func(chunk []peer.ID) {
defer wg.Done()
for _, key := range chunk {
if ctx.Err() != nil {
break
}

sendsSoFar := atomic.AddUint64(&numSends, 1)
if onePctKeys > 0 && sendsSoFar%onePctKeys == 0 {
logger.Infof("bulk sending goroutine: %.1f%% done - %d/%d done", 100*float64(sendsSoFar)/float64(len(sortedKeys)), sendsSoFar, len(sortedKeys))
}
if err := fn(ctx, key); err != nil {
var l interface{}
if isProvRec {
l = internal.LoggableProviderRecordBytes(key)
} else {
atomic.AddUint64(&numSendsSuccessful, 1)
l = internal.LoggableRecordKeyString(key)
}
logger.Infof("failed to complete bulk sending of key :%v. %v", l, err)
} else {
atomic.AddUint64(&numSendsSuccessful, 1)
}
}()
}
}

// divide the keys into groups so that we can talk to more peers at a time, because the keys are sorted in
// XOR/Kadmelia space consecutive puts will be too the same, or nearly the same, set of peers. Working in parallel
// means less waiting on individual dials to complete and also continuing to make progress even if one segment of
// the network is being slow, or we are maxing out the connection, stream, etc. to those peers.
keyGroups := divideIntoGroups(sortedKeys, dht.bulkSendParallelism)
wg.Add(len(keyGroups))
for _, chunk := range keyGroups {
go bulkSendFn(chunk)
gammazero marked this conversation as resolved.
Show resolved Hide resolved
}

gammazero marked this conversation as resolved.
Show resolved Hide resolved
wg.Wait()

if numSendsSuccessful == 0 {
Expand All @@ -1006,6 +1040,39 @@ func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func(
return nil
}

// divideIntoGroups divides the set of keys into (at most) the number of groups
func divideIntoGroups(keys []peer.ID, groups int) [][]peer.ID {
var keyGroups [][]peer.ID
if len(keys) < groups {
for i := 0; i < len(keys); i++ {
keyGroups = append(keyGroups, keys[i:i+1])
}
return keyGroups
}

chunkSize := len(keys) / groups
remainder := len(keys) % groups

start := 0
end := chunkSize
for i := 0; i < groups; i++ {
var chunk []peer.ID
// distribute the remainder as one extra entry per parallel thread
if remainder > 0 {
chunk = keys[start : end+1]
remainder--
start = end + 1
end = end + 1 + chunkSize
} else {
chunk = keys[start:end]
start = end
end = end + chunkSize
}
keyGroups = append(keyGroups, chunk)
}
return keyGroups
}

// FindProviders searches until the context expires.
func (dht *FullRT) FindProviders(ctx context.Context, c cid.Cid) ([]peer.AddrInfo, error) {
if !dht.enableProviders {
Expand Down Expand Up @@ -1129,7 +1196,7 @@ func (dht *FullRT) findProvidersAsyncRoutine(ctx context.Context, key multihash.
return nil
}

dht.execOnMany(queryctx, fn, peers)
dht.execOnMany(queryctx, fn, peers, false)
}

// FindPeer searches for a peer with given ID.
Expand Down Expand Up @@ -1214,7 +1281,7 @@ func (dht *FullRT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo, e
return nil
}

dht.execOnMany(queryctx, fn, peers)
dht.execOnMany(queryctx, fn, peers, false)

close(addrsCh)
wg.Wait()
Expand Down
85 changes: 85 additions & 0 deletions fullrt/dht_test.go
@@ -0,0 +1,85 @@
package fullrt

import (
"strconv"
"testing"

"github.com/libp2p/go-libp2p-core/peer"
)

func TestDivideIntoGroups(t *testing.T) {
var keys []peer.ID
for i := 0; i < 10; i++ {
keys = append(keys, peer.ID(strconv.Itoa(i)))
}

convertToStrings := func(peers []peer.ID) []string {
var out []string
for _, p := range peers {
out = append(out, string(p))
}
return out
}

pidsEquals := func(a, b []string) bool {
if len(a) != len(b) {
return false
}
for i, v := range a {
if v != b[i] {
return false
}
}
return true
}

t.Run("Divides", func(t *testing.T) {
gr := divideIntoGroups(keys, 2)
if len(gr) != 2 {
t.Fatal("incorrect number of groups")
}
if g1, expected := convertToStrings(gr[0]), []string{"0", "1", "2", "3", "4"}; !pidsEquals(g1, expected) {
t.Fatalf("expected %v, got %v", expected, g1)
}
if g2, expected := convertToStrings(gr[1]), []string{"5", "6", "7", "8", "9"}; !pidsEquals(g2, expected) {
t.Fatalf("expected %v, got %v", expected, g2)
}
})
t.Run("Remainder", func(t *testing.T) {
gr := divideIntoGroups(keys, 3)
if len(gr) != 3 {
t.Fatal("incorrect number of groups")
}
if g, expected := convertToStrings(gr[0]), []string{"0", "1", "2", "3"}; !pidsEquals(g, expected) {
t.Fatalf("expected %v, got %v", expected, g)
}
if g, expected := convertToStrings(gr[1]), []string{"4", "5", "6"}; !pidsEquals(g, expected) {
t.Fatalf("expected %v, got %v", expected, g)
}
if g, expected := convertToStrings(gr[2]), []string{"7", "8", "9"}; !pidsEquals(g, expected) {
t.Fatalf("expected %v, got %v", expected, g)
}
})
t.Run("OneEach", func(t *testing.T) {
gr := divideIntoGroups(keys, 10)
if len(gr) != 10 {
t.Fatal("incorrect number of groups")
}
for i := 0; i < 10; i++ {
if g, expected := convertToStrings(gr[i]), []string{strconv.Itoa(i)}; !pidsEquals(g, expected) {
t.Fatalf("expected %v, got %v", expected, g)
}
}
})
t.Run("TooManyGroups", func(t *testing.T) {
gr := divideIntoGroups(keys, 11)
if len(gr) != 10 {
t.Fatal("incorrect number of groups")
}
for i := 0; i < 10; i++ {
if g, expected := convertToStrings(gr[i]), []string{strconv.Itoa(i)}; !pidsEquals(g, expected) {
t.Fatalf("expected %v, got %v", expected, g)
}
}
})
gammazero marked this conversation as resolved.
Show resolved Hide resolved
}