Skip to content

Commit

Permalink
schedule migration of largest accounts first
Browse files Browse the repository at this point in the history
  • Loading branch information
turbolent committed May 13, 2024
1 parent 15d2e63 commit 53fe9a6
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 33 deletions.
53 changes: 37 additions & 16 deletions cmd/util/ledger/migrations/account_based_migration.go
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"fmt"
"io"
"sort"
"sync"
"time"

Expand Down Expand Up @@ -220,28 +221,48 @@ func MigrateGroupConcurrently(

go func() {
defer close(jobs)
err := registersByAccount.ForEachAccount(
func(owner string, accountRegisters *registers.AccountRegisters) error {
address, err := common.BytesToAddress([]byte(owner))
if err != nil {
return err
}
job := jobMigrateAccountGroup{
Address: address,
AccountRegisters: accountRegisters,
}

select {
case <-ctx.Done():
return nil
case jobs <- job:
}
allAccountRegisters := make([]*registers.AccountRegisters, 0, accountCount)

err := registersByAccount.ForEachAccount(
func(accountRegisters *registers.AccountRegisters) error {
allAccountRegisters = append(
allAccountRegisters,
accountRegisters,
)
return nil
},
)
if err != nil {
cancel(fmt.Errorf("failed to schedule jobs for all accounts: %w", err))
cancel(fmt.Errorf("failed to get all account registers: %w", err))
}

// Schedule jobs in descending order of the number of registers
sort.Slice(allAccountRegisters, func(i, j int) bool {
a := allAccountRegisters[i]
b := allAccountRegisters[j]
return a.Count() > b.Count()
})

for _, accountRegisters := range allAccountRegisters {
owner := accountRegisters.Owner()

address, err := common.BytesToAddress([]byte(owner))
if err != nil {
cancel(fmt.Errorf("failed to convert owner to address: %w", err))
return
}

job := jobMigrateAccountGroup{
Address: address,
AccountRegisters: accountRegisters,
}

select {
case <-ctx.Done():
return
case jobs <- job:
}
}
}()

Expand Down
32 changes: 18 additions & 14 deletions cmd/util/ledger/migrations/account_size_filter_migration.go
Expand Up @@ -57,20 +57,24 @@ func NewAccountSizeFilterMigration(
}
}

return registersByAccount.ForEachAccount(func(owner string, accountRegisters *registers.AccountRegisters) error {
if _, ok := exceptions[owner]; ok {
return nil
}

info := payloadCountByAddress[owner]
if info.size <= maxAccountSize {
return nil
}

return accountRegisters.ForEach(func(owner, key string, _ []byte) error {
return accountRegisters.Set(owner, key, nil)
})
})
return registersByAccount.ForEachAccount(
func(accountRegisters *registers.AccountRegisters) error {
owner := accountRegisters.Owner()

if _, ok := exceptions[owner]; ok {
return nil
}

info := payloadCountByAddress[owner]
if info.size <= maxAccountSize {
return nil
}

return accountRegisters.ForEach(func(owner, key string, _ []byte) error {
return accountRegisters.Set(owner, key, nil)
})
},
)
}
}

Expand Down
6 changes: 3 additions & 3 deletions cmd/util/ledger/util/registers/registers.go
Expand Up @@ -92,9 +92,9 @@ func (b *ByAccount) Payloads() []*ledger.Payload {
return payloads
}

func (b *ByAccount) ForEachAccount(f func(owner string, accountRegisters *AccountRegisters) error) error {
for owner, accountRegisters := range b.registers {
err := f(owner, accountRegisters)
func (b *ByAccount) ForEachAccount(f func(accountRegisters *AccountRegisters) error) error {
for _, accountRegisters := range b.registers {
err := f(accountRegisters)
if err != nil {
return err
}
Expand Down

0 comments on commit 53fe9a6

Please sign in to comment.