Skip to content

Commit

Permalink
update implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
Brindrajsinh-Chauhan committed May 6, 2024
1 parent 9ff0214 commit 1dcf9e5
Show file tree
Hide file tree
Showing 16 changed files with 222 additions and 162 deletions.
40 changes: 21 additions & 19 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ type CacheConfig struct {
StateHistory uint64 // Number of blocks from head whose state histories are reserved.
StateScheme string // Scheme used to store ethereum states and merkle tree nodes on top

EnableDiskRootInterval bool // Enable update disk root when time threshold is reached. Disabled by default
DiskRootThreshold time.Duration // Time threshold after which to flush the layers to disk
AllowForceUpdate bool // Enable update disk root when commit threshold is reached. Disabled by default
CommitThreshold int // Number of commits threshold after which to flush the layers to disk

SnapshotNoBuild bool // Whether the background generation is allowed
SnapshotWait bool // Wait for snapshot construction on startup. TODO(karalabe): This is a dirty hack for testing, nuke it
Expand All @@ -160,9 +160,11 @@ func (c *CacheConfig) triedbConfig(isVerkle bool) *triedb.Config {
}
if c.StateScheme == rawdb.PathScheme {
config.PathDB = &pathdb.Config{
StateHistory: c.StateHistory,
CleanCacheSize: c.TrieCleanLimit * 1024 * 1024,
DirtyCacheSize: c.TrieDirtyLimit * 1024 * 1024,
StateHistory: c.StateHistory,
CleanCacheSize: c.TrieCleanLimit * 1024 * 1024,
DirtyCacheSize: c.TrieDirtyLimit * 1024 * 1024,
AllowForceUpdate: c.AllowForceUpdate,
CommitThreshold: c.CommitThreshold,
}
}
return config
Expand All @@ -171,14 +173,14 @@ func (c *CacheConfig) triedbConfig(isVerkle bool) *triedb.Config {
// defaultCacheConfig are the default caching values if none are specified by the
// user (also used during testing).
var defaultCacheConfig = &CacheConfig{
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieTimeLimit: 5 * time.Minute,
SnapshotLimit: 256,
SnapshotWait: true,
StateScheme: rawdb.HashScheme,
EnableDiskRootInterval: false,
DiskRootThreshold: 60 * time.Minute,
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieTimeLimit: 5 * time.Minute,
SnapshotLimit: 256,
SnapshotWait: true,
StateScheme: rawdb.HashScheme,
AllowForceUpdate: false,
CommitThreshold: 128,
}

// DefaultCacheConfigWithScheme returns a deep copied default cache config with
Expand Down Expand Up @@ -446,12 +448,12 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
recover = true
}
snapconfig := snapshot.Config{
CacheSize: bc.cacheConfig.SnapshotLimit,
Recovery: recover,
NoBuild: bc.cacheConfig.SnapshotNoBuild,
AsyncBuild: !bc.cacheConfig.SnapshotWait,
EnableDiskRootInterval: bc.cacheConfig.EnableDiskRootInterval,
DiskRootThreshold: bc.cacheConfig.DiskRootThreshold,
CacheSize: bc.cacheConfig.SnapshotLimit,
Recovery: recover,
NoBuild: bc.cacheConfig.SnapshotNoBuild,
AsyncBuild: !bc.cacheConfig.SnapshotWait,
AllowForceUpdate: bc.cacheConfig.AllowForceUpdate,
CommitThreshold: bc.cacheConfig.CommitThreshold,
}
bc.snaps, _ = snapshot.New(snapconfig, bc.db, bc.triedb, head.Root)
}
Expand Down
4 changes: 2 additions & 2 deletions core/blockchain_repair_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1820,7 +1820,7 @@ func testRepairWithScheme(t *testing.T, tt *rewindTest, snapshots bool, scheme s
t.Fatalf("Failed to flush trie state: %v", err)
}
if snapshots {
if err := chain.snaps.Cap(canonblocks[tt.commitBlock-1].Root(), 0); err != nil {
if err := chain.snaps.Cap(canonblocks[tt.commitBlock-1].Root(), 0, false); err != nil {
t.Fatalf("Failed to flatten snapshots: %v", err)
}
}
Expand Down Expand Up @@ -1950,7 +1950,7 @@ func testIssue23496(t *testing.T, scheme string) {
if _, err := chain.InsertChain(blocks[1:2]); err != nil {
t.Fatalf("Failed to import canonical chain start: %v", err)
}
if err := chain.snaps.Cap(blocks[1].Root(), 0); err != nil {
if err := chain.snaps.Cap(blocks[1].Root(), 0, false); err != nil {
t.Fatalf("Failed to flatten snapshots: %v", err)
}

Expand Down
2 changes: 1 addition & 1 deletion core/blockchain_sethead_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2023,7 +2023,7 @@ func testSetHeadWithScheme(t *testing.T, tt *rewindTest, snapshots bool, scheme
if tt.commitBlock > 0 {
chain.triedb.Commit(canonblocks[tt.commitBlock-1].Root(), false)
if snapshots {
if err := chain.snaps.Cap(canonblocks[tt.commitBlock-1].Root(), 0); err != nil {
if err := chain.snaps.Cap(canonblocks[tt.commitBlock-1].Root(), 0, false); err != nil {
t.Fatalf("Failed to flatten snapshots: %v", err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/blockchain_snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (basic *snapshotTestBasic) prepare(t *testing.T) (*BlockChain, []*types.Blo
// Flushing the entire snap tree into the disk, the
// relevant (a) snapshot root and (b) snapshot generator
// will be persisted atomically.
chain.snaps.Cap(blocks[point-1].Root(), 0)
chain.snaps.Cap(blocks[point-1].Root(), 0, false)
diskRoot, blockRoot := chain.snaps.DiskRoot(), blocks[point-1].Root()
if !bytes.Equal(diskRoot.Bytes(), blockRoot.Bytes()) {
t.Fatalf("Failed to flush disk layer change, want %x, got %x", blockRoot, diskRoot)
Expand Down
2 changes: 1 addition & 1 deletion core/state/pruner/pruner.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func prune(snaptree *snapshot.Tree, root common.Hash, maindb ethdb.Database, sta
// Pruning is done, now drop the "useless" layers from the snapshot.
// Firstly, flushing the target layer into the disk. After that all
// diff layers below the target will all be merged into the disk.
if err := snaptree.Cap(root, 0); err != nil {
if err := snaptree.Cap(root, 0, false); err != nil {
return err
}
// Secondly, flushing the snapshot journal into the disk. All diff
Expand Down
6 changes: 4 additions & 2 deletions core/state/snapshot/difflayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,10 @@ var (
bloomAccountHasherOffset = 0
bloomStorageHasherOffset = 0

// Setting a minimum to prevent very low input from user
minTimeThreshold = 1 * time.Minute
// Count for number of commits before force disk update
// after the first 128 layers, the 129 layers would be committed
// to disk.
defaultCommitThreshold = 128
)

func init() {
Expand Down
8 changes: 4 additions & 4 deletions core/state/snapshot/disklayer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func TestDiskMerge(t *testing.T) {
}); err != nil {
t.Fatalf("failed to update snapshot tree: %v", err)
}
if err := snaps.Cap(diffRoot, 0); err != nil {
if err := snaps.Cap(diffRoot, 0, false); err != nil {
t.Fatalf("failed to flatten snapshot tree: %v", err)
}
// Retrieve all the data through the disk layer and validate it
Expand Down Expand Up @@ -356,7 +356,7 @@ func TestDiskPartialMerge(t *testing.T) {
}); err != nil {
t.Fatalf("test %d: failed to update snapshot tree: %v", i, err)
}
if err := snaps.Cap(diffRoot, 0); err != nil {
if err := snaps.Cap(diffRoot, 0, false); err != nil {
t.Fatalf("test %d: failed to flatten snapshot tree: %v", i, err)
}
// Retrieve all the data through the disk layer and validate it
Expand Down Expand Up @@ -467,7 +467,7 @@ func TestDiskGeneratorPersistence(t *testing.T) {
}, nil); err != nil {
t.Fatalf("failed to update snapshot tree: %v", err)
}
if err := snaps.Cap(diffRoot, 0); err != nil {
if err := snaps.Cap(diffRoot, 0, false); err != nil {
t.Fatalf("failed to flatten snapshot tree: %v", err)
}
blob := rawdb.ReadSnapshotGenerator(db)
Expand All @@ -489,7 +489,7 @@ func TestDiskGeneratorPersistence(t *testing.T) {
}
diskLayer := snaps.layers[snaps.diskRoot()].(*diskLayer)
diskLayer.genMarker = nil // Construction finished
if err := snaps.Cap(diffTwoRoot, 0); err != nil {
if err := snaps.Cap(diffTwoRoot, 0, false); err != nil {
t.Fatalf("failed to flatten snapshot tree: %v", err)
}
blob = rawdb.ReadSnapshotGenerator(db)
Expand Down
12 changes: 6 additions & 6 deletions core/state/snapshot/iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func TestAccountIteratorTraversal(t *testing.T) {
aggregatorMemoryLimit = limit
}()
aggregatorMemoryLimit = 0 // Force pushing the bottom-most layer into disk
snaps.Cap(common.HexToHash("0x04"), 2)
snaps.Cap(common.HexToHash("0x04"), 2, false)
verifyIterator(t, 7, head.(*diffLayer).newBinaryAccountIterator(), verifyAccount)

it, _ = snaps.AccountIterator(common.HexToHash("0x04"), common.Hash{})
Expand Down Expand Up @@ -296,7 +296,7 @@ func TestStorageIteratorTraversal(t *testing.T) {
aggregatorMemoryLimit = limit
}()
aggregatorMemoryLimit = 0 // Force pushing the bottom-most layer into disk
snaps.Cap(common.HexToHash("0x04"), 2)
snaps.Cap(common.HexToHash("0x04"), 2, false)
verifyIterator(t, 6, head.(*diffLayer).newBinaryStorageIterator(common.HexToHash("0xaa")), verifyStorage)

it, _ = snaps.StorageIterator(common.HexToHash("0x04"), common.HexToHash("0xaa"), common.Hash{})
Expand Down Expand Up @@ -384,7 +384,7 @@ func TestAccountIteratorTraversalValues(t *testing.T) {
aggregatorMemoryLimit = limit
}()
aggregatorMemoryLimit = 0 // Force pushing the bottom-most layer into disk
snaps.Cap(common.HexToHash("0x09"), 2)
snaps.Cap(common.HexToHash("0x09"), 2, false)

it, _ = snaps.AccountIterator(common.HexToHash("0x09"), common.Hash{})
for it.Next() {
Expand Down Expand Up @@ -483,7 +483,7 @@ func TestStorageIteratorTraversalValues(t *testing.T) {
aggregatorMemoryLimit = limit
}()
aggregatorMemoryLimit = 0 // Force pushing the bottom-most layer into disk
snaps.Cap(common.HexToHash("0x09"), 2)
snaps.Cap(common.HexToHash("0x09"), 2, false)

it, _ = snaps.StorageIterator(common.HexToHash("0x09"), common.HexToHash("0xaa"), common.Hash{})
for it.Next() {
Expand Down Expand Up @@ -541,7 +541,7 @@ func TestAccountIteratorLargeTraversal(t *testing.T) {
aggregatorMemoryLimit = limit
}()
aggregatorMemoryLimit = 0 // Force pushing the bottom-most layer into disk
snaps.Cap(common.HexToHash("0x80"), 2)
snaps.Cap(common.HexToHash("0x80"), 2, false)

verifyIterator(t, 200, head.(*diffLayer).newBinaryAccountIterator(), verifyAccount)

Expand Down Expand Up @@ -580,7 +580,7 @@ func TestAccountIteratorFlattening(t *testing.T) {
it, _ := snaps.AccountIterator(common.HexToHash("0x04"), common.Hash{})
defer it.Release()

if err := snaps.Cap(common.HexToHash("0x04"), 1); err != nil {
if err := snaps.Cap(common.HexToHash("0x04"), 1, false); err != nil {
t.Fatalf("failed to flatten snapshot stack: %v", err)
}
//verifyIterator(t, 7, it)
Expand Down
80 changes: 47 additions & 33 deletions core/state/snapshot/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"errors"
"fmt"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
Expand Down Expand Up @@ -151,12 +150,23 @@ type snapshot interface {

// Config includes the configurations for snapshots.
type Config struct {
CacheSize int // Megabytes permitted to use for read caches
Recovery bool // Indicator that the snapshots is in the recovery mode
NoBuild bool // Indicator that the snapshots generation is disallowed
AsyncBuild bool // The snapshot generation is allowed to be constructed asynchronously
EnableDiskRootInterval bool // The disk root is allowed to update after a time threshold
DiskRootThreshold time.Duration // The threshold to update disk root
CacheSize int // Megabytes permitted to use for read caches
Recovery bool // Indicator that the snapshots is in the recovery mode
NoBuild bool // Indicator that the snapshots generation is disallowed
AsyncBuild bool // The snapshot generation is allowed to be constructed asynchronously
AllowForceUpdate bool // Enable forcing snap root generation on a commit count
CommitThreshold int // Number of commit after which to attempt snap root update
}

// sanitize checks the provided user configurations and changes anything that's
// unreasonable or unworkable.
func (c *Config) sanitize() Config {
conf := *c

if conf.CommitThreshold == 0 {
conf.CommitThreshold = defaultCommitThreshold
}
return conf
}

// Tree is an Ethereum state snapshot tree. It consists of one persistent base
Expand All @@ -169,12 +179,12 @@ type Config struct {
// storage data to avoid expensive multi-level trie lookups; and to allow sorted,
// cheap iteration of the account/storage tries for sync aid.
type Tree struct {
config Config // Snapshots configurations
diskdb ethdb.KeyValueStore // Persistent database to store the snapshot
triedb *triedb.Database // In-memory cache to access the trie through
layers map[common.Hash]snapshot // Collection of all known layers
lock sync.RWMutex
baseTime time.Time // Reference to calculate the time threshold
config Config // Snapshots configurations
diskdb ethdb.KeyValueStore // Persistent database to store the snapshot
triedb *triedb.Database // In-memory cache to access the trie through
layers map[common.Hash]snapshot // Collection of all known layers
lock sync.RWMutex
commitCounter int // Counter for number of commits

// Test hooks
onFlatten func() // Hook invoked when the bottom most diff layers are flattened
Expand All @@ -199,17 +209,13 @@ type Tree struct {
func New(config Config, diskdb ethdb.KeyValueStore, triedb *triedb.Database, root common.Hash) (*Tree, error) {
// Create a new, empty snapshot tree
snap := &Tree{
config: config,
diskdb: diskdb,
triedb: triedb,
layers: make(map[common.Hash]snapshot),
baseTime: time.Now(),
config: config,
diskdb: diskdb,
triedb: triedb,
layers: make(map[common.Hash]snapshot),
}

// If user provided threshold smaller than minimum, set to minimum
if config.DiskRootThreshold < minTimeThreshold {
config.DiskRootThreshold = minTimeThreshold
}
config = config.sanitize()

// Attempt to load a previously persisted snapshot and rebuild one if failed
head, disabled, err := loadSnapshot(diskdb, triedb, root, config.CacheSize, config.Recovery, config.NoBuild)
Expand Down Expand Up @@ -393,7 +399,7 @@ func (t *Tree) Update(blockRoot common.Hash, parentRoot common.Hash, destructs m
// which may or may not overflow and cascade to disk. Since this last layer's
// survival is only known *after* capping, we need to omit it from the count if
// we want to ensure that *at least* the requested number of diff layers remain.
func (t *Tree) Cap(root common.Hash, layers int) error {
func (t *Tree) Cap(root common.Hash, layers int, force bool) error {
// Retrieve the head snapshot to cap from
snap := t.Snapshot(root)
if snap == nil {
Expand Down Expand Up @@ -427,7 +433,7 @@ func (t *Tree) Cap(root common.Hash, layers int) error {
t.layers = map[common.Hash]snapshot{base.root: base}
return nil
}
persisted := t.cap(diff, layers)
persisted := t.cap(diff, layers, force)

// Remove any layer that is stale or links into a stale layer
children := make(map[common.Hash][]common.Hash)
Expand Down Expand Up @@ -477,7 +483,7 @@ func (t *Tree) Cap(root common.Hash, layers int) error {
// which may or may not overflow and cascade to disk. Since this last layer's
// survival is only known *after* capping, we need to omit it from the count if
// we want to ensure that *at least* the requested number of diff layers remain.
func (t *Tree) cap(diff *diffLayer, layers int) *diskLayer {
func (t *Tree) cap(diff *diffLayer, layers int, force bool) *diskLayer {
// Dive until we run out of layers or reach the persistent database
for i := 0; i < layers-1; i++ {
// If we still have diff layers below, continue down
Expand Down Expand Up @@ -511,7 +517,7 @@ func (t *Tree) cap(diff *diffLayer, layers int) *diskLayer {
t.onFlatten()
}
diff.parent = flattened
if (flattened.memory < aggregatorMemoryLimit) && !t.isPastThreshold() {
if (flattened.memory < aggregatorMemoryLimit) && !force {
// Accumulator layer is smaller than the limit, so we can abort, unless
// there's a snapshot being generated currently. In that case, the trie
// will move from underneath the generator so we **must** merge all the
Expand All @@ -530,9 +536,6 @@ func (t *Tree) cap(diff *diffLayer, layers int) *diskLayer {
base := diffToDisk(bottom)
bottom.lock.RUnlock()

// Reset the time reference for next update
t.baseTime = time.Now()

t.layers[base.root] = base
diff.parent = base
return base
Expand Down Expand Up @@ -900,8 +903,19 @@ func (t *Tree) Size() (diffs common.StorageSize, buf common.StorageSize) {
return size, 0
}

// Check if time threshold is enabled and if the time elapsed is more than the threshold.
// if false, we can abort else we proceed to update the disk root
func (t *Tree) isPastThreshold() bool {
return (t.config.EnableDiskRootInterval && (time.Since(t.baseTime) > t.config.DiskRootThreshold))
// Checks the config to compare if count of commits is above threshold
func (t *Tree) CompareThreshold() bool {
if !t.config.AllowForceUpdate {
return false
}

log.Debug("Snapshot Commit counters", "counter", t.commitCounter, "threshold", t.config.CommitThreshold)
if t.commitCounter > t.config.CommitThreshold {
t.commitCounter = 0
return true
}

t.commitCounter++

return false
}

0 comments on commit 1dcf9e5

Please sign in to comment.