diff --git a/core/blockchain.go b/core/blockchain.go index e14291aa54c39..66bc395c9dc45 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -207,8 +207,7 @@ type BlockChain struct { processor Processor // Block transaction processor interface vmConfig vm.Config - shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block. - terminateInsert func(common.Hash, uint64) bool // Testing hook used to terminate ancient receipt chain insertion. + shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block. } // NewBlockChain returns a fully initialised block chain using information @@ -1085,38 +1084,6 @@ const ( SideStatTy ) -// truncateAncient rewinds the blockchain to the specified header and deletes all -// data in the ancient store that exceeds the specified header. -func (bc *BlockChain) truncateAncient(head uint64) error { - frozen, err := bc.db.Ancients() - if err != nil { - return err - } - // Short circuit if there is no data to truncate in ancient store. - if frozen <= head+1 { - return nil - } - // Truncate all the data in the freezer beyond the specified head - if err := bc.db.TruncateAncients(head + 1); err != nil { - return err - } - // Clear out any stale content from the caches - bc.hc.headerCache.Purge() - bc.hc.tdCache.Purge() - bc.hc.numberCache.Purge() - - // Clear out any stale content from the caches - bc.bodyCache.Purge() - bc.bodyRLPCache.Purge() - bc.receiptsCache.Purge() - bc.blockCache.Purge() - bc.txLookupCache.Purge() - bc.futureBlocks.Purge() - - log.Info("Rewind ancient data", "number", head) - return nil -} - // numberHash is just a container for a number and a hash, to represent a block type numberHash struct { number uint64 @@ -1155,12 +1122,14 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ var ( stats = struct{ processed, ignored int32 }{} start = time.Now() - size = 0 + size = int64(0) ) + // updateHead updates the head fast sync block if the inserted blocks are better // and returns an indicator whether the inserted blocks are canonical. updateHead := func(head *types.Block) bool { bc.chainmu.Lock() + defer bc.chainmu.Unlock() // Rewind may have occurred, skip in that case. if bc.CurrentHeader().Number.Cmp(head.Number()) >= 0 { @@ -1169,68 +1138,63 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ rawdb.WriteHeadFastBlockHash(bc.db, head.Hash()) bc.currentFastBlock.Store(head) headFastBlockGauge.Update(int64(head.NumberU64())) - bc.chainmu.Unlock() return true } } - bc.chainmu.Unlock() return false } + // writeAncient writes blockchain and corresponding receipt chain into ancient store. // // this function only accepts canonical chain data. All side chain will be reverted // eventually. writeAncient := func(blockChain types.Blocks, receiptChain []types.Receipts) (int, error) { - var ( - previous = bc.CurrentFastBlock() - batch = bc.db.NewBatch() - ) - // If any error occurs before updating the head or we are inserting a side chain, - // all the data written this time wll be rolled back. - defer func() { - if previous != nil { - if err := bc.truncateAncient(previous.NumberU64()); err != nil { - log.Crit("Truncate ancient store failed", "err", err) - } - } - }() - var deleted []*numberHash - for i, block := range blockChain { - // Short circuit insertion if shutting down or processing failed - if bc.insertStopped() { - return 0, errInsertionInterrupted - } - // Short circuit insertion if it is required(used in testing only) - if bc.terminateInsert != nil && bc.terminateInsert(block.Hash(), block.NumberU64()) { - return i, errors.New("insertion is terminated for testing purpose") - } - // Short circuit if the owner header is unknown - if !bc.HasHeader(block.Hash(), block.NumberU64()) { - return i, fmt.Errorf("containing header #%d [%x..] unknown", block.Number(), block.Hash().Bytes()[:4]) - } - if block.NumberU64() == 1 { - // Make sure to write the genesis into the freezer - if frozen, _ := bc.db.Ancients(); frozen == 0 { - h := rawdb.ReadCanonicalHash(bc.db, 0) - b := rawdb.ReadBlock(bc.db, h, 0) - size += rawdb.WriteAncientBlock(bc.db, b, rawdb.ReadReceipts(bc.db, h, 0, bc.chainConfig), rawdb.ReadTd(bc.db, h, 0)) - log.Info("Wrote genesis to ancients") + first := blockChain[0] + last := blockChain[len(blockChain)-1] + + // Ensure genesis is in ancients. + if first.NumberU64() == 1 { + if frozen, _ := bc.db.Ancients(); frozen == 0 { + b := bc.genesisBlock + td := bc.genesisBlock.Difficulty() + writeSize, err := rawdb.WriteAncientBlocks(bc.db, []*types.Block{b}, []types.Receipts{nil}, td) + size += writeSize + if err != nil { + log.Error("Error writing genesis to ancients", "err", err) + return 0, err } + log.Info("Wrote genesis to ancients") } - // Flush data into ancient database. - size += rawdb.WriteAncientBlock(bc.db, block, receiptChain[i], bc.GetTd(block.Hash(), block.NumberU64())) - - // Write tx indices if any condition is satisfied: - // * If user requires to reserve all tx indices(txlookuplimit=0) - // * If all ancient tx indices are required to be reserved(txlookuplimit is even higher than ancientlimit) - // * If block number is large enough to be regarded as a recent block - // It means blocks below the ancientLimit-txlookupLimit won't be indexed. - // - // But if the `TxIndexTail` is not nil, e.g. Geth is initialized with - // an external ancient database, during the setup, blockchain will start - // a background routine to re-indexed all indices in [ancients - txlookupLimit, ancients) - // range. In this case, all tx indices of newly imported blocks should be - // generated. + } + // Before writing the blocks to the ancients, we need to ensure that + // they correspond to the what the headerchain 'expects'. + // We only check the last block/header, since it's a contiguous chain. + if !bc.HasHeader(last.Hash(), last.NumberU64()) { + return 0, fmt.Errorf("containing header #%d [%x..] unknown", last.Number(), last.Hash().Bytes()[:4]) + } + + // Write all chain data to ancients. + td := bc.GetTd(first.Hash(), first.NumberU64()) + writeSize, err := rawdb.WriteAncientBlocks(bc.db, blockChain, receiptChain, td) + size += writeSize + if err != nil { + log.Error("Error importing chain data to ancients", "err", err) + return 0, err + } + + // Write tx indices if any condition is satisfied: + // * If user requires to reserve all tx indices(txlookuplimit=0) + // * If all ancient tx indices are required to be reserved(txlookuplimit is even higher than ancientlimit) + // * If block number is large enough to be regarded as a recent block + // It means blocks below the ancientLimit-txlookupLimit won't be indexed. + // + // But if the `TxIndexTail` is not nil, e.g. Geth is initialized with + // an external ancient database, during the setup, blockchain will start + // a background routine to re-indexed all indices in [ancients - txlookupLimit, ancients) + // range. In this case, all tx indices of newly imported blocks should be + // generated. + var batch = bc.db.NewBatch() + for _, block := range blockChain { if bc.txLookupLimit == 0 || ancientLimit <= bc.txLookupLimit || block.NumberU64() >= ancientLimit-bc.txLookupLimit { rawdb.WriteTxLookupEntriesByBlock(batch, block) } else if rawdb.ReadTxIndexTail(bc.db) != nil { @@ -1238,51 +1202,50 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ } stats.processed++ } + // Flush all tx-lookup index data. - size += batch.ValueSize() + size += int64(batch.ValueSize()) if err := batch.Write(); err != nil { + // The tx index data could not be written. + // Roll back the ancient store update. + fastBlock := bc.CurrentFastBlock().NumberU64() + if err := bc.db.TruncateAncients(fastBlock + 1); err != nil { + log.Error("Can't truncate ancient store after failed insert", "err", err) + } return 0, err } - batch.Reset() // Sync the ancient store explicitly to ensure all data has been flushed to disk. if err := bc.db.Sync(); err != nil { return 0, err } - if !updateHead(blockChain[len(blockChain)-1]) { - return 0, errors.New("side blocks can't be accepted as the ancient chain data") - } - previous = nil // disable rollback explicitly - // Wipe out canonical block data. - for _, nh := range deleted { - rawdb.DeleteBlockWithoutNumber(batch, nh.hash, nh.number) - rawdb.DeleteCanonicalHash(batch, nh.number) - } - for _, block := range blockChain { - // Always keep genesis block in active database. - if block.NumberU64() != 0 { - rawdb.DeleteBlockWithoutNumber(batch, block.Hash(), block.NumberU64()) - rawdb.DeleteCanonicalHash(batch, block.NumberU64()) + // Update the current fast block because all block data is now present in DB. + previousFastBlock := bc.CurrentFastBlock().NumberU64() + if !updateHead(blockChain[len(blockChain)-1]) { + // We end up here if the header chain has reorg'ed, and the blocks/receipts + // don't match the canonical chain. + if err := bc.db.TruncateAncients(previousFastBlock + 1); err != nil { + log.Error("Can't truncate ancient store after failed insert", "err", err) } + return 0, errSideChainReceipts } - if err := batch.Write(); err != nil { - return 0, err - } - batch.Reset() - // Wipe out side chain too. - for _, nh := range deleted { - for _, hash := range rawdb.ReadAllHashes(bc.db, nh.number) { - rawdb.DeleteBlock(batch, hash, nh.number) + // Delete block data from the main database. + batch.Reset() + canonHashes := make(map[common.Hash]struct{}) + for _, block := range blockChain { + canonHashes[block.Hash()] = struct{}{} + if block.NumberU64() == 0 { + continue } + rawdb.DeleteCanonicalHash(batch, block.NumberU64()) + rawdb.DeleteBlockWithoutNumber(batch, block.Hash(), block.NumberU64()) } - for _, block := range blockChain { - // Always keep genesis block in active database. - if block.NumberU64() != 0 { - for _, hash := range rawdb.ReadAllHashes(bc.db, block.NumberU64()) { - rawdb.DeleteBlock(batch, hash, block.NumberU64()) - } + // Delete side chain hash-to-number mappings. + for _, nh := range rawdb.ReadAllHashesInRange(bc.db, first.NumberU64(), last.NumberU64()) { + if _, canon := canonHashes[nh.Hash]; !canon { + rawdb.DeleteHeader(batch, nh.Hash, nh.Number) } } if err := batch.Write(); err != nil { @@ -1290,6 +1253,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ } return 0, nil } + // writeLive writes blockchain and corresponding receipt chain into active store. writeLive := func(blockChain types.Blocks, receiptChain []types.Receipts) (int, error) { skipPresenceCheck := false @@ -1327,7 +1291,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ if err := batch.Write(); err != nil { return 0, err } - size += batch.ValueSize() + size += int64(batch.ValueSize()) batch.Reset() } stats.processed++ @@ -1336,7 +1300,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ // we can ensure all components of body is completed(body, receipts, // tx indexes) if batch.ValueSize() > 0 { - size += batch.ValueSize() + size += int64(batch.ValueSize()) if err := batch.Write(); err != nil { return 0, err } @@ -1344,6 +1308,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ updateHead(blockChain[len(blockChain)-1]) return 0, nil } + // Write downloaded chain data and corresponding receipt chain data if len(ancientBlocks) > 0 { if n, err := writeAncient(ancientBlocks, ancientReceipts); err != nil { diff --git a/core/blockchain_test.go b/core/blockchain_test.go index 4e5df633b7e32..8d94f17aabae9 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -670,6 +670,7 @@ func TestFastVsFullChains(t *testing.T) { if n, err := ancient.InsertReceiptChain(blocks, receipts, uint64(len(blocks)/2)); err != nil { t.Fatalf("failed to insert receipt %d: %v", n, err) } + // Iterate over all chain data components, and cross reference for i := 0; i < len(blocks); i++ { num, hash := blocks[i].NumberU64(), blocks[i].Hash() @@ -693,10 +694,27 @@ func TestFastVsFullChains(t *testing.T) { } else if types.CalcUncleHash(fblock.Uncles()) != types.CalcUncleHash(arblock.Uncles()) || types.CalcUncleHash(anblock.Uncles()) != types.CalcUncleHash(arblock.Uncles()) { t.Errorf("block #%d [%x]: uncles mismatch: fastdb %v, ancientdb %v, archivedb %v", num, hash, fblock.Uncles(), anblock, arblock.Uncles()) } - if freceipts, anreceipts, areceipts := rawdb.ReadReceipts(fastDb, hash, *rawdb.ReadHeaderNumber(fastDb, hash), fast.Config()), rawdb.ReadReceipts(ancientDb, hash, *rawdb.ReadHeaderNumber(ancientDb, hash), fast.Config()), rawdb.ReadReceipts(archiveDb, hash, *rawdb.ReadHeaderNumber(archiveDb, hash), fast.Config()); types.DeriveSha(freceipts, trie.NewStackTrie(nil)) != types.DeriveSha(areceipts, trie.NewStackTrie(nil)) { + + // Check receipts. + freceipts := rawdb.ReadReceipts(fastDb, hash, num, fast.Config()) + anreceipts := rawdb.ReadReceipts(ancientDb, hash, num, fast.Config()) + areceipts := rawdb.ReadReceipts(archiveDb, hash, num, fast.Config()) + if types.DeriveSha(freceipts, trie.NewStackTrie(nil)) != types.DeriveSha(areceipts, trie.NewStackTrie(nil)) { t.Errorf("block #%d [%x]: receipts mismatch: fastdb %v, ancientdb %v, archivedb %v", num, hash, freceipts, anreceipts, areceipts) } + + // Check that hash-to-number mappings are present in all databases. + if m := rawdb.ReadHeaderNumber(fastDb, hash); m == nil || *m != num { + t.Errorf("block #%d [%x]: wrong hash-to-number mapping in fastdb: %v", num, hash, m) + } + if m := rawdb.ReadHeaderNumber(ancientDb, hash); m == nil || *m != num { + t.Errorf("block #%d [%x]: wrong hash-to-number mapping in ancientdb: %v", num, hash, m) + } + if m := rawdb.ReadHeaderNumber(archiveDb, hash); m == nil || *m != num { + t.Errorf("block #%d [%x]: wrong hash-to-number mapping in archivedb: %v", num, hash, m) + } } + // Check that the canonical chains are the same between the databases for i := 0; i < len(blocks)+1; i++ { if fhash, ahash := rawdb.ReadCanonicalHash(fastDb, uint64(i)), rawdb.ReadCanonicalHash(archiveDb, uint64(i)); fhash != ahash { @@ -1639,20 +1657,34 @@ func TestBlockchainRecovery(t *testing.T) { } } -func TestIncompleteAncientReceiptChainInsertion(t *testing.T) { - // Configure and generate a sample block chain - var ( - gendb = rawdb.NewMemoryDatabase() - key, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") - address = crypto.PubkeyToAddress(key.PublicKey) - funds = big.NewInt(1000000000) - gspec = &Genesis{Config: params.TestChainConfig, Alloc: GenesisAlloc{address: {Balance: funds}}} - genesis = gspec.MustCommit(gendb) - ) - height := uint64(1024) - blocks, receipts := GenerateChain(gspec.Config, genesis, ethash.NewFaker(), gendb, int(height), nil) +// This test checks that InsertReceiptChain will roll back correctly when attempting to insert a side chain. +func TestInsertReceiptChainRollback(t *testing.T) { + // Generate forked chain. The returned BlockChain object is used to process the side chain blocks. + tmpChain, sideblocks, canonblocks, err := getLongAndShortChains() + if err != nil { + t.Fatal(err) + } + defer tmpChain.Stop() + // Get the side chain receipts. + if _, err := tmpChain.InsertChain(sideblocks); err != nil { + t.Fatal("processing side chain failed:", err) + } + t.Log("sidechain head:", tmpChain.CurrentBlock().Number(), tmpChain.CurrentBlock().Hash()) + sidechainReceipts := make([]types.Receipts, len(sideblocks)) + for i, block := range sideblocks { + sidechainReceipts[i] = tmpChain.GetReceiptsByHash(block.Hash()) + } + // Get the canon chain receipts. + if _, err := tmpChain.InsertChain(canonblocks); err != nil { + t.Fatal("processing canon chain failed:", err) + } + t.Log("canon head:", tmpChain.CurrentBlock().Number(), tmpChain.CurrentBlock().Hash()) + canonReceipts := make([]types.Receipts, len(canonblocks)) + for i, block := range canonblocks { + canonReceipts[i] = tmpChain.GetReceiptsByHash(block.Hash()) + } - // Import the chain as a ancient-first node and ensure all pointers are updated + // Set up a BlockChain that uses the ancient store. frdir, err := ioutil.TempDir("", "") if err != nil { t.Fatalf("failed to create temp freezer dir: %v", err) @@ -1662,38 +1694,43 @@ func TestIncompleteAncientReceiptChainInsertion(t *testing.T) { if err != nil { t.Fatalf("failed to create temp freezer db: %v", err) } + gspec := Genesis{Config: params.AllEthashProtocolChanges} gspec.MustCommit(ancientDb) - ancient, _ := NewBlockChain(ancientDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil, nil) - defer ancient.Stop() + ancientChain, _ := NewBlockChain(ancientDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil, nil) + defer ancientChain.Stop() - headers := make([]*types.Header, len(blocks)) - for i, block := range blocks { - headers[i] = block.Header() - } - if n, err := ancient.InsertHeaderChain(headers, 1); err != nil { - t.Fatalf("failed to insert header %d: %v", n, err) + // Import the canonical header chain. + canonHeaders := make([]*types.Header, len(canonblocks)) + for i, block := range canonblocks { + canonHeaders[i] = block.Header() } - // Abort ancient receipt chain insertion deliberately - ancient.terminateInsert = func(hash common.Hash, number uint64) bool { - return number == blocks[len(blocks)/2].NumberU64() + if _, err = ancientChain.InsertHeaderChain(canonHeaders, 1); err != nil { + t.Fatal("can't import canon headers:", err) } - previousFastBlock := ancient.CurrentFastBlock() - if n, err := ancient.InsertReceiptChain(blocks, receipts, uint64(3*len(blocks)/4)); err == nil { - t.Fatalf("failed to insert receipt %d: %v", n, err) + + // Try to insert blocks/receipts of the side chain. + _, err = ancientChain.InsertReceiptChain(sideblocks, sidechainReceipts, uint64(len(sideblocks))) + if err == nil { + t.Fatal("expected error from InsertReceiptChain.") } - if ancient.CurrentFastBlock().NumberU64() != previousFastBlock.NumberU64() { - t.Fatalf("failed to rollback ancient data, want %d, have %d", previousFastBlock.NumberU64(), ancient.CurrentFastBlock().NumberU64()) + if ancientChain.CurrentFastBlock().NumberU64() != 0 { + t.Fatalf("failed to rollback ancient data, want %d, have %d", 0, ancientChain.CurrentFastBlock().NumberU64()) } - if frozen, err := ancient.db.Ancients(); err != nil || frozen != 1 { - t.Fatalf("failed to truncate ancient data") + if frozen, err := ancientChain.db.Ancients(); err != nil || frozen != 1 { + t.Fatalf("failed to truncate ancient data, frozen index is %d", frozen) } - ancient.terminateInsert = nil - if n, err := ancient.InsertReceiptChain(blocks, receipts, uint64(3*len(blocks)/4)); err != nil { - t.Fatalf("failed to insert receipt %d: %v", n, err) + + // Insert blocks/receipts of the canonical chain. + _, err = ancientChain.InsertReceiptChain(canonblocks, canonReceipts, uint64(len(canonblocks))) + if err != nil { + t.Fatalf("can't import canon chain receipts: %v", err) } - if ancient.CurrentFastBlock().NumberU64() != blocks[len(blocks)-1].NumberU64() { + if ancientChain.CurrentFastBlock().NumberU64() != canonblocks[len(canonblocks)-1].NumberU64() { t.Fatalf("failed to insert ancient recept chain after rollback") } + if frozen, _ := ancientChain.db.Ancients(); frozen != uint64(len(canonblocks))+1 { + t.Fatalf("wrong ancients count %d", frozen) + } } // Tests that importing a very large side fork, which is larger than the canon chain, @@ -1958,9 +1995,8 @@ func testInsertKnownChainData(t *testing.T, typ string) { asserter(t, blocks2[len(blocks2)-1]) } -// getLongAndShortChains returns two chains, -// A is longer, B is heavier -func getLongAndShortChains() (*BlockChain, []*types.Block, []*types.Block, error) { +// getLongAndShortChains returns two chains: A is longer, B is heavier. +func getLongAndShortChains() (bc *BlockChain, longChain []*types.Block, heavyChain []*types.Block, err error) { // Generate a canonical chain to act as the main dataset engine := ethash.NewFaker() db := rawdb.NewMemoryDatabase() @@ -1968,7 +2004,7 @@ func getLongAndShortChains() (*BlockChain, []*types.Block, []*types.Block, error // Generate and import the canonical chain, // Offset the time, to keep the difficulty low - longChain, _ := GenerateChain(params.TestChainConfig, genesis, engine, db, 80, func(i int, b *BlockGen) { + longChain, _ = GenerateChain(params.TestChainConfig, genesis, engine, db, 80, func(i int, b *BlockGen) { b.SetCoinbase(common.Address{1}) }) diskdb := rawdb.NewMemoryDatabase() @@ -1982,10 +2018,13 @@ func getLongAndShortChains() (*BlockChain, []*types.Block, []*types.Block, error // Generate fork chain, make it shorter than canon, with common ancestor pretty early parentIndex := 3 parent := longChain[parentIndex] - heavyChain, _ := GenerateChain(params.TestChainConfig, parent, engine, db, 75, func(i int, b *BlockGen) { + heavyChainExt, _ := GenerateChain(params.TestChainConfig, parent, engine, db, 75, func(i int, b *BlockGen) { b.SetCoinbase(common.Address{2}) b.OffsetTime(-9) }) + heavyChain = append(heavyChain, longChain[:parentIndex+1]...) + heavyChain = append(heavyChain, heavyChainExt...) + // Verify that the test is sane var ( longerTd = new(big.Int) diff --git a/core/error.go b/core/error.go index 33cf874d15e46..594f3a26e5596 100644 --- a/core/error.go +++ b/core/error.go @@ -31,6 +31,8 @@ var ( // ErrNoGenesis is returned when there is no Genesis Block. ErrNoGenesis = errors.New("genesis not found in chain") + + errSideChainReceipts = errors.New("side blocks can't be accepted as ancient chain data") ) // List of evm-call-message pre-checking errors. All state transition messages will diff --git a/core/rawdb/accessors_chain.go b/core/rawdb/accessors_chain.go index 76132bf37e68e..58226fb046b6b 100644 --- a/core/rawdb/accessors_chain.go +++ b/core/rawdb/accessors_chain.go @@ -19,6 +19,7 @@ package rawdb import ( "bytes" "encoding/binary" + "fmt" "math/big" "sort" @@ -81,6 +82,37 @@ func ReadAllHashes(db ethdb.Iteratee, number uint64) []common.Hash { return hashes } +type NumberHash struct { + Number uint64 + Hash common.Hash +} + +// ReadAllHashes retrieves all the hashes assigned to blocks at a certain heights, +// both canonical and reorged forks included. +// This method considers both limits to be _inclusive_. +func ReadAllHashesInRange(db ethdb.Iteratee, first, last uint64) []*NumberHash { + var ( + start = encodeBlockNumber(first) + keyLength = len(headerPrefix) + 8 + 32 + hashes = make([]*NumberHash, 0, 1+last-first) + it = db.NewIterator(headerPrefix, start) + ) + defer it.Release() + for it.Next() { + key := it.Key() + if len(key) != keyLength { + continue + } + num := binary.BigEndian.Uint64(key[len(headerPrefix) : len(headerPrefix)+8]) + if num > last { + break + } + hash := common.BytesToHash(key[len(key)-32:]) + hashes = append(hashes, &NumberHash{num, hash}) + } + return hashes +} + // ReadAllCanonicalHashes retrieves all canonical number and hash mappings at the // certain chain range. If the accumulated entries reaches the given threshold, // abort the iteration and return the semi-finish result. @@ -656,34 +688,48 @@ func WriteBlock(db ethdb.KeyValueWriter, block *types.Block) { } // WriteAncientBlock writes entire block data into ancient store and returns the total written size. -func WriteAncientBlock(db ethdb.AncientWriter, block *types.Block, receipts types.Receipts, td *big.Int) int { - // Encode all block components to RLP format. - headerBlob, err := rlp.EncodeToBytes(block.Header()) - if err != nil { - log.Crit("Failed to RLP encode block header", "err", err) - } - bodyBlob, err := rlp.EncodeToBytes(block.Body()) - if err != nil { - log.Crit("Failed to RLP encode body", "err", err) +func WriteAncientBlocks(db ethdb.AncientWriter, blocks []*types.Block, receipts []types.Receipts, td *big.Int) (int64, error) { + var ( + tdSum = new(big.Int).Set(td) + stReceipts []*types.ReceiptForStorage + ) + return db.ModifyAncients(func(op ethdb.AncientWriteOp) error { + for i, block := range blocks { + // Convert receipts to storage format and sum up total difficulty. + stReceipts = stReceipts[:0] + for _, receipt := range receipts[i] { + stReceipts = append(stReceipts, (*types.ReceiptForStorage)(receipt)) + } + header := block.Header() + if i > 0 { + tdSum.Add(tdSum, header.Difficulty) + } + if err := writeAncientBlock(op, block, header, stReceipts, tdSum); err != nil { + return err + } + } + return nil + }) +} + +func writeAncientBlock(op ethdb.AncientWriteOp, block *types.Block, header *types.Header, receipts []*types.ReceiptForStorage, td *big.Int) error { + num := block.NumberU64() + if err := op.AppendRaw(freezerHashTable, num, block.Hash().Bytes()); err != nil { + return fmt.Errorf("can't add block %d hash: %v", num, err) } - storageReceipts := make([]*types.ReceiptForStorage, len(receipts)) - for i, receipt := range receipts { - storageReceipts[i] = (*types.ReceiptForStorage)(receipt) + if err := op.Append(freezerHeaderTable, num, header); err != nil { + return fmt.Errorf("can't append block header %d: %v", num, err) } - receiptBlob, err := rlp.EncodeToBytes(storageReceipts) - if err != nil { - log.Crit("Failed to RLP encode block receipts", "err", err) + if err := op.Append(freezerBodiesTable, num, block.Body()); err != nil { + return fmt.Errorf("can't append block body %d: %v", num, err) } - tdBlob, err := rlp.EncodeToBytes(td) - if err != nil { - log.Crit("Failed to RLP encode block total difficulty", "err", err) + if err := op.Append(freezerReceiptTable, num, receipts); err != nil { + return fmt.Errorf("can't append block %d receipts: %v", num, err) } - // Write all blob to flatten files. - err = db.AppendAncient(block.NumberU64(), block.Hash().Bytes(), headerBlob, bodyBlob, receiptBlob, tdBlob) - if err != nil { - log.Crit("Failed to write block data to ancient store", "err", err) + if err := op.Append(freezerDifficultyTable, num, td); err != nil { + return fmt.Errorf("can't append block %d total difficulty: %v", num, err) } - return len(headerBlob) + len(bodyBlob) + len(receiptBlob) + len(tdBlob) + common.HashLength + return nil } // DeleteBlock removes all block data associated with a hash. diff --git a/core/rawdb/accessors_chain_test.go b/core/rawdb/accessors_chain_test.go index f20e8b1fffb5f..58d7645e50734 100644 --- a/core/rawdb/accessors_chain_test.go +++ b/core/rawdb/accessors_chain_test.go @@ -29,6 +29,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" "golang.org/x/crypto/sha3" @@ -438,7 +439,7 @@ func TestAncientStorage(t *testing.T) { if err != nil { t.Fatalf("failed to create temp freezer dir: %v", err) } - defer os.Remove(frdir) + defer os.RemoveAll(frdir) db, err := NewDatabaseWithFreezer(NewMemoryDatabase(), frdir, "", false) if err != nil { @@ -467,8 +468,10 @@ func TestAncientStorage(t *testing.T) { if blob := ReadTdRLP(db, hash, number); len(blob) > 0 { t.Fatalf("non existent td returned") } + // Write and verify the header in the database - WriteAncientBlock(db, block, nil, big.NewInt(100)) + WriteAncientBlocks(db, []*types.Block{block}, []types.Receipts{nil}, big.NewInt(100)) + if blob := ReadHeaderRLP(db, hash, number); len(blob) == 0 { t.Fatalf("no header returned") } @@ -481,6 +484,7 @@ func TestAncientStorage(t *testing.T) { if blob := ReadTdRLP(db, hash, number); len(blob) == 0 { t.Fatalf("no td returned") } + // Use a fake hash for data retrieval, nothing should be returned. fakeHash := common.BytesToHash([]byte{0x01, 0x02, 0x03}) if blob := ReadHeaderRLP(db, fakeHash, number); len(blob) != 0 { @@ -528,3 +532,141 @@ func TestCanonicalHashIteration(t *testing.T) { } } } + +func TestHashesInRange(t *testing.T) { + mkHeader := func(number, seq int) *types.Header { + h := types.Header{ + Difficulty: new(big.Int), + Number: big.NewInt(int64(number)), + GasLimit: uint64(seq), + } + return &h + } + db := NewMemoryDatabase() + // For each number, write N versions of that particular number + total := 0 + for i := 0; i < 15; i++ { + for ii := 0; ii < i; ii++ { + WriteHeader(db, mkHeader(i, ii)) + total++ + } + } + if have, want := len(ReadAllHashesInRange(db, 10, 10)), 10; have != want { + t.Fatalf("Wrong number of hashes read, want %d, got %d", want, have) + } + if have, want := len(ReadAllHashesInRange(db, 10, 9)), 0; have != want { + t.Fatalf("Wrong number of hashes read, want %d, got %d", want, have) + } + if have, want := len(ReadAllHashesInRange(db, 0, 100)), total; have != want { + t.Fatalf("Wrong number of hashes read, want %d, got %d", want, have) + } + if have, want := len(ReadAllHashesInRange(db, 9, 10)), 9+10; have != want { + t.Fatalf("Wrong number of hashes read, want %d, got %d", want, have) + } + if have, want := len(ReadAllHashes(db, 10)), 10; have != want { + t.Fatalf("Wrong number of hashes read, want %d, got %d", want, have) + } + if have, want := len(ReadAllHashes(db, 16)), 0; have != want { + t.Fatalf("Wrong number of hashes read, want %d, got %d", want, have) + } + if have, want := len(ReadAllHashes(db, 1)), 1; have != want { + t.Fatalf("Wrong number of hashes read, want %d, got %d", want, have) + } +} + +// This measures the write speed of the WriteAncientBlocks operation. +func BenchmarkWriteAncientBlocks(b *testing.B) { + // Open freezer database. + frdir, err := ioutil.TempDir("", "") + if err != nil { + b.Fatalf("failed to create temp freezer dir: %v", err) + } + defer os.RemoveAll(frdir) + db, err := NewDatabaseWithFreezer(NewMemoryDatabase(), frdir, "", false) + if err != nil { + b.Fatalf("failed to create database with ancient backend") + } + + // Create the data to insert. The blocks must have consecutive numbers, so we create + // all of them ahead of time. However, there is no need to create receipts + // individually for each block, just make one batch here and reuse it for all writes. + const batchSize = 128 + const blockTxs = 20 + allBlocks := makeTestBlocks(b.N, blockTxs) + batchReceipts := makeTestReceipts(batchSize, blockTxs) + b.ResetTimer() + + // The benchmark loop writes batches of blocks, but note that the total block count is + // b.N. This means the resulting ns/op measurement is the time it takes to write a + // single block and its associated data. + var td = big.NewInt(55) + var totalSize int64 + for i := 0; i < b.N; i += batchSize { + length := batchSize + if i+batchSize > b.N { + length = b.N - i + } + + blocks := allBlocks[i : i+length] + receipts := batchReceipts[:length] + writeSize, err := WriteAncientBlocks(db, blocks, receipts, td) + if err != nil { + b.Fatal(err) + } + totalSize += writeSize + } + + // Enable MB/s reporting. + b.SetBytes(totalSize / int64(b.N)) +} + +// makeTestBlocks creates fake blocks for the ancient write benchmark. +func makeTestBlocks(nblock int, txsPerBlock int) []*types.Block { + key, _ := crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") + signer := types.LatestSignerForChainID(big.NewInt(8)) + + // Create transactions. + txs := make([]*types.Transaction, txsPerBlock) + for i := 0; i < len(txs); i++ { + var err error + to := common.Address{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1} + txs[i], err = types.SignNewTx(key, signer, &types.LegacyTx{ + Nonce: 2, + GasPrice: big.NewInt(30000), + Gas: 0x45454545, + To: &to, + }) + if err != nil { + panic(err) + } + } + + // Create the blocks. + blocks := make([]*types.Block, nblock) + for i := 0; i < nblock; i++ { + header := &types.Header{ + Number: big.NewInt(int64(i)), + Extra: []byte("test block"), + } + blocks[i] = types.NewBlockWithHeader(header).WithBody(txs, nil) + blocks[i].Hash() // pre-cache the block hash + } + return blocks +} + +// makeTestReceipts creates fake receipts for the ancient write benchmark. +func makeTestReceipts(n int, nPerBlock int) []types.Receipts { + receipts := make([]*types.Receipt, nPerBlock) + for i := 0; i < len(receipts); i++ { + receipts[i] = &types.Receipt{ + Status: types.ReceiptStatusSuccessful, + CumulativeGasUsed: 0x888888888, + Logs: make([]*types.Log, 5), + } + } + allReceipts := make([]types.Receipts, n) + for i := 0; i < n; i++ { + allReceipts[i] = receipts + } + return allReceipts +} diff --git a/core/rawdb/database.go b/core/rawdb/database.go index 90619169a09b9..0e116ef999b92 100644 --- a/core/rawdb/database.go +++ b/core/rawdb/database.go @@ -104,9 +104,9 @@ func (db *nofreezedb) AncientSize(kind string) (uint64, error) { return 0, errNotSupported } -// AppendAncient returns an error as we don't have a backing chain freezer. -func (db *nofreezedb) AppendAncient(number uint64, hash, header, body, receipts, td []byte) error { - return errNotSupported +// ModifyAncients is not supported. +func (db *nofreezedb) ModifyAncients(func(ethdb.AncientWriteOp) error) (int64, error) { + return 0, errNotSupported } // TruncateAncients returns an error as we don't have a backing chain freezer. @@ -122,9 +122,7 @@ func (db *nofreezedb) Sync() error { // NewDatabase creates a high level database on top of a given key-value data // store without a freezer moving immutable chain segments into cold storage. func NewDatabase(db ethdb.KeyValueStore) ethdb.Database { - return &nofreezedb{ - KeyValueStore: db, - } + return &nofreezedb{KeyValueStore: db} } // NewDatabaseWithFreezer creates a high level database on top of a given key- @@ -132,7 +130,7 @@ func NewDatabase(db ethdb.KeyValueStore) ethdb.Database { // storage. func NewDatabaseWithFreezer(db ethdb.KeyValueStore, freezer string, namespace string, readonly bool) (ethdb.Database, error) { // Create the idle freezer instance - frdb, err := newFreezer(freezer, namespace, readonly) + frdb, err := newFreezer(freezer, namespace, readonly, freezerTableSize, FreezerNoSnappy) if err != nil { return nil, err } diff --git a/core/rawdb/freezer.go b/core/rawdb/freezer.go index 4262a615a7b67..4cecbc50f480d 100644 --- a/core/rawdb/freezer.go +++ b/core/rawdb/freezer.go @@ -61,6 +61,9 @@ const ( // freezerBatchLimit is the maximum number of blocks to freeze in one batch // before doing an fsync and deleting it from the key-value store. freezerBatchLimit = 30000 + + // freezerTableSize defines the maximum size of freezer data files. + freezerTableSize = 2 * 1000 * 1000 * 1000 ) // freezer is an memory mapped append-only database to store immutable chain data @@ -77,6 +80,10 @@ type freezer struct { frozen uint64 // Number of blocks already frozen threshold uint64 // Number of recent blocks not to freeze (params.FullImmutabilityThreshold apart from tests) + // This lock synchronizes writers and the truncate operation. + writeLock sync.Mutex + writeBatch *freezerBatch + readonly bool tables map[string]*freezerTable // Data tables for storing everything instanceLock fileutil.Releaser // File-system lock to prevent double opens @@ -90,7 +97,10 @@ type freezer struct { // newFreezer creates a chain freezer that moves ancient chain data into // append-only flat file containers. -func newFreezer(datadir string, namespace string, readonly bool) (*freezer, error) { +// +// The 'tables' argument defines the data tables. If the value of a map +// entry is true, snappy compression is disabled for the table. +func newFreezer(datadir string, namespace string, readonly bool, maxTableSize uint32, tables map[string]bool) (*freezer, error) { // Create the initial freezer object var ( readMeter = metrics.NewRegisteredMeter(namespace+"ancient/read", nil) @@ -119,8 +129,10 @@ func newFreezer(datadir string, namespace string, readonly bool) (*freezer, erro trigger: make(chan chan struct{}), quit: make(chan struct{}), } - for name, disableSnappy := range FreezerNoSnappy { - table, err := newTable(datadir, name, readMeter, writeMeter, sizeGauge, disableSnappy) + + // Create the tables. + for name, disableSnappy := range tables { + table, err := newTable(datadir, name, readMeter, writeMeter, sizeGauge, maxTableSize, disableSnappy) if err != nil { for _, table := range freezer.tables { table.Close() @@ -130,6 +142,8 @@ func newFreezer(datadir string, namespace string, readonly bool) (*freezer, erro } freezer.tables[name] = table } + + // Truncate all tables to common length. if err := freezer.repair(); err != nil { for _, table := range freezer.tables { table.Close() @@ -137,12 +151,19 @@ func newFreezer(datadir string, namespace string, readonly bool) (*freezer, erro lock.Release() return nil, err } + + // Create the write batch. + freezer.writeBatch = newFreezerBatch(freezer) + log.Info("Opened ancient database", "database", datadir, "readonly", readonly) return freezer, nil } // Close terminates the chain freezer, unmapping all the data files. func (f *freezer) Close() error { + f.writeLock.Lock() + defer f.writeLock.Unlock() + var errs []error f.closeOnce.Do(func() { close(f.quit) @@ -199,60 +220,49 @@ func (f *freezer) Ancients() (uint64, error) { // AncientSize returns the ancient size of the specified category. func (f *freezer) AncientSize(kind string) (uint64, error) { + // This needs the write lock to avoid data races on table fields. + // Speed doesn't matter here, AncientSize is for debugging. + f.writeLock.Lock() + defer f.writeLock.Unlock() + if table := f.tables[kind]; table != nil { return table.size() } return 0, errUnknownTable } -// AppendAncient injects all binary blobs belong to block at the end of the -// append-only immutable table files. -// -// Notably, this function is lock free but kind of thread-safe. All out-of-order -// injection will be rejected. But if two injections with same number happen at -// the same time, we can get into the trouble. -func (f *freezer) AppendAncient(number uint64, hash, header, body, receipts, td []byte) (err error) { +// ModifyAncients runs the given write operation. +func (f *freezer) ModifyAncients(fn func(ethdb.AncientWriteOp) error) (writeSize int64, err error) { if f.readonly { - return errReadOnly - } - // Ensure the binary blobs we are appending is continuous with freezer. - if atomic.LoadUint64(&f.frozen) != number { - return errOutOrderInsertion + return 0, errReadOnly } - // Rollback all inserted data if any insertion below failed to ensure - // the tables won't out of sync. + f.writeLock.Lock() + defer f.writeLock.Unlock() + + // Roll back all tables to the starting position in case of error. + prevItem := f.frozen defer func() { if err != nil { - rerr := f.repair() - if rerr != nil { - log.Crit("Failed to repair freezer", "err", rerr) + // The write operation has failed. Go back to the previous item position. + for name, table := range f.tables { + err := table.truncate(prevItem) + if err != nil { + log.Error("Freezer table roll-back failed", "table", name, "index", prevItem, "err", err) + } } - log.Info("Append ancient failed", "number", number, "err", err) } }() - // Inject all the components into the relevant data tables - if err := f.tables[freezerHashTable].Append(f.frozen, hash[:]); err != nil { - log.Error("Failed to append ancient hash", "number", f.frozen, "hash", hash, "err", err) - return err - } - if err := f.tables[freezerHeaderTable].Append(f.frozen, header); err != nil { - log.Error("Failed to append ancient header", "number", f.frozen, "hash", hash, "err", err) - return err - } - if err := f.tables[freezerBodiesTable].Append(f.frozen, body); err != nil { - log.Error("Failed to append ancient body", "number", f.frozen, "hash", hash, "err", err) - return err - } - if err := f.tables[freezerReceiptTable].Append(f.frozen, receipts); err != nil { - log.Error("Failed to append ancient receipts", "number", f.frozen, "hash", hash, "err", err) - return err + + f.writeBatch.reset() + if err := fn(f.writeBatch); err != nil { + return 0, err } - if err := f.tables[freezerDifficultyTable].Append(f.frozen, td); err != nil { - log.Error("Failed to append ancient difficulty", "number", f.frozen, "hash", hash, "err", err) - return err + item, writeSize, err := f.writeBatch.commit() + if err != nil { + return 0, err } - atomic.AddUint64(&f.frozen, 1) // Only modify atomically - return nil + atomic.StoreUint64(&f.frozen, item) + return writeSize, nil } // TruncateAncients discards any recent data above the provided threshold number. @@ -260,6 +270,9 @@ func (f *freezer) TruncateAncients(items uint64) error { if f.readonly { return errReadOnly } + f.writeLock.Lock() + defer f.writeLock.Unlock() + if atomic.LoadUint64(&f.frozen) <= items { return nil } @@ -286,6 +299,24 @@ func (f *freezer) Sync() error { return nil } +// repair truncates all data tables to the same length. +func (f *freezer) repair() error { + min := uint64(math.MaxUint64) + for _, table := range f.tables { + items := atomic.LoadUint64(&table.items) + if min > items { + min = items + } + } + for _, table := range f.tables { + if err := table.truncate(min); err != nil { + return err + } + } + atomic.StoreUint64(&f.frozen, min) + return nil +} + // freeze is a background thread that periodically checks the blockchain for any // import progress and moves ancient data from the fast database into the freezer. // @@ -352,54 +383,28 @@ func (f *freezer) freeze(db ethdb.KeyValueStore) { backoff = true continue } + // Seems we have data ready to be frozen, process in usable batches - limit := *number - threshold - if limit-f.frozen > freezerBatchLimit { - limit = f.frozen + freezerBatchLimit - } var ( start = time.Now() - first = f.frozen - ancients = make([]common.Hash, 0, limit-f.frozen) + first, _ = f.Ancients() + limit = *number - threshold ) - for f.frozen <= limit { - // Retrieves all the components of the canonical block - hash := ReadCanonicalHash(nfdb, f.frozen) - if hash == (common.Hash{}) { - log.Error("Canonical hash missing, can't freeze", "number", f.frozen) - break - } - header := ReadHeaderRLP(nfdb, hash, f.frozen) - if len(header) == 0 { - log.Error("Block header missing, can't freeze", "number", f.frozen, "hash", hash) - break - } - body := ReadBodyRLP(nfdb, hash, f.frozen) - if len(body) == 0 { - log.Error("Block body missing, can't freeze", "number", f.frozen, "hash", hash) - break - } - receipts := ReadReceiptsRLP(nfdb, hash, f.frozen) - if len(receipts) == 0 { - log.Error("Block receipts missing, can't freeze", "number", f.frozen, "hash", hash) - break - } - td := ReadTdRLP(nfdb, hash, f.frozen) - if len(td) == 0 { - log.Error("Total difficulty missing, can't freeze", "number", f.frozen, "hash", hash) - break - } - log.Trace("Deep froze ancient block", "number", f.frozen, "hash", hash) - // Inject all the components into the relevant data tables - if err := f.AppendAncient(f.frozen, hash[:], header, body, receipts, td); err != nil { - break - } - ancients = append(ancients, hash) + if limit-first > freezerBatchLimit { + limit = first + freezerBatchLimit + } + ancients, err := f.freezeRange(nfdb, first, limit) + if err != nil { + log.Error("Error in block freeze operation", "err", err) + backoff = true + continue } + // Batch of blocks have been frozen, flush them before wiping from leveldb if err := f.Sync(); err != nil { log.Crit("Failed to flush frozen tables", "err", err) } + // Wipe out all data from the active database batch := db.NewBatch() for i := 0; i < len(ancients); i++ { @@ -464,6 +469,7 @@ func (f *freezer) freeze(db ethdb.KeyValueStore) { log.Crit("Failed to delete dangling side blocks", "err", err) } } + // Log something friendly for the user context := []interface{}{ "blocks", f.frozen - first, "elapsed", common.PrettyDuration(time.Since(start)), "number", f.frozen - 1, @@ -480,20 +486,54 @@ func (f *freezer) freeze(db ethdb.KeyValueStore) { } } -// repair truncates all data tables to the same length. -func (f *freezer) repair() error { - min := uint64(math.MaxUint64) - for _, table := range f.tables { - items := atomic.LoadUint64(&table.items) - if min > items { - min = items - } - } - for _, table := range f.tables { - if err := table.truncate(min); err != nil { - return err +func (f *freezer) freezeRange(nfdb *nofreezedb, number, limit uint64) (hashes []common.Hash, err error) { + hashes = make([]common.Hash, 0, limit-number) + + _, err = f.ModifyAncients(func(op ethdb.AncientWriteOp) error { + for ; number <= limit; number++ { + // Retrieve all the components of the canonical block. + hash := ReadCanonicalHash(nfdb, number) + if hash == (common.Hash{}) { + return fmt.Errorf("canonical hash missing, can't freeze block %d", number) + } + header := ReadHeaderRLP(nfdb, hash, number) + if len(header) == 0 { + return fmt.Errorf("block header missing, can't freeze block %d", number) + } + body := ReadBodyRLP(nfdb, hash, number) + if len(body) == 0 { + return fmt.Errorf("block body missing, can't freeze block %d", number) + } + receipts := ReadReceiptsRLP(nfdb, hash, number) + if len(receipts) == 0 { + return fmt.Errorf("block receipts missing, can't freeze block %d", number) + } + td := ReadTdRLP(nfdb, hash, number) + if len(td) == 0 { + return fmt.Errorf("total difficulty missing, can't freeze block %d", number) + } + + // Write to the batch. + if err := op.AppendRaw(freezerHashTable, number, hash[:]); err != nil { + return fmt.Errorf("can't write hash to freezer: %v", err) + } + if err := op.AppendRaw(freezerHeaderTable, number, header); err != nil { + return fmt.Errorf("can't write header to freezer: %v", err) + } + if err := op.AppendRaw(freezerBodiesTable, number, body); err != nil { + return fmt.Errorf("can't write body to freezer: %v", err) + } + if err := op.AppendRaw(freezerReceiptTable, number, receipts); err != nil { + return fmt.Errorf("can't write receipts to freezer: %v", err) + } + if err := op.AppendRaw(freezerDifficultyTable, number, td); err != nil { + return fmt.Errorf("can't write td to freezer: %v", err) + } + + hashes = append(hashes, hash) } - } - atomic.StoreUint64(&f.frozen, min) - return nil + return nil + }) + + return hashes, err } diff --git a/core/rawdb/freezer_batch.go b/core/rawdb/freezer_batch.go new file mode 100644 index 0000000000000..8297c0ac1dd4e --- /dev/null +++ b/core/rawdb/freezer_batch.go @@ -0,0 +1,248 @@ +// Copyright 2021 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rawdb + +import ( + "fmt" + "sync/atomic" + + "github.com/ethereum/go-ethereum/common/math" + "github.com/ethereum/go-ethereum/rlp" + "github.com/golang/snappy" +) + +// This is the maximum amount of data that will be buffered in memory +// for a single freezer table batch. +const freezerBatchBufferLimit = 2 * 1024 * 1024 + +// freezerBatch is a write operation of multiple items on a freezer. +type freezerBatch struct { + tables map[string]*freezerTableBatch +} + +func newFreezerBatch(f *freezer) *freezerBatch { + batch := &freezerBatch{tables: make(map[string]*freezerTableBatch, len(f.tables))} + for kind, table := range f.tables { + batch.tables[kind] = table.newBatch() + } + return batch +} + +// Append adds an RLP-encoded item of the given kind. +func (batch *freezerBatch) Append(kind string, num uint64, item interface{}) error { + return batch.tables[kind].Append(num, item) +} + +// AppendRaw adds an item of the given kind. +func (batch *freezerBatch) AppendRaw(kind string, num uint64, item []byte) error { + return batch.tables[kind].AppendRaw(num, item) +} + +// reset initializes the batch. +func (batch *freezerBatch) reset() { + for _, tb := range batch.tables { + tb.reset() + } +} + +// commit is called at the end of a write operation and +// writes all remaining data to tables. +func (batch *freezerBatch) commit() (item uint64, writeSize int64, err error) { + // Check that count agrees on all batches. + item = uint64(math.MaxUint64) + for name, tb := range batch.tables { + if item < math.MaxUint64 && tb.curItem != item { + return 0, 0, fmt.Errorf("table %s is at item %d, want %d", name, tb.curItem, item) + } + item = tb.curItem + } + + // Commit all table batches. + for _, tb := range batch.tables { + if err := tb.commit(); err != nil { + return 0, 0, err + } + writeSize += tb.totalBytes + } + return item, writeSize, nil +} + +// freezerTableBatch is a batch for a freezer table. +type freezerTableBatch struct { + t *freezerTable + + sb *snappyBuffer + encBuffer writeBuffer + dataBuffer []byte + indexBuffer []byte + curItem uint64 // expected index of next append + totalBytes int64 // counts written bytes since reset +} + +// newBatch creates a new batch for the freezer table. +func (t *freezerTable) newBatch() *freezerTableBatch { + batch := &freezerTableBatch{t: t} + if !t.noCompression { + batch.sb = new(snappyBuffer) + } + batch.reset() + return batch +} + +// reset clears the batch for reuse. +func (batch *freezerTableBatch) reset() { + batch.dataBuffer = batch.dataBuffer[:0] + batch.indexBuffer = batch.indexBuffer[:0] + batch.curItem = atomic.LoadUint64(&batch.t.items) + batch.totalBytes = 0 +} + +// Append rlp-encodes and adds data at the end of the freezer table. The item number is a +// precautionary parameter to ensure data correctness, but the table will reject already +// existing data. +func (batch *freezerTableBatch) Append(item uint64, data interface{}) error { + if item != batch.curItem { + return errOutOrderInsertion + } + + // Encode the item. + batch.encBuffer.Reset() + if err := rlp.Encode(&batch.encBuffer, data); err != nil { + return err + } + encItem := batch.encBuffer.data + if batch.sb != nil { + encItem = batch.sb.compress(encItem) + } + return batch.appendItem(encItem) +} + +// AppendRaw injects a binary blob at the end of the freezer table. The item number is a +// precautionary parameter to ensure data correctness, but the table will reject already +// existing data. +func (batch *freezerTableBatch) AppendRaw(item uint64, blob []byte) error { + if item != batch.curItem { + return errOutOrderInsertion + } + + encItem := blob + if batch.sb != nil { + encItem = batch.sb.compress(blob) + } + return batch.appendItem(encItem) +} + +func (batch *freezerTableBatch) appendItem(data []byte) error { + // Check if item fits into current data file. + itemSize := int64(len(data)) + itemOffset := batch.t.headBytes + int64(len(batch.dataBuffer)) + if itemOffset+itemSize > int64(batch.t.maxFileSize) { + // It doesn't fit, go to next file first. + if err := batch.commit(); err != nil { + return err + } + if err := batch.t.advanceHead(); err != nil { + return err + } + itemOffset = 0 + } + + // Put data to buffer. + batch.dataBuffer = append(batch.dataBuffer, data...) + batch.totalBytes += itemSize + + // Put index entry to buffer. + entry := indexEntry{filenum: batch.t.headId, offset: uint32(itemOffset + itemSize)} + batch.indexBuffer = entry.append(batch.indexBuffer) + batch.curItem++ + + return batch.maybeCommit() +} + +// maybeCommit writes the buffered data if the buffer is full enough. +func (batch *freezerTableBatch) maybeCommit() error { + if len(batch.dataBuffer) > freezerBatchBufferLimit { + return batch.commit() + } + return nil +} + +// commit writes the batched items to the backing freezerTable. +func (batch *freezerTableBatch) commit() error { + // Write data. + _, err := batch.t.head.Write(batch.dataBuffer) + if err != nil { + return err + } + dataSize := int64(len(batch.dataBuffer)) + batch.dataBuffer = batch.dataBuffer[:0] + + // Write index. + _, err = batch.t.index.Write(batch.indexBuffer) + if err != nil { + return err + } + indexSize := int64(len(batch.indexBuffer)) + batch.indexBuffer = batch.indexBuffer[:0] + + // Update headBytes of table. + batch.t.headBytes += dataSize + atomic.StoreUint64(&batch.t.items, batch.curItem) + + // Update metrics. + batch.t.sizeGauge.Inc(dataSize + indexSize) + batch.t.writeMeter.Mark(dataSize + indexSize) + return nil +} + +// snappyBuffer writes snappy in block format, and can be reused. It is +// reset when WriteTo is called. +type snappyBuffer struct { + dst []byte +} + +// compress snappy-compresses the data. +func (s *snappyBuffer) compress(data []byte) []byte { + // The snappy library does not care what the capacity of the buffer is, + // but only checks the length. If the length is too small, it will + // allocate a brand new buffer. + // To avoid that, we check the required size here, and grow the size of the + // buffer to utilize the full capacity. + if n := snappy.MaxEncodedLen(len(data)); len(s.dst) < n { + if cap(s.dst) < n { + s.dst = make([]byte, n) + } + s.dst = s.dst[:n] + } + + s.dst = snappy.Encode(s.dst, data) + return s.dst +} + +// writeBuffer implements io.Writer for a byte slice. +type writeBuffer struct { + data []byte +} + +func (wb *writeBuffer) Write(data []byte) (int, error) { + wb.data = append(wb.data, data...) + return len(data), nil +} + +func (wb *writeBuffer) Reset() { + wb.data = wb.data[:0] +} diff --git a/core/rawdb/freezer_table.go b/core/rawdb/freezer_table.go index 9d052f7cd8d37..22405cf9b4f89 100644 --- a/core/rawdb/freezer_table.go +++ b/core/rawdb/freezer_table.go @@ -17,6 +17,7 @@ package rawdb import ( + "bytes" "encoding/binary" "errors" "fmt" @@ -55,19 +56,20 @@ type indexEntry struct { const indexEntrySize = 6 -// unmarshallBinary deserializes binary b into the rawIndex entry. +// unmarshalBinary deserializes binary b into the rawIndex entry. func (i *indexEntry) unmarshalBinary(b []byte) error { i.filenum = uint32(binary.BigEndian.Uint16(b[:2])) i.offset = binary.BigEndian.Uint32(b[2:6]) return nil } -// marshallBinary serializes the rawIndex entry into binary. -func (i *indexEntry) marshallBinary() []byte { - b := make([]byte, indexEntrySize) - binary.BigEndian.PutUint16(b[:2], uint16(i.filenum)) - binary.BigEndian.PutUint32(b[2:6], i.offset) - return b +// append adds the encoded entry to the end of b. +func (i *indexEntry) append(b []byte) []byte { + offset := len(b) + out := append(b, make([]byte, indexEntrySize)...) + binary.BigEndian.PutUint16(out[offset:], uint16(i.filenum)) + binary.BigEndian.PutUint32(out[offset+2:], i.offset) + return out } // bounds returns the start- and end- offsets, and the file number of where to @@ -107,7 +109,7 @@ type freezerTable struct { // to count how many historic items have gone missing. itemOffset uint32 // Offset (number of discarded items) - headBytes uint32 // Number of bytes written to the head file + headBytes int64 // Number of bytes written to the head file readMeter metrics.Meter // Meter for measuring the effective amount of data read writeMeter metrics.Meter // Meter for measuring the effective amount of data written sizeGauge metrics.Gauge // Gauge for tracking the combined size of all freezer tables @@ -118,12 +120,7 @@ type freezerTable struct { // NewFreezerTable opens the given path as a freezer table. func NewFreezerTable(path, name string, disableSnappy bool) (*freezerTable, error) { - return newTable(path, name, metrics.NilMeter{}, metrics.NilMeter{}, metrics.NilGauge{}, disableSnappy) -} - -// newTable opens a freezer table with default settings - 2G files -func newTable(path string, name string, readMeter metrics.Meter, writeMeter metrics.Meter, sizeGauge metrics.Gauge, disableSnappy bool) (*freezerTable, error) { - return newCustomTable(path, name, readMeter, writeMeter, sizeGauge, 2*1000*1000*1000, disableSnappy) + return newTable(path, name, metrics.NilMeter{}, metrics.NilMeter{}, metrics.NilGauge{}, freezerTableSize, disableSnappy) } // openFreezerFileForAppend opens a freezer table file and seeks to the end @@ -164,10 +161,10 @@ func truncateFreezerFile(file *os.File, size int64) error { return nil } -// newCustomTable opens a freezer table, creating the data and index files if they are +// newTable opens a freezer table, creating the data and index files if they are // non existent. Both files are truncated to the shortest common length to ensure // they don't go out of sync. -func newCustomTable(path string, name string, readMeter metrics.Meter, writeMeter metrics.Meter, sizeGauge metrics.Gauge, maxFilesize uint32, noCompression bool) (*freezerTable, error) { +func newTable(path string, name string, readMeter metrics.Meter, writeMeter metrics.Meter, sizeGauge metrics.Gauge, maxFilesize uint32, noCompression bool) (*freezerTable, error) { // Ensure the containing directory exists and open the indexEntry file if err := os.MkdirAll(path, 0755); err != nil { return nil, err @@ -313,7 +310,7 @@ func (t *freezerTable) repair() error { } // Update the item and byte counters and return t.items = uint64(t.itemOffset) + uint64(offsetsSize/indexEntrySize-1) // last indexEntry points to the end of the data file - t.headBytes = uint32(contentSize) + t.headBytes = contentSize t.headId = lastIndex.filenum // Close opened files and preopen all files @@ -387,14 +384,14 @@ func (t *freezerTable) truncate(items uint64) error { t.releaseFilesAfter(expected.filenum, true) // Set back the historic head t.head = newHead - atomic.StoreUint32(&t.headId, expected.filenum) + t.headId = expected.filenum } if err := truncateFreezerFile(t.head, int64(expected.offset)); err != nil { return err } // All data files truncated, set internal counters and return + t.headBytes = int64(expected.offset) atomic.StoreUint64(&t.items, items) - atomic.StoreUint32(&t.headBytes, expected.offset) // Retrieve the new size and update the total size counter newSize, err := t.sizeNolock() @@ -471,94 +468,6 @@ func (t *freezerTable) releaseFilesAfter(num uint32, remove bool) { } } -// Append injects a binary blob at the end of the freezer table. The item number -// is a precautionary parameter to ensure data correctness, but the table will -// reject already existing data. -// -// Note, this method will *not* flush any data to disk so be sure to explicitly -// fsync before irreversibly deleting data from the database. -func (t *freezerTable) Append(item uint64, blob []byte) error { - // Encode the blob before the lock portion - if !t.noCompression { - blob = snappy.Encode(nil, blob) - } - // Read lock prevents competition with truncate - retry, err := t.append(item, blob, false) - if err != nil { - return err - } - if retry { - // Read lock was insufficient, retry with a writelock - _, err = t.append(item, blob, true) - } - return err -} - -// append injects a binary blob at the end of the freezer table. -// Normally, inserts do not require holding the write-lock, so it should be invoked with 'wlock' set to -// false. -// However, if the data will grown the current file out of bounds, then this -// method will return 'true, nil', indicating that the caller should retry, this time -// with 'wlock' set to true. -func (t *freezerTable) append(item uint64, encodedBlob []byte, wlock bool) (bool, error) { - if wlock { - t.lock.Lock() - defer t.lock.Unlock() - } else { - t.lock.RLock() - defer t.lock.RUnlock() - } - // Ensure the table is still accessible - if t.index == nil || t.head == nil { - return false, errClosed - } - // Ensure only the next item can be written, nothing else - if atomic.LoadUint64(&t.items) != item { - return false, fmt.Errorf("appending unexpected item: want %d, have %d", t.items, item) - } - bLen := uint32(len(encodedBlob)) - if t.headBytes+bLen < bLen || - t.headBytes+bLen > t.maxFileSize { - // Writing would overflow, so we need to open a new data file. - // If we don't already hold the writelock, abort and let the caller - // invoke this method a second time. - if !wlock { - return true, nil - } - nextID := atomic.LoadUint32(&t.headId) + 1 - // We open the next file in truncated mode -- if this file already - // exists, we need to start over from scratch on it - newHead, err := t.openFile(nextID, openFreezerFileTruncated) - if err != nil { - return false, err - } - // Close old file, and reopen in RDONLY mode - t.releaseFile(t.headId) - t.openFile(t.headId, openFreezerFileForReadOnly) - - // Swap out the current head - t.head = newHead - atomic.StoreUint32(&t.headBytes, 0) - atomic.StoreUint32(&t.headId, nextID) - } - if _, err := t.head.Write(encodedBlob); err != nil { - return false, err - } - newOffset := atomic.AddUint32(&t.headBytes, bLen) - idx := indexEntry{ - filenum: atomic.LoadUint32(&t.headId), - offset: newOffset, - } - // Write indexEntry - t.index.Write(idx.marshallBinary()) - - t.writeMeter.Mark(int64(bLen + indexEntrySize)) - t.sizeGauge.Inc(int64(bLen + indexEntrySize)) - - atomic.AddUint64(&t.items, 1) - return false, nil -} - // getIndices returns the index entries for the given from-item, covering 'count' items. // N.B: The actual number of returned indices for N items will always be N+1 (unless an // error is returned). @@ -651,6 +560,7 @@ func (t *freezerTable) RetrieveItems(start, count, maxBytes uint64) ([][]byte, e func (t *freezerTable) retrieveItems(start, count, maxBytes uint64) ([]byte, []int, error) { t.lock.RLock() defer t.lock.RUnlock() + // Ensure the table and the item is accessible if t.index == nil || t.head == nil { return nil, nil, errClosed @@ -763,6 +673,32 @@ func (t *freezerTable) sizeNolock() (uint64, error) { return total, nil } +// advanceHead should be called when the current head file would outgrow the file limits, +// and a new file must be opened. The caller of this method must hold the write-lock +// before calling this method. +func (t *freezerTable) advanceHead() error { + t.lock.Lock() + defer t.lock.Unlock() + + // We open the next file in truncated mode -- if this file already + // exists, we need to start over from scratch on it. + nextID := t.headId + 1 + newHead, err := t.openFile(nextID, openFreezerFileTruncated) + if err != nil { + return err + } + + // Close old file, and reopen in RDONLY mode. + t.releaseFile(t.headId) + t.openFile(t.headId, openFreezerFileForReadOnly) + + // Swap out the current head. + t.head = newHead + t.headBytes = 0 + t.headId = nextID + return nil +} + // Sync pushes any pending data from memory out to disk. This is an expensive // operation, so use it with care. func (t *freezerTable) Sync() error { @@ -775,10 +711,21 @@ func (t *freezerTable) Sync() error { // DumpIndex is a debug print utility function, mainly for testing. It can also // be used to analyse a live freezer table index. func (t *freezerTable) DumpIndex(start, stop int64) { + t.dumpIndex(os.Stdout, start, stop) +} + +func (t *freezerTable) dumpIndexString(start, stop int64) string { + var out bytes.Buffer + out.WriteString("\n") + t.dumpIndex(&out, start, stop) + return out.String() +} + +func (t *freezerTable) dumpIndex(w io.Writer, start, stop int64) { buf := make([]byte, indexEntrySize) - fmt.Printf("| number | fileno | offset |\n") - fmt.Printf("|--------|--------|--------|\n") + fmt.Fprintf(w, "| number | fileno | offset |\n") + fmt.Fprintf(w, "|--------|--------|--------|\n") for i := uint64(start); ; i++ { if _, err := t.index.ReadAt(buf, int64(i*indexEntrySize)); err != nil { @@ -786,10 +733,10 @@ func (t *freezerTable) DumpIndex(start, stop int64) { } var entry indexEntry entry.unmarshalBinary(buf) - fmt.Printf("| %03d | %03d | %03d | \n", i, entry.filenum, entry.offset) + fmt.Fprintf(w, "| %03d | %03d | %03d | \n", i, entry.filenum, entry.offset) if stop > 0 && i >= uint64(stop) { break } } - fmt.Printf("|--------------------------|\n") + fmt.Fprintf(w, "|--------------------------|\n") } diff --git a/core/rawdb/freezer_table_test.go b/core/rawdb/freezer_table_test.go index e8a8b5c46309e..803809b5207f4 100644 --- a/core/rawdb/freezer_table_test.go +++ b/core/rawdb/freezer_table_test.go @@ -18,49 +18,36 @@ package rawdb import ( "bytes" - "encoding/binary" "fmt" - "io/ioutil" "math/rand" "os" "path/filepath" - "sync" "testing" "time" "github.com/ethereum/go-ethereum/metrics" + "github.com/stretchr/testify/require" ) func init() { rand.Seed(time.Now().Unix()) } -// Gets a chunk of data, filled with 'b' -func getChunk(size int, b int) []byte { - data := make([]byte, size) - for i := range data { - data[i] = byte(b) - } - return data -} - // TestFreezerBasics test initializing a freezertable from scratch, writing to the table, // and reading it back. func TestFreezerBasics(t *testing.T) { t.Parallel() // set cutoff at 50 bytes - f, err := newCustomTable(os.TempDir(), + f, err := newTable(os.TempDir(), fmt.Sprintf("unittest-%d", rand.Uint64()), metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge(), 50, true) if err != nil { t.Fatal(err) } defer f.Close() + // Write 15 bytes 255 times, results in 85 files - for x := 0; x < 255; x++ { - data := getChunk(15, x) - f.Append(uint64(x), data) - } + writeChunks(t, f, 255, 15) //print(t, f, 0) //print(t, f, 1) @@ -98,16 +85,21 @@ func TestFreezerBasicsClosing(t *testing.T) { f *freezerTable err error ) - f, err = newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true) + f, err = newTable(os.TempDir(), fname, rm, wm, sg, 50, true) if err != nil { t.Fatal(err) } - // Write 15 bytes 255 times, results in 85 files + + // Write 15 bytes 255 times, results in 85 files. + // In-between writes, the table is closed and re-opened. for x := 0; x < 255; x++ { data := getChunk(15, x) - f.Append(uint64(x), data) + batch := f.newBatch() + require.NoError(t, batch.AppendRaw(uint64(x), data)) + require.NoError(t, batch.commit()) f.Close() - f, err = newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true) + + f, err = newTable(os.TempDir(), fname, rm, wm, sg, 50, true) if err != nil { t.Fatal(err) } @@ -124,7 +116,7 @@ func TestFreezerBasicsClosing(t *testing.T) { t.Fatalf("test %d, got \n%x != \n%x", y, got, exp) } f.Close() - f, err = newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true) + f, err = newTable(os.TempDir(), fname, rm, wm, sg, 50, true) if err != nil { t.Fatal(err) } @@ -137,22 +129,22 @@ func TestFreezerRepairDanglingHead(t *testing.T) { rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge() fname := fmt.Sprintf("dangling_headtest-%d", rand.Uint64()) - { // Fill table - f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true) + // Fill table + { + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true) if err != nil { t.Fatal(err) } // Write 15 bytes 255 times - for x := 0; x < 255; x++ { - data := getChunk(15, x) - f.Append(uint64(x), data) - } + writeChunks(t, f, 255, 15) + // The last item should be there if _, err = f.Retrieve(0xfe); err != nil { t.Fatal(err) } f.Close() } + // open the index idxFile, err := os.OpenFile(filepath.Join(os.TempDir(), fmt.Sprintf("%s.ridx", fname)), os.O_RDWR, 0644) if err != nil { @@ -165,9 +157,10 @@ func TestFreezerRepairDanglingHead(t *testing.T) { } idxFile.Truncate(stat.Size() - 4) idxFile.Close() + // Now open it again { - f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true) + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true) if err != nil { t.Fatal(err) } @@ -188,22 +181,22 @@ func TestFreezerRepairDanglingHeadLarge(t *testing.T) { rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge() fname := fmt.Sprintf("dangling_headtest-%d", rand.Uint64()) - { // Fill a table and close it - f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true) + // Fill a table and close it + { + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true) if err != nil { t.Fatal(err) } // Write 15 bytes 255 times - for x := 0; x < 0xff; x++ { - data := getChunk(15, x) - f.Append(uint64(x), data) - } + writeChunks(t, f, 255, 15) + // The last item should be there if _, err = f.Retrieve(f.items - 1); err != nil { t.Fatal(err) } f.Close() } + // open the index idxFile, err := os.OpenFile(filepath.Join(os.TempDir(), fmt.Sprintf("%s.ridx", fname)), os.O_RDWR, 0644) if err != nil { @@ -213,9 +206,10 @@ func TestFreezerRepairDanglingHeadLarge(t *testing.T) { // 0-indexEntry, 1-indexEntry, corrupt-indexEntry idxFile.Truncate(indexEntrySize + indexEntrySize + indexEntrySize/2) idxFile.Close() + // Now open it again { - f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true) + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true) if err != nil { t.Fatal(err) } @@ -228,15 +222,17 @@ func TestFreezerRepairDanglingHeadLarge(t *testing.T) { t.Errorf("Expected error for missing index entry") } // We should now be able to store items again, from item = 1 + batch := f.newBatch() for x := 1; x < 0xff; x++ { - data := getChunk(15, ^x) - f.Append(uint64(x), data) + require.NoError(t, batch.AppendRaw(uint64(x), getChunk(15, ^x))) } + require.NoError(t, batch.commit()) f.Close() } + // And if we open it, we should now be able to read all of them (new values) { - f, _ := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true) + f, _ := newTable(os.TempDir(), fname, rm, wm, sg, 50, true) for y := 1; y < 255; y++ { exp := getChunk(15, ^y) got, err := f.Retrieve(uint64(y)) @@ -255,22 +251,21 @@ func TestSnappyDetection(t *testing.T) { t.Parallel() rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge() fname := fmt.Sprintf("snappytest-%d", rand.Uint64()) + // Open with snappy { - f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true) + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true) if err != nil { t.Fatal(err) } // Write 15 bytes 255 times - for x := 0; x < 0xff; x++ { - data := getChunk(15, x) - f.Append(uint64(x), data) - } + writeChunks(t, f, 255, 15) f.Close() } + // Open without snappy { - f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, false) + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, false) if err != nil { t.Fatal(err) } @@ -282,7 +277,7 @@ func TestSnappyDetection(t *testing.T) { // Open with snappy { - f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true) + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true) if err != nil { t.Fatal(err) } @@ -292,8 +287,8 @@ func TestSnappyDetection(t *testing.T) { t.Fatalf("expected no error, got %v", err) } } - } + func assertFileSize(f string, size int64) error { stat, err := os.Stat(f) if err != nil { @@ -303,7 +298,6 @@ func assertFileSize(f string, size int64) error { return fmt.Errorf("error, expected size %d, got %d", size, stat.Size()) } return nil - } // TestFreezerRepairDanglingIndex checks that if the index has more entries than there are data, @@ -313,16 +307,15 @@ func TestFreezerRepairDanglingIndex(t *testing.T) { rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge() fname := fmt.Sprintf("dangling_indextest-%d", rand.Uint64()) - { // Fill a table and close it - f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true) + // Fill a table and close it + { + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true) if err != nil { t.Fatal(err) } // Write 15 bytes 9 times : 150 bytes - for x := 0; x < 9; x++ { - data := getChunk(15, x) - f.Append(uint64(x), data) - } + writeChunks(t, f, 9, 15) + // The last item should be there if _, err = f.Retrieve(f.items - 1); err != nil { f.Close() @@ -331,6 +324,7 @@ func TestFreezerRepairDanglingIndex(t *testing.T) { f.Close() // File sizes should be 45, 45, 45 : items[3, 3, 3) } + // Crop third file fileToCrop := filepath.Join(os.TempDir(), fmt.Sprintf("%s.0002.rdat", fname)) // Truncate third file: 45 ,45, 20 @@ -345,17 +339,18 @@ func TestFreezerRepairDanglingIndex(t *testing.T) { file.Truncate(20) file.Close() } + // Open db it again // It should restore the file(s) to // 45, 45, 15 // with 3+3+1 items { - f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true) + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true) if err != nil { t.Fatal(err) } + defer f.Close() if f.items != 7 { - f.Close() t.Fatalf("expected %d items, got %d", 7, f.items) } if err := assertFileSize(fileToCrop, 15); err != nil { @@ -365,30 +360,29 @@ func TestFreezerRepairDanglingIndex(t *testing.T) { } func TestFreezerTruncate(t *testing.T) { - t.Parallel() rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge() fname := fmt.Sprintf("truncation-%d", rand.Uint64()) - { // Fill table - f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true) + // Fill table + { + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true) if err != nil { t.Fatal(err) } // Write 15 bytes 30 times - for x := 0; x < 30; x++ { - data := getChunk(15, x) - f.Append(uint64(x), data) - } + writeChunks(t, f, 30, 15) + // The last item should be there if _, err = f.Retrieve(f.items - 1); err != nil { t.Fatal(err) } f.Close() } + // Reopen, truncate { - f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true) + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true) if err != nil { t.Fatal(err) } @@ -401,9 +395,7 @@ func TestFreezerTruncate(t *testing.T) { if f.headBytes != 15 { t.Fatalf("expected %d bytes, got %d", 15, f.headBytes) } - } - } // TestFreezerRepairFirstFile tests a head file with the very first item only half-written. @@ -412,20 +404,26 @@ func TestFreezerRepairFirstFile(t *testing.T) { t.Parallel() rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge() fname := fmt.Sprintf("truncationfirst-%d", rand.Uint64()) - { // Fill table - f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true) + + // Fill table + { + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true) if err != nil { t.Fatal(err) } // Write 80 bytes, splitting out into two files - f.Append(0, getChunk(40, 0xFF)) - f.Append(1, getChunk(40, 0xEE)) + batch := f.newBatch() + require.NoError(t, batch.AppendRaw(0, getChunk(40, 0xFF))) + require.NoError(t, batch.AppendRaw(1, getChunk(40, 0xEE))) + require.NoError(t, batch.commit()) + // The last item should be there - if _, err = f.Retrieve(f.items - 1); err != nil { + if _, err = f.Retrieve(1); err != nil { t.Fatal(err) } f.Close() } + // Truncate the file in half fileToCrop := filepath.Join(os.TempDir(), fmt.Sprintf("%s.0001.rdat", fname)) { @@ -439,9 +437,10 @@ func TestFreezerRepairFirstFile(t *testing.T) { file.Truncate(20) file.Close() } + // Reopen { - f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true) + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true) if err != nil { t.Fatal(err) } @@ -449,9 +448,14 @@ func TestFreezerRepairFirstFile(t *testing.T) { f.Close() t.Fatalf("expected %d items, got %d", 0, f.items) } + // Write 40 bytes - f.Append(1, getChunk(40, 0xDD)) + batch := f.newBatch() + require.NoError(t, batch.AppendRaw(1, getChunk(40, 0xDD))) + require.NoError(t, batch.commit()) + f.Close() + // Should have been truncated down to zero and then 40 written if err := assertFileSize(fileToCrop, 40); err != nil { t.Fatal(err) @@ -468,25 +472,26 @@ func TestFreezerReadAndTruncate(t *testing.T) { t.Parallel() rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge() fname := fmt.Sprintf("read_truncate-%d", rand.Uint64()) - { // Fill table - f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true) + + // Fill table + { + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true) if err != nil { t.Fatal(err) } // Write 15 bytes 30 times - for x := 0; x < 30; x++ { - data := getChunk(15, x) - f.Append(uint64(x), data) - } + writeChunks(t, f, 30, 15) + // The last item should be there if _, err = f.Retrieve(f.items - 1); err != nil { t.Fatal(err) } f.Close() } + // Reopen and read all files { - f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true) + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true) if err != nil { t.Fatal(err) } @@ -497,40 +502,48 @@ func TestFreezerReadAndTruncate(t *testing.T) { for y := byte(0); y < 30; y++ { f.Retrieve(uint64(y)) } + // Now, truncate back to zero f.truncate(0) + // Write the data again + batch := f.newBatch() for x := 0; x < 30; x++ { - data := getChunk(15, ^x) - if err := f.Append(uint64(x), data); err != nil { - t.Fatalf("error %v", err) - } + require.NoError(t, batch.AppendRaw(uint64(x), getChunk(15, ^x))) } + require.NoError(t, batch.commit()) f.Close() } } -func TestOffset(t *testing.T) { +func TestFreezerOffset(t *testing.T) { t.Parallel() rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge() fname := fmt.Sprintf("offset-%d", rand.Uint64()) - { // Fill table - f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 40, true) + + // Fill table + { + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 40, true) if err != nil { t.Fatal(err) } + // Write 6 x 20 bytes, splitting out into three files - f.Append(0, getChunk(20, 0xFF)) - f.Append(1, getChunk(20, 0xEE)) + batch := f.newBatch() + require.NoError(t, batch.AppendRaw(0, getChunk(20, 0xFF))) + require.NoError(t, batch.AppendRaw(1, getChunk(20, 0xEE))) - f.Append(2, getChunk(20, 0xdd)) - f.Append(3, getChunk(20, 0xcc)) + require.NoError(t, batch.AppendRaw(2, getChunk(20, 0xdd))) + require.NoError(t, batch.AppendRaw(3, getChunk(20, 0xcc))) - f.Append(4, getChunk(20, 0xbb)) - f.Append(5, getChunk(20, 0xaa)) - f.DumpIndex(0, 100) + require.NoError(t, batch.AppendRaw(4, getChunk(20, 0xbb))) + require.NoError(t, batch.AppendRaw(5, getChunk(20, 0xaa))) + require.NoError(t, batch.commit()) + + t.Log(f.dumpIndexString(0, 100)) f.Close() } + // Now crop it. { // delete files 0 and 1 @@ -558,7 +571,7 @@ func TestOffset(t *testing.T) { filenum: tailId, offset: itemOffset, } - buf := zeroIndex.marshallBinary() + buf := zeroIndex.append(nil) // Overwrite index zero copy(indexBuf, buf) // Remove the four next indices by overwriting @@ -567,44 +580,36 @@ func TestOffset(t *testing.T) { // Need to truncate the moved index items indexFile.Truncate(indexEntrySize * (1 + 2)) indexFile.Close() - } + // Now open again - checkPresent := func(numDeleted uint64) { - f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 40, true) + { + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 40, true) if err != nil { t.Fatal(err) } - f.DumpIndex(0, 100) - // It should allow writing item 6 - f.Append(numDeleted+2, getChunk(20, 0x99)) - - // It should be fine to fetch 4,5,6 - if got, err := f.Retrieve(numDeleted); err != nil { - t.Fatal(err) - } else if exp := getChunk(20, 0xbb); !bytes.Equal(got, exp) { - t.Fatalf("expected %x got %x", exp, got) - } - if got, err := f.Retrieve(numDeleted + 1); err != nil { - t.Fatal(err) - } else if exp := getChunk(20, 0xaa); !bytes.Equal(got, exp) { - t.Fatalf("expected %x got %x", exp, got) - } - if got, err := f.Retrieve(numDeleted + 2); err != nil { - t.Fatal(err) - } else if exp := getChunk(20, 0x99); !bytes.Equal(got, exp) { - t.Fatalf("expected %x got %x", exp, got) - } - - // It should error at 0, 1,2,3 - for i := numDeleted - 1; i > numDeleted-10; i-- { - if _, err := f.Retrieve(i); err == nil { - t.Fatal("expected err") - } - } - } - checkPresent(4) - // Now, let's pretend we have deleted 1M items + defer f.Close() + t.Log(f.dumpIndexString(0, 100)) + + // It should allow writing item 6. + batch := f.newBatch() + require.NoError(t, batch.AppendRaw(6, getChunk(20, 0x99))) + require.NoError(t, batch.commit()) + + checkRetrieveError(t, f, map[uint64]error{ + 0: errOutOfBounds, + 1: errOutOfBounds, + 2: errOutOfBounds, + 3: errOutOfBounds, + }) + checkRetrieve(t, f, map[uint64][]byte{ + 4: getChunk(20, 0xbb), + 5: getChunk(20, 0xaa), + 6: getChunk(20, 0x99), + }) + } + + // Edit the index again, with a much larger initial offset of 1M. { // Read the index file p := filepath.Join(os.TempDir(), fmt.Sprintf("%v.ridx", fname)) @@ -624,13 +629,71 @@ func TestOffset(t *testing.T) { offset: itemOffset, filenum: tailId, } - buf := zeroIndex.marshallBinary() + buf := zeroIndex.append(nil) // Overwrite index zero copy(indexBuf, buf) indexFile.WriteAt(indexBuf, 0) indexFile.Close() } - checkPresent(1000000) + + // Check that existing items have been moved to index 1M. + { + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 40, true) + if err != nil { + t.Fatal(err) + } + defer f.Close() + t.Log(f.dumpIndexString(0, 100)) + + checkRetrieveError(t, f, map[uint64]error{ + 0: errOutOfBounds, + 1: errOutOfBounds, + 2: errOutOfBounds, + 3: errOutOfBounds, + 999999: errOutOfBounds, + }) + checkRetrieve(t, f, map[uint64][]byte{ + 1000000: getChunk(20, 0xbb), + 1000001: getChunk(20, 0xaa), + }) + } +} + +func checkRetrieve(t *testing.T, f *freezerTable, items map[uint64][]byte) { + t.Helper() + + for item, wantBytes := range items { + value, err := f.Retrieve(item) + if err != nil { + t.Fatalf("can't get expected item %d: %v", item, err) + } + if !bytes.Equal(value, wantBytes) { + t.Fatalf("item %d has wrong value %x (want %x)", item, value, wantBytes) + } + } +} + +func checkRetrieveError(t *testing.T, f *freezerTable, items map[uint64]error) { + t.Helper() + + for item, wantError := range items { + value, err := f.Retrieve(item) + if err == nil { + t.Fatalf("unexpected value %x for item %d, want error %v", item, value, wantError) + } + if err != wantError { + t.Fatalf("wrong error for item %d: %v", item, err) + } + } +} + +// Gets a chunk of data, filled with 'b' +func getChunk(size int, b int) []byte { + data := make([]byte, size) + for i := range data { + data[i] = byte(b) + } + return data } // TODO (?) @@ -644,53 +707,18 @@ func TestOffset(t *testing.T) { // should be handled already, and the case described above can only (?) happen if an // external process/user deletes files from the filesystem. -// TestAppendTruncateParallel is a test to check if the Append/truncate operations are -// racy. -// -// The reason why it's not a regular fuzzer, within tests/fuzzers, is that it is dependent -// on timing rather than 'clever' input -- there's no determinism. -func TestAppendTruncateParallel(t *testing.T) { - dir, err := ioutil.TempDir("", "freezer") - if err != nil { - t.Fatal(err) - } - defer os.RemoveAll(dir) +func writeChunks(t *testing.T, ft *freezerTable, n int, length int) { + t.Helper() - f, err := newCustomTable(dir, "tmp", metrics.NilMeter{}, metrics.NilMeter{}, metrics.NilGauge{}, 8, true) - if err != nil { - t.Fatal(err) - } - - fill := func(mark uint64) []byte { - data := make([]byte, 8) - binary.LittleEndian.PutUint64(data, mark) - return data - } - - for i := 0; i < 5000; i++ { - f.truncate(0) - data0 := fill(0) - f.Append(0, data0) - data1 := fill(1) - - var wg sync.WaitGroup - wg.Add(2) - go func() { - f.truncate(0) - wg.Done() - }() - go func() { - f.Append(1, data1) - wg.Done() - }() - wg.Wait() - - if have, err := f.Retrieve(0); err == nil { - if !bytes.Equal(have, data0) { - t.Fatalf("have %x want %x", have, data0) - } + batch := ft.newBatch() + for i := 0; i < n; i++ { + if err := batch.AppendRaw(uint64(i), getChunk(length, i)); err != nil { + t.Fatalf("AppendRaw(%d, ...) returned error: %v", i, err) } } + if err := batch.commit(); err != nil { + t.Fatalf("Commit returned error: %v", err) + } } // TestSequentialRead does some basic tests on the RetrieveItems. @@ -698,20 +726,17 @@ func TestSequentialRead(t *testing.T) { rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge() fname := fmt.Sprintf("batchread-%d", rand.Uint64()) { // Fill table - f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true) + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true) if err != nil { t.Fatal(err) } // Write 15 bytes 30 times - for x := 0; x < 30; x++ { - data := getChunk(15, x) - f.Append(uint64(x), data) - } + writeChunks(t, f, 30, 15) f.DumpIndex(0, 30) f.Close() } { // Open it, iterate, verify iteration - f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true) + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true) if err != nil { t.Fatal(err) } @@ -732,7 +757,7 @@ func TestSequentialRead(t *testing.T) { } { // Open it, iterate, verify byte limit. The byte limit is less than item // size, so each lookup should only return one item - f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 40, true) + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 40, true) if err != nil { t.Fatal(err) } @@ -761,16 +786,13 @@ func TestSequentialReadByteLimit(t *testing.T) { rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge() fname := fmt.Sprintf("batchread-2-%d", rand.Uint64()) { // Fill table - f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 100, true) + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 100, true) if err != nil { t.Fatal(err) } // Write 10 bytes 30 times, // Splitting it at every 100 bytes (10 items) - for x := 0; x < 30; x++ { - data := getChunk(10, x) - f.Append(uint64(x), data) - } + writeChunks(t, f, 30, 10) f.Close() } for i, tc := range []struct { @@ -786,7 +808,7 @@ func TestSequentialReadByteLimit(t *testing.T) { {100, 109, 10}, } { { - f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 100, true) + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 100, true) if err != nil { t.Fatal(err) } diff --git a/core/rawdb/freezer_test.go b/core/rawdb/freezer_test.go new file mode 100644 index 0000000000000..7359131c8944d --- /dev/null +++ b/core/rawdb/freezer_test.go @@ -0,0 +1,301 @@ +// Copyright 2021 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rawdb + +import ( + "bytes" + "errors" + "fmt" + "io/ioutil" + "math/big" + "math/rand" + "os" + "sync" + "testing" + + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/rlp" + "github.com/stretchr/testify/require" +) + +var freezerTestTableDef = map[string]bool{"test": true} + +func TestFreezerModify(t *testing.T) { + t.Parallel() + + // Create test data. + var valuesRaw [][]byte + var valuesRLP []*big.Int + for x := 0; x < 100; x++ { + v := getChunk(256, x) + valuesRaw = append(valuesRaw, v) + iv := big.NewInt(int64(x)) + iv = iv.Exp(iv, iv, nil) + valuesRLP = append(valuesRLP, iv) + } + + tables := map[string]bool{"raw": true, "rlp": false} + f, dir := newFreezerForTesting(t, tables) + defer os.RemoveAll(dir) + defer f.Close() + + // Commit test data. + _, err := f.ModifyAncients(func(op ethdb.AncientWriteOp) error { + for i := range valuesRaw { + if err := op.AppendRaw("raw", uint64(i), valuesRaw[i]); err != nil { + return err + } + if err := op.Append("rlp", uint64(i), valuesRLP[i]); err != nil { + return err + } + } + return nil + }) + if err != nil { + t.Fatal("ModifyAncients failed:", err) + } + + // Dump indexes. + for _, table := range f.tables { + t.Log(table.name, "index:", table.dumpIndexString(0, int64(len(valuesRaw)))) + } + + // Read back test data. + checkAncientCount(t, f, "raw", uint64(len(valuesRaw))) + checkAncientCount(t, f, "rlp", uint64(len(valuesRLP))) + for i := range valuesRaw { + v, _ := f.Ancient("raw", uint64(i)) + if !bytes.Equal(v, valuesRaw[i]) { + t.Fatalf("wrong raw value at %d: %x", i, v) + } + ivEnc, _ := f.Ancient("rlp", uint64(i)) + want, _ := rlp.EncodeToBytes(valuesRLP[i]) + if !bytes.Equal(ivEnc, want) { + t.Fatalf("wrong RLP value at %d: %x", i, ivEnc) + } + } +} + +// This checks that ModifyAncients rolls back freezer updates +// when the function passed to it returns an error. +func TestFreezerModifyRollback(t *testing.T) { + t.Parallel() + + f, dir := newFreezerForTesting(t, freezerTestTableDef) + defer os.RemoveAll(dir) + + theError := errors.New("oops") + _, err := f.ModifyAncients(func(op ethdb.AncientWriteOp) error { + // Append three items. This creates two files immediately, + // because the table size limit of the test freezer is 2048. + require.NoError(t, op.AppendRaw("test", 0, make([]byte, 2048))) + require.NoError(t, op.AppendRaw("test", 1, make([]byte, 2048))) + require.NoError(t, op.AppendRaw("test", 2, make([]byte, 2048))) + return theError + }) + if err != theError { + t.Errorf("ModifyAncients returned wrong error %q", err) + } + checkAncientCount(t, f, "test", 0) + f.Close() + + // Reopen and check that the rolled-back data doesn't reappear. + tables := map[string]bool{"test": true} + f2, err := newFreezer(dir, "", false, 2049, tables) + if err != nil { + t.Fatalf("can't reopen freezer after failed ModifyAncients: %v", err) + } + defer f2.Close() + checkAncientCount(t, f2, "test", 0) +} + +// This test runs ModifyAncients and Ancient concurrently with each other. +func TestFreezerConcurrentModifyRetrieve(t *testing.T) { + t.Parallel() + + f, dir := newFreezerForTesting(t, freezerTestTableDef) + defer os.RemoveAll(dir) + defer f.Close() + + var ( + numReaders = 5 + writeBatchSize = uint64(50) + written = make(chan uint64, numReaders*6) + wg sync.WaitGroup + ) + wg.Add(numReaders + 1) + + // Launch the writer. It appends 10000 items in batches. + go func() { + defer wg.Done() + defer close(written) + for item := uint64(0); item < 10000; item += writeBatchSize { + _, err := f.ModifyAncients(func(op ethdb.AncientWriteOp) error { + for i := uint64(0); i < writeBatchSize; i++ { + item := item + i + value := getChunk(32, int(item)) + if err := op.AppendRaw("test", item, value); err != nil { + return err + } + } + return nil + }) + if err != nil { + panic(err) + } + for i := 0; i < numReaders; i++ { + written <- item + writeBatchSize + } + } + }() + + // Launch the readers. They read random items from the freezer up to the + // current frozen item count. + for i := 0; i < numReaders; i++ { + go func() { + defer wg.Done() + for frozen := range written { + for rc := 0; rc < 80; rc++ { + num := uint64(rand.Intn(int(frozen))) + value, err := f.Ancient("test", num) + if err != nil { + panic(fmt.Errorf("error reading %d (frozen %d): %v", num, frozen, err)) + } + if !bytes.Equal(value, getChunk(32, int(num))) { + panic(fmt.Errorf("wrong value at %d", num)) + } + } + } + }() + } + + wg.Wait() +} + +// This test runs ModifyAncients and TruncateAncients concurrently with each other. +func TestFreezerConcurrentModifyTruncate(t *testing.T) { + f, dir := newFreezerForTesting(t, freezerTestTableDef) + defer os.RemoveAll(dir) + defer f.Close() + + var item = make([]byte, 256) + + for i := 0; i < 1000; i++ { + // First reset and write 100 items. + if err := f.TruncateAncients(0); err != nil { + t.Fatal("truncate failed:", err) + } + _, err := f.ModifyAncients(func(op ethdb.AncientWriteOp) error { + for i := uint64(0); i < 100; i++ { + if err := op.AppendRaw("test", i, item); err != nil { + return err + } + } + return nil + }) + if err != nil { + t.Fatal("modify failed:", err) + } + checkAncientCount(t, f, "test", 100) + + // Now append 100 more items and truncate concurrently. + var ( + wg sync.WaitGroup + truncateErr error + modifyErr error + ) + wg.Add(3) + go func() { + _, modifyErr = f.ModifyAncients(func(op ethdb.AncientWriteOp) error { + for i := uint64(100); i < 200; i++ { + if err := op.AppendRaw("test", i, item); err != nil { + return err + } + } + return nil + }) + wg.Done() + }() + go func() { + truncateErr = f.TruncateAncients(10) + wg.Done() + }() + go func() { + f.AncientSize("test") + wg.Done() + }() + wg.Wait() + + // Now check the outcome. If the truncate operation went through first, the append + // fails, otherwise it succeeds. In either case, the freezer should be positioned + // at 10 after both operations are done. + if truncateErr != nil { + t.Fatal("concurrent truncate failed:", err) + } + if !(modifyErr == nil || modifyErr == errOutOrderInsertion) { + t.Fatal("wrong error from concurrent modify:", modifyErr) + } + checkAncientCount(t, f, "test", 10) + } +} + +func newFreezerForTesting(t *testing.T, tables map[string]bool) (*freezer, string) { + t.Helper() + + dir, err := ioutil.TempDir("", "freezer") + if err != nil { + t.Fatal(err) + } + // note: using low max table size here to ensure the tests actually + // switch between multiple files. + f, err := newFreezer(dir, "", false, 2049, tables) + if err != nil { + t.Fatal("can't open freezer", err) + } + return f, dir +} + +// checkAncientCount verifies that the freezer contains n items. +func checkAncientCount(t *testing.T, f *freezer, kind string, n uint64) { + t.Helper() + + if frozen, _ := f.Ancients(); frozen != n { + t.Fatalf("Ancients() returned %d, want %d", frozen, n) + } + + // Check at index n-1. + if n > 0 { + index := n - 1 + if ok, _ := f.HasAncient(kind, index); !ok { + t.Errorf("HasAncient(%q, %d) returned false unexpectedly", kind, index) + } + if _, err := f.Ancient(kind, index); err != nil { + t.Errorf("Ancient(%q, %d) returned unexpected error %q", kind, index, err) + } + } + + // Check at index n. + index := n + if ok, _ := f.HasAncient(kind, index); ok { + t.Errorf("HasAncient(%q, %d) returned true unexpectedly", kind, index) + } + if _, err := f.Ancient(kind, index); err == nil { + t.Errorf("Ancient(%q, %d) didn't return expected error", kind, index) + } else if err != errOutOfBounds { + t.Errorf("Ancient(%q, %d) returned unexpected error %q", kind, index, err) + } +} diff --git a/core/rawdb/table.go b/core/rawdb/table.go index 586451c0644de..02e23b51747bb 100644 --- a/core/rawdb/table.go +++ b/core/rawdb/table.go @@ -80,10 +80,9 @@ func (t *table) AncientSize(kind string) (uint64, error) { return t.db.AncientSize(kind) } -// AppendAncient is a noop passthrough that just forwards the request to the underlying -// database. -func (t *table) AppendAncient(number uint64, hash, header, body, receipts, td []byte) error { - return t.db.AppendAncient(number, hash, header, body, receipts, td) +// ModifyAncients runs an ancient write operation on the underlying database. +func (t *table) ModifyAncients(fn func(ethdb.AncientWriteOp) error) (int64, error) { + return t.db.ModifyAncients(fn) } // TruncateAncients is a noop passthrough that just forwards the request to the underlying diff --git a/ethdb/database.go b/ethdb/database.go index bdc09d5e9877c..3c6500d1dc259 100644 --- a/ethdb/database.go +++ b/ethdb/database.go @@ -92,9 +92,10 @@ type AncientReader interface { // AncientWriter contains the methods required to write to immutable ancient data. type AncientWriter interface { - // AppendAncient injects all binary blobs belong to block at the end of the - // append-only immutable table files. - AppendAncient(number uint64, hash, header, body, receipt, td []byte) error + // ModifyAncients runs a write operation on the ancient store. + // If the function returns an error, any changes to the underlying store are reverted. + // The integer return value is the total size of the written data. + ModifyAncients(func(AncientWriteOp) error) (int64, error) // TruncateAncients discards all but the first n ancient data from the ancient store. TruncateAncients(n uint64) error @@ -103,6 +104,15 @@ type AncientWriter interface { Sync() error } +// AncientWriteOp is given to the function argument of ModifyAncients. +type AncientWriteOp interface { + // Append adds an RLP-encoded item. + Append(kind string, number uint64, item interface{}) error + + // AppendRaw adds an item without RLP-encoding it. + AppendRaw(kind string, number uint64, item []byte) error +} + // Reader contains the methods required to read data from both key-value as well as // immutable ancient data. type Reader interface {