From 63ca13f103ce8b555be715365f8f1eb74355b6a0 Mon Sep 17 00:00:00 2001 From: Gary Rong Date: Tue, 23 Nov 2021 14:02:06 +0800 Subject: [PATCH] core/rawdb, cmd, ethdb, eth: implement freezer tail deletion --- core/blockchain.go | 6 +- core/rawdb/accessors_chain.go | 6 +- core/rawdb/database.go | 16 +- core/rawdb/freezer.go | 72 +++++- core/rawdb/freezer_batch.go | 2 +- core/rawdb/freezer_meta.go | 236 +++++++++++++++++++ core/rawdb/freezer_meta_test.go | 69 ++++++ core/rawdb/freezer_table.go | 391 ++++++++++++++++++++++++------- core/rawdb/freezer_table_test.go | 262 ++++++++++++++++++--- core/rawdb/freezer_test.go | 6 +- core/rawdb/freezer_utils.go | 83 +++++++ core/rawdb/freezer_utils_test.go | 76 ++++++ core/rawdb/table.go | 18 +- ethdb/database.go | 16 +- 14 files changed, 1109 insertions(+), 150 deletions(-) create mode 100644 core/rawdb/freezer_meta.go create mode 100644 core/rawdb/freezer_meta_test.go create mode 100644 core/rawdb/freezer_utils.go create mode 100644 core/rawdb/freezer_utils_test.go diff --git a/core/blockchain.go b/core/blockchain.go index 8da7cc22923f8..585f3e17ee38c 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -592,7 +592,7 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, root common.Hash, repair bo if num+1 <= frozen { // Truncate all relative data(header, total difficulty, body, receipt // and canonical hash) from ancient store. - if err := bc.db.TruncateAncients(num); err != nil { + if err := bc.db.TruncateHead(num); err != nil { log.Crit("Failed to truncate ancient data", "number", num, "err", err) } // Remove the hash <-> number mapping from the active store. @@ -994,7 +994,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ // The tx index data could not be written. // Roll back the ancient store update. fastBlock := bc.CurrentFastBlock().NumberU64() - if err := bc.db.TruncateAncients(fastBlock + 1); err != nil { + if err := bc.db.TruncateHead(fastBlock + 1); err != nil { log.Error("Can't truncate ancient store after failed insert", "err", err) } return 0, err @@ -1010,7 +1010,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ if !updateHead(blockChain[len(blockChain)-1]) { // We end up here if the header chain has reorg'ed, and the blocks/receipts // don't match the canonical chain. - if err := bc.db.TruncateAncients(previousFastBlock + 1); err != nil { + if err := bc.db.TruncateHead(previousFastBlock + 1); err != nil { log.Error("Can't truncate ancient store after failed insert", "err", err) } return 0, errSideChainReceipts diff --git a/core/rawdb/accessors_chain.go b/core/rawdb/accessors_chain.go index 8e9706ea6fdb9..7e4a06df924b9 100644 --- a/core/rawdb/accessors_chain.go +++ b/core/rawdb/accessors_chain.go @@ -83,8 +83,8 @@ type NumberHash struct { Hash common.Hash } -// ReadAllHashes retrieves all the hashes assigned to blocks at a certain heights, -// both canonical and reorged forks included. +// ReadAllHashesInRange retrieves all the hashes assigned to blocks at a certain +// heights, both canonical and reorged forks included. // This method considers both limits to be _inclusive_. func ReadAllHashesInRange(db ethdb.Iteratee, first, last uint64) []*NumberHash { var ( @@ -776,7 +776,7 @@ func WriteBlock(db ethdb.KeyValueWriter, block *types.Block) { WriteHeader(db, block.Header()) } -// WriteAncientBlock writes entire block data into ancient store and returns the total written size. +// WriteAncientBlocks writes entire block data into ancient store and returns the total written size. func WriteAncientBlocks(db ethdb.AncientWriter, blocks []*types.Block, receipts []types.Receipts, td *big.Int) (int64, error) { var ( tdSum = new(big.Int).Set(td) diff --git a/core/rawdb/database.go b/core/rawdb/database.go index 5ef64d26a2057..64cc2862bb37e 100644 --- a/core/rawdb/database.go +++ b/core/rawdb/database.go @@ -99,6 +99,11 @@ func (db *nofreezedb) Ancients() (uint64, error) { return 0, errNotSupported } +// Tail returns an error as we don't have a backing chain freezer. +func (db *nofreezedb) Tail() (uint64, error) { + return 0, errNotSupported +} + // AncientSize returns an error as we don't have a backing chain freezer. func (db *nofreezedb) AncientSize(kind string) (uint64, error) { return 0, errNotSupported @@ -109,8 +114,13 @@ func (db *nofreezedb) ModifyAncients(func(ethdb.AncientWriteOp) error) (int64, e return 0, errNotSupported } -// TruncateAncients returns an error as we don't have a backing chain freezer. -func (db *nofreezedb) TruncateAncients(items uint64) error { +// TruncateHead returns an error as we don't have a backing chain freezer. +func (db *nofreezedb) TruncateHead(items uint64) error { + return errNotSupported +} + +// TruncateTail returns an error as we don't have a backing chain freezer. +func (db *nofreezedb) TruncateTail(items uint64) error { return errNotSupported } @@ -211,7 +221,7 @@ func NewDatabaseWithFreezer(db ethdb.KeyValueStore, freezer string, namespace st // Block #1 is still in the database, we're allowed to init a new feezer } // Otherwise, the head header is still the genesis, we're allowed to init a new - // feezer. + // freezer. } } // Freezer is consistent with the key-value database, permit combining the two diff --git a/core/rawdb/freezer.go b/core/rawdb/freezer.go index e19c202adc843..503ccbbb98512 100644 --- a/core/rawdb/freezer.go +++ b/core/rawdb/freezer.go @@ -59,14 +59,14 @@ const ( freezerRecheckInterval = time.Minute // freezerBatchLimit is the maximum number of blocks to freeze in one batch - // before doing an fsync and deleting it from the key-value store. + // before doing a fsync and deleting it from the key-value store. freezerBatchLimit = 30000 // freezerTableSize defines the maximum size of freezer data files. freezerTableSize = 2 * 1000 * 1000 * 1000 ) -// freezer is an memory mapped append-only database to store immutable chain data +// freezer is a memory mapped append-only database to store immutable chain data // into flat files: // // - The append only nature ensures that disk writes are minimized. @@ -78,6 +78,7 @@ type freezer struct { // 64-bit aligned fields can be atomic. The struct is guaranteed to be so aligned, // so take advantage of that (https://golang.org/pkg/sync/atomic/#pkg-note-BUG). frozen uint64 // Number of blocks already frozen + tail uint64 // Number of the first stored item in the freezer threshold uint64 // Number of recent blocks not to freeze (params.FullImmutabilityThreshold apart from tests) // This lock synchronizes writers and the truncate operation, as well as @@ -219,6 +220,11 @@ func (f *freezer) Ancients() (uint64, error) { return atomic.LoadUint64(&f.frozen), nil } +// Tail returns the number of first stored item in the freezer. +func (f *freezer) Tail() (uint64, error) { + return atomic.LoadUint64(&f.tail), nil +} + // AncientSize returns the ancient size of the specified category. func (f *freezer) AncientSize(kind string) (uint64, error) { // This needs the write lock to avoid data races on table fields. @@ -254,7 +260,7 @@ func (f *freezer) ModifyAncients(fn func(ethdb.AncientWriteOp) error) (writeSize if err != nil { // The write operation has failed. Go back to the previous item position. for name, table := range f.tables { - err := table.truncate(prevItem) + err := table.truncateHead(prevItem) if err != nil { log.Error("Freezer table roll-back failed", "table", name, "index", prevItem, "err", err) } @@ -274,8 +280,8 @@ func (f *freezer) ModifyAncients(fn func(ethdb.AncientWriteOp) error) (writeSize return writeSize, nil } -// TruncateAncients discards any recent data above the provided threshold number. -func (f *freezer) TruncateAncients(items uint64) error { +// TruncateHead discards any recent data above the provided threshold number. +func (f *freezer) TruncateHead(items uint64) error { if f.readonly { return errReadOnly } @@ -285,12 +291,42 @@ func (f *freezer) TruncateAncients(items uint64) error { if atomic.LoadUint64(&f.frozen) <= items { return nil } + var frozen uint64 + for _, table := range f.tables { + if err := table.truncateHead(items); err != nil { + return err + } + // Tables should be aligned, only check the first table. + if frozen == 0 { + frozen = atomic.LoadUint64(&table.items) + } + } + atomic.StoreUint64(&f.frozen, frozen) + return nil +} + +// TruncateTail discards any recent data below the provided threshold number. +func (f *freezer) TruncateTail(tail uint64) error { + if f.readonly { + return errReadOnly + } + f.writeLock.Lock() + defer f.writeLock.Unlock() + + if atomic.LoadUint64(&f.tail) >= tail { + return nil + } + var truncated uint64 for _, table := range f.tables { - if err := table.truncate(items); err != nil { + if err := table.truncateTail(tail); err != nil { return err } + if truncated == 0 { + // Tables should be aligned, only check the first table. + truncated = table.tail() + } } - atomic.StoreUint64(&f.frozen, items) + atomic.StoreUint64(&f.tail, truncated) return nil } @@ -308,21 +344,31 @@ func (f *freezer) Sync() error { return nil } -// repair truncates all data tables to the same length. +// repair truncates all data tables to the same length, both tail and head. func (f *freezer) repair() error { - min := uint64(math.MaxUint64) + var ( + head = uint64(math.MaxUint64) + tail = uint64(0) + ) for _, table := range f.tables { items := atomic.LoadUint64(&table.items) - if min > items { - min = items + if head > items { + head = items + } + if table.tail() > tail { + tail = table.tail() } } for _, table := range f.tables { - if err := table.truncate(min); err != nil { + if err := table.truncateHead(head); err != nil { + return err + } + if err := table.truncateTail(tail); err != nil { return err } } - atomic.StoreUint64(&f.frozen, min) + atomic.StoreUint64(&f.frozen, head) + atomic.StoreUint64(&f.tail, tail) return nil } diff --git a/core/rawdb/freezer_batch.go b/core/rawdb/freezer_batch.go index 762fa8f25f19d..864a7f5e98bfe 100644 --- a/core/rawdb/freezer_batch.go +++ b/core/rawdb/freezer_batch.go @@ -191,7 +191,7 @@ func (batch *freezerTableBatch) commit() error { dataSize := int64(len(batch.dataBuffer)) batch.dataBuffer = batch.dataBuffer[:0] - // Write index. + // Write indices. _, err = batch.t.index.Write(batch.indexBuffer) if err != nil { return err diff --git a/core/rawdb/freezer_meta.go b/core/rawdb/freezer_meta.go new file mode 100644 index 0000000000000..4df449d53587d --- /dev/null +++ b/core/rawdb/freezer_meta.go @@ -0,0 +1,236 @@ +// Copyright 2022 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see + +package rawdb + +import ( + "bytes" + "errors" + "fmt" + "os" + + "github.com/ethereum/go-ethereum/rlp" +) + +const ( + freezerVersion = 1 // The version tag of freezer table structure + metaLength = 1024 // The number of bytes allocated for the freezer table metadata +) + +var errIncompatibleVersion = errors.New("incompatible version") + +type incompatibleError struct { + version uint16 + expect uint16 + err error +} + +func newIncompatibleError(version uint16) *incompatibleError { + return &incompatibleError{ + version: version, + expect: freezerVersion, + err: errIncompatibleVersion, + } +} + +// Unwrap returns the internal evm error which allows us for further +// analysis outside. +func (err *incompatibleError) Unwrap() error { + return err.err +} + +func (err *incompatibleError) Error() string { + return fmt.Sprintf("%v, get %d, expect %d", err.err, err.version, err.expect) +} + +// freezerTableMeta wraps all the metadata of the freezer table. +type freezerTableMeta struct { + version uint16 // Freezer table version descriptor + tailId uint32 // The number of the earliest file + deleted uint64 // The number of items that have been removed from the table + hidden uint64 // The number of items that have been hidden in the table +} + +// newMetadata initializes the metadata object with the given parameters. +func newMetadata(tailId uint32, deleted uint64, hidden uint64) *freezerTableMeta { + return &freezerTableMeta{ + version: freezerVersion, + tailId: tailId, + deleted: deleted, + hidden: hidden, + } +} + +// encodeMetadata encodes the given parameters as the freezer table metadata. +func encodeMetadata(meta *freezerTableMeta) ([]byte, error) { + buffer := new(bytes.Buffer) + if err := rlp.Encode(buffer, meta.version); err != nil { + return nil, err + } + if err := rlp.Encode(buffer, meta.tailId); err != nil { + return nil, err + } + if err := rlp.Encode(buffer, meta.deleted); err != nil { + return nil, err + } + if err := rlp.Encode(buffer, meta.hidden); err != nil { + return nil, err + } + buffer.Write(make([]byte, metaLength-buffer.Len())) // Right pad zero bytes to the specified length + return buffer.Bytes(), nil +} + +// decodeMetadata decodes the freezer-table metadata from the given +// rlp stream. +func decodeMetadata(r *rlp.Stream) (*freezerTableMeta, error) { + var version uint16 + if err := r.Decode(&version); err != nil { + return nil, err + } + if version != freezerVersion { + return nil, newIncompatibleError(version) + } + var tailId uint32 + if err := r.Decode(&tailId); err != nil { + return nil, err + } + var deleted, hidden uint64 + if err := r.Decode(&deleted); err != nil { + return nil, err + } + if err := r.Decode(&hidden); err != nil { + return nil, err + } + return newMetadata(tailId, deleted, hidden), nil +} + +// storeMetadata stores the metadata of the freezer table into the +// given index file. +func storeMetadata(index *os.File, meta *freezerTableMeta) error { + encoded, err := encodeMetadata(meta) + if err != nil { + return err + } + if _, err := index.WriteAt(encoded, 0); err != nil { + return err + } + return nil +} + +// loadMetadata loads the metadata of the freezer table from the +// given index file. Return the error if the version of loaded +// metadata is not expected. +func loadMetadata(index *os.File) (*freezerTableMeta, error) { + stat, err := index.Stat() + if err != nil { + return nil, err + } + if stat.Size() < metaLength { + return nil, newIncompatibleError(0) + } + buffer := make([]byte, metaLength) + if _, err := index.ReadAt(buffer, 0); err != nil { + return nil, err + } + return decodeMetadata(rlp.NewStream(bytes.NewReader(buffer), 0)) +} + +// upgradeV0TableIndex extracts the indexes from version-0 index file and +// encodes/stores them into the latest version index file. +func upgradeV0TableIndex(index *os.File) error { + // Create a temporary offset buffer to read indexEntry info + buffer := make([]byte, indexEntrySize) + + // Read index zero, determine what file is the earliest + // and how many entries are deleted from the freezer table. + var first indexEntry + if _, err := index.ReadAt(buffer, 0); err != nil { + return err + } + first.unmarshalBinary(buffer) + + encoded, err := encodeMetadata(newMetadata(first.filenum, uint64(first.offset), 0)) + if err != nil { + return err + } + // Close the origin index file. + if err := index.Close(); err != nil { + return err + } + return copyFrom(index.Name(), index.Name(), indexEntrySize, func(f *os.File) error { + _, err := f.Write(encoded) + return err + }) +} + +// upgradeTableIndex upgrades the legacy index file to the latest version. +// This function should be responsible for closing the origin index file +// and return the re-opened one. +func upgradeTableIndex(index *os.File, version uint16) (*os.File, *freezerTableMeta, error) { + switch version { + case 0: + if err := upgradeV0TableIndex(index); err != nil { + return nil, nil, err + } + default: + return nil, nil, errors.New("unknown freezer table index") + } + // Reopen the upgraded index file and load the metadata from it + index, err := os.Open(index.Name()) + if err != nil { + return nil, nil, err + } + meta, err := loadMetadata(index) + if err != nil { + return nil, nil, err + } + return index, meta, nil +} + +// repairTableIndex repairs the given index file of freezer table and returns +// the stored metadata inside. If the index file is be rewritten, the function +// should be responsible for closing the origin one and return the new handler. +// If the table is empty, commit the empty metadata; +// If the table is legacy, upgrade it to the latest version; +func repairTableIndex(index *os.File) (*os.File, *freezerTableMeta, error) { + stat, err := index.Stat() + if err != nil { + return nil, nil, err + } + if stat.Size() == 0 { + meta := newMetadata(0, 0, 0) + if err := storeMetadata(index, meta); err != nil { + return nil, nil, err + } + // Shift file cursor to the end for next write operation + _, err = index.Seek(0, 2) + if err != nil { + return nil, nil, err + } + return index, meta, nil + } + meta, err := loadMetadata(index) + if err != nil { + if !errors.Is(err, errIncompatibleVersion) { + return nil, nil, err + } + index, meta, err = upgradeTableIndex(index, err.(*incompatibleError).version) + } + if err != nil { + return nil, nil, err + } + return index, meta, nil +} diff --git a/core/rawdb/freezer_meta_test.go b/core/rawdb/freezer_meta_test.go new file mode 100644 index 0000000000000..3340450f984a9 --- /dev/null +++ b/core/rawdb/freezer_meta_test.go @@ -0,0 +1,69 @@ +// Copyright 2022 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see + +package rawdb + +import ( + "errors" + "io/ioutil" + "os" + "testing" +) + +func TestStoreLoadFreezerTableMeta(t *testing.T) { + var cases = []struct { + version uint16 + deleted uint64 + hidden uint64 + expectErr error + }{ + { + freezerVersion, 100, 200, nil, + }, + { + 0, 100, 200, errIncompatibleVersion, // legacy version + }, + } + for _, c := range cases { + f, err := ioutil.TempFile(os.TempDir(), "*") + if err != nil { + t.Fatalf("Failed to create file %v", err) + } + err = storeMetadata(f, &freezerTableMeta{ + version: c.version, + deleted: c.deleted, + hidden: c.hidden, + }) + if err != nil { + t.Fatalf("Failed to store metadata %v", err) + } + meta, err := loadMetadata(f) + if !errors.Is(err, c.expectErr) { + t.Fatalf("Unexpected error %v", err) + } + if c.expectErr == nil { + if meta.version != c.version { + t.Fatalf("Unexpected version field") + } + if meta.deleted != c.deleted { + t.Fatalf("Unexpected deleted field") + } + if meta.hidden != c.hidden { + t.Fatalf("Unexpected hidden field") + } + } + } +} diff --git a/core/rawdb/freezer_table.go b/core/rawdb/freezer_table.go index 22405cf9b4f89..f1baca9aac6be 100644 --- a/core/rawdb/freezer_table.go +++ b/core/rawdb/freezer_table.go @@ -46,21 +46,20 @@ var ( errNotSupported = errors.New("this operation is not supported") ) -// indexEntry contains the number/id of the file that the data resides in, aswell as the -// offset within the file to the end of the data +// indexEntry contains the number/id of the file that the data resides in, as well as the +// offset within the file to the end of the data. // In serialized form, the filenum is stored as uint16. type indexEntry struct { - filenum uint32 // stored as uint16 ( 2 bytes) - offset uint32 // stored as uint32 ( 4 bytes) + filenum uint32 // stored as uint16 ( 2 bytes ) + offset uint32 // stored as uint32 ( 4 bytes ) } const indexEntrySize = 6 // unmarshalBinary deserializes binary b into the rawIndex entry. -func (i *indexEntry) unmarshalBinary(b []byte) error { +func (i *indexEntry) unmarshalBinary(b []byte) { i.filenum = uint32(binary.BigEndian.Uint16(b[:2])) i.offset = binary.BigEndian.Uint32(b[2:6]) - return nil } // append adds the encoded entry to the end of b. @@ -75,14 +74,14 @@ func (i *indexEntry) append(b []byte) []byte { // bounds returns the start- and end- offsets, and the file number of where to // read there data item marked by the two index entries. The two entries are // assumed to be sequential. -func (start *indexEntry) bounds(end *indexEntry) (startOffset, endOffset, fileId uint32) { - if start.filenum != end.filenum { +func (i *indexEntry) bounds(end *indexEntry) (startOffset, endOffset, fileId uint32) { + if i.filenum != end.filenum { // If a piece of data 'crosses' a data-file, // it's actually in one piece on the second data-file. // We return a zero-indexEntry for the second file as start return 0, end.offset, end.filenum } - return start.offset, end.offset, end.filenum + return i.offset, end.offset, end.filenum } // freezerTable represents a single chained data table within the freezer (e.g. blocks). @@ -92,7 +91,15 @@ type freezerTable struct { // WARNING: The `items` field is accessed atomically. On 32 bit platforms, only // 64-bit aligned fields can be atomic. The struct is guaranteed to be so aligned, // so take advantage of that (https://golang.org/pkg/sync/atomic/#pkg-note-BUG). - items uint64 // Number of items stored in the table (including items removed from tail) + items uint64 // Number of items stored in the table (including items removed from tail) + itemOffset uint64 // Number of items removed from the table + + // itemHidden is the number of items marked as deleted they are not removed + // from the table yet. Since the tail deletion is only supported at file level + // which means the actual deletion will be delayed until the total "marked as + // deleted" data reach the threshold. Before that these items will be hidden + // to prevent being visited again. + itemHidden uint64 noCompression bool // if true, disables snappy compression. Note: does not work retroactively maxFileSize uint32 // Max file size for data-files @@ -105,10 +112,6 @@ type freezerTable struct { tailId uint32 // number of the earliest file index *os.File // File descriptor for the indexEntry file of the table - // In the case that old items are deleted (from the tail), we use itemOffset - // to count how many historic items have gone missing. - itemOffset uint32 // Offset (number of discarded items) - headBytes int64 // Number of bytes written to the head file readMeter metrics.Meter // Meter for measuring the effective amount of data read writeMeter metrics.Meter // Meter for measuring the effective amount of data written @@ -162,7 +165,7 @@ func truncateFreezerFile(file *os.File, size int64) error { } // newTable opens a freezer table, creating the data and index files if they are -// non existent. Both files are truncated to the shortest common length to ensure +// non-existent. Both files are truncated to the shortest common length to ensure // they don't go out of sync. func newTable(path string, name string, readMeter metrics.Meter, writeMeter metrics.Meter, sizeGauge metrics.Gauge, maxFilesize uint32, noCompression bool) (*freezerTable, error) { // Ensure the containing directory exists and open the indexEntry file @@ -171,19 +174,17 @@ func newTable(path string, name string, readMeter metrics.Meter, writeMeter metr } var idxName string if noCompression { - // Raw idx - idxName = fmt.Sprintf("%s.ridx", name) + idxName = fmt.Sprintf("%s.ridx", name) // raw index file } else { - // Compressed idx - idxName = fmt.Sprintf("%s.cidx", name) + idxName = fmt.Sprintf("%s.cidx", name) // compressed index file } - offsets, err := openFreezerFileForAppend(filepath.Join(path, idxName)) + index, err := openFreezerFileForAppend(filepath.Join(path, idxName)) if err != nil { return nil, err } // Create the table and repair any past inconsistency tab := &freezerTable{ - index: offsets, + index: index, files: make(map[uint32]*os.File), readMeter: readMeter, writeMeter: writeMeter, @@ -209,26 +210,29 @@ func newTable(path string, name string, readMeter metrics.Meter, writeMeter metr return tab, nil } -// repair cross checks the head and the index file and truncates them to +// repair cross-checks the head and the index file and truncates them to // be in sync with each other after a potential crash / data loss. func (t *freezerTable) repair() error { - // Create a temporary offset buffer to init files with and read indexEntry into - buffer := make([]byte, indexEntrySize) + index, meta, err := repairTableIndex(t.index) + if err != nil { + return err + } + t.index = index // index file may be reopened, update it + t.tailId, t.itemOffset, t.itemHidden = meta.tailId, meta.deleted, meta.hidden - // If we've just created the files, initialize the index with the 0 indexEntry + // Ensure the index is a multiple of indexEntrySize bytes. The assumption + // is held that index file at least has metaLength bytes for storing meta- + // data. stat, err := t.index.Stat() if err != nil { return err } - if stat.Size() == 0 { - if _, err := t.index.Write(buffer); err != nil { + if overflow := (stat.Size() - metaLength) % indexEntrySize; overflow != 0 { + err := truncateFreezerFile(t.index, stat.Size()-overflow) + if err != nil { return err } } - // Ensure the index is a multiple of indexEntrySize bytes - if overflow := stat.Size() % indexEntrySize; overflow != 0 { - truncateFreezerFile(t.index, stat.Size()-overflow) // New file can't trigger this path - } // Retrieve the file sizes and prepare for truncation if stat, err = t.index.Stat(); err != nil { return err @@ -237,25 +241,32 @@ func (t *freezerTable) repair() error { // Open the head file var ( - firstIndex indexEntry - lastIndex indexEntry contentSize int64 contentExp int64 + lastIndex *indexEntry ) - // Read index zero, determine what file is the earliest - // and what item offset to use - t.index.ReadAt(buffer, 0) - firstIndex.unmarshalBinary(buffer) - - t.tailId = firstIndex.filenum - t.itemOffset = firstIndex.offset - - t.index.ReadAt(buffer, offsetsSize-indexEntrySize) - lastIndex.unmarshalBinary(buffer) - t.head, err = t.openFile(lastIndex.filenum, openFreezerFileForAppend) + // Read last index, determine what file is the latest and + // what's the current head item + items, err := t.indexLen() if err != nil { return err } + if items == 0 { + t.head, err = t.openFile(t.tailId, openFreezerFileForAppend) + if err != nil { + return err + } + lastIndex = &indexEntry{filenum: t.tailId, offset: 0} + } else { + lastIndex, err = t.getIndex(0, 1) + if err != nil { + return err + } + t.head, err = t.openFile(lastIndex.filenum, openFreezerFileForAppend) + if err != nil { + return err + } + } if stat, err = t.head.Stat(); err != nil { return err } @@ -263,7 +274,6 @@ func (t *freezerTable) repair() error { // Keep truncating both files until they come in sync contentExp = int64(lastIndex.offset) - for contentExp != contentSize { // Truncate the head file to the last offset pointer if contentExp < contentSize { @@ -279,15 +289,23 @@ func (t *freezerTable) repair() error { if err := truncateFreezerFile(t.index, offsetsSize-indexEntrySize); err != nil { return err } - offsetsSize -= indexEntrySize - t.index.ReadAt(buffer, offsetsSize-indexEntrySize) - var newLastIndex indexEntry - newLastIndex.unmarshalBinary(buffer) + // Load the previous index entry from the index file + offsetsSize, items = offsetsSize-indexEntrySize, items-1 + + var newLast *indexEntry + if items == 0 { + newLast = &indexEntry{filenum: t.tailId, offset: 0} + } else { + newLast, err = t.getIndex(0, 1) + if err != nil { + return err + } + } // We might have slipped back into an earlier head-file here - if newLastIndex.filenum != lastIndex.filenum { + if newLast.filenum != lastIndex.filenum { // Release earlier opened file t.releaseFile(lastIndex.filenum) - if t.head, err = t.openFile(newLastIndex.filenum, openFreezerFileForAppend); err != nil { + if t.head, err = t.openFile(newLast.filenum, openFreezerFileForAppend); err != nil { return err } if stat, err = t.head.Stat(); err != nil { @@ -297,7 +315,7 @@ func (t *freezerTable) repair() error { } contentSize = stat.Size() } - lastIndex = newLastIndex + lastIndex = newLast contentExp = int64(lastIndex.offset) } } @@ -309,10 +327,16 @@ func (t *freezerTable) repair() error { return err } // Update the item and byte counters and return - t.items = uint64(t.itemOffset) + uint64(offsetsSize/indexEntrySize-1) // last indexEntry points to the end of the data file + t.items = t.itemOffset + uint64(items) t.headBytes = contentSize t.headId = lastIndex.filenum + // Delete the leftover files because of head deletion + t.releaseFilesAfter(t.headId, true) + + // Delete the leftover files because of tail deletion + t.releaseFilesBefore(t.tailId, true) + // Close opened files and preopen all files if err := t.preopen(); err != nil { return err @@ -328,6 +352,7 @@ func (t *freezerTable) repair() error { func (t *freezerTable) preopen() (err error) { // The repair might have already opened (some) files t.releaseFilesAfter(0, false) + // Open all except head in RDONLY for i := t.tailId; i < t.headId; i++ { if _, err = t.openFile(i, openFreezerFileForReadOnly); err != nil { @@ -339,8 +364,14 @@ func (t *freezerTable) preopen() (err error) { return err } -// truncate discards any recent data above the provided threshold number. -func (t *freezerTable) truncate(items uint64) error { +// tail returns the index of the first stored item in the freezer table. +// It can also be interpreted as the number of deleted items from the tail. +func (t *freezerTable) tail() uint64 { + return atomic.LoadUint64(&t.itemHidden) + atomic.LoadUint64(&t.itemOffset) +} + +// truncateHead discards any recent data above the provided threshold number. +func (t *freezerTable) truncateHead(items uint64) error { t.lock.Lock() defer t.lock.Unlock() @@ -360,17 +391,33 @@ func (t *freezerTable) truncate(items uint64) error { log = t.logger.Warn // Only loud warn if we delete multiple items } log("Truncating freezer table", "items", existing, "limit", items) - if err := truncateFreezerFile(t.index, int64(items+1)*indexEntrySize); err != nil { + + // Calculate the relative offset between the new head and tail, use + // it to access the corresponding index entry. If the requested target + // is even below the freezer tail, reject it. + var ( + itemOffset = atomic.LoadUint64(&t.itemOffset) + itemHidden = atomic.LoadUint64(&t.itemHidden) + tail = itemOffset + itemHidden + ) + if items < tail { + return errors.New("truncation below tail") + } + offset := items - itemOffset + + if err := truncateFreezerFile(t.index, int64(offset)*indexEntrySize+metaLength); err != nil { return err } // Calculate the new expected size of the data file and truncate it - buffer := make([]byte, indexEntrySize) - if _, err := t.index.ReadAt(buffer, int64(items*indexEntrySize)); err != nil { - return err + var expected *indexEntry + if offset == 0 { + expected = &indexEntry{filenum: t.tailId, offset: 0} + } else { + expected, err = t.getIndex(int64(offset-1), 0) + if err != nil { + return err + } } - var expected indexEntry - expected.unmarshalBinary(buffer) - // We might need to truncate back to older files if expected.filenum != t.headId { // If already open for reading, force-reopen for writing @@ -399,7 +446,94 @@ func (t *freezerTable) truncate(items uint64) error { return err } t.sizeGauge.Dec(int64(oldSize - newSize)) + return nil +} + +func (t *freezerTable) truncateIndexFile(originDeleted, deleted, hidden uint64, tailId uint32) error { + encoded, err := encodeMetadata(newMetadata(tailId, deleted, hidden)) + if err != nil { + return err + } + err = copyFrom(t.index.Name(), t.index.Name(), metaLength+indexEntrySize*(deleted-originDeleted), func(f *os.File) error { + _, err := f.Write(encoded) + return err + }) + if err != nil { + return err + } + if err := t.index.Close(); err != nil { + return err + } + offsets, err := openFreezerFileForAppend(t.index.Name()) + if err != nil { + return err + } + t.index = offsets + return nil +} + +// truncateHead discards any recent data before the provided threshold number. +func (t *freezerTable) truncateTail(items uint64) error { + t.lock.Lock() + defer t.lock.Unlock() + + // Ensure the given truncate target falls in the correct range + var ( + deleted = atomic.LoadUint64(&t.itemOffset) + hidden = atomic.LoadUint64(&t.itemHidden) + ) + if deleted+hidden >= items { + return nil + } + head := atomic.LoadUint64(&t.items) + if head < items { + return errors.New("truncation above head") + } + // Load the index of new tail item after the deletion. + newTail, err := t.getIndex(int64(items-deleted), 0) + if err != nil { + return err + } + // Freezer only supports deletion by file, just mark the entries as hidden + if t.tailId == newTail.filenum { + atomic.StoreUint64(&t.itemHidden, items-deleted) + return storeMetadata(t.index, newMetadata(t.tailId, deleted, items-deleted)) + } + if t.tailId > newTail.filenum { + return fmt.Errorf("invalid index, tail-file %d, item-file %d", t.tailId, newTail.filenum) + } + // We need to truncate, save the old size for metrics tracking + oldSize, err := t.sizeNolock() + if err != nil { + return err + } + // Count how many items can be deleted from the file. + var newDeleted = items + for current := items - 1; current >= deleted; current -= 1 { + cur, err := t.getIndex(int64(current-deleted), 0) + if err != nil { + return err + } + if cur.filenum != newTail.filenum { + break + } + newDeleted = current + } + if err := t.truncateIndexFile(deleted, newDeleted, items-newDeleted, newTail.filenum); err != nil { + return err + } + // Release any files before the current tail + t.tailId = newTail.filenum + atomic.StoreUint64(&t.itemOffset, newDeleted) + atomic.StoreUint64(&t.itemHidden, items-newDeleted) + t.releaseFilesBefore(t.tailId, true) + // Retrieve the new size and update the total size counter + newSize, err := t.sizeNolock() + if err != nil { + return err + } + t.sizeGauge.Dec(int64(oldSize - newSize)) return nil } @@ -468,6 +602,69 @@ func (t *freezerTable) releaseFilesAfter(num uint32, remove bool) { } } +// releaseFilesBefore closes all open files with a lower number, and optionally also deletes the files +func (t *freezerTable) releaseFilesBefore(num uint32, remove bool) { + for fnum, f := range t.files { + if fnum < num { + delete(t.files, fnum) + f.Close() + if remove { + os.Remove(f.Name()) + } + } + } +} + +// indexLen returns the total index entries stored in the index file. +// This number can also be counted as the data entries stored in the +// freezer table. +func (t *freezerTable) indexLen() (int64, error) { + stat, err := t.index.Stat() + if err != nil { + return 0, err + } + size := stat.Size() + if size < metaLength { + return 0, errors.New("invalid index file") + } + indexSize := size - metaLength + if indexSize%indexEntrySize != 0 { + return 0, errors.New("invalid index file") + } + return indexSize / indexEntrySize, nil +} + +// getIndex returns a single index from the index file, with the given offset +// interpreted according to whence: 0 means relative to the origin of the file +// and 1 means relative to the end. +func (t *freezerTable) getIndex(offset int64, whence int) (*indexEntry, error) { + count, err := t.indexLen() + if err != nil { + return nil, err + } + var ( + off int64 + index indexEntry + buffer = make([]byte, indexEntrySize) + ) + if whence == 0 { + if offset >= count { + return nil, errors.New("out of range") + } + off = metaLength + offset*indexEntrySize + } else { + if offset >= count { + return nil, errors.New("out of range") + } + off = metaLength + (count-1-offset)*indexEntrySize + } + if _, err := t.index.ReadAt(buffer, off); err != nil { + return nil, err + } + index.unmarshalBinary(buffer[:]) + return &index, nil +} + // getIndices returns the index entries for the given from-item, covering 'count' items. // N.B: The actual number of returned indices for N items will always be N+1 (unless an // error is returned). @@ -475,32 +672,32 @@ func (t *freezerTable) releaseFilesAfter(num uint32, remove bool) { // so that the items are within bounds. If this method is used to read out of bounds, // it will return error. func (t *freezerTable) getIndices(from, count uint64) ([]*indexEntry, error) { - // Apply the table-offset - from = from - uint64(t.itemOffset) - // For reading N items, we need N+1 indices. - buffer := make([]byte, (count+1)*indexEntrySize) - if _, err := t.index.ReadAt(buffer, int64(from*indexEntrySize)); err != nil { - return nil, err + // Special case if we're reading the first item in the freezer. We assume that + // the first item always start from zero(regarding the deletion, we + // only support deletion by files, so that the assumption is held). + var indices []*indexEntry + from = from - atomic.LoadUint64(&t.itemOffset) + if from == 0 { + indices = append(indices, &indexEntry{ + filenum: t.tailId, + offset: 0, + }) + count = count - 1 + from = from + 1 } + // For reading N items, we need N+1 indices. var ( - indices []*indexEntry - offset int + buffer = make([]byte, (count+1)*indexEntrySize) + offset = metaLength + int64(from-1)*indexEntrySize ) - for i := from; i <= from+count; i++ { + if _, err := t.index.ReadAt(buffer, offset); err != nil { + return nil, err + } + for i := 0; i <= int(count); i++ { index := new(indexEntry) - index.unmarshalBinary(buffer[offset:]) - offset += indexEntrySize + index.unmarshalBinary(buffer[i*indexEntrySize:]) indices = append(indices, index) } - if from == 0 { - // Special case if we're reading the first item in the freezer. We assume that - // the first item always start from zero(regarding the deletion, we - // only support deletion by files, so that the assumption is held). - // This means we can use the first item metadata to carry information about - // the 'global' offset, for the deletion-case - indices[0].offset = 0 - indices[0].filenum = indices[1].filenum - } return indices, nil } @@ -561,18 +758,23 @@ func (t *freezerTable) retrieveItems(start, count, maxBytes uint64) ([]byte, []i t.lock.RLock() defer t.lock.RUnlock() - // Ensure the table and the item is accessible + // Ensure the table and the item are accessible if t.index == nil || t.head == nil { return nil, nil, errClosed } - itemCount := atomic.LoadUint64(&t.items) // max number + var ( + items = atomic.LoadUint64(&t.items) // the total items(head + 1) + deleted = atomic.LoadUint64(&t.itemOffset) // the number of deleted items + hidden = atomic.LoadUint64(&t.itemHidden) // the number of hidden items + tail = deleted + hidden + ) // Ensure the start is written, not deleted from the tail, and that the // caller actually wants something - if itemCount <= start || uint64(t.itemOffset) > start || count == 0 { + if items <= start || tail > start || count == 0 { return nil, nil, errOutOfBounds } - if start+count > itemCount { - count = itemCount - start + if start+count > items { + count = items - start } var ( output = make([]byte, maxBytes) // Buffer to read data into @@ -648,10 +850,10 @@ func (t *freezerTable) retrieveItems(start, count, maxBytes uint64) ([]byte, []i return output[:outputSize], sizes, nil } -// has returns an indicator whether the specified number data -// exists in the freezer table. +// has returns an indicator whether the specified number data is still accessible +// in the freezer table. func (t *freezerTable) has(number uint64) bool { - return atomic.LoadUint64(&t.items) > number + return atomic.LoadUint64(&t.items) > number && t.tail() <= number } // size returns the total data size in the freezer table. @@ -722,13 +924,20 @@ func (t *freezerTable) dumpIndexString(start, stop int64) string { } func (t *freezerTable) dumpIndex(w io.Writer, start, stop int64) { + meta, err := loadMetadata(t.index) + if err != nil { + fmt.Fprintf(w, "Failed to decode freezer table %v\n", err) + return + } + fmt.Fprintf(w, "Version %d deleted %d, hidden %d\n", meta.version, meta.deleted, meta.hidden) + buf := make([]byte, indexEntrySize) fmt.Fprintf(w, "| number | fileno | offset |\n") fmt.Fprintf(w, "|--------|--------|--------|\n") for i := uint64(start); ; i++ { - if _, err := t.index.ReadAt(buf, int64(i*indexEntrySize)); err != nil { + if _, err := t.index.ReadAt(buf, int64(i*indexEntrySize)+metaLength); err != nil { break } var entry indexEntry diff --git a/core/rawdb/freezer_table_test.go b/core/rawdb/freezer_table_test.go index 803809b5207f4..e92b31decf2c8 100644 --- a/core/rawdb/freezer_table_test.go +++ b/core/rawdb/freezer_table_test.go @@ -19,6 +19,7 @@ package rawdb import ( "bytes" "fmt" + "io/ioutil" "math/rand" "os" "path/filepath" @@ -203,8 +204,8 @@ func TestFreezerRepairDanglingHeadLarge(t *testing.T) { t.Fatalf("Failed to open index file: %v", err) } // Remove everything but the first item, and leave data unaligned - // 0-indexEntry, 1-indexEntry, corrupt-indexEntry - idxFile.Truncate(indexEntrySize + indexEntrySize + indexEntrySize/2) + // metadata, 1-indexEntry, corrupt-indexEntry + idxFile.Truncate(metaLength + indexEntrySize + indexEntrySize/2) idxFile.Close() // Now open it again @@ -387,7 +388,7 @@ func TestFreezerTruncate(t *testing.T) { t.Fatal(err) } defer f.Close() - f.truncate(10) // 150 bytes + f.truncateHead(10) // 150 bytes if f.items != 10 { t.Fatalf("expected %d items, got %d", 10, f.items) } @@ -504,7 +505,7 @@ func TestFreezerReadAndTruncate(t *testing.T) { } // Now, truncate back to zero - f.truncate(0) + f.truncateHead(0) // Write the data again batch := f.newBatch() @@ -559,26 +560,23 @@ func TestFreezerOffset(t *testing.T) { if err != nil { t.Fatal(err) } - indexBuf := make([]byte, 7*indexEntrySize) + indexBuf := make([]byte, 6*indexEntrySize+metaLength) indexFile.Read(indexBuf) // Update the index file, so that we store - // [ file = 2, offset = 4 ] at index zero - - tailId := uint32(2) // First file is 2 - itemOffset := uint32(4) // We have removed four items - zeroIndex := indexEntry{ - filenum: tailId, - offset: itemOffset, + // [ file = 2, deleted = 4, hidden = 0 ] as meta + blob, err := encodeMetadata(newMetadata(2, 4, 0)) + if err != nil { + t.Fatal(err) } - buf := zeroIndex.append(nil) - // Overwrite index zero - copy(indexBuf, buf) + copy(indexBuf, blob) + // Remove the four next indices by overwriting - copy(indexBuf[indexEntrySize:], indexBuf[indexEntrySize*5:]) + copy(indexBuf[metaLength:], indexBuf[metaLength+indexEntrySize*4:]) indexFile.WriteAt(indexBuf, 0) + // Need to truncate the moved index items - indexFile.Truncate(indexEntrySize * (1 + 2)) + indexFile.Truncate(indexEntrySize*2 + metaLength) indexFile.Close() } @@ -617,22 +615,22 @@ func TestFreezerOffset(t *testing.T) { if err != nil { t.Fatal(err) } - indexBuf := make([]byte, 3*indexEntrySize) + indexBuf := make([]byte, 2*indexEntrySize+metaLength) indexFile.Read(indexBuf) // Update the index file, so that we store - // [ file = 2, offset = 1M ] at index zero - - tailId := uint32(2) // First file is 2 - itemOffset := uint32(1000000) // We have removed 1M items - zeroIndex := indexEntry{ - offset: itemOffset, - filenum: tailId, + // [ file = 2, deleted = 1M, hidden = 0 ] as meta + blob, err := encodeMetadata(newMetadata(2, 1000000, 0)) + if err != nil { + t.Fatal(err) } - buf := zeroIndex.append(nil) - // Overwrite index zero - copy(indexBuf, buf) + copy(indexBuf, blob) + + // Remove the four 2 indices by overwriting + copy(indexBuf[metaLength:], indexBuf[metaLength+indexEntrySize*2:]) indexFile.WriteAt(indexBuf, 0) + + indexFile.Truncate(indexEntrySize*2 + metaLength) indexFile.Close() } @@ -659,6 +657,214 @@ func TestFreezerOffset(t *testing.T) { } } +func TestTruncateTail(t *testing.T) { + t.Parallel() + rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge() + fname := fmt.Sprintf("truncate-tail-%d", rand.Uint64()) + + // Fill table + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 40, true) + if err != nil { + t.Fatal(err) + } + + // Write 7 x 20 bytes, splitting out into four files + batch := f.newBatch() + require.NoError(t, batch.AppendRaw(0, getChunk(20, 0xFF))) + require.NoError(t, batch.AppendRaw(1, getChunk(20, 0xEE))) + require.NoError(t, batch.AppendRaw(2, getChunk(20, 0xdd))) + require.NoError(t, batch.AppendRaw(3, getChunk(20, 0xcc))) + require.NoError(t, batch.AppendRaw(4, getChunk(20, 0xbb))) + require.NoError(t, batch.AppendRaw(5, getChunk(20, 0xaa))) + require.NoError(t, batch.AppendRaw(6, getChunk(20, 0x11))) + require.NoError(t, batch.commit()) + + // nothing to do, all the items should still be there. + f.truncateTail(0) + fmt.Println(f.dumpIndexString(0, 1000)) + checkRetrieve(t, f, map[uint64][]byte{ + 0: getChunk(20, 0xFF), + 1: getChunk(20, 0xEE), + 2: getChunk(20, 0xdd), + 3: getChunk(20, 0xcc), + 4: getChunk(20, 0xbb), + 5: getChunk(20, 0xaa), + 6: getChunk(20, 0x11), + }) + + // truncate single element( item 0 ), deletion is only supported at file level + f.truncateTail(1) + fmt.Println(f.dumpIndexString(0, 1000)) + checkRetrieveError(t, f, map[uint64]error{ + 0: errOutOfBounds, + }) + checkRetrieve(t, f, map[uint64][]byte{ + 1: getChunk(20, 0xEE), + 2: getChunk(20, 0xdd), + 3: getChunk(20, 0xcc), + 4: getChunk(20, 0xbb), + 5: getChunk(20, 0xaa), + 6: getChunk(20, 0x11), + }) + + // Reopen the table, the deletion information should be persisted as well + f.Close() + f, err = newTable(os.TempDir(), fname, rm, wm, sg, 40, true) + if err != nil { + t.Fatal(err) + } + checkRetrieveError(t, f, map[uint64]error{ + 0: errOutOfBounds, + }) + checkRetrieve(t, f, map[uint64][]byte{ + 1: getChunk(20, 0xEE), + 2: getChunk(20, 0xdd), + 3: getChunk(20, 0xcc), + 4: getChunk(20, 0xbb), + 5: getChunk(20, 0xaa), + 6: getChunk(20, 0x11), + }) + + // truncate two elements( item 0, item 1 ), the file 0 should be deleted + f.truncateTail(2) + checkRetrieveError(t, f, map[uint64]error{ + 0: errOutOfBounds, + 1: errOutOfBounds, + }) + checkRetrieve(t, f, map[uint64][]byte{ + 2: getChunk(20, 0xdd), + 3: getChunk(20, 0xcc), + 4: getChunk(20, 0xbb), + 5: getChunk(20, 0xaa), + 6: getChunk(20, 0x11), + }) + + // Reopen the table, the above testing should still pass + f.Close() + f, err = newTable(os.TempDir(), fname, rm, wm, sg, 40, true) + if err != nil { + t.Fatal(err) + } + defer f.Close() + + checkRetrieveError(t, f, map[uint64]error{ + 0: errOutOfBounds, + 1: errOutOfBounds, + }) + checkRetrieve(t, f, map[uint64][]byte{ + 2: getChunk(20, 0xdd), + 3: getChunk(20, 0xcc), + 4: getChunk(20, 0xbb), + 5: getChunk(20, 0xaa), + 6: getChunk(20, 0x11), + }) + + // truncate all, the entire freezer should be deleted + f.truncateTail(6) + checkRetrieveError(t, f, map[uint64]error{ + 0: errOutOfBounds, + 1: errOutOfBounds, + 2: errOutOfBounds, + 3: errOutOfBounds, + 4: errOutOfBounds, + 5: errOutOfBounds, + }) + checkRetrieve(t, f, map[uint64][]byte{ + 6: getChunk(20, 0x11), + }) +} + +func TestTruncateHeadBelowTail(t *testing.T) { + t.Parallel() + rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge() + fname := fmt.Sprintf("truncate-head-blow-tail-%d", rand.Uint64()) + + // Fill table + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 40, true) + if err != nil { + t.Fatal(err) + } + + // Write 7 x 20 bytes, splitting out into four files + batch := f.newBatch() + require.NoError(t, batch.AppendRaw(0, getChunk(20, 0xFF))) + require.NoError(t, batch.AppendRaw(1, getChunk(20, 0xEE))) + require.NoError(t, batch.AppendRaw(2, getChunk(20, 0xdd))) + require.NoError(t, batch.AppendRaw(3, getChunk(20, 0xcc))) + require.NoError(t, batch.AppendRaw(4, getChunk(20, 0xbb))) + require.NoError(t, batch.AppendRaw(5, getChunk(20, 0xaa))) + require.NoError(t, batch.AppendRaw(6, getChunk(20, 0x11))) + require.NoError(t, batch.commit()) + + f.truncateTail(4) // Tail = 4 + + // NewHead is required to be 3, the entire table should be truncated + f.truncateHead(4) + checkRetrieveError(t, f, map[uint64]error{ + 0: errOutOfBounds, // Deleted by tail + 1: errOutOfBounds, // Deleted by tail + 2: errOutOfBounds, // Deleted by tail + 3: errOutOfBounds, // Deleted by tail + 4: errOutOfBounds, // Deleted by Head + 5: errOutOfBounds, // Deleted by Head + 6: errOutOfBounds, // Deleted by Head + }) + + // Append new items + batch = f.newBatch() + require.NoError(t, batch.AppendRaw(4, getChunk(20, 0xbb))) + require.NoError(t, batch.AppendRaw(5, getChunk(20, 0xaa))) + require.NoError(t, batch.AppendRaw(6, getChunk(20, 0x11))) + require.NoError(t, batch.commit()) + + checkRetrieve(t, f, map[uint64][]byte{ + 4: getChunk(20, 0xbb), + 5: getChunk(20, 0xaa), + 6: getChunk(20, 0x11), + }) + + f.truncateTail(5) // Lazy deleted the item-4, it's hidden + f.truncateHead(5) // New head is reset to item-4 + checkRetrieveError(t, f, map[uint64]error{ + 4: errOutOfBounds, // Hidden item + }) +} + +func TestUpgradeLegacyFreezerTable(t *testing.T) { + f, err := ioutil.TempFile("", "") + if err != nil { + t.Fatal(err) + } + defer os.Remove(f.Name()) + + index := &indexEntry{ + filenum: 100, + offset: 200, + } + encoded := index.append(nil) + f.Write(encoded) + + newf, meta, err := repairTableIndex(f) + if err != nil { + t.Fatal(err) + } + if newf.Name() != f.Name() { + t.Fatal("Unexpected file name") + } + if meta.tailId != 100 { + t.Fatal("Unexpected tail file") + } + if meta.deleted != 200 { + t.Fatal("Unexpected deleted items") + } + if meta.hidden != 0 { + t.Fatal("Unexpected hidden items") + } + if meta.version != freezerVersion { + t.Fatal("Unexpected freezer version") + } +} + func checkRetrieve(t *testing.T, f *freezerTable, items map[uint64][]byte) { t.Helper() diff --git a/core/rawdb/freezer_test.go b/core/rawdb/freezer_test.go index fa84f803068b9..c97371110e7f9 100644 --- a/core/rawdb/freezer_test.go +++ b/core/rawdb/freezer_test.go @@ -186,7 +186,7 @@ func TestFreezerConcurrentModifyRetrieve(t *testing.T) { wg.Wait() } -// This test runs ModifyAncients and TruncateAncients concurrently with each other. +// This test runs ModifyAncients and TruncateHead concurrently with each other. func TestFreezerConcurrentModifyTruncate(t *testing.T) { f, dir := newFreezerForTesting(t, freezerTestTableDef) defer os.RemoveAll(dir) @@ -196,7 +196,7 @@ func TestFreezerConcurrentModifyTruncate(t *testing.T) { for i := 0; i < 1000; i++ { // First reset and write 100 items. - if err := f.TruncateAncients(0); err != nil { + if err := f.TruncateHead(0); err != nil { t.Fatal("truncate failed:", err) } _, err := f.ModifyAncients(func(op ethdb.AncientWriteOp) error { @@ -231,7 +231,7 @@ func TestFreezerConcurrentModifyTruncate(t *testing.T) { wg.Done() }() go func() { - truncateErr = f.TruncateAncients(10) + truncateErr = f.TruncateHead(10) wg.Done() }() go func() { diff --git a/core/rawdb/freezer_utils.go b/core/rawdb/freezer_utils.go new file mode 100644 index 0000000000000..c7510d4da847c --- /dev/null +++ b/core/rawdb/freezer_utils.go @@ -0,0 +1,83 @@ +// Copyright 2022 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rawdb + +import ( + "io" + "io/ioutil" + "os" + "path/filepath" +) + +// copyFrom copies data from 'srcPath' at offset 'offset' into 'destPath'. +// The 'destPath' is created if it doesn't exist, otherwise it is overwritten. +// Before the copy is executed, there is a callback can be registered to +// manipulate the dest file. +// It is perfectly valid to have destPath == srcPath. +func copyFrom(srcPath, destPath string, offset uint64, before func(f *os.File) error) error { + // Create a temp file in the same dir where we want it to wind up + f, err := ioutil.TempFile(filepath.Dir(destPath), "*") + if err != nil { + return err + } + fname := f.Name() + + // Clean up the leftover file + defer func() { + if f != nil { + f.Close() + } + os.Remove(fname) + }() + + // Apply the given function if it's not nil before we copy + // the content from the src. + if before != nil { + if err := before(f); err != nil { + return err + } + } + // Open the source file + src, err := os.Open(srcPath) + if err != nil { + return err + } + if _, err = src.Seek(int64(offset), 0); err != nil { + src.Close() + return err + } + // io.Copy uses 32K buffer internally. + _, err = io.Copy(f, src) + if err != nil { + src.Close() + return err + } + // Rename the temporary file to the specified dest name. + // src may be same as dest, so needs to be closed before + // we do the final move. + src.Close() + + if err := f.Close(); err != nil { + return err + } + f = nil + + if err := os.Rename(fname, destPath); err != nil { + return err + } + return nil +} diff --git a/core/rawdb/freezer_utils_test.go b/core/rawdb/freezer_utils_test.go new file mode 100644 index 0000000000000..de8087f9b9361 --- /dev/null +++ b/core/rawdb/freezer_utils_test.go @@ -0,0 +1,76 @@ +// Copyright 2022 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rawdb + +import ( + "bytes" + "io/ioutil" + "os" + "testing" +) + +func TestCopyFrom(t *testing.T) { + var ( + content = []byte{0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8} + prefix = []byte{0x9, 0xa, 0xb, 0xc, 0xd, 0xf} + ) + var cases = []struct { + src, dest string + offset uint64 + writePrefix bool + }{ + {"foo", "bar", 0, false}, + {"foo", "bar", 1, false}, + {"foo", "bar", 8, false}, + {"foo", "foo", 0, false}, + {"foo", "foo", 1, false}, + {"foo", "foo", 8, false}, + {"foo", "bar", 0, true}, + {"foo", "bar", 1, true}, + {"foo", "bar", 8, true}, + } + for _, c := range cases { + ioutil.WriteFile(c.src, content, 0644) + + if err := copyFrom(c.src, c.dest, c.offset, func(f *os.File) error { + if !c.writePrefix { + return nil + } + f.Write(prefix) + return nil + }); err != nil { + os.Remove(c.src) + t.Fatalf("Failed to copy %v", err) + } + + blob, err := ioutil.ReadFile(c.dest) + if err != nil { + os.Remove(c.src) + os.Remove(c.dest) + t.Fatalf("Failed to read %v", err) + } + want := content[c.offset:] + if c.writePrefix { + want = append(prefix, want...) + } + if !bytes.Equal(blob, want) { + t.Fatal("Unexpected value") + } + os.Remove(c.src) + os.Remove(c.dest) + } +} diff --git a/core/rawdb/table.go b/core/rawdb/table.go index 91fc31b660d67..f52f6989d765a 100644 --- a/core/rawdb/table.go +++ b/core/rawdb/table.go @@ -74,6 +74,12 @@ func (t *table) Ancients() (uint64, error) { return t.db.Ancients() } +// Tail is a noop passthrough that just forwards the request to the underlying +// database. +func (t *table) Tail() (uint64, error) { + return t.db.Tail() +} + // AncientSize is a noop passthrough that just forwards the request to the underlying // database. func (t *table) AncientSize(kind string) (uint64, error) { @@ -89,10 +95,16 @@ func (t *table) ReadAncients(fn func(reader ethdb.AncientReader) error) (err err return t.db.ReadAncients(fn) } -// TruncateAncients is a noop passthrough that just forwards the request to the underlying +// TruncateHead is a noop passthrough that just forwards the request to the underlying +// database. +func (t *table) TruncateHead(items uint64) error { + return t.db.TruncateHead(items) +} + +// TruncateTail is a noop passthrough that just forwards the request to the underlying // database. -func (t *table) TruncateAncients(items uint64) error { - return t.db.TruncateAncients(items) +func (t *table) TruncateTail(items uint64) error { + return t.db.TruncateTail(items) } // Sync is a noop passthrough that just forwards the request to the underlying diff --git a/ethdb/database.go b/ethdb/database.go index 0a5729c6c1ecc..fafa712523262 100644 --- a/ethdb/database.go +++ b/ethdb/database.go @@ -86,6 +86,10 @@ type AncientReader interface { // Ancients returns the ancient item numbers in the ancient store. Ancients() (uint64, error) + // Tail returns the number of first stored item in the freezer. + // This number can also be interpreted as the total deleted item numbers. + Tail() (uint64, error) + // AncientSize returns the ancient size of the specified category. AncientSize(kind string) (uint64, error) } @@ -106,8 +110,16 @@ type AncientWriter interface { // The integer return value is the total size of the written data. ModifyAncients(func(AncientWriteOp) error) (int64, error) - // TruncateAncients discards all but the first n ancient data from the ancient store. - TruncateAncients(n uint64) error + // TruncateHead discards all but the first n ancient data from the ancient store. + // After the truncation, the latest item can be accessed it item_n-1(start from 0). + TruncateHead(n uint64) error + + // TruncateTail discards the first n ancient data from the ancient store. The already + // deleted items are ignored. After the truncation, the earliest item can be accessed + // is item_n(start from 0). The deleted items may not be removed from the ancient store + // immediately, but only when the accumulated deleted data reach the threshold then + // will be removed all together. + TruncateTail(n uint64) error // Sync flushes all in-memory ancient store data to disk. Sync() error