From 8cce6568eabd31409b195d863c0656e1f56b6780 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Tue, 25 May 2021 19:28:46 -0400 Subject: [PATCH 1/6] fullrt: only log percentage completion of bulk sends if there are at least 100 messages to send --- fullrt/dht.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fullrt/dht.go b/fullrt/dht.go index f01662737..b017e1a8c 100644 --- a/fullrt/dht.go +++ b/fullrt/dht.go @@ -978,7 +978,7 @@ func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func( defer wg.Done() for _, key := range chunk { sendsSoFar := atomic.AddUint64(&numSends, 1) - if sendsSoFar%onePctKeys == 0 { + if onePctKeys > 0 && sendsSoFar%onePctKeys == 0 { logger.Infof("bulk sending goroutine: %.1f%% done - %d/%d done", 100*float64(sendsSoFar)/float64(len(sortedKeys)), sendsSoFar, len(sortedKeys)) } if err := fn(ctx, key); err != nil { From c5223963964942eabe99b73cf3b8859bac8a3658 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Wed, 26 May 2021 03:06:26 -0400 Subject: [PATCH 2/6] fullrt: abort bulk sending earlier if the context is cancelled --- fullrt/dht.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/fullrt/dht.go b/fullrt/dht.go index b017e1a8c..d4554d9bf 100644 --- a/fullrt/dht.go +++ b/fullrt/dht.go @@ -977,6 +977,10 @@ func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func( go func() { defer wg.Done() for _, key := range chunk { + if ctx.Err() != nil { + break + } + sendsSoFar := atomic.AddUint64(&numSends, 1) if onePctKeys > 0 && sendsSoFar%onePctKeys == 0 { logger.Infof("bulk sending goroutine: %.1f%% done - %d/%d done", 100*float64(sendsSoFar)/float64(len(sortedKeys)), sendsSoFar, len(sortedKeys)) From eaca669fcb5e54598f0f7f76ef82b311fb8a7f72 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Wed, 26 May 2021 10:03:23 -0400 Subject: [PATCH 3/6] fullrt: fix dividing up bulk sending of keys into groups --- fullrt/dht.go | 89 ++++++++++++++++++++++++++++++---------------- fullrt/dht_test.go | 85 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 144 insertions(+), 30 deletions(-) create mode 100644 fullrt/dht_test.go diff --git a/fullrt/dht.go b/fullrt/dht.go index d4554d9bf..5b2e0de63 100644 --- a/fullrt/dht.go +++ b/fullrt/dht.go @@ -962,43 +962,39 @@ func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func( var numSendsSuccessful uint64 = 0 wg := sync.WaitGroup{} - wg.Add(dht.bulkSendParallelism) - chunkSize := len(sortedKeys) / dht.bulkSendParallelism onePctKeys := uint64(len(sortedKeys)) / 100 - for i := 0; i < dht.bulkSendParallelism; i++ { - var chunk []peer.ID - end := (i + 1) * chunkSize - if end > len(sortedKeys) { - chunk = sortedKeys[i*chunkSize:] - } else { - chunk = sortedKeys[i*chunkSize : end] - } - go func() { - defer wg.Done() - for _, key := range chunk { - if ctx.Err() != nil { - break - } + bulkSendFn := func(chunk []peer.ID) { + defer wg.Done() + for _, key := range chunk { + if ctx.Err() != nil { + break + } - sendsSoFar := atomic.AddUint64(&numSends, 1) - if onePctKeys > 0 && sendsSoFar%onePctKeys == 0 { - logger.Infof("bulk sending goroutine: %.1f%% done - %d/%d done", 100*float64(sendsSoFar)/float64(len(sortedKeys)), sendsSoFar, len(sortedKeys)) - } - if err := fn(ctx, key); err != nil { - var l interface{} - if isProvRec { - l = internal.LoggableProviderRecordBytes(key) - } else { - l = internal.LoggableRecordKeyString(key) - } - logger.Infof("failed to complete bulk sending of key :%v. %v", l, err) + sendsSoFar := atomic.AddUint64(&numSends, 1) + if onePctKeys > 0 && sendsSoFar%onePctKeys == 0 { + logger.Infof("bulk sending goroutine: %.1f%% done - %d/%d done", 100*float64(sendsSoFar)/float64(len(sortedKeys)), sendsSoFar, len(sortedKeys)) + } + if err := fn(ctx, key); err != nil { + var l interface{} + if isProvRec { + l = internal.LoggableProviderRecordBytes(key) } else { - atomic.AddUint64(&numSendsSuccessful, 1) + l = internal.LoggableRecordKeyString(key) } + logger.Infof("failed to complete bulk sending of key :%v. %v", l, err) + } else { + atomic.AddUint64(&numSendsSuccessful, 1) } - }() + } } + + keyGroups := divideIntoGroups(sortedKeys, dht.bulkSendParallelism) + wg.Add(len(keyGroups)) + for _, chunk := range keyGroups { + go bulkSendFn(chunk) + } + wg.Wait() if numSendsSuccessful == 0 { @@ -1010,6 +1006,39 @@ func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func( return nil } +// divideIntoGroups divides the set of keys into (at most) the number of groups +func divideIntoGroups(keys []peer.ID, groups int) [][]peer.ID { + var keyGroups [][]peer.ID + if len(keys) < groups { + for i := 0; i < len(keys); i++ { + keyGroups = append(keyGroups, keys[i:i+1]) + } + return keyGroups + } + + chunkSize := len(keys) / groups + remainder := len(keys) % groups + + start := 0 + end := chunkSize + for i := 0; i < groups; i++ { + var chunk []peer.ID + // distribute the remainder as one extra entry per parallel thread + if remainder > 0 { + chunk = keys[start : end+1] + remainder-- + start = end + 1 + end = end + 1 + chunkSize + } else { + chunk = keys[start:end] + start = end + end = end + chunkSize + } + keyGroups = append(keyGroups, chunk) + } + return keyGroups +} + // FindProviders searches until the context expires. func (dht *FullRT) FindProviders(ctx context.Context, c cid.Cid) ([]peer.AddrInfo, error) { if !dht.enableProviders { diff --git a/fullrt/dht_test.go b/fullrt/dht_test.go new file mode 100644 index 000000000..945548f43 --- /dev/null +++ b/fullrt/dht_test.go @@ -0,0 +1,85 @@ +package fullrt + +import ( + "strconv" + "testing" + + "github.com/libp2p/go-libp2p-core/peer" +) + +func TestDivideIntoGroups(t *testing.T) { + var keys []peer.ID + for i := 0; i < 10; i++ { + keys = append(keys, peer.ID(strconv.Itoa(i))) + } + + convertToStrings := func(peers []peer.ID) []string { + var out []string + for _, p := range peers { + out = append(out, string(p)) + } + return out + } + + pidsEquals := func(a, b []string) bool { + if len(a) != len(b) { + return false + } + for i, v := range a { + if v != b[i] { + return false + } + } + return true + } + + t.Run("Divides", func(t *testing.T) { + gr := divideIntoGroups(keys, 2) + if len(gr) != 2 { + t.Fatal("incorrect number of groups") + } + if g1, expected := convertToStrings(gr[0]), []string{"0", "1", "2", "3", "4"}; !pidsEquals(g1, expected) { + t.Fatalf("expected %v, got %v", expected, g1) + } + if g2, expected := convertToStrings(gr[1]), []string{"5", "6", "7", "8", "9"}; !pidsEquals(g2, expected) { + t.Fatalf("expected %v, got %v", expected, g2) + } + }) + t.Run("Remainder", func(t *testing.T) { + gr := divideIntoGroups(keys, 3) + if len(gr) != 3 { + t.Fatal("incorrect number of groups") + } + if g, expected := convertToStrings(gr[0]), []string{"0", "1", "2", "3"}; !pidsEquals(g, expected) { + t.Fatalf("expected %v, got %v", expected, g) + } + if g, expected := convertToStrings(gr[1]), []string{"4", "5", "6"}; !pidsEquals(g, expected) { + t.Fatalf("expected %v, got %v", expected, g) + } + if g, expected := convertToStrings(gr[2]), []string{"7", "8", "9"}; !pidsEquals(g, expected) { + t.Fatalf("expected %v, got %v", expected, g) + } + }) + t.Run("OneEach", func(t *testing.T) { + gr := divideIntoGroups(keys, 10) + if len(gr) != 10 { + t.Fatal("incorrect number of groups") + } + for i := 0; i < 10; i++ { + if g, expected := convertToStrings(gr[i]), []string{strconv.Itoa(i)}; !pidsEquals(g, expected) { + t.Fatalf("expected %v, got %v", expected, g) + } + } + }) + t.Run("TooManyGroups", func(t *testing.T) { + gr := divideIntoGroups(keys, 11) + if len(gr) != 10 { + t.Fatal("incorrect number of groups") + } + for i := 0; i < 10; i++ { + if g, expected := convertToStrings(gr[i]), []string{strconv.Itoa(i)}; !pidsEquals(g, expected) { + t.Fatalf("expected %v, got %v", expected, g) + } + } + }) +} From 0da80650a35714d7e164d6fdab798b5467121d0e Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Thu, 27 May 2021 10:12:16 -0400 Subject: [PATCH 4/6] fullrt: comment to explain why bulk sending splits into groups --- fullrt/dht.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/fullrt/dht.go b/fullrt/dht.go index 5b2e0de63..c4e9ae660 100644 --- a/fullrt/dht.go +++ b/fullrt/dht.go @@ -989,6 +989,10 @@ func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func( } } + // divide the keys into groups so that we can talk to more peers at a time, because the keys are sorted in + // XOR/Kadmelia space consecutive puts will be too the same, or nearly the same, set of peers. Working in parallel + // means less waiting on individual dials to complete and also continuing to make progress even if one segment of + // the network is being slow, or we are maxing out the connection, stream, etc. to those peers. keyGroups := divideIntoGroups(sortedKeys, dht.bulkSendParallelism) wg.Add(len(keyGroups)) for _, chunk := range keyGroups { From 5eb9aaa3acac08ba6d9d60c0e3662db5edf4a748 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Wed, 26 May 2021 03:08:07 -0400 Subject: [PATCH 5/6] fullrt: execOnMany abort early if executing on no peers --- fullrt/dht.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/fullrt/dht.go b/fullrt/dht.go index c4e9ae660..4b43ed867 100644 --- a/fullrt/dht.go +++ b/fullrt/dht.go @@ -831,6 +831,10 @@ func (dht *FullRT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err e } func (dht *FullRT) execOnMany(ctx context.Context, fn func(context.Context, peer.ID) error, peers []peer.ID) int { + if len(peers) == 0 { + return 0 + } + putctx, cancel := context.WithCancel(ctx) defer cancel() From daab800da0dc14b2af112b2dc1626aaffd2dc888 Mon Sep 17 00:00:00 2001 From: gammazero Date: Wed, 19 May 2021 15:40:59 -0700 Subject: [PATCH 6/6] Cleanup execOnMany and allow it to be used in both sloppy and non-sloppy mode The execOnMany function was able to exit prematurely, leaving its child goroutines running. These would write to a channel that closed after execOnMany returned in findProvidersAsyncRoutine. The function is now able to run both in a sloppy mode where it allows goroutines to be cleaned up after the function has completed as well a safer non-sloppy mode where goroutines will complete before the function returns. The sloppy mode is used for DHT "get" operations like FindProviders and SearchValues whereas the non-sloppy mode is used for "put" operations like PutValue and Provide (along with their bulk operation equivalents). This fixes https://github.com/ipfs/go-ipfs/issues/8146 --- fullrt/dht.go | 90 +++++++++++++++++++++++++++++++++------------------ 1 file changed, 58 insertions(+), 32 deletions(-) diff --git a/fullrt/dht.go b/fullrt/dht.go index 4b43ed867..9c7edef1f 100644 --- a/fullrt/dht.go +++ b/fullrt/dht.go @@ -464,7 +464,7 @@ func (dht *FullRT) PutValue(ctx context.Context, key string, value []byte, opts }) err := dht.protoMessenger.PutValue(ctx, p, rec) return err - }, peers) + }, peers, true) if successes == 0 { return fmt.Errorf("failed to complete put") @@ -751,7 +751,7 @@ func (dht *FullRT) getValues(ctx context.Context, key string, stopQuery chan str return nil } - dht.execOnMany(ctx, queryFn, peers) + dht.execOnMany(ctx, queryFn, peers, false) lookupResCh <- &lookupWithFollowupResult{peers: peers} }() return valCh, lookupResCh @@ -817,7 +817,7 @@ func (dht *FullRT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err e successes := dht.execOnMany(ctx, func(ctx context.Context, p peer.ID) error { err := dht.protoMessenger.PutProvider(ctx, p, keyMH, dht.h) return err - }, peers) + }, peers, true) if exceededDeadline { return context.DeadlineExceeded @@ -830,45 +830,71 @@ func (dht *FullRT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err e return ctx.Err() } -func (dht *FullRT) execOnMany(ctx context.Context, fn func(context.Context, peer.ID) error, peers []peer.ID) int { +// execOnMany executes the given function on each of the peers, although it may only wait for a certain chunk of peers +// to respond before considering the results "good enough" and returning. +// +// If sloppyExit is true then this function will return without waiting for all of its internal goroutines to close. +// If sloppyExit is true then the passed in function MUST be able to safely complete an arbitrary amount of time after +// execOnMany has returned (e.g. do not write to resources that might get closed or set to nil and therefore result in +// a panic instead of just returning an error). +func (dht *FullRT) execOnMany(ctx context.Context, fn func(context.Context, peer.ID) error, peers []peer.ID, sloppyExit bool) int { if len(peers) == 0 { return 0 } - putctx, cancel := context.WithCancel(ctx) + // having a buffer that can take all of the elements is basically a hack to allow for sloppy exits that clean up + // the goroutines after the function is done rather than before + errCh := make(chan error, len(peers)) + numSuccessfulToWaitFor := int(float64(len(peers)) * dht.waitFrac) + + putctx, cancel := context.WithTimeout(ctx, dht.timeoutPerOp) defer cancel() - waitAllCh := make(chan struct{}, len(peers)) - numSuccessfulToWaitFor := int(float64(len(peers)) * dht.waitFrac) - waitSuccessCh := make(chan struct{}, numSuccessfulToWaitFor) for _, p := range peers { go func(p peer.ID) { - fnCtx, fnCancel := context.WithTimeout(putctx, dht.timeoutPerOp) - defer fnCancel() - err := fn(fnCtx, p) - if err != nil { - logger.Debug(err) - } else { - waitSuccessCh <- struct{}{} - } - waitAllCh <- struct{}{} + errCh <- fn(putctx, p) }(p) } - numSuccess, numDone := 0, 0 - t := time.NewTimer(time.Hour) - for numDone != len(peers) { + var numDone, numSuccess, successSinceLastTick int + var ticker *time.Ticker + var tickChan <-chan time.Time + + for numDone < len(peers) { select { - case <-waitAllCh: + case err := <-errCh: numDone++ - case <-waitSuccessCh: - if numSuccess >= numSuccessfulToWaitFor { - t.Reset(time.Millisecond * 500) + if err == nil { + numSuccess++ + if numSuccess >= numSuccessfulToWaitFor && ticker == nil { + // Once there are enough successes, wait a little longer + ticker = time.NewTicker(time.Millisecond * 500) + defer ticker.Stop() + tickChan = ticker.C + successSinceLastTick = numSuccess + } + // This is equivalent to numSuccess * 2 + numFailures >= len(peers) and is a heuristic that seems to be + // performing reasonably. + // TODO: Make this metric more configurable + // TODO: Have better heuristics in this function whether determined from observing static network + // properties or dynamically calculating them + if numSuccess+numDone >= len(peers) { + cancel() + if sloppyExit { + return numSuccess + } + } + } + case <-tickChan: + if numSuccess > successSinceLastTick { + // If there were additional successes, then wait another tick + successSinceLastTick = numSuccess + } else { + cancel() + if sloppyExit { + return numSuccess + } } - numSuccess++ - numDone++ - case <-t.C: - cancel() } } return numSuccess @@ -902,7 +928,7 @@ func (dht *FullRT) ProvideMany(ctx context.Context, keys []multihash.Multihash) pmes.ProviderPeers = pbPeers return dht.messageSender.SendMessage(ctx, p, pmes) - }, peers) + }, peers, true) if successes == 0 { return fmt.Errorf("no successful provides") } @@ -945,7 +971,7 @@ func (dht *FullRT) PutMany(ctx context.Context, keys []string, values [][]byte) successes := dht.execOnMany(ctx, func(ctx context.Context, p peer.ID) error { keyStr := string(k) return dht.protoMessenger.PutValue(ctx, p, record.MakePutRecord(keyStr, keyRecMap[keyStr])) - }, peers) + }, peers, true) if successes == 0 { return fmt.Errorf("no successful puts") } @@ -1170,7 +1196,7 @@ func (dht *FullRT) findProvidersAsyncRoutine(ctx context.Context, key multihash. return nil } - dht.execOnMany(queryctx, fn, peers) + dht.execOnMany(queryctx, fn, peers, false) } // FindPeer searches for a peer with given ID. @@ -1255,7 +1281,7 @@ func (dht *FullRT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo, e return nil } - dht.execOnMany(queryctx, fn, peers) + dht.execOnMany(queryctx, fn, peers, false) close(addrsCh) wg.Wait()