Skip to content

Commit

Permalink
core/rawdb: benchmark for freezer
Browse files Browse the repository at this point in the history
core/rawdb: improve append benchmark

core/rawdb: add freezer batch

core/rawdb: minor benchmark nitpicks

core/rawdb: add freezer batch

core/rawdb: method docs

core/rawdb: resolve freezer batch filenum at write-time

core/rawdb: use bytes.Buffer in freezer batch

core/rawdb: work in progress -- rlp encoding inside freezer batch

core/rawdb: implement write batchs in freezer

core/rawdb: improve memory allocations for snappy compression
core/rawdb: polish
core/rawdb: simplify + fix copy/paste error
core/rawdb: error handling for freezertable batches
core/rawdb: lint nitpicks
core/rawdb: more tests for freezer batching

core/rawdb: freezer batch interface

core/rawdb: WIP freezer batch interface
core/rawdb: check insert count of table batches
core/rawdb: use ancient batch in background freeze
core/rawdb: fix some issues in background freezer
ethdb: document AncientBatch
core/rawdb: remove single-item append
core/rawdb: use dumpIndexString in batch tests
core/rawdb: fix batch test
core/rawdb: implement commit during append
core/rawdb: add some missing close calls in freezer tests
core/rawdb: simplify snappy buffer
core/rawdb: change ancient writer interface
core/rawdb: fix offset test
core/rawdb: track headBytes only in freezerTable
core/rawdb: add writeLock
core/rawdb: reuse write batch
core/rawdb: re-add metrics reporting
core/rawdb: recreate the concurrent truncate test of freezer

This changes the test slightly to use the 'freezer' object instead of
'freezerTable'. This is necessary because the concurrency handling has
moved to the freezer and concurrent append and truncate is no longer
allowed on the table object.

core/rawdb: fix item count after ModifyAncients and improve the test
core/rawdb: add concurrency test for retrieve
core/rawdb: allow overriding max table size in newFreezer

This makes the concurrency tests fail with -race.

core/rawdb: fix race in advanceFile
core/rawdb: remove atomic access on freezer.headId

It doesn't need to be atomic, all accesses are protected by the lock.

core/rawdb: use int64 for writeSize

core/rawdb: implement and test all-or-nothing behavior of ModifyAncients
core/rawdb: move buffer code to batch file
core/rawdb: reimplement batch test on freezer
core/rawdb: fix race in AncientSize
core, core/rawdb: WIP batch ancient write in sync
core/rawdb: delete old batch tests
core: fix error handling when inserting side chain receipts

This fixes (well, rewrites) the test for the error case of
InsertReceiptChain where side chain data is imported and setting the
fast block fails because it doesn't match the header chain.

This previously relied on the terminateInsert hook. With the new ancient
write interface, there is no good place for this hook anymore, so the
test now injects an actual error instead of simulating one.

core: improve comments

core/blockchain, core/rawdb: less iterator usage when deleting leveldb-data

core/rawdb: return write error from WriteAncientBlocks

It's easier to test this function when it doesn't just exit
the process on failure.

core/rawdb: add benchmark for WriteAncientBlocks

core/rawdb: use os.RemoveAll in test

core/rawdb: avoid allocating in benchmark loop

core/rawdb: avoid copying difficulty twice

The difficulty is already copied by block.Header(), avoid
copying it again.

core/rawdb: improve WriteAncientBlocks benchmark

core: handle block->header validation
  • Loading branch information
holiman committed Aug 25, 2021
1 parent fe2f153 commit 70ac648
Show file tree
Hide file tree
Showing 13 changed files with 1,343 additions and 578 deletions.
199 changes: 82 additions & 117 deletions core/blockchain.go
Expand Up @@ -207,8 +207,7 @@ type BlockChain struct {
processor Processor // Block transaction processor interface
vmConfig vm.Config

shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block.
terminateInsert func(common.Hash, uint64) bool // Testing hook used to terminate ancient receipt chain insertion.
shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block.
}

// NewBlockChain returns a fully initialised block chain using information
Expand Down Expand Up @@ -1085,38 +1084,6 @@ const (
SideStatTy
)

// truncateAncient rewinds the blockchain to the specified header and deletes all
// data in the ancient store that exceeds the specified header.
func (bc *BlockChain) truncateAncient(head uint64) error {
frozen, err := bc.db.Ancients()
if err != nil {
return err
}
// Short circuit if there is no data to truncate in ancient store.
if frozen <= head+1 {
return nil
}
// Truncate all the data in the freezer beyond the specified head
if err := bc.db.TruncateAncients(head + 1); err != nil {
return err
}
// Clear out any stale content from the caches
bc.hc.headerCache.Purge()
bc.hc.tdCache.Purge()
bc.hc.numberCache.Purge()

// Clear out any stale content from the caches
bc.bodyCache.Purge()
bc.bodyRLPCache.Purge()
bc.receiptsCache.Purge()
bc.blockCache.Purge()
bc.txLookupCache.Purge()
bc.futureBlocks.Purge()

log.Info("Rewind ancient data", "number", head)
return nil
}

// numberHash is just a container for a number and a hash, to represent a block
type numberHash struct {
number uint64
Expand Down Expand Up @@ -1155,12 +1122,14 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
var (
stats = struct{ processed, ignored int32 }{}
start = time.Now()
size = 0
size = int64(0)
)

// updateHead updates the head fast sync block if the inserted blocks are better
// and returns an indicator whether the inserted blocks are canonical.
updateHead := func(head *types.Block) bool {
bc.chainmu.Lock()
defer bc.chainmu.Unlock()

// Rewind may have occurred, skip in that case.
if bc.CurrentHeader().Number.Cmp(head.Number()) >= 0 {
Expand All @@ -1169,127 +1138,122 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
rawdb.WriteHeadFastBlockHash(bc.db, head.Hash())
bc.currentFastBlock.Store(head)
headFastBlockGauge.Update(int64(head.NumberU64()))
bc.chainmu.Unlock()
return true
}
}
bc.chainmu.Unlock()
return false
}

// writeAncient writes blockchain and corresponding receipt chain into ancient store.
//
// this function only accepts canonical chain data. All side chain will be reverted
// eventually.
writeAncient := func(blockChain types.Blocks, receiptChain []types.Receipts) (int, error) {
var (
previous = bc.CurrentFastBlock()
batch = bc.db.NewBatch()
)
// If any error occurs before updating the head or we are inserting a side chain,
// all the data written this time wll be rolled back.
defer func() {
if previous != nil {
if err := bc.truncateAncient(previous.NumberU64()); err != nil {
log.Crit("Truncate ancient store failed", "err", err)
}
}
}()
var deleted []*numberHash
for i, block := range blockChain {
// Short circuit insertion if shutting down or processing failed
if bc.insertStopped() {
return 0, errInsertionInterrupted
}
// Short circuit insertion if it is required(used in testing only)
if bc.terminateInsert != nil && bc.terminateInsert(block.Hash(), block.NumberU64()) {
return i, errors.New("insertion is terminated for testing purpose")
}
// Short circuit if the owner header is unknown
if !bc.HasHeader(block.Hash(), block.NumberU64()) {
return i, fmt.Errorf("containing header #%d [%x..] unknown", block.Number(), block.Hash().Bytes()[:4])
}
if block.NumberU64() == 1 {
// Make sure to write the genesis into the freezer
if frozen, _ := bc.db.Ancients(); frozen == 0 {
h := rawdb.ReadCanonicalHash(bc.db, 0)
b := rawdb.ReadBlock(bc.db, h, 0)
size += rawdb.WriteAncientBlock(bc.db, b, rawdb.ReadReceipts(bc.db, h, 0, bc.chainConfig), rawdb.ReadTd(bc.db, h, 0))
log.Info("Wrote genesis to ancients")
first := blockChain[0]
last := blockChain[len(blockChain)-1]

// Ensure genesis is in ancients.
if first.NumberU64() == 1 {
if frozen, _ := bc.db.Ancients(); frozen == 0 {
b := bc.genesisBlock
td := bc.genesisBlock.Difficulty()
writeSize, err := rawdb.WriteAncientBlocks(bc.db, []*types.Block{b}, []types.Receipts{nil}, td)
size += writeSize
if err != nil {
log.Error("Error writing genesis to ancients", "err", err)
return 0, err
}
log.Info("Wrote genesis to ancients")
}
// Flush data into ancient database.
size += rawdb.WriteAncientBlock(bc.db, block, receiptChain[i], bc.GetTd(block.Hash(), block.NumberU64()))

// Write tx indices if any condition is satisfied:
// * If user requires to reserve all tx indices(txlookuplimit=0)
// * If all ancient tx indices are required to be reserved(txlookuplimit is even higher than ancientlimit)
// * If block number is large enough to be regarded as a recent block
// It means blocks below the ancientLimit-txlookupLimit won't be indexed.
//
// But if the `TxIndexTail` is not nil, e.g. Geth is initialized with
// an external ancient database, during the setup, blockchain will start
// a background routine to re-indexed all indices in [ancients - txlookupLimit, ancients)
// range. In this case, all tx indices of newly imported blocks should be
// generated.
}
// Before writing the blocks to the ancients, we need to ensure that
// they correspond to the what the headerchain 'expects'.
// We only check the last block/header, since it's a contiguous chain.
if !bc.HasHeader(last.Hash(), last.NumberU64()) {
return 0, fmt.Errorf("containing header #%d [%x..] unknown", last.Number(), last.Hash().Bytes()[:4])
}

// Write all chain data to ancients.
td := bc.GetTd(first.Hash(), first.NumberU64())
writeSize, err := rawdb.WriteAncientBlocks(bc.db, blockChain, receiptChain, td)
size += writeSize
if err != nil {
log.Error("Error importing chain data to ancients", "err", err)
return 0, err
}

// Write tx indices if any condition is satisfied:
// * If user requires to reserve all tx indices(txlookuplimit=0)
// * If all ancient tx indices are required to be reserved(txlookuplimit is even higher than ancientlimit)
// * If block number is large enough to be regarded as a recent block
// It means blocks below the ancientLimit-txlookupLimit won't be indexed.
//
// But if the `TxIndexTail` is not nil, e.g. Geth is initialized with
// an external ancient database, during the setup, blockchain will start
// a background routine to re-indexed all indices in [ancients - txlookupLimit, ancients)
// range. In this case, all tx indices of newly imported blocks should be
// generated.
var batch = bc.db.NewBatch()
for _, block := range blockChain {
if bc.txLookupLimit == 0 || ancientLimit <= bc.txLookupLimit || block.NumberU64() >= ancientLimit-bc.txLookupLimit {
rawdb.WriteTxLookupEntriesByBlock(batch, block)
} else if rawdb.ReadTxIndexTail(bc.db) != nil {
rawdb.WriteTxLookupEntriesByBlock(batch, block)
}
stats.processed++
}

// Flush all tx-lookup index data.
size += batch.ValueSize()
size += int64(batch.ValueSize())
if err := batch.Write(); err != nil {
// The tx index data could not be written.
// Roll back the ancient store update.
fastBlock := bc.CurrentFastBlock().NumberU64()
if err := bc.db.TruncateAncients(fastBlock + 1); err != nil {
log.Error("Can't truncate ancient store after failed insert", "err", err)
}
return 0, err
}
batch.Reset()

// Sync the ancient store explicitly to ensure all data has been flushed to disk.
if err := bc.db.Sync(); err != nil {
return 0, err
}
if !updateHead(blockChain[len(blockChain)-1]) {
return 0, errors.New("side blocks can't be accepted as the ancient chain data")
}
previous = nil // disable rollback explicitly

// Wipe out canonical block data.
for _, nh := range deleted {
rawdb.DeleteBlockWithoutNumber(batch, nh.hash, nh.number)
rawdb.DeleteCanonicalHash(batch, nh.number)
}
for _, block := range blockChain {
// Always keep genesis block in active database.
if block.NumberU64() != 0 {
rawdb.DeleteBlockWithoutNumber(batch, block.Hash(), block.NumberU64())
rawdb.DeleteCanonicalHash(batch, block.NumberU64())
// Update the current fast block because all block data is now present in DB.
previousFastBlock := bc.CurrentFastBlock().NumberU64()
if !updateHead(blockChain[len(blockChain)-1]) {
// We end up here if the header chain has reorg'ed, and the blocks/receipts
// don't match the canonical chain.
if err := bc.db.TruncateAncients(previousFastBlock + 1); err != nil {
log.Error("Can't truncate ancient store after failed insert", "err", err)
}
return 0, errSideChainReceipts
}
if err := batch.Write(); err != nil {
return 0, err
}
batch.Reset()

// Wipe out side chain too.
for _, nh := range deleted {
for _, hash := range rawdb.ReadAllHashes(bc.db, nh.number) {
rawdb.DeleteBlock(batch, hash, nh.number)
// Delete block data from the main database.
batch.Reset()
canonHashes := make(map[common.Hash]struct{})
for _, block := range blockChain {
canonHashes[block.Hash()] = struct{}{}
if block.NumberU64() == 0 {
continue
}
rawdb.DeleteCanonicalHash(batch, block.NumberU64())
rawdb.DeleteBlockWithoutNumber(batch, block.Hash(), block.NumberU64())
}
for _, block := range blockChain {
// Always keep genesis block in active database.
if block.NumberU64() != 0 {
for _, hash := range rawdb.ReadAllHashes(bc.db, block.NumberU64()) {
rawdb.DeleteBlock(batch, hash, block.NumberU64())
}
// Delete side chain hash-to-number mappings.
for _, nh := range rawdb.ReadAllHashesInRange(bc.db, first.NumberU64(), last.NumberU64()) {
if _, canon := canonHashes[nh.Hash]; !canon {
rawdb.DeleteHeader(batch, nh.Hash, nh.Number)
}
}
if err := batch.Write(); err != nil {
return 0, err
}
return 0, nil
}

// writeLive writes blockchain and corresponding receipt chain into active store.
writeLive := func(blockChain types.Blocks, receiptChain []types.Receipts) (int, error) {
skipPresenceCheck := false
Expand Down Expand Up @@ -1327,7 +1291,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
if err := batch.Write(); err != nil {
return 0, err
}
size += batch.ValueSize()
size += int64(batch.ValueSize())
batch.Reset()
}
stats.processed++
Expand All @@ -1336,14 +1300,15 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
// we can ensure all components of body is completed(body, receipts,
// tx indexes)
if batch.ValueSize() > 0 {
size += batch.ValueSize()
size += int64(batch.ValueSize())
if err := batch.Write(); err != nil {
return 0, err
}
}
updateHead(blockChain[len(blockChain)-1])
return 0, nil
}

// Write downloaded chain data and corresponding receipt chain data
if len(ancientBlocks) > 0 {
if n, err := writeAncient(ancientBlocks, ancientReceipts); err != nil {
Expand Down

0 comments on commit 70ac648

Please sign in to comment.