Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/rawdb, cmd, ethdb, eth: implement freezer tail deletion #23954

Merged
merged 10 commits into from Mar 10, 2022
6 changes: 3 additions & 3 deletions core/blockchain.go
Expand Up @@ -592,7 +592,7 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, root common.Hash, repair bo
if num+1 <= frozen {
// Truncate all relative data(header, total difficulty, body, receipt
// and canonical hash) from ancient store.
if err := bc.db.TruncateAncients(num); err != nil {
if err := bc.db.TruncateHead(num); err != nil {
log.Crit("Failed to truncate ancient data", "number", num, "err", err)
}
// Remove the hash <-> number mapping from the active store.
Expand Down Expand Up @@ -991,7 +991,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
size += int64(batch.ValueSize())
if err = batch.Write(); err != nil {
fastBlock := bc.CurrentFastBlock().NumberU64()
if err := bc.db.TruncateAncients(fastBlock + 1); err != nil {
if err := bc.db.TruncateHead(fastBlock + 1); err != nil {
log.Error("Can't truncate ancient store after failed insert", "err", err)
}
return 0, err
Expand All @@ -1009,7 +1009,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
if !updateHead(blockChain[len(blockChain)-1]) {
// We end up here if the header chain has reorg'ed, and the blocks/receipts
// don't match the canonical chain.
if err := bc.db.TruncateAncients(previousFastBlock + 1); err != nil {
if err := bc.db.TruncateHead(previousFastBlock + 1); err != nil {
log.Error("Can't truncate ancient store after failed insert", "err", err)
}
return 0, errSideChainReceipts
Expand Down
6 changes: 3 additions & 3 deletions core/rawdb/accessors_chain.go
Expand Up @@ -83,8 +83,8 @@ type NumberHash struct {
Hash common.Hash
}

// ReadAllHashes retrieves all the hashes assigned to blocks at a certain heights,
// both canonical and reorged forks included.
// ReadAllHashesInRange retrieves all the hashes assigned to blocks at certain
// heights, both canonical and reorged forks included.
// This method considers both limits to be _inclusive_.
func ReadAllHashesInRange(db ethdb.Iteratee, first, last uint64) []*NumberHash {
var (
Expand Down Expand Up @@ -776,7 +776,7 @@ func WriteBlock(db ethdb.KeyValueWriter, block *types.Block) {
WriteHeader(db, block.Header())
}

// WriteAncientBlock writes entire block data into ancient store and returns the total written size.
// WriteAncientBlocks writes entire block data into ancient store and returns the total written size.
func WriteAncientBlocks(db ethdb.AncientWriter, blocks []*types.Block, receipts []types.Receipts, td *big.Int) (int64, error) {
var (
tdSum = new(big.Int).Set(td)
Expand Down
16 changes: 13 additions & 3 deletions core/rawdb/database.go
Expand Up @@ -99,6 +99,11 @@ func (db *nofreezedb) Ancients() (uint64, error) {
return 0, errNotSupported
}

// Tail returns an error as we don't have a backing chain freezer.
func (db *nofreezedb) Tail() (uint64, error) {
return 0, errNotSupported
}

// AncientSize returns an error as we don't have a backing chain freezer.
func (db *nofreezedb) AncientSize(kind string) (uint64, error) {
return 0, errNotSupported
Expand All @@ -109,8 +114,13 @@ func (db *nofreezedb) ModifyAncients(func(ethdb.AncientWriteOp) error) (int64, e
return 0, errNotSupported
}

// TruncateAncients returns an error as we don't have a backing chain freezer.
func (db *nofreezedb) TruncateAncients(items uint64) error {
// TruncateHead returns an error as we don't have a backing chain freezer.
func (db *nofreezedb) TruncateHead(items uint64) error {
return errNotSupported
}

// TruncateTail returns an error as we don't have a backing chain freezer.
func (db *nofreezedb) TruncateTail(items uint64) error {
return errNotSupported
}

Expand Down Expand Up @@ -211,7 +221,7 @@ func NewDatabaseWithFreezer(db ethdb.KeyValueStore, freezer string, namespace st
// Block #1 is still in the database, we're allowed to init a new feezer
}
// Otherwise, the head header is still the genesis, we're allowed to init a new
// feezer.
// freezer.
}
}
// Freezer is consistent with the key-value database, permit combining the two
Expand Down
57 changes: 47 additions & 10 deletions core/rawdb/freezer.go
Expand Up @@ -66,7 +66,7 @@ const (
freezerTableSize = 2 * 1000 * 1000 * 1000
)

// freezer is an memory mapped append-only database to store immutable chain data
// freezer is a memory mapped append-only database to store immutable chain data
// into flat files:
//
// - The append only nature ensures that disk writes are minimized.
Expand All @@ -78,6 +78,7 @@ type freezer struct {
// 64-bit aligned fields can be atomic. The struct is guaranteed to be so aligned,
// so take advantage of that (https://golang.org/pkg/sync/atomic/#pkg-note-BUG).
frozen uint64 // Number of blocks already frozen
tail uint64 // Number of the first stored item in the freezer
threshold uint64 // Number of recent blocks not to freeze (params.FullImmutabilityThreshold apart from tests)

// This lock synchronizes writers and the truncate operation, as well as
Expand Down Expand Up @@ -226,6 +227,11 @@ func (f *freezer) Ancients() (uint64, error) {
return atomic.LoadUint64(&f.frozen), nil
}

// Tail returns the number of first stored item in the freezer.
func (f *freezer) Tail() (uint64, error) {
return atomic.LoadUint64(&f.tail), nil
}

// AncientSize returns the ancient size of the specified category.
func (f *freezer) AncientSize(kind string) (uint64, error) {
// This needs the write lock to avoid data races on table fields.
Expand Down Expand Up @@ -261,7 +267,7 @@ func (f *freezer) ModifyAncients(fn func(ethdb.AncientWriteOp) error) (writeSize
if err != nil {
// The write operation has failed. Go back to the previous item position.
for name, table := range f.tables {
err := table.truncate(prevItem)
err := table.truncateHead(prevItem)
if err != nil {
log.Error("Freezer table roll-back failed", "table", name, "index", prevItem, "err", err)
}
Expand All @@ -281,8 +287,8 @@ func (f *freezer) ModifyAncients(fn func(ethdb.AncientWriteOp) error) (writeSize
return writeSize, nil
}

// TruncateAncients discards any recent data above the provided threshold number.
func (f *freezer) TruncateAncients(items uint64) error {
// TruncateHead discards any recent data above the provided threshold number.
func (f *freezer) TruncateHead(items uint64) error {
if f.readonly {
return errReadOnly
}
Expand All @@ -293,14 +299,34 @@ func (f *freezer) TruncateAncients(items uint64) error {
return nil
}
for _, table := range f.tables {
if err := table.truncate(items); err != nil {
if err := table.truncateHead(items); err != nil {
return err
}
}
atomic.StoreUint64(&f.frozen, items)
return nil
}

// TruncateTail discards any recent data below the provided threshold number.
func (f *freezer) TruncateTail(tail uint64) error {
if f.readonly {
return errReadOnly
}
f.writeLock.Lock()
defer f.writeLock.Unlock()

if atomic.LoadUint64(&f.tail) >= tail {
return nil
}
for _, table := range f.tables {
if err := table.truncateTail(tail); err != nil {
return err
}
}
atomic.StoreUint64(&f.tail, tail)
return nil
}

// Sync flushes all data tables to disk.
func (f *freezer) Sync() error {
var errs []error
Expand Down Expand Up @@ -344,19 +370,30 @@ func (f *freezer) validate() error {

// repair truncates all data tables to the same length.
func (f *freezer) repair() error {
min := uint64(math.MaxUint64)
var (
head = uint64(math.MaxUint64)
tail = uint64(0)
)
for _, table := range f.tables {
items := atomic.LoadUint64(&table.items)
if min > items {
min = items
if head > items {
head = items
}
hidden := atomic.LoadUint64(&table.itemHidden)
if hidden > tail {
tail = hidden
}
}
for _, table := range f.tables {
if err := table.truncate(min); err != nil {
if err := table.truncateHead(head); err != nil {
return err
}
if err := table.truncateTail(tail); err != nil {
return err
}
}
atomic.StoreUint64(&f.frozen, min)
atomic.StoreUint64(&f.frozen, head)
atomic.StoreUint64(&f.tail, tail)
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion core/rawdb/freezer_batch.go
Expand Up @@ -191,7 +191,7 @@ func (batch *freezerTableBatch) commit() error {
dataSize := int64(len(batch.dataBuffer))
batch.dataBuffer = batch.dataBuffer[:0]

// Write index.
// Write indices.
_, err = batch.t.index.Write(batch.indexBuffer)
if err != nil {
return err
Expand Down
130 changes: 130 additions & 0 deletions core/rawdb/freezer_meta.go
@@ -0,0 +1,130 @@
// Copyright 2022 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>

package rawdb

import (
"encoding/binary"
"errors"
"io"
"os"

"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
)

const freezerVersion = 1 // The initial version tag of freezer table metadata

// freezerTableMeta wraps all the metadata of the freezer table.
type freezerTableMeta struct {
// version is the versioning descriptor of the freezer table.
version uint16

// VirtualTail indicates how many items have been marked as deleted.
// Its value is equal to the number of items removed from the table
// plus the number of items hidden in the table, so it should never
// be lower than the "actual tail".
VirtualTail uint64
}

// newMetadata initializes the metadata object with the given virtual tail.
func newMetadata(tail uint64) *freezerTableMeta {
return &freezerTableMeta{
version: freezerVersion,
VirtualTail: tail,
}
}

// readMetadata reads the metadata of the freezer table from the
// given metadata file.
func readMetadata(file *os.File) (*freezerTableMeta, error) {
rjl493456442 marked this conversation as resolved.
Show resolved Hide resolved
_, err := file.Seek(0, io.SeekStart)
if err != nil {
return nil, err
}
// load the first 2 bytes, resolve the version tag
var buf [2]byte
_, err = file.Read(buf[:2])
if err != nil {
return nil, err
}
version := binary.BigEndian.Uint16(buf[:])
switch version {
case freezerVersion:
var meta freezerTableMeta
if err := rlp.Decode(file, &meta); err != nil {
return nil, err
}
meta.version = freezerVersion
return &meta, nil
default:
return nil, errors.New("undefined version")
}
}

// writeMetadata writes the metadata of the freezer table into the
// given metadata file.
func writeMetadata(file *os.File, meta *freezerTableMeta) error {
_, err := file.Seek(0, io.SeekStart)
if err != nil {
return err
}
var buf [2]byte
binary.BigEndian.PutUint16(buf[:], meta.version)
_, err = file.Write(buf[:])
if err != nil {
return err
}
return rlp.Encode(file, meta)
}

// loadMetadata loads the metadata from the given metadata file.
// Initializes the metadata file with the given "actual tail" if
// it's empty.
func loadMetadata(file *os.File, tail uint64) (*freezerTableMeta, error) {
stat, err := file.Stat()
if err != nil {
return nil, err
}
// Write the metadata with the given actual tail into metadata file
// if it's non-existent. There are two possible scenarios here:
// - the freezer table is empty
// - the freezer table is legacy
// In both cases, write the meta into the file with the actual tail
// as the virtual tail.
if stat.Size() == 0 {
m := newMetadata(tail)
if err := writeMetadata(file, m); err != nil {
return nil, err
}
return m, nil
}
m, err := readMetadata(file)
if err != nil {
return nil, err
}
// Update the virtual tail with the given actual tail if it's even
// lower than it. Theoretically it shouldn't happen at all, print
// a warning here.
if m.VirtualTail < tail {
log.Warn("Updated virtual tail", "have", m.VirtualTail, "now", tail)
m.VirtualTail = tail
if err := writeMetadata(file, m); err != nil {
return nil, err
}
}
return m, nil
}