diff --git a/cmd/geth/dbcmd.go b/cmd/geth/dbcmd.go index c2c42276b5355..3e92ee63329f0 100644 --- a/cmd/geth/dbcmd.go +++ b/cmd/geth/dbcmd.go @@ -517,9 +517,9 @@ func freezerInspect(ctx *cli.Context) error { return fmt.Errorf("required arguments: %v", ctx.Command.ArgsUsage) } kind := ctx.Args().Get(0) - if noSnap, ok := rawdb.FreezerNoSnappy[kind]; !ok { + if noSnap, ok := rawdb.ChainFreezerNoSnappy[kind]; !ok { var options []string - for opt := range rawdb.FreezerNoSnappy { + for opt := range rawdb.ChainFreezerNoSnappy { options = append(options, opt) } sort.Strings(options) diff --git a/core/blockchain.go b/core/blockchain.go index 6ee58ef4f3da3..75c871cd0c57a 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -274,7 +274,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par rawdb.InitDatabaseFromFreezer(bc.db) // If ancient database is not empty, reconstruct all missing // indices in the background. - frozen, _ := bc.db.Ancients() + frozen, _ := bc.db.Ancients(rawdb.ChainFreezer) if frozen > 0 { txIndexBlock = frozen } @@ -313,7 +313,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par } // Ensure that a previous crash in SetHead doesn't leave extra ancients - if frozen, err := bc.db.Ancients(); err == nil && frozen > 0 { + if frozen, err := bc.db.Ancients(rawdb.ChainFreezer); err == nil && frozen > 0 { var ( needRewind bool low uint64 @@ -506,7 +506,7 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, root common.Hash, repair bo // Retrieve the last pivot block to short circuit rollbacks beyond it and the // current freezer limit to start nuking id underflown pivot := rawdb.ReadLastPivotNumber(bc.db) - frozen, _ := bc.db.Ancients() + frozen, _ := bc.db.Ancients(rawdb.ChainFreezer) updateFn := func(db ethdb.KeyValueWriter, header *types.Header) (uint64, bool) { // Rewind the blockchain, ensuring we don't end up with a stateless head @@ -590,11 +590,11 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, root common.Hash, repair bo // Rewind the header chain, deleting all block bodies until then delFn := func(db ethdb.KeyValueWriter, hash common.Hash, num uint64) { // Ignore the error here since light client won't hit this path - frozen, _ := bc.db.Ancients() + frozen, _ := bc.db.Ancients(rawdb.ChainFreezer) if num+1 <= frozen { // Truncate all relative data(header, total difficulty, body, receipt // and canonical hash) from ancient store. - if err := bc.db.TruncateAncients(num); err != nil { + if err := bc.db.TruncateHead(rawdb.ChainFreezer, num); err != nil { log.Crit("Failed to truncate ancient data", "number", num, "err", err) } // Remove the hash <-> number mapping from the active store. @@ -949,7 +949,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ // Ensure genesis is in ancients. if first.NumberU64() == 1 { - if frozen, _ := bc.db.Ancients(); frozen == 0 { + if frozen, _ := bc.db.Ancients(rawdb.ChainFreezer); frozen == 0 { b := bc.genesisBlock td := bc.genesisBlock.Difficulty() writeSize, err := rawdb.WriteAncientBlocks(bc.db, []*types.Block{b}, []types.Receipts{nil}, td) @@ -1004,14 +1004,14 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ // The tx index data could not be written. // Roll back the ancient store update. fastBlock := bc.CurrentFastBlock().NumberU64() - if err := bc.db.TruncateAncients(fastBlock + 1); err != nil { + if err := bc.db.TruncateHead(rawdb.ChainFreezer, fastBlock+1); err != nil { log.Error("Can't truncate ancient store after failed insert", "err", err) } return 0, err } // Sync the ancient store explicitly to ensure all data has been flushed to disk. - if err := bc.db.Sync(); err != nil { + if err := bc.db.Sync(rawdb.ChainFreezer); err != nil { return 0, err } @@ -1020,7 +1020,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ if !updateHead(blockChain[len(blockChain)-1]) { // We end up here if the header chain has reorg'ed, and the blocks/receipts // don't match the canonical chain. - if err := bc.db.TruncateAncients(previousFastBlock + 1); err != nil { + if err := bc.db.TruncateHead(rawdb.ChainFreezer, previousFastBlock+1); err != nil { log.Error("Can't truncate ancient store after failed insert", "err", err) } return 0, errSideChainReceipts diff --git a/core/blockchain_repair_test.go b/core/blockchain_repair_test.go index f4f762078732e..ea37b8d5c845b 100644 --- a/core/blockchain_repair_test.go +++ b/core/blockchain_repair_test.go @@ -1818,7 +1818,7 @@ func testRepair(t *testing.T, tt *rewindTest, snapshots bool) { // Force run a freeze cycle type freezer interface { Freeze(threshold uint64) error - Ancients() (uint64, error) + Ancients(typ string) (uint64, error) } db.(freezer).Freeze(tt.freezeThreshold) @@ -1857,7 +1857,7 @@ func testRepair(t *testing.T, tt *rewindTest, snapshots bool) { if head := chain.CurrentBlock(); head.NumberU64() != tt.expHeadBlock { t.Errorf("Head block mismatch: have %d, want %d", head.NumberU64(), tt.expHeadBlock) } - if frozen, err := db.(freezer).Ancients(); err != nil { + if frozen, err := db.(freezer).Ancients(rawdb.ChainFreezer); err != nil { t.Errorf("Failed to retrieve ancient count: %v\n", err) } else if int(frozen) != tt.expFrozen { t.Errorf("Frozen block count mismatch: have %d, want %d", frozen, tt.expFrozen) diff --git a/core/blockchain_sethead_test.go b/core/blockchain_sethead_test.go index 27b6be6e13638..a3e9457469100 100644 --- a/core/blockchain_sethead_test.go +++ b/core/blockchain_sethead_test.go @@ -2024,7 +2024,7 @@ func testSetHead(t *testing.T, tt *rewindTest, snapshots bool) { // Force run a freeze cycle type freezer interface { Freeze(threshold uint64) error - Ancients() (uint64, error) + Ancients(typ string) (uint64, error) } db.(freezer).Freeze(tt.freezeThreshold) @@ -2050,7 +2050,7 @@ func testSetHead(t *testing.T, tt *rewindTest, snapshots bool) { if head := chain.CurrentBlock(); head.NumberU64() != tt.expHeadBlock { t.Errorf("Head block mismatch: have %d, want %d", head.NumberU64(), tt.expHeadBlock) } - if frozen, err := db.(freezer).Ancients(); err != nil { + if frozen, err := db.(freezer).Ancients(rawdb.ChainFreezer); err != nil { t.Errorf("Failed to retrieve ancient count: %v\n", err) } else if int(frozen) != tt.expFrozen { t.Errorf("Frozen block count mismatch: have %d, want %d", frozen, tt.expFrozen) diff --git a/core/blockchain_test.go b/core/blockchain_test.go index 80d07eb30ab0c..9b494e930eaa4 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -836,7 +836,7 @@ func TestLightVsFastVsFullChainHeads(t *testing.T) { ancient.SetHead(remove - 1) assert(t, "ancient", ancient, 0, 0, 0) - if frozen, err := ancientDb.Ancients(); err != nil || frozen != 1 { + if frozen, err := ancientDb.Ancients(rawdb.ChainFreezer); err != nil || frozen != 1 { t.Fatalf("failed to truncate ancient store, want %v, have %v", 1, frozen) } // Import the chain as a light node and ensure all pointers are updated @@ -1723,7 +1723,7 @@ func TestInsertReceiptChainRollback(t *testing.T) { if ancientChain.CurrentFastBlock().NumberU64() != 0 { t.Fatalf("failed to rollback ancient data, want %d, have %d", 0, ancientChain.CurrentFastBlock().NumberU64()) } - if frozen, err := ancientChain.db.Ancients(); err != nil || frozen != 1 { + if frozen, err := ancientChain.db.Ancients(rawdb.ChainFreezer); err != nil || frozen != 1 { t.Fatalf("failed to truncate ancient data, frozen index is %d", frozen) } @@ -1735,7 +1735,7 @@ func TestInsertReceiptChainRollback(t *testing.T) { if ancientChain.CurrentFastBlock().NumberU64() != canonblocks[len(canonblocks)-1].NumberU64() { t.Fatalf("failed to insert ancient recept chain after rollback") } - if frozen, _ := ancientChain.db.Ancients(); frozen != uint64(len(canonblocks))+1 { + if frozen, _ := ancientChain.db.Ancients(rawdb.ChainFreezer); frozen != uint64(len(canonblocks))+1 { t.Fatalf("wrong ancients count %d", frozen) } } diff --git a/core/rawdb/accessors_chain.go b/core/rawdb/accessors_chain.go index 4028191b760c5..e44393f7ce125 100644 --- a/core/rawdb/accessors_chain.go +++ b/core/rawdb/accessors_chain.go @@ -36,7 +36,7 @@ import ( // ReadCanonicalHash retrieves the hash assigned to a canonical block number. func ReadCanonicalHash(db ethdb.Reader, number uint64) common.Hash { var data []byte - db.ReadAncients(func(reader ethdb.AncientReader) error { + db.ReadAncients(ChainFreezer, func(reader ethdb.AncientReadOp) error { data, _ = reader.Ancient(freezerHashTable, number) if len(data) == 0 { // Get it by hash from leveldb @@ -83,8 +83,8 @@ type NumberHash struct { Hash common.Hash } -// ReadAllHashes retrieves all the hashes assigned to blocks at a certain heights, -// both canonical and reorged forks included. +// ReadAllHashesInRange retrieves all the hashes assigned to blocks at a certain +// heights, both canonical and reorged forks included. // This method considers both limits to be _inclusive_. func ReadAllHashesInRange(db ethdb.Iteratee, first, last uint64) []*NumberHash { var ( @@ -300,7 +300,7 @@ func WriteFastTxLookupLimit(db ethdb.KeyValueWriter, number uint64) { // ReadHeaderRLP retrieves a block header in its raw RLP database encoding. func ReadHeaderRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue { var data []byte - db.ReadAncients(func(reader ethdb.AncientReader) error { + db.ReadAncients(ChainFreezer, func(reader ethdb.AncientReadOp) error { // First try to look up the data in ancient database. Extra hash // comparison is necessary since ancient database only maintains // the canonical data. @@ -377,9 +377,9 @@ func deleteHeaderWithoutNumber(db ethdb.KeyValueWriter, hash common.Hash, number } } -// isCanon is an internal utility method, to check whether the given number/hash +// isCanonOp is an internal utility method, to check whether the given number/hash // is part of the ancient (canon) set. -func isCanon(reader ethdb.AncientReader, number uint64, hash common.Hash) bool { +func isCanonOp(reader ethdb.AncientReadOp, number uint64, hash common.Hash) bool { h, err := reader.Ancient(freezerHashTable, number) if err != nil { return false @@ -387,15 +387,25 @@ func isCanon(reader ethdb.AncientReader, number uint64, hash common.Hash) bool { return bytes.Equal(h, hash[:]) } +// isCanonOp is an internal utility method, to check whether the given number/hash +// is part of the ancient (canon) set. +func isCanon(reader ethdb.AncientReader, number uint64, hash common.Hash) bool { + h, err := reader.Ancient(ChainFreezer, freezerHashTable, number) + if err != nil { + return false + } + return bytes.Equal(h, hash[:]) +} + // ReadBodyRLP retrieves the block body (transactions and uncles) in RLP encoding. func ReadBodyRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue { // First try to look up the data in ancient database. Extra hash // comparison is necessary since ancient database only maintains // the canonical data. var data []byte - db.ReadAncients(func(reader ethdb.AncientReader) error { + db.ReadAncients(ChainFreezer, func(reader ethdb.AncientReadOp) error { // Check if the data is in ancients - if isCanon(reader, number, hash) { + if isCanonOp(reader, number, hash) { data, _ = reader.Ancient(freezerBodiesTable, number) return nil } @@ -410,7 +420,7 @@ func ReadBodyRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue // block at number, in RLP encoding. func ReadCanonicalBodyRLP(db ethdb.Reader, number uint64) rlp.RawValue { var data []byte - db.ReadAncients(func(reader ethdb.AncientReader) error { + db.ReadAncients(ChainFreezer, func(reader ethdb.AncientReadOp) error { data, _ = reader.Ancient(freezerBodiesTable, number) if len(data) > 0 { return nil @@ -473,9 +483,9 @@ func DeleteBody(db ethdb.KeyValueWriter, hash common.Hash, number uint64) { // ReadTdRLP retrieves a block's total difficulty corresponding to the hash in RLP encoding. func ReadTdRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue { var data []byte - db.ReadAncients(func(reader ethdb.AncientReader) error { + db.ReadAncients(ChainFreezer, func(reader ethdb.AncientReadOp) error { // Check if the data is in ancients - if isCanon(reader, number, hash) { + if isCanonOp(reader, number, hash) { data, _ = reader.Ancient(freezerDifficultyTable, number) return nil } @@ -533,9 +543,9 @@ func HasReceipts(db ethdb.Reader, hash common.Hash, number uint64) bool { // ReadReceiptsRLP retrieves all the transaction receipts belonging to a block in RLP encoding. func ReadReceiptsRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue { var data []byte - db.ReadAncients(func(reader ethdb.AncientReader) error { + db.ReadAncients(ChainFreezer, func(reader ethdb.AncientReadOp) error { // Check if the data is in ancients - if isCanon(reader, number, hash) { + if isCanonOp(reader, number, hash) { data, _ = reader.Ancient(freezerReceiptTable, number) return nil } @@ -741,13 +751,13 @@ func WriteBlock(db ethdb.KeyValueWriter, block *types.Block) { WriteHeader(db, block.Header()) } -// WriteAncientBlock writes entire block data into ancient store and returns the total written size. +// WriteAncientBlocks writes entire block data into ancient store and returns the total written size. func WriteAncientBlocks(db ethdb.AncientWriter, blocks []*types.Block, receipts []types.Receipts, td *big.Int) (int64, error) { var ( tdSum = new(big.Int).Set(td) stReceipts []*types.ReceiptForStorage ) - return db.ModifyAncients(func(op ethdb.AncientWriteOp) error { + return db.ModifyAncients(ChainFreezer, func(op ethdb.AncientWriteOp) error { for i, block := range blocks { // Convert receipts to storage format and sum up total difficulty. stReceipts = stReceipts[:0] diff --git a/core/rawdb/chain_iterator.go b/core/rawdb/chain_iterator.go index daee721594e62..e49a6bd2a5f47 100644 --- a/core/rawdb/chain_iterator.go +++ b/core/rawdb/chain_iterator.go @@ -34,7 +34,7 @@ import ( // injects into the database the block hash->number mappings. func InitDatabaseFromFreezer(db ethdb.Database) { // If we can't access the freezer or it's empty, abort - frozen, err := db.Ancients() + frozen, err := db.Ancients(ChainFreezer) if err != nil || frozen == 0 { return } @@ -50,7 +50,7 @@ func InitDatabaseFromFreezer(db ethdb.Database) { if i+count > frozen { count = frozen - i } - data, err := db.AncientRange(freezerHashTable, i, count, 32*count) + data, err := db.AncientRange(ChainFreezer, freezerHashTable, i, count, 32*count) if err != nil { log.Crit("Failed to init database from freezer", "err", err) } diff --git a/core/rawdb/database.go b/core/rawdb/database.go index c5af776672cf3..1e72073bc7f8b 100644 --- a/core/rawdb/database.go +++ b/core/rawdb/database.go @@ -32,20 +32,140 @@ import ( "github.com/olekukonko/tablewriter" ) +var ( + // errUndefinedType is returned if non-existent freezer is required + // to be accessed. + errUndefinedType = errors.New("freezer type is not defined") +) + // freezerdb is a database wrapper that enabled freezer data retrievals. type freezerdb struct { ethdb.KeyValueStore - ethdb.AncientStore + chain *freezer +} + +// HasAncient returns an indicator whether the specified data exists in the +// ancient store. +func (frdb *freezerdb) HasAncient(typ string, kind string, id uint64) (bool, error) { + switch typ { + case ChainFreezer: + return frdb.chain.HasAncient(kind, id) + default: + return false, errUndefinedType + } +} + +// Ancient retrieves an ancient binary blob from the append-only immutable files. +func (frdb *freezerdb) Ancient(typ string, kind string, id uint64) ([]byte, error) { + switch typ { + case ChainFreezer: + return frdb.chain.Ancient(kind, id) + default: + return nil, errUndefinedType + } +} + +// AncientRange retrieves multiple items in sequence, starting from the index 'start'. +func (frdb *freezerdb) AncientRange(typ string, kind string, start, max, maxByteSize uint64) ([][]byte, error) { + switch typ { + case ChainFreezer: + return frdb.chain.AncientRange(kind, start, max, maxByteSize) + default: + return nil, errUndefinedType + } +} + +// Ancients returns the ancient item numbers in the ancient store. +func (frdb *freezerdb) Ancients(typ string) (uint64, error) { + switch typ { + case ChainFreezer: + return frdb.chain.Ancients() + default: + return 0, errUndefinedType + } +} + +// Tail returns the number of first stored item in the freezer. +func (frdb *freezerdb) Tail(typ string) (uint64, error) { + switch typ { + case ChainFreezer: + return frdb.chain.Tail() + default: + return 0, errUndefinedType + } +} + +// AncientSize returns the ancient size of the specified category. +func (frdb *freezerdb) AncientSize(typ string, kind string) (uint64, error) { + switch typ { + case ChainFreezer: + return frdb.chain.AncientSize(kind) + default: + return 0, errUndefinedType + } +} + +// ModifyAncients runs a write operation on the ancient store. +// If the function returns an error, any changes to the underlying store are reverted. +// The integer return value is the total size of the written data. +func (frdb *freezerdb) ModifyAncients(typ string, fn func(ethdb.AncientWriteOp) error) (int64, error) { + switch typ { + case ChainFreezer: + return frdb.chain.ModifyAncients(fn) + default: + return 0, errUndefinedType + } +} + +// TruncateHead discards all but the first n ancient data from the ancient store. +func (frdb *freezerdb) TruncateHead(typ string, items uint64) error { + switch typ { + case ChainFreezer: + return frdb.chain.TruncateHead(items) + default: + return errUndefinedType + } +} + +// TruncateTail discards the first n ancient data from the ancient store. +func (frdb *freezerdb) TruncateTail(typ string, tail uint64) error { + switch typ { + case ChainFreezer: + return frdb.chain.TruncateTail(tail) + default: + return errUndefinedType + } +} + +// Sync flushes all in-memory ancient store data to disk. +func (frdb *freezerdb) Sync(typ string) error { + switch typ { + case ChainFreezer: + return frdb.chain.Sync() + default: + return errUndefinedType + } +} + +// ReadAncients runs the given read operation while ensuring that no writes take place +// on the underlying freezer. +func (frdb *freezerdb) ReadAncients(typ string, fn func(reader ethdb.AncientReadOp) error) (err error) { + switch typ { + case ChainFreezer: + return frdb.chain.ReadAncients(fn) + default: + return errUndefinedType + } } // Close implements io.Closer, closing both the fast key-value store as well as // the slow ancient tables. func (frdb *freezerdb) Close() error { var errs []error - if err := frdb.AncientStore.Close(); err != nil { + if err := frdb.KeyValueStore.Close(); err != nil { errs = append(errs, err) } - if err := frdb.KeyValueStore.Close(); err != nil { + if err := frdb.chain.Close(); err != nil { errs = append(errs, err) } if len(errs) != 0 { @@ -58,18 +178,18 @@ func (frdb *freezerdb) Close() error { // a freeze cycle completes, without having to sleep for a minute to trigger the // automatic background run. func (frdb *freezerdb) Freeze(threshold uint64) error { - if frdb.AncientStore.(*freezer).readonly { + if frdb.chain.readonly { return errReadOnly } // Set the freezer threshold to a temporary value defer func(old uint64) { - atomic.StoreUint64(&frdb.AncientStore.(*freezer).threshold, old) - }(atomic.LoadUint64(&frdb.AncientStore.(*freezer).threshold)) - atomic.StoreUint64(&frdb.AncientStore.(*freezer).threshold, threshold) + atomic.StoreUint64(&frdb.chain.threshold, old) + }(atomic.LoadUint64(&frdb.chain.threshold)) + atomic.StoreUint64(&frdb.chain.threshold, threshold) // Trigger a freeze cycle and block until it's done trigger := make(chan struct{}, 1) - frdb.AncientStore.(*freezer).trigger <- trigger + frdb.chain.trigger <- trigger <-trigger return nil } @@ -80,46 +200,88 @@ type nofreezedb struct { } // HasAncient returns an error as we don't have a backing chain freezer. -func (db *nofreezedb) HasAncient(kind string, number uint64) (bool, error) { +func (db *nofreezedb) HasAncient(typ string, kind string, number uint64) (bool, error) { return false, errNotSupported } // Ancient returns an error as we don't have a backing chain freezer. -func (db *nofreezedb) Ancient(kind string, number uint64) ([]byte, error) { +func (db *nofreezedb) Ancient(typ string, kind string, number uint64) ([]byte, error) { return nil, errNotSupported } // AncientRange returns an error as we don't have a backing chain freezer. -func (db *nofreezedb) AncientRange(kind string, start, max, maxByteSize uint64) ([][]byte, error) { +func (db *nofreezedb) AncientRange(typ string, kind string, start, max, maxByteSize uint64) ([][]byte, error) { return nil, errNotSupported } // Ancients returns an error as we don't have a backing chain freezer. -func (db *nofreezedb) Ancients() (uint64, error) { +func (db *nofreezedb) Ancients(typ string) (uint64, error) { + return 0, errNotSupported +} + +// Tail returns an error as we don't have a backing chain freezer. +func (db *nofreezedb) Tail(typ string) (uint64, error) { return 0, errNotSupported } // AncientSize returns an error as we don't have a backing chain freezer. -func (db *nofreezedb) AncientSize(kind string) (uint64, error) { +func (db *nofreezedb) AncientSize(typ string, kind string) (uint64, error) { return 0, errNotSupported } -// ModifyAncients is not supported. -func (db *nofreezedb) ModifyAncients(func(ethdb.AncientWriteOp) error) (int64, error) { +// ModifyAncients returns an error as we don't have a backing chain freezer. +func (db *nofreezedb) ModifyAncients(typ string, fn func(ethdb.AncientWriteOp) error) (int64, error) { return 0, errNotSupported } -// TruncateAncients returns an error as we don't have a backing chain freezer. -func (db *nofreezedb) TruncateAncients(items uint64) error { +// TruncateHead returns an error as we don't have a backing chain freezer. +func (db *nofreezedb) TruncateHead(typ string, items uint64) error { + return errNotSupported +} + +// TruncateTail discards the first n ancient data from the ancient store. +func (db *nofreezedb) TruncateTail(typ string, tail uint64) error { return errNotSupported } // Sync returns an error as we don't have a backing chain freezer. -func (db *nofreezedb) Sync() error { +func (db *nofreezedb) Sync(typ string) error { return errNotSupported } -func (db *nofreezedb) ReadAncients(fn func(reader ethdb.AncientReader) error) (err error) { +type nullAncientReadOp struct{} + +// HasAncient returns an error as we don't have a backing chain freezer. +func (db *nullAncientReadOp) HasAncient(kind string, number uint64) (bool, error) { + return false, errNotSupported +} + +// Ancient returns an error as we don't have a backing chain freezer. +func (db *nullAncientReadOp) Ancient(kind string, number uint64) ([]byte, error) { + return nil, errNotSupported +} + +// AncientRange returns an error as we don't have a backing chain freezer. +func (db *nullAncientReadOp) AncientRange(kind string, start, max, maxByteSize uint64) ([][]byte, error) { + return nil, errNotSupported +} + +// Ancients returns an error as we don't have a backing chain freezer. +func (db *nullAncientReadOp) Ancients() (uint64, error) { + return 0, errNotSupported +} + +// Tail returns an error as we don't have a backing chain freezer. +func (db *nullAncientReadOp) Tail() (uint64, error) { + return 0, errNotSupported +} + +// AncientSize returns an error as we don't have a backing chain freezer. +func (db *nullAncientReadOp) AncientSize(kind string) (uint64, error) { + return 0, errNotSupported +} + +func (db *nofreezedb) ReadAncients(typ string, fn func(reader ethdb.AncientReadOp) error) (err error) { // Unlike other ancient-related methods, this method does not return // errNotSupported when invoked. // The reason for this is that the caller might want to do several things: @@ -132,7 +294,7 @@ func (db *nofreezedb) ReadAncients(fn func(reader ethdb.AncientReader) error) (e // If we instead were to return errNotSupported here, then the caller would // have to explicitly check for that, having an extra clause to do the // non-ancient operations. - return fn(db) + return fn(&nullAncientReadOp{}) } // NewDatabase creates a high level database on top of a given key-value data @@ -145,8 +307,8 @@ func NewDatabase(db ethdb.KeyValueStore) ethdb.Database { // value data store with a freezer moving immutable chain segments into cold // storage. func NewDatabaseWithFreezer(db ethdb.KeyValueStore, freezer string, namespace string, readonly bool) (ethdb.Database, error) { - // Create the idle freezer instance - frdb, err := newFreezer(freezer, namespace, readonly, freezerTableSize, FreezerNoSnappy) + // Create the ancient chain freezer + ancientChain, err := newFreezer(freezer, namespace, readonly, freezerTableSize, ChainFreezerNoSnappy) if err != nil { return nil, err } @@ -173,11 +335,11 @@ func NewDatabaseWithFreezer(db ethdb.KeyValueStore, freezer string, namespace st // validate in this method. If, however, the genesis hash is not nil, compare // it to the freezer content. if kvgenesis, _ := db.Get(headerHashKey(0)); len(kvgenesis) > 0 { - if frozen, _ := frdb.Ancients(); frozen > 0 { + if frozen, _ := ancientChain.Ancients(); frozen > 0 { // If the freezer already contains something, ensure that the genesis blocks // match, otherwise we might mix up freezers across chains and destroy both // the freezer and the key-value store. - frgenesis, err := frdb.Ancient(freezerHashTable, 0) + frgenesis, err := ancientChain.Ancient(freezerHashTable, 0) if err != nil { return nil, fmt.Errorf("failed to retrieve genesis from ancient %v", err) } else if !bytes.Equal(kvgenesis, frgenesis) { @@ -211,20 +373,20 @@ func NewDatabaseWithFreezer(db ethdb.KeyValueStore, freezer string, namespace st // Block #1 is still in the database, we're allowed to init a new feezer } // Otherwise, the head header is still the genesis, we're allowed to init a new - // feezer. + // freezer. } } // Freezer is consistent with the key-value database, permit combining the two - if !frdb.readonly { - frdb.wg.Add(1) + if !ancientChain.readonly { + ancientChain.wg.Add(1) go func() { - frdb.freeze(db) - frdb.wg.Done() + ancientChain.freeze(db) + ancientChain.wg.Done() }() } return &freezerdb{ KeyValueStore: db, - AncientStore: frdb, + chain: ancientChain, }, nil } @@ -416,14 +578,14 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error { // Inspect append-only file store then. ancientSizes := []*common.StorageSize{&ancientHeadersSize, &ancientBodiesSize, &ancientReceiptsSize, &ancientHashesSize, &ancientTdsSize} for i, category := range []string{freezerHeaderTable, freezerBodiesTable, freezerReceiptTable, freezerHashTable, freezerDifficultyTable} { - if size, err := db.AncientSize(category); err == nil { + if size, err := db.AncientSize(ChainFreezer, category); err == nil { *ancientSizes[i] += common.StorageSize(size) total += common.StorageSize(size) } } // Get number of ancient rows inside the freezer ancients := counter(0) - if count, err := db.Ancients(); err == nil { + if count, err := db.Ancients(ChainFreezer); err == nil { ancients = counter(count) } // Display the database statistic. diff --git a/core/rawdb/freezer.go b/core/rawdb/freezer.go index e19c202adc843..91974cc19417f 100644 --- a/core/rawdb/freezer.go +++ b/core/rawdb/freezer.go @@ -59,14 +59,14 @@ const ( freezerRecheckInterval = time.Minute // freezerBatchLimit is the maximum number of blocks to freeze in one batch - // before doing an fsync and deleting it from the key-value store. + // before doing a fsync and deleting it from the key-value store. freezerBatchLimit = 30000 // freezerTableSize defines the maximum size of freezer data files. freezerTableSize = 2 * 1000 * 1000 * 1000 ) -// freezer is an memory mapped append-only database to store immutable chain data +// freezer is a memory mapped append-only database to store immutable chain data // into flat files: // // - The append only nature ensures that disk writes are minimized. @@ -78,6 +78,7 @@ type freezer struct { // 64-bit aligned fields can be atomic. The struct is guaranteed to be so aligned, // so take advantage of that (https://golang.org/pkg/sync/atomic/#pkg-note-BUG). frozen uint64 // Number of blocks already frozen + tail uint64 // Number of the first stored item in the freezer threshold uint64 // Number of recent blocks not to freeze (params.FullImmutabilityThreshold apart from tests) // This lock synchronizes writers and the truncate operation, as well as @@ -219,6 +220,11 @@ func (f *freezer) Ancients() (uint64, error) { return atomic.LoadUint64(&f.frozen), nil } +// Tail returns the number of first stored item in the freezer. +func (f *freezer) Tail() (uint64, error) { + return atomic.LoadUint64(&f.tail), nil +} + // AncientSize returns the ancient size of the specified category. func (f *freezer) AncientSize(kind string) (uint64, error) { // This needs the write lock to avoid data races on table fields. @@ -234,7 +240,7 @@ func (f *freezer) AncientSize(kind string) (uint64, error) { // ReadAncients runs the given read operation while ensuring that no writes take place // on the underlying freezer. -func (f *freezer) ReadAncients(fn func(ethdb.AncientReader) error) (err error) { +func (f *freezer) ReadAncients(fn func(op ethdb.AncientReadOp) error) (err error) { f.writeLock.RLock() defer f.writeLock.RUnlock() return fn(f) @@ -254,7 +260,7 @@ func (f *freezer) ModifyAncients(fn func(ethdb.AncientWriteOp) error) (writeSize if err != nil { // The write operation has failed. Go back to the previous item position. for name, table := range f.tables { - err := table.truncate(prevItem) + err := table.truncateHead(prevItem) if err != nil { log.Error("Freezer table roll-back failed", "table", name, "index", prevItem, "err", err) } @@ -274,8 +280,8 @@ func (f *freezer) ModifyAncients(fn func(ethdb.AncientWriteOp) error) (writeSize return writeSize, nil } -// TruncateAncients discards any recent data above the provided threshold number. -func (f *freezer) TruncateAncients(items uint64) error { +// TruncateHead discards any recent data above the provided threshold number. +func (f *freezer) TruncateHead(items uint64) error { if f.readonly { return errReadOnly } @@ -286,7 +292,7 @@ func (f *freezer) TruncateAncients(items uint64) error { return nil } for _, table := range f.tables { - if err := table.truncate(items); err != nil { + if err := table.truncateHead(items); err != nil { return err } } @@ -294,6 +300,30 @@ func (f *freezer) TruncateAncients(items uint64) error { return nil } +// TruncateTail discards any recent data below the provided threshold number. +func (f *freezer) TruncateTail(tail uint64) error { + if f.readonly { + return errReadOnly + } + f.writeLock.Lock() + defer f.writeLock.Unlock() + + if atomic.LoadUint64(&f.tail) >= tail { + return nil + } + // Because the data size maintained by different tables are inconsistent, + // there will be some or all tables that do not actually perform the truncate + // action because tail deletion can only do in file level. These deletions + // will be delayed until the whole data file is deletable. + for _, table := range f.tables { + if err := table.truncateTail(tail); err != nil { + return err + } + } + atomic.StoreUint64(&f.tail, tail) + return nil +} + // Sync flushes all data tables to disk. func (f *freezer) Sync() error { var errs []error @@ -310,19 +340,30 @@ func (f *freezer) Sync() error { // repair truncates all data tables to the same length. func (f *freezer) repair() error { - min := uint64(math.MaxUint64) + var ( + head = uint64(math.MaxUint64) + tail = uint64(0) + ) for _, table := range f.tables { items := atomic.LoadUint64(&table.items) - if min > items { - min = items + if head > items { + head = items + } + offset := atomic.LoadUint64(&table.itemOffset) + if offset > tail { + tail = offset } } for _, table := range f.tables { - if err := table.truncate(min); err != nil { + if err := table.truncateHead(head); err != nil { + return err + } + if err := table.truncateTail(tail); err != nil { return err } } - atomic.StoreUint64(&f.frozen, min) + atomic.StoreUint64(&f.frozen, head) + atomic.StoreUint64(&f.tail, tail) return nil } diff --git a/core/rawdb/freezer_table.go b/core/rawdb/freezer_table.go index 22405cf9b4f89..c2eab423f2852 100644 --- a/core/rawdb/freezer_table.go +++ b/core/rawdb/freezer_table.go @@ -17,12 +17,15 @@ package rawdb import ( + "bufio" "bytes" "encoding/binary" "errors" "fmt" "io" + "io/ioutil" "os" + "path" "path/filepath" "sync" "sync/atomic" @@ -46,21 +49,20 @@ var ( errNotSupported = errors.New("this operation is not supported") ) -// indexEntry contains the number/id of the file that the data resides in, aswell as the -// offset within the file to the end of the data +// indexEntry contains the number/id of the file that the data resides in, as well as the +// offset within the file to the end of the data. // In serialized form, the filenum is stored as uint16. type indexEntry struct { - filenum uint32 // stored as uint16 ( 2 bytes) - offset uint32 // stored as uint32 ( 4 bytes) + filenum uint32 // stored as uint16 ( 2 bytes ) + offset uint32 // stored as uint32 ( 4 bytes ) } const indexEntrySize = 6 // unmarshalBinary deserializes binary b into the rawIndex entry. -func (i *indexEntry) unmarshalBinary(b []byte) error { +func (i *indexEntry) unmarshalBinary(b []byte) { i.filenum = uint32(binary.BigEndian.Uint16(b[:2])) i.offset = binary.BigEndian.Uint32(b[2:6]) - return nil } // append adds the encoded entry to the end of b. @@ -92,22 +94,29 @@ type freezerTable struct { // WARNING: The `items` field is accessed atomically. On 32 bit platforms, only // 64-bit aligned fields can be atomic. The struct is guaranteed to be so aligned, // so take advantage of that (https://golang.org/pkg/sync/atomic/#pkg-note-BUG). - items uint64 // Number of items stored in the table (including items removed from tail) + items uint64 // Number of items stored in the table (including items removed from tail) + itemOffset uint64 // Number of items removed from the table + + // itemHidden is the number of items marked as deleted they are not removed + // from the table yet. Since the tail deletion is only supported at file level + // which means the real deletion will be delayed until the total "be marked as + // deleted" data reach the threshold. Before that these items will be hidden + // to prevent being visited again. + // + // Note this field is not persisted which means it will get lost after restart. + itemHidden uint64 noCompression bool // if true, disables snappy compression. Note: does not work retroactively maxFileSize uint32 // Max file size for data-files name string path string - head *os.File // File descriptor for the data head of the table - files map[uint32]*os.File // open files - headId uint32 // number of the currently active head file - tailId uint32 // number of the earliest file - index *os.File // File descriptor for the indexEntry file of the table - - // In the case that old items are deleted (from the tail), we use itemOffset - // to count how many historic items have gone missing. - itemOffset uint32 // Offset (number of discarded items) + head *os.File // File descriptor for the data head of the table + files map[uint32]*os.File // open files + headId uint32 // number of the currently active head file + tailId uint32 // number of the earliest file + index *os.File // File descriptor for the indexEntry file of the table + indexName string // File name of the indexEntry file headBytes int64 // Number of bytes written to the head file readMeter metrics.Meter // Meter for measuring the effective amount of data read @@ -162,7 +171,7 @@ func truncateFreezerFile(file *os.File, size int64) error { } // newTable opens a freezer table, creating the data and index files if they are -// non existent. Both files are truncated to the shortest common length to ensure +// non-existent. Both files are truncated to the shortest common length to ensure // they don't go out of sync. func newTable(path string, name string, readMeter metrics.Meter, writeMeter metrics.Meter, sizeGauge metrics.Gauge, maxFilesize uint32, noCompression bool) (*freezerTable, error) { // Ensure the containing directory exists and open the indexEntry file @@ -184,6 +193,7 @@ func newTable(path string, name string, readMeter metrics.Meter, writeMeter metr // Create the table and repair any past inconsistency tab := &freezerTable{ index: offsets, + indexName: idxName, files: make(map[uint32]*os.File), readMeter: readMeter, writeMeter: writeMeter, @@ -209,7 +219,7 @@ func newTable(path string, name string, readMeter metrics.Meter, writeMeter metr return tab, nil } -// repair cross checks the head and the index file and truncates them to +// repair cross-checks the head and the index file and truncates them to // be in sync with each other after a potential crash / data loss. func (t *freezerTable) repair() error { // Create a temporary offset buffer to init files with and read indexEntry into @@ -227,7 +237,9 @@ func (t *freezerTable) repair() error { } // Ensure the index is a multiple of indexEntrySize bytes if overflow := stat.Size() % indexEntrySize; overflow != 0 { - truncateFreezerFile(t.index, stat.Size()-overflow) // New file can't trigger this path + if err := truncateFreezerFile(t.index, stat.Size()-overflow); err != nil { + return err + } // New file can't trigger this path } // Retrieve the file sizes and prepare for truncation if stat, err = t.index.Stat(); err != nil { @@ -244,14 +256,19 @@ func (t *freezerTable) repair() error { ) // Read index zero, determine what file is the earliest // and what item offset to use - t.index.ReadAt(buffer, 0) + if _, err := t.index.ReadAt(buffer, 0); err != nil { + return err + } firstIndex.unmarshalBinary(buffer) + t.tailId, t.itemOffset = firstIndex.filenum, uint64(firstIndex.offset) - t.tailId = firstIndex.filenum - t.itemOffset = firstIndex.offset - - t.index.ReadAt(buffer, offsetsSize-indexEntrySize) + // Read last index, determine what file is the latest and + // what's the current head item + if _, err := t.index.ReadAt(buffer, offsetsSize-indexEntrySize); err != nil { + return err + } lastIndex.unmarshalBinary(buffer) + t.head, err = t.openFile(lastIndex.filenum, openFreezerFileForAppend) if err != nil { return err @@ -279,10 +296,14 @@ func (t *freezerTable) repair() error { if err := truncateFreezerFile(t.index, offsetsSize-indexEntrySize); err != nil { return err } + // Load the previous index entry from the index file offsetsSize -= indexEntrySize - t.index.ReadAt(buffer, offsetsSize-indexEntrySize) + if _, err := t.index.ReadAt(buffer, offsetsSize-indexEntrySize); err != nil { + return err + } var newLastIndex indexEntry newLastIndex.unmarshalBinary(buffer) + // We might have slipped back into an earlier head-file here if newLastIndex.filenum != lastIndex.filenum { // Release earlier opened file @@ -309,10 +330,16 @@ func (t *freezerTable) repair() error { return err } // Update the item and byte counters and return - t.items = uint64(t.itemOffset) + uint64(offsetsSize/indexEntrySize-1) // last indexEntry points to the end of the data file + t.items = t.itemOffset + uint64(offsetsSize/indexEntrySize-1) // last indexEntry points to the end of the data file t.headBytes = contentSize t.headId = lastIndex.filenum + // Delete the leftover files because of head deletion + t.releaseFilesAfter(t.headId, true) + + // Delete the leftover files because of tail deletion + t.releaseFilesBefore(t.tailId, true) + // Close opened files and preopen all files if err := t.preopen(); err != nil { return err @@ -328,6 +355,7 @@ func (t *freezerTable) repair() error { func (t *freezerTable) preopen() (err error) { // The repair might have already opened (some) files t.releaseFilesAfter(0, false) + // Open all except head in RDONLY for i := t.tailId; i < t.headId; i++ { if _, err = t.openFile(i, openFreezerFileForReadOnly); err != nil { @@ -339,8 +367,8 @@ func (t *freezerTable) preopen() (err error) { return err } -// truncate discards any recent data above the provided threshold number. -func (t *freezerTable) truncate(items uint64) error { +// truncateHead discards any recent data above the provided threshold number. +func (t *freezerTable) truncateHead(items uint64) error { t.lock.Lock() defer t.lock.Unlock() @@ -360,17 +388,34 @@ func (t *freezerTable) truncate(items uint64) error { log = t.logger.Warn // Only loud warn if we delete multiple items } log("Truncating freezer table", "items", existing, "limit", items) - if err := truncateFreezerFile(t.index, int64(items+1)*indexEntrySize); err != nil { + + // Calculate the relative offset between the new head and tail, use + // it to access the corresponding index entry. If the requested rewind + // target is even below the freezer tail, truncate all the items in the + // freezer. + var ( + offset uint64 // the offset which points to the last index + itemOffset = atomic.LoadUint64(&t.itemOffset) + ) + if items < itemOffset { + offset, items = 0, itemOffset + } else { + offset = items - itemOffset + } + if err := truncateFreezerFile(t.index, int64(offset+1)*indexEntrySize); err != nil { return err } // Calculate the new expected size of the data file and truncate it buffer := make([]byte, indexEntrySize) - if _, err := t.index.ReadAt(buffer, int64(items*indexEntrySize)); err != nil { + if _, err := t.index.ReadAt(buffer, int64(offset*indexEntrySize)); err != nil { return err } var expected indexEntry expected.unmarshalBinary(buffer) - + // Special case for reading the meta index entry, reset the offset to 0. + if offset == 0 { + expected.offset = 0 + } // We might need to truncate back to older files if expected.filenum != t.headId { // If already open for reading, force-reopen for writing @@ -391,15 +436,137 @@ func (t *freezerTable) truncate(items uint64) error { } // All data files truncated, set internal counters and return t.headBytes = int64(expected.offset) - atomic.StoreUint64(&t.items, items) + atomic.StoreUint64(&t.items, items) + if items < itemOffset+atomic.LoadUint64(&t.itemHidden) { + atomic.StoreUint64(&t.itemHidden, items-itemOffset) + } // Retrieve the new size and update the total size counter newSize, err := t.sizeNolock() if err != nil { return err } t.sizeGauge.Dec(int64(oldSize - newSize)) + return nil +} + +func (t *freezerTable) truncateIndexFile(oldTail uint64, items uint64, fileno uint32) (err error) { + f, err := ioutil.TempFile(t.path, "index-tmp-*") + if err != nil { + return err + } + defer func() { + if err != nil { + f.Close() + os.Remove(f.Name()) + } + }() + // Use a buffered writer to minimize write(2) syscalls. + bufw := bufio.NewWriter(f) + + meta := indexEntry{filenum: fileno, offset: uint32(items)} + buffer := meta.append(nil) + if wn, err := bufw.Write(buffer); err != nil || wn != indexEntrySize { + return errors.New("failed to write meta index") + } + // Copy the remaining indexes into the new index file + offset := items - oldTail + 1 // offset contains the meta index + if _, err := t.index.Seek(int64(offset*indexEntrySize), 0); err != nil { + return err + } + if _, err := io.Copy(bufw, t.index); err != nil { + return err + } + if err := bufw.Flush(); err != nil { + return err + } + if err := f.Close(); err != nil { + return err + } + if err := os.Rename(f.Name(), path.Join(t.path, t.indexName)); err != nil { + return err + } + // Reopen the index file + if err := t.index.Close(); err != nil { + return err + } + offsets, err := openFreezerFileForAppend(path.Join(t.path, t.indexName)) + if err != nil { + return err + } + t.index = offsets + return nil +} +// truncateHead discards any recent data before the provided threshold number. +func (t *freezerTable) truncateTail(tail uint64) error { + t.lock.Lock() + defer t.lock.Unlock() + + // Ensure the given truncate target falls in the correct range + itemOffset, hidden := atomic.LoadUint64(&t.itemOffset), atomic.LoadUint64(&t.itemHidden) + if itemOffset+hidden > tail { + return nil + } + head := atomic.LoadUint64(&t.items) + if head <= tail { + return nil + } + // Load the new tail index, extract the position info. Note the + // second index contains the "real" position info. + offset := tail - itemOffset + 1 + buffer := make([]byte, indexEntrySize) + if _, err := t.index.ReadAt(buffer, int64(offset*indexEntrySize)); err != nil { + return err + } + var newTail indexEntry + newTail.unmarshalBinary(buffer) + + // Update the hidden marker to ensure this item is not accessible. + atomic.StoreUint64(&t.itemHidden, tail-itemOffset) + + // Freezer only supports deletion by file, return without any operation + if t.tailId == newTail.filenum { + return nil + } + if t.tailId > newTail.filenum { + return fmt.Errorf("invalid index, tail-file %d, item-file %d", t.tailId, newTail.filenum) + } + // We need to truncate, save the old size for metrics tracking + oldSize, err := t.sizeNolock() + if err != nil { + return err + } + var ( + cur indexEntry + index = tail + ) + for current := tail - 1; current >= itemOffset; current -= 1 { + offset := current - itemOffset + 1 + if _, err := t.index.ReadAt(buffer, int64(offset*indexEntrySize)); err != nil { + return err + } + cur.unmarshalBinary(buffer) + if cur.filenum != newTail.filenum { + break + } + index = current + } + if err := t.truncateIndexFile(itemOffset, index, newTail.filenum); err != nil { + return err + } + // Release any files before the current tail + t.tailId = newTail.filenum + atomic.StoreUint64(&t.itemOffset, index) + atomic.StoreUint64(&t.itemHidden, tail-index) + t.releaseFilesBefore(t.tailId, true) + + // Retrieve the new size and update the total size counter + newSize, err := t.sizeNolock() + if err != nil { + return err + } + t.sizeGauge.Dec(int64(oldSize - newSize)) return nil } @@ -468,6 +635,19 @@ func (t *freezerTable) releaseFilesAfter(num uint32, remove bool) { } } +// releaseFilesBefore closes all open files with a lower number, and optionally also deletes the files +func (t *freezerTable) releaseFilesBefore(num uint32, remove bool) { + for fnum, f := range t.files { + if fnum < num { + delete(t.files, fnum) + f.Close() + if remove { + os.Remove(f.Name()) + } + } + } +} + // getIndices returns the index entries for the given from-item, covering 'count' items. // N.B: The actual number of returned indices for N items will always be N+1 (unless an // error is returned). @@ -476,7 +656,8 @@ func (t *freezerTable) releaseFilesAfter(num uint32, remove bool) { // it will return error. func (t *freezerTable) getIndices(from, count uint64) ([]*indexEntry, error) { // Apply the table-offset - from = from - uint64(t.itemOffset) + from = from - atomic.LoadUint64(&t.itemOffset) + // For reading N items, we need N+1 indices. buffer := make([]byte, (count+1)*indexEntrySize) if _, err := t.index.ReadAt(buffer, int64(from*indexEntrySize)); err != nil { @@ -561,14 +742,22 @@ func (t *freezerTable) retrieveItems(start, count, maxBytes uint64) ([]byte, []i t.lock.RLock() defer t.lock.RUnlock() - // Ensure the table and the item is accessible + // Ensure the table and the item are accessible if t.index == nil || t.head == nil { return nil, nil, errClosed } - itemCount := atomic.LoadUint64(&t.items) // max number + var ( + itemTail = atomic.LoadUint64(&t.itemOffset) // the minimal item + itemCount = atomic.LoadUint64(&t.items) // the total items(head + 1) + itemHidden = atomic.LoadUint64(&t.itemHidden) // the number of hidden items + ) + // There are some items are marked as deleted but have not been removed yet. + if itemHidden > 0 { + itemTail = itemTail + itemHidden + } // Ensure the start is written, not deleted from the tail, and that the // caller actually wants something - if itemCount <= start || uint64(t.itemOffset) > start || count == 0 { + if itemCount <= start || itemTail > start || count == 0 { return nil, nil, errOutOfBounds } if start+count > itemCount { @@ -648,10 +837,15 @@ func (t *freezerTable) retrieveItems(start, count, maxBytes uint64) ([]byte, []i return output[:outputSize], sizes, nil } -// has returns an indicator whether the specified number data -// exists in the freezer table. +// has returns an indicator whether the specified number data is still accessible +// in the freezer table. func (t *freezerTable) has(number uint64) bool { - return atomic.LoadUint64(&t.items) > number + var ( + items = atomic.LoadUint64(&t.items) + itemOffset = atomic.LoadUint64(&t.itemOffset) + itemHidden = atomic.LoadUint64(&t.itemHidden) + ) + return items > number && itemOffset+itemHidden <= number } // size returns the total data size in the freezer table. diff --git a/core/rawdb/freezer_table_test.go b/core/rawdb/freezer_table_test.go index 803809b5207f4..f7a63cc3ca76d 100644 --- a/core/rawdb/freezer_table_test.go +++ b/core/rawdb/freezer_table_test.go @@ -387,7 +387,7 @@ func TestFreezerTruncate(t *testing.T) { t.Fatal(err) } defer f.Close() - f.truncate(10) // 150 bytes + f.truncateHead(10) // 150 bytes if f.items != 10 { t.Fatalf("expected %d items, got %d", 10, f.items) } @@ -504,7 +504,7 @@ func TestFreezerReadAndTruncate(t *testing.T) { } // Now, truncate back to zero - f.truncate(0) + f.truncateHead(0) // Write the data again batch := f.newBatch() @@ -659,6 +659,178 @@ func TestFreezerOffset(t *testing.T) { } } +func TestTruncateTail(t *testing.T) { + t.Parallel() + rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge() + fname := fmt.Sprintf("truncate-tail-%d", rand.Uint64()) + + // Fill table + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 40, true) + if err != nil { + t.Fatal(err) + } + + // Write 7 x 20 bytes, splitting out into four files + batch := f.newBatch() + require.NoError(t, batch.AppendRaw(0, getChunk(20, 0xFF))) + require.NoError(t, batch.AppendRaw(1, getChunk(20, 0xEE))) + require.NoError(t, batch.AppendRaw(2, getChunk(20, 0xdd))) + require.NoError(t, batch.AppendRaw(3, getChunk(20, 0xcc))) + require.NoError(t, batch.AppendRaw(4, getChunk(20, 0xbb))) + require.NoError(t, batch.AppendRaw(5, getChunk(20, 0xaa))) + require.NoError(t, batch.AppendRaw(6, getChunk(20, 0x11))) + require.NoError(t, batch.commit()) + + // nothing to do, all the items should still be there. + f.truncateTail(0) + checkRetrieve(t, f, map[uint64][]byte{ + 0: getChunk(20, 0xFF), + 1: getChunk(20, 0xEE), + 2: getChunk(20, 0xdd), + 3: getChunk(20, 0xcc), + 4: getChunk(20, 0xbb), + 5: getChunk(20, 0xaa), + 6: getChunk(20, 0x11), + }) + + // truncate single element( item 0 ), deletion is only supported at file level + f.truncateTail(1) + checkRetrieveError(t, f, map[uint64]error{ + 0: errOutOfBounds, + }) + checkRetrieve(t, f, map[uint64][]byte{ + 1: getChunk(20, 0xEE), + 2: getChunk(20, 0xdd), + 3: getChunk(20, 0xcc), + 4: getChunk(20, 0xbb), + 5: getChunk(20, 0xaa), + 6: getChunk(20, 0x11), + }) + + // Reopen the table, the un-persisted deletion will get lost + f.Close() + f, err = newTable(os.TempDir(), fname, rm, wm, sg, 40, true) + if err != nil { + t.Fatal(err) + } + checkRetrieve(t, f, map[uint64][]byte{ + 0: getChunk(20, 0xFF), + 1: getChunk(20, 0xEE), + 2: getChunk(20, 0xdd), + 3: getChunk(20, 0xcc), + 4: getChunk(20, 0xbb), + 5: getChunk(20, 0xaa), + 6: getChunk(20, 0x11), + }) + + // truncate two elements( item 0, item 1 ), the file 0 should be deleted + f.truncateTail(2) + t.Log(f.dumpIndexString(0, 100)) + checkRetrieveError(t, f, map[uint64]error{ + 0: errOutOfBounds, + 1: errOutOfBounds, + }) + checkRetrieve(t, f, map[uint64][]byte{ + 2: getChunk(20, 0xdd), + 3: getChunk(20, 0xcc), + 4: getChunk(20, 0xbb), + 5: getChunk(20, 0xaa), + 6: getChunk(20, 0x11), + }) + + // Reopen the table, the above testing should still pass + f.Close() + f, err = newTable(os.TempDir(), fname, rm, wm, sg, 40, true) + if err != nil { + t.Fatal(err) + } + defer f.Close() + checkRetrieveError(t, f, map[uint64]error{ + 0: errOutOfBounds, + 1: errOutOfBounds, + }) + checkRetrieve(t, f, map[uint64][]byte{ + 2: getChunk(20, 0xdd), + 3: getChunk(20, 0xcc), + 4: getChunk(20, 0xbb), + 5: getChunk(20, 0xaa), + 6: getChunk(20, 0x11), + }) + + // truncate all, the entire freezer should be deleted + f.truncateTail(6) + checkRetrieveError(t, f, map[uint64]error{ + 0: errOutOfBounds, + 1: errOutOfBounds, + 2: errOutOfBounds, + 3: errOutOfBounds, + 4: errOutOfBounds, + 5: errOutOfBounds, + }) + checkRetrieve(t, f, map[uint64][]byte{ + 6: getChunk(20, 0x11), + }) +} + +func TestTruncateHeadBelowTail(t *testing.T) { + t.Parallel() + rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge() + fname := fmt.Sprintf("truncate-head-blow-tail-%d", rand.Uint64()) + + // Fill table + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 40, true) + if err != nil { + t.Fatal(err) + } + + // Write 7 x 20 bytes, splitting out into four files + batch := f.newBatch() + require.NoError(t, batch.AppendRaw(0, getChunk(20, 0xFF))) + require.NoError(t, batch.AppendRaw(1, getChunk(20, 0xEE))) + require.NoError(t, batch.AppendRaw(2, getChunk(20, 0xdd))) + require.NoError(t, batch.AppendRaw(3, getChunk(20, 0xcc))) + require.NoError(t, batch.AppendRaw(4, getChunk(20, 0xbb))) + require.NoError(t, batch.AppendRaw(5, getChunk(20, 0xaa))) + require.NoError(t, batch.AppendRaw(6, getChunk(20, 0x11))) + require.NoError(t, batch.commit()) + + f.truncateTail(4) // Tail = 4 + + // NewHead is required to be 3, the entire table should be truncated + f.truncateHead(4) + checkRetrieveError(t, f, map[uint64]error{ + 0: errOutOfBounds, // Deleted by tail + 1: errOutOfBounds, // Deleted by tail + 2: errOutOfBounds, // Deleted by tail + 3: errOutOfBounds, // Deleted by tail + 4: errOutOfBounds, // Deleted by Head + 5: errOutOfBounds, // Deleted by Head + 6: errOutOfBounds, // Deleted by Head + }) + t.Log(f.dumpIndexString(0, 100)) + + // Append new items + batch = f.newBatch() + require.NoError(t, batch.AppendRaw(4, getChunk(20, 0xbb))) + require.NoError(t, batch.AppendRaw(5, getChunk(20, 0xaa))) + require.NoError(t, batch.AppendRaw(6, getChunk(20, 0x11))) + require.NoError(t, batch.commit()) + t.Log(f.dumpIndexString(0, 100)) + + checkRetrieve(t, f, map[uint64][]byte{ + 4: getChunk(20, 0xbb), + 5: getChunk(20, 0xaa), + 6: getChunk(20, 0x11), + }) + + f.truncateTail(5) // Lazy deleted the item-4, it's hidden + f.truncateHead(5) // New head is reset to item-4 + t.Log(f.dumpIndexString(0, 100)) + checkRetrieveError(t, f, map[uint64]error{ + 4: errOutOfBounds, // Hidden item + }) +} + func checkRetrieve(t *testing.T, f *freezerTable, items map[uint64][]byte) { t.Helper() diff --git a/core/rawdb/freezer_test.go b/core/rawdb/freezer_test.go index fa84f803068b9..c97371110e7f9 100644 --- a/core/rawdb/freezer_test.go +++ b/core/rawdb/freezer_test.go @@ -186,7 +186,7 @@ func TestFreezerConcurrentModifyRetrieve(t *testing.T) { wg.Wait() } -// This test runs ModifyAncients and TruncateAncients concurrently with each other. +// This test runs ModifyAncients and TruncateHead concurrently with each other. func TestFreezerConcurrentModifyTruncate(t *testing.T) { f, dir := newFreezerForTesting(t, freezerTestTableDef) defer os.RemoveAll(dir) @@ -196,7 +196,7 @@ func TestFreezerConcurrentModifyTruncate(t *testing.T) { for i := 0; i < 1000; i++ { // First reset and write 100 items. - if err := f.TruncateAncients(0); err != nil { + if err := f.TruncateHead(0); err != nil { t.Fatal("truncate failed:", err) } _, err := f.ModifyAncients(func(op ethdb.AncientWriteOp) error { @@ -231,7 +231,7 @@ func TestFreezerConcurrentModifyTruncate(t *testing.T) { wg.Done() }() go func() { - truncateErr = f.TruncateAncients(10) + truncateErr = f.TruncateHead(10) wg.Done() }() go func() { diff --git a/core/rawdb/schema.go b/core/rawdb/schema.go index d432db2ab752e..e804d689f50ff 100644 --- a/core/rawdb/schema.go +++ b/core/rawdb/schema.go @@ -117,9 +117,9 @@ const ( freezerDifficultyTable = "diffs" ) -// FreezerNoSnappy configures whether compression is disabled for the ancient-tables. +// ChainFreezerNoSnappy configures whether compression is disabled for the ancient-chain-tables. // Hashes and difficulties don't compress well. -var FreezerNoSnappy = map[string]bool{ +var ChainFreezerNoSnappy = map[string]bool{ freezerHeaderTable: false, freezerHashTable: true, freezerBodiesTable: false, @@ -127,6 +127,11 @@ var FreezerNoSnappy = map[string]bool{ freezerDifficultyTable: true, } +const ( + // ChainFreezer indicates the name of ancient chain freezer + ChainFreezer = "chain" +) + // LegacyTxLookupEntry is the legacy TxLookupEntry definition with some unnecessary // fields. type LegacyTxLookupEntry struct { diff --git a/core/rawdb/table.go b/core/rawdb/table.go index 91fc31b660d67..d83ee5e38d923 100644 --- a/core/rawdb/table.go +++ b/core/rawdb/table.go @@ -52,53 +52,68 @@ func (t *table) Get(key []byte) ([]byte, error) { // HasAncient is a noop passthrough that just forwards the request to the underlying // database. -func (t *table) HasAncient(kind string, number uint64) (bool, error) { - return t.db.HasAncient(kind, number) +func (t *table) HasAncient(typ string, kind string, number uint64) (bool, error) { + return t.db.HasAncient(typ, kind, number) } // Ancient is a noop passthrough that just forwards the request to the underlying // database. -func (t *table) Ancient(kind string, number uint64) ([]byte, error) { - return t.db.Ancient(kind, number) +func (t *table) Ancient(typ string, kind string, number uint64) ([]byte, error) { + return t.db.Ancient(typ, kind, number) } // AncientRange is a noop passthrough that just forwards the request to the underlying // database. -func (t *table) AncientRange(kind string, start, count, maxBytes uint64) ([][]byte, error) { - return t.db.AncientRange(kind, start, count, maxBytes) +func (t *table) AncientRange(typ string, kind string, start, count, maxBytes uint64) ([][]byte, error) { + return t.db.AncientRange(typ, kind, start, count, maxBytes) } // Ancients is a noop passthrough that just forwards the request to the underlying // database. -func (t *table) Ancients() (uint64, error) { - return t.db.Ancients() +func (t *table) Ancients(typ string) (uint64, error) { + return t.db.Ancients(typ) +} + +// Tail is a noop passthrough that just forwards the request to the underlying +// database. +func (t *table) Tail(typ string) (uint64, error) { + return t.db.Tail(typ) } // AncientSize is a noop passthrough that just forwards the request to the underlying // database. -func (t *table) AncientSize(kind string) (uint64, error) { - return t.db.AncientSize(kind) +func (t *table) AncientSize(typ string, kind string) (uint64, error) { + return t.db.AncientSize(typ, kind) +} + +// ModifyAncients is a noop passthrough that just forwards the request to the underlying +// database. +func (t *table) ModifyAncients(typ string, fn func(ethdb.AncientWriteOp) error) (int64, error) { + return t.db.ModifyAncients(typ, fn) } -// ModifyAncients runs an ancient write operation on the underlying database. -func (t *table) ModifyAncients(fn func(ethdb.AncientWriteOp) error) (int64, error) { - return t.db.ModifyAncients(fn) +// ReadAncients is a noop passthrough that just forwards the request to the underlying +// database. +func (t *table) ReadAncients(typ string, fn func(reader ethdb.AncientReadOp) error) (err error) { + return t.db.ReadAncients(typ, fn) } -func (t *table) ReadAncients(fn func(reader ethdb.AncientReader) error) (err error) { - return t.db.ReadAncients(fn) +// TruncateHead is a noop passthrough that just forwards the request to the underlying +// database. +func (t *table) TruncateHead(typ string, items uint64) error { + return t.db.TruncateHead(typ, items) } -// TruncateAncients is a noop passthrough that just forwards the request to the underlying +// TruncateTail is a noop passthrough that just forwards the request to the underlying // database. -func (t *table) TruncateAncients(items uint64) error { - return t.db.TruncateAncients(items) +func (t *table) TruncateTail(typ string, tail uint64) error { + return t.db.TruncateTail(typ, tail) } // Sync is a noop passthrough that just forwards the request to the underlying // database. -func (t *table) Sync() error { - return t.db.Sync() +func (t *table) Sync(typ string) error { + return t.db.Sync(typ) } // Put inserts the given value into the database at a prefixed version of the diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 4ca1b55bbb2bd..0e9fbd752e1ae 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -523,7 +523,7 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I } else { d.ancientLimit = 0 } - frozen, _ := d.stateDB.Ancients() // Ignore the error here since light client can also hit here. + frozen, _ := d.stateDB.Ancients(rawdb.ChainFreezer) // Ignore the error here since light client can also hit here. // If a part of blockchain data has already been written into active store, // disable the ancient style insertion explicitly. diff --git a/ethdb/database.go b/ethdb/database.go index 0a5729c6c1ecc..a82d54795dd6a 100644 --- a/ethdb/database.go +++ b/ethdb/database.go @@ -71,23 +71,26 @@ type KeyValueStore interface { type AncientReader interface { // HasAncient returns an indicator whether the specified data exists in the // ancient store. - HasAncient(kind string, number uint64) (bool, error) + HasAncient(typ string, kind string, number uint64) (bool, error) // Ancient retrieves an ancient binary blob from the append-only immutable files. - Ancient(kind string, number uint64) ([]byte, error) + Ancient(typ string, kind string, number uint64) ([]byte, error) // AncientRange retrieves multiple items in sequence, starting from the index 'start'. // It will return // - at most 'count' items, // - at least 1 item (even if exceeding the maxBytes), but will otherwise // return as many items as fit into maxBytes. - AncientRange(kind string, start, count, maxBytes uint64) ([][]byte, error) + AncientRange(typ string, kind string, start, count, maxBytes uint64) ([][]byte, error) // Ancients returns the ancient item numbers in the ancient store. - Ancients() (uint64, error) + Ancients(typ string) (uint64, error) + + // Tail returns the number of first stored item in the freezer. + Tail(typ string) (uint64, error) // AncientSize returns the ancient size of the specified category. - AncientSize(kind string) (uint64, error) + AncientSize(typ string, kind string) (uint64, error) } // AncientBatchReader is the interface for 'batched' or 'atomic' reading. @@ -96,7 +99,7 @@ type AncientBatchReader interface { // ReadAncients runs the given read operation while ensuring that no writes take place // on the underlying freezer. - ReadAncients(fn func(AncientReader) error) (err error) + ReadAncients(typ string, fn func(AncientReadOp) error) (err error) } // AncientWriter contains the methods required to write to immutable ancient data. @@ -104,13 +107,45 @@ type AncientWriter interface { // ModifyAncients runs a write operation on the ancient store. // If the function returns an error, any changes to the underlying store are reverted. // The integer return value is the total size of the written data. - ModifyAncients(func(AncientWriteOp) error) (int64, error) + ModifyAncients(typ string, fn func(AncientWriteOp) error) (int64, error) - // TruncateAncients discards all but the first n ancient data from the ancient store. - TruncateAncients(n uint64) error + // TruncateHead discards all but the first n ancient data from the ancient store. + // After the truncation, the latest item can be accessed it item_n-1(start from 0). + TruncateHead(typ string, n uint64) error + + // TruncateTail discards the first n ancient data from the ancient store. The already + // deleted items are ignored. After the truncation, the earliest item can be accessed + // is item_n. + TruncateTail(typ string, n uint64) error // Sync flushes all in-memory ancient store data to disk. - Sync() error + Sync(typ string) error +} + +// AncientReadOp is given to the function argument of ReadAncients. +type AncientReadOp interface { + // HasAncient returns an indicator whether the specified data exists in the + // ancient store. + HasAncient(kind string, number uint64) (bool, error) + + // Ancient retrieves an ancient binary blob from the append-only immutable files. + Ancient(kind string, number uint64) ([]byte, error) + + // AncientRange retrieves multiple items in sequence, starting from the index 'start'. + // It will return + // - at most 'count' items, + // - at least 1 item (even if exceeding the maxBytes), but will otherwise + // return as many items as fit into maxBytes. + AncientRange(kind string, start, count, maxBytes uint64) ([][]byte, error) + + // Ancients returns the ancient item numbers in the ancient store. + Ancients() (uint64, error) + + // Tail returns the number of first stored item in the freezer. + Tail() (uint64, error) + + // AncientSize returns the ancient size of the specified category. + AncientSize(kind string) (uint64, error) } // AncientWriteOp is given to the function argument of ModifyAncients. diff --git a/les/downloader/downloader.go b/les/downloader/downloader.go index e7dfc4158e0ed..47831cc236295 100644 --- a/les/downloader/downloader.go +++ b/les/downloader/downloader.go @@ -526,7 +526,7 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I } else { d.ancientLimit = 0 } - frozen, _ := d.stateDB.Ancients() // Ignore the error here since light client can also hit here. + frozen, _ := d.stateDB.Ancients(rawdb.ChainFreezer) // Ignore the error here since light client can also hit here. // If a part of blockchain data has already been written into active store, // disable the ancient style insertion explicitly.