Skip to content

Commit

Permalink
core/rawdb, cmd, ethdb, eth: implement freezer tail deletion
Browse files Browse the repository at this point in the history
  • Loading branch information
rjl493456442 committed Jan 10, 2022
1 parent 127ce93 commit 63ca13f
Show file tree
Hide file tree
Showing 14 changed files with 1,109 additions and 150 deletions.
6 changes: 3 additions & 3 deletions core/blockchain.go
Expand Up @@ -592,7 +592,7 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, root common.Hash, repair bo
if num+1 <= frozen {
// Truncate all relative data(header, total difficulty, body, receipt
// and canonical hash) from ancient store.
if err := bc.db.TruncateAncients(num); err != nil {
if err := bc.db.TruncateHead(num); err != nil {
log.Crit("Failed to truncate ancient data", "number", num, "err", err)
}
// Remove the hash <-> number mapping from the active store.
Expand Down Expand Up @@ -994,7 +994,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
// The tx index data could not be written.
// Roll back the ancient store update.
fastBlock := bc.CurrentFastBlock().NumberU64()
if err := bc.db.TruncateAncients(fastBlock + 1); err != nil {
if err := bc.db.TruncateHead(fastBlock + 1); err != nil {
log.Error("Can't truncate ancient store after failed insert", "err", err)
}
return 0, err
Expand All @@ -1010,7 +1010,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
if !updateHead(blockChain[len(blockChain)-1]) {
// We end up here if the header chain has reorg'ed, and the blocks/receipts
// don't match the canonical chain.
if err := bc.db.TruncateAncients(previousFastBlock + 1); err != nil {
if err := bc.db.TruncateHead(previousFastBlock + 1); err != nil {
log.Error("Can't truncate ancient store after failed insert", "err", err)
}
return 0, errSideChainReceipts
Expand Down
6 changes: 3 additions & 3 deletions core/rawdb/accessors_chain.go
Expand Up @@ -83,8 +83,8 @@ type NumberHash struct {
Hash common.Hash
}

// ReadAllHashes retrieves all the hashes assigned to blocks at a certain heights,
// both canonical and reorged forks included.
// ReadAllHashesInRange retrieves all the hashes assigned to blocks at a certain
// heights, both canonical and reorged forks included.
// This method considers both limits to be _inclusive_.
func ReadAllHashesInRange(db ethdb.Iteratee, first, last uint64) []*NumberHash {
var (
Expand Down Expand Up @@ -776,7 +776,7 @@ func WriteBlock(db ethdb.KeyValueWriter, block *types.Block) {
WriteHeader(db, block.Header())
}

// WriteAncientBlock writes entire block data into ancient store and returns the total written size.
// WriteAncientBlocks writes entire block data into ancient store and returns the total written size.
func WriteAncientBlocks(db ethdb.AncientWriter, blocks []*types.Block, receipts []types.Receipts, td *big.Int) (int64, error) {
var (
tdSum = new(big.Int).Set(td)
Expand Down
16 changes: 13 additions & 3 deletions core/rawdb/database.go
Expand Up @@ -99,6 +99,11 @@ func (db *nofreezedb) Ancients() (uint64, error) {
return 0, errNotSupported
}

// Tail returns an error as we don't have a backing chain freezer.
func (db *nofreezedb) Tail() (uint64, error) {
return 0, errNotSupported
}

// AncientSize returns an error as we don't have a backing chain freezer.
func (db *nofreezedb) AncientSize(kind string) (uint64, error) {
return 0, errNotSupported
Expand All @@ -109,8 +114,13 @@ func (db *nofreezedb) ModifyAncients(func(ethdb.AncientWriteOp) error) (int64, e
return 0, errNotSupported
}

// TruncateAncients returns an error as we don't have a backing chain freezer.
func (db *nofreezedb) TruncateAncients(items uint64) error {
// TruncateHead returns an error as we don't have a backing chain freezer.
func (db *nofreezedb) TruncateHead(items uint64) error {
return errNotSupported
}

// TruncateTail returns an error as we don't have a backing chain freezer.
func (db *nofreezedb) TruncateTail(items uint64) error {
return errNotSupported
}

Expand Down Expand Up @@ -211,7 +221,7 @@ func NewDatabaseWithFreezer(db ethdb.KeyValueStore, freezer string, namespace st
// Block #1 is still in the database, we're allowed to init a new feezer
}
// Otherwise, the head header is still the genesis, we're allowed to init a new
// feezer.
// freezer.
}
}
// Freezer is consistent with the key-value database, permit combining the two
Expand Down
72 changes: 59 additions & 13 deletions core/rawdb/freezer.go
Expand Up @@ -59,14 +59,14 @@ const (
freezerRecheckInterval = time.Minute

// freezerBatchLimit is the maximum number of blocks to freeze in one batch
// before doing an fsync and deleting it from the key-value store.
// before doing a fsync and deleting it from the key-value store.
freezerBatchLimit = 30000

// freezerTableSize defines the maximum size of freezer data files.
freezerTableSize = 2 * 1000 * 1000 * 1000
)

// freezer is an memory mapped append-only database to store immutable chain data
// freezer is a memory mapped append-only database to store immutable chain data
// into flat files:
//
// - The append only nature ensures that disk writes are minimized.
Expand All @@ -78,6 +78,7 @@ type freezer struct {
// 64-bit aligned fields can be atomic. The struct is guaranteed to be so aligned,
// so take advantage of that (https://golang.org/pkg/sync/atomic/#pkg-note-BUG).
frozen uint64 // Number of blocks already frozen
tail uint64 // Number of the first stored item in the freezer
threshold uint64 // Number of recent blocks not to freeze (params.FullImmutabilityThreshold apart from tests)

// This lock synchronizes writers and the truncate operation, as well as
Expand Down Expand Up @@ -219,6 +220,11 @@ func (f *freezer) Ancients() (uint64, error) {
return atomic.LoadUint64(&f.frozen), nil
}

// Tail returns the number of first stored item in the freezer.
func (f *freezer) Tail() (uint64, error) {
return atomic.LoadUint64(&f.tail), nil
}

// AncientSize returns the ancient size of the specified category.
func (f *freezer) AncientSize(kind string) (uint64, error) {
// This needs the write lock to avoid data races on table fields.
Expand Down Expand Up @@ -254,7 +260,7 @@ func (f *freezer) ModifyAncients(fn func(ethdb.AncientWriteOp) error) (writeSize
if err != nil {
// The write operation has failed. Go back to the previous item position.
for name, table := range f.tables {
err := table.truncate(prevItem)
err := table.truncateHead(prevItem)
if err != nil {
log.Error("Freezer table roll-back failed", "table", name, "index", prevItem, "err", err)
}
Expand All @@ -274,8 +280,8 @@ func (f *freezer) ModifyAncients(fn func(ethdb.AncientWriteOp) error) (writeSize
return writeSize, nil
}

// TruncateAncients discards any recent data above the provided threshold number.
func (f *freezer) TruncateAncients(items uint64) error {
// TruncateHead discards any recent data above the provided threshold number.
func (f *freezer) TruncateHead(items uint64) error {
if f.readonly {
return errReadOnly
}
Expand All @@ -285,12 +291,42 @@ func (f *freezer) TruncateAncients(items uint64) error {
if atomic.LoadUint64(&f.frozen) <= items {
return nil
}
var frozen uint64
for _, table := range f.tables {
if err := table.truncateHead(items); err != nil {
return err
}
// Tables should be aligned, only check the first table.
if frozen == 0 {
frozen = atomic.LoadUint64(&table.items)
}
}
atomic.StoreUint64(&f.frozen, frozen)
return nil
}

// TruncateTail discards any recent data below the provided threshold number.
func (f *freezer) TruncateTail(tail uint64) error {
if f.readonly {
return errReadOnly
}
f.writeLock.Lock()
defer f.writeLock.Unlock()

if atomic.LoadUint64(&f.tail) >= tail {
return nil
}
var truncated uint64
for _, table := range f.tables {
if err := table.truncate(items); err != nil {
if err := table.truncateTail(tail); err != nil {
return err
}
if truncated == 0 {
// Tables should be aligned, only check the first table.
truncated = table.tail()
}
}
atomic.StoreUint64(&f.frozen, items)
atomic.StoreUint64(&f.tail, truncated)
return nil
}

Expand All @@ -308,21 +344,31 @@ func (f *freezer) Sync() error {
return nil
}

// repair truncates all data tables to the same length.
// repair truncates all data tables to the same length, both tail and head.
func (f *freezer) repair() error {
min := uint64(math.MaxUint64)
var (
head = uint64(math.MaxUint64)
tail = uint64(0)
)
for _, table := range f.tables {
items := atomic.LoadUint64(&table.items)
if min > items {
min = items
if head > items {
head = items
}
if table.tail() > tail {
tail = table.tail()
}
}
for _, table := range f.tables {
if err := table.truncate(min); err != nil {
if err := table.truncateHead(head); err != nil {
return err
}
if err := table.truncateTail(tail); err != nil {
return err
}
}
atomic.StoreUint64(&f.frozen, min)
atomic.StoreUint64(&f.frozen, head)
atomic.StoreUint64(&f.tail, tail)
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion core/rawdb/freezer_batch.go
Expand Up @@ -191,7 +191,7 @@ func (batch *freezerTableBatch) commit() error {
dataSize := int64(len(batch.dataBuffer))
batch.dataBuffer = batch.dataBuffer[:0]

// Write index.
// Write indices.
_, err = batch.t.index.Write(batch.indexBuffer)
if err != nil {
return err
Expand Down

0 comments on commit 63ca13f

Please sign in to comment.