Skip to content

Commit

Permalink
[R4R]implement diff sync (ethereum#376)
Browse files Browse the repository at this point in the history
* implement block process part of light sync

* add difflayer protocol

* handle difflayer and refine light processor

* add testcase for diff protocol

* make it faster

* allow validator to light sync

* change into diff sync

* ligth sync: download difflayer (#2)

* ligth sync: download difflayer

Signed-off-by: kyrie-yl <lei.y@binance.com>

* download diff layer: fix according to the comments

Signed-off-by: kyrie-yl <lei.y@binance.com>

* download diff layer: update

Signed-off-by: kyrie-yl <lei.y@binance.com>

* download diff layer: fix accroding comments

Signed-off-by: kyrie-yl <lei.y@binance.com>

Co-authored-by: kyrie-yl <lei.y@binance.com>

* update light sync to diff sync

* raise the max diff limit

* add switcher of snap protocol

* fix test case

* make commit concurrently

* remove peer for diff cache when peer closed

* consensus tuning

* add test code

* remove extra message

* fix testcase and lint

make diff block configable

wait code write

fix testcase

resolve comments

resolve comment

* resolve comments

* resolve comments

* resolve comment

* fix mistake

Co-authored-by: kyrie-yl <83150977+kyrie-yl@users.noreply.github.com>
Co-authored-by: kyrie-yl <lei.y@binance.com>
  • Loading branch information
3 people committed Sep 28, 2021
1 parent bca9678 commit 1ded097
Show file tree
Hide file tree
Showing 71 changed files with 3,352 additions and 394 deletions.
4 changes: 2 additions & 2 deletions cmd/evm/internal/t8ntool/execution.go
Expand Up @@ -223,7 +223,7 @@ func (pre *Prestate) Apply(vmConfig vm.Config, chainConfig *params.ChainConfig,
statedb.AddBalance(pre.Env.Coinbase, minerReward)
}
// Commit block
root, err := statedb.Commit(chainConfig.IsEIP158(vmContext.BlockNumber))
root, _, err := statedb.Commit(chainConfig.IsEIP158(vmContext.BlockNumber))
if err != nil {
fmt.Fprintf(os.Stderr, "Could not commit state: %v", err)
return nil, nil, NewError(ErrorEVM, fmt.Errorf("could not commit state: %v", err))
Expand Down Expand Up @@ -252,7 +252,7 @@ func MakePreState(db ethdb.Database, accounts core.GenesisAlloc) *state.StateDB
}
}
// Commit and re-open to start with a clean state.
root, _ := statedb.Commit(false)
root, _, _ := statedb.Commit(false)
statedb, _ = state.New(root, sdb, nil)
return statedb
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/faucet/faucet.go
Expand Up @@ -139,7 +139,7 @@ func main() {
log.Crit("Length of bep2eContracts, bep2eSymbols, bep2eAmounts mismatch")
}

bep2eInfos := make(map[string]bep2eInfo, 0)
bep2eInfos := make(map[string]bep2eInfo, len(symbols))
for idx, s := range symbols {
n, ok := big.NewInt(0).SetString(bep2eNumAmounts[idx], 10)
if !ok {
Expand Down
4 changes: 4 additions & 0 deletions cmd/geth/main.go
Expand Up @@ -65,6 +65,8 @@ var (
utils.ExternalSignerFlag,
utils.NoUSBFlag,
utils.DirectBroadcastFlag,
utils.DisableSnapProtocolFlag,
utils.DiffSyncFlag,
utils.RangeLimitFlag,
utils.USBFlag,
utils.SmartCardDaemonPathFlag,
Expand Down Expand Up @@ -114,6 +116,8 @@ var (
utils.CacheGCFlag,
utils.CacheSnapshotFlag,
utils.CachePreimagesFlag,
utils.PersistDiffFlag,
utils.DiffBlockFlag,
utils.ListenPortFlag,
utils.MaxPeersFlag,
utils.MaxPendingPeersFlag,
Expand Down
1 change: 1 addition & 0 deletions cmd/geth/usage.go
Expand Up @@ -40,6 +40,7 @@ var AppHelpFlagGroups = []flags.FlagGroup{
utils.KeyStoreDirFlag,
utils.NoUSBFlag,
utils.DirectBroadcastFlag,
utils.DisableSnapProtocolFlag,
utils.RangeLimitFlag,
utils.SmartCardDaemonPathFlag,
utils.NetworkIdFlag,
Expand Down
41 changes: 40 additions & 1 deletion cmd/utils/flags.go
Expand Up @@ -117,6 +117,15 @@ var (
Name: "directbroadcast",
Usage: "Enable directly broadcast mined block to all peers",
}
DisableSnapProtocolFlag = cli.BoolFlag{
Name: "disablesnapprotocol",
Usage: "Disable snap protocol",
}
DiffSyncFlag = cli.BoolFlag{
Name: "diffsync",
Usage: "Enable diffy sync, Please note that enable diffsync will improve the syncing speed, " +
"but will degrade the security to light client level",
}
RangeLimitFlag = cli.BoolFlag{
Name: "rangelimit",
Usage: "Enable 5000 blocks limit for range query",
Expand All @@ -125,6 +134,10 @@ var (
Name: "datadir.ancient",
Usage: "Data directory for ancient chain segments (default = inside chaindata)",
}
DiffFlag = DirectoryFlag{
Name: "datadir.diff",
Usage: "Data directory for difflayer segments (default = inside chaindata)",
}
MinFreeDiskSpaceFlag = DirectoryFlag{
Name: "datadir.minfreedisk",
Usage: "Minimum free disk space in MB, once reached triggers auto shut down (default = --cache.gc converted to MB, 0 = disabled)",
Expand Down Expand Up @@ -425,6 +438,15 @@ var (
Name: "cache.preimages",
Usage: "Enable recording the SHA3/keccak preimages of trie keys",
}
PersistDiffFlag = cli.BoolFlag{
Name: "persistdiff",
Usage: "Enable persistence of the diff layer",
}
DiffBlockFlag = cli.Uint64Flag{
Name: "diffblock",
Usage: "The number of blocks should be persisted in db (default = 864000 )",
Value: uint64(864000),
}
// Miner settings
MiningEnabledFlag = cli.BoolFlag{
Name: "mine",
Expand Down Expand Up @@ -1271,6 +1293,9 @@ func SetNodeConfig(ctx *cli.Context, cfg *node.Config) {
if ctx.GlobalIsSet(DirectBroadcastFlag.Name) {
cfg.DirectBroadcast = ctx.GlobalBool(DirectBroadcastFlag.Name)
}
if ctx.GlobalIsSet(DisableSnapProtocolFlag.Name) {
cfg.DisableSnapProtocol = ctx.GlobalBool(DisableSnapProtocolFlag.Name)
}
if ctx.GlobalIsSet(RangeLimitFlag.Name) {
cfg.RangeLimit = ctx.GlobalBool(RangeLimitFlag.Name)
}
Expand Down Expand Up @@ -1564,7 +1589,15 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
if ctx.GlobalIsSet(AncientFlag.Name) {
cfg.DatabaseFreezer = ctx.GlobalString(AncientFlag.Name)
}

if ctx.GlobalIsSet(DiffFlag.Name) {
cfg.DatabaseDiff = ctx.GlobalString(DiffFlag.Name)
}
if ctx.GlobalIsSet(PersistDiffFlag.Name) {
cfg.PersistDiff = ctx.GlobalBool(PersistDiffFlag.Name)
}
if ctx.GlobalIsSet(DiffBlockFlag.Name) {
cfg.DiffBlock = ctx.GlobalUint64(DiffBlockFlag.Name)
}
if gcmode := ctx.GlobalString(GCModeFlag.Name); gcmode != "full" && gcmode != "archive" {
Fatalf("--%s must be either 'full' or 'archive'", GCModeFlag.Name)
}
Expand All @@ -1574,6 +1607,12 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
if ctx.GlobalIsSet(DirectBroadcastFlag.Name) {
cfg.DirectBroadcast = ctx.GlobalBool(DirectBroadcastFlag.Name)
}
if ctx.GlobalIsSet(DisableSnapProtocolFlag.Name) {
cfg.DisableSnapProtocol = ctx.GlobalBool(DisableSnapProtocolFlag.Name)
}
if ctx.GlobalIsSet(DiffSyncFlag.Name) {
cfg.DiffSync = ctx.GlobalBool(DiffSyncFlag.Name)
}
if ctx.GlobalIsSet(RangeLimitFlag.Name) {
cfg.RangeLimit = ctx.GlobalBool(RangeLimitFlag.Name)
}
Expand Down
14 changes: 13 additions & 1 deletion common/gopool/pool.go
@@ -1,14 +1,16 @@
package gopool

import (
"runtime"
"time"

"github.com/panjf2000/ants/v2"
)

var (
// Init a instance pool when importing ants.
defaultPool, _ = ants.NewPool(ants.DefaultAntsPoolSize, ants.WithExpiryDuration(10*time.Second))
defaultPool, _ = ants.NewPool(ants.DefaultAntsPoolSize, ants.WithExpiryDuration(10*time.Second))
minNumberPerTask = 5
)

// Logger is used for logging formatted messages.
Expand Down Expand Up @@ -46,3 +48,13 @@ func Release() {
func Reboot() {
defaultPool.Reboot()
}

func Threads(tasks int) int {
threads := tasks / minNumberPerTask
if threads > runtime.NumCPU() {
threads = runtime.NumCPU()
} else if threads == 0 {
threads = 1
}
return threads
}
1 change: 1 addition & 0 deletions consensus/consensus.go
Expand Up @@ -141,4 +141,5 @@ type PoSA interface {
IsSystemContract(to *common.Address) bool
EnoughDistance(chain ChainReader, header *types.Header) bool
IsLocalBlock(header *types.Header) bool
AllowLightProcess(chain ChainReader, currentHeader *types.Header) bool
}
16 changes: 16 additions & 0 deletions consensus/parlia/parlia.go
Expand Up @@ -799,6 +799,11 @@ func (p *Parlia) Delay(chain consensus.ChainReader, header *types.Header) *time.
return nil
}
delay := p.delayForRamanujanFork(snap, header)
// The blocking time should be no more than half of period
half := time.Duration(p.config.Period) * time.Second / 2
if delay > half {
delay = half
}
return &delay
}

Expand Down Expand Up @@ -882,6 +887,17 @@ func (p *Parlia) EnoughDistance(chain consensus.ChainReader, header *types.Heade
return snap.enoughDistance(p.val, header)
}

func (p *Parlia) AllowLightProcess(chain consensus.ChainReader, currentHeader *types.Header) bool {
snap, err := p.snapshot(chain, currentHeader.Number.Uint64()-1, currentHeader.ParentHash, nil)
if err != nil {
return true
}

idx := snap.indexOfVal(p.val)
// validator is not allowed to diff sync
return idx < 0
}

func (p *Parlia) IsLocalBlock(header *types.Header) bool {
return p.val == header.Coinbase
}
Expand Down
2 changes: 1 addition & 1 deletion consensus/parlia/ramanujanfork.go
Expand Up @@ -21,7 +21,7 @@ func (p *Parlia) delayForRamanujanFork(snap *Snapshot, header *types.Header) tim
if header.Difficulty.Cmp(diffNoTurn) == 0 {
// It's not our turn explicitly to sign, delay it a bit
wiggle := time.Duration(len(snap.Validators)/2+1) * wiggleTimeBeforeFork
delay += time.Duration(fixedBackOffTimeBeforeFork) + time.Duration(rand.Int63n(int64(wiggle)))
delay += fixedBackOffTimeBeforeFork + time.Duration(rand.Int63n(int64(wiggle)))
}
return delay
}
Expand Down
2 changes: 1 addition & 1 deletion consensus/parlia/snapshot.go
Expand Up @@ -256,7 +256,7 @@ func (s *Snapshot) enoughDistance(validator common.Address, header *types.Header
if validator == header.Coinbase {
return false
}
offset := (int64(s.Number) + 1) % int64(validatorNum)
offset := (int64(s.Number) + 1) % validatorNum
if int64(idx) >= offset {
return int64(idx)-offset >= validatorNum-2
} else {
Expand Down
3 changes: 0 additions & 3 deletions core/block_validator.go
Expand Up @@ -17,9 +17,7 @@
package core

import (
"encoding/json"
"fmt"
"os"

"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/core/state"
Expand Down Expand Up @@ -133,7 +131,6 @@ func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateD
},
func() error {
if root := statedb.IntermediateRoot(v.config.IsEIP158(header.Number)); header.Root != root {
statedb.IterativeDump(true, true, true, json.NewEncoder(os.Stdout))
return fmt.Errorf("invalid merkle root (remote: %x local: %x)", header.Root, root)
} else {
return nil
Expand Down

0 comments on commit 1ded097

Please sign in to comment.