diff --git a/core/blockchain.go b/core/blockchain.go
index fa7e39fb0189f..fc5275dc70a89 100644
--- a/core/blockchain.go
+++ b/core/blockchain.go
@@ -592,7 +592,7 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, root common.Hash, repair bo
if num+1 <= frozen {
// Truncate all relative data(header, total difficulty, body, receipt
// and canonical hash) from ancient store.
- if err := bc.db.TruncateAncients(num); err != nil {
+ if err := bc.db.TruncateHead(num); err != nil {
log.Crit("Failed to truncate ancient data", "number", num, "err", err)
}
// Remove the hash <-> number mapping from the active store.
@@ -991,7 +991,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
size += int64(batch.ValueSize())
if err = batch.Write(); err != nil {
fastBlock := bc.CurrentFastBlock().NumberU64()
- if err := bc.db.TruncateAncients(fastBlock + 1); err != nil {
+ if err := bc.db.TruncateHead(fastBlock + 1); err != nil {
log.Error("Can't truncate ancient store after failed insert", "err", err)
}
return 0, err
@@ -1009,7 +1009,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
if !updateHead(blockChain[len(blockChain)-1]) {
// We end up here if the header chain has reorg'ed, and the blocks/receipts
// don't match the canonical chain.
- if err := bc.db.TruncateAncients(previousFastBlock + 1); err != nil {
+ if err := bc.db.TruncateHead(previousFastBlock + 1); err != nil {
log.Error("Can't truncate ancient store after failed insert", "err", err)
}
return 0, errSideChainReceipts
diff --git a/core/rawdb/accessors_chain.go b/core/rawdb/accessors_chain.go
index 8e9706ea6fdb9..7e4a06df924b9 100644
--- a/core/rawdb/accessors_chain.go
+++ b/core/rawdb/accessors_chain.go
@@ -83,8 +83,8 @@ type NumberHash struct {
Hash common.Hash
}
-// ReadAllHashes retrieves all the hashes assigned to blocks at a certain heights,
-// both canonical and reorged forks included.
+// ReadAllHashesInRange retrieves all the hashes assigned to blocks at a certain
+// heights, both canonical and reorged forks included.
// This method considers both limits to be _inclusive_.
func ReadAllHashesInRange(db ethdb.Iteratee, first, last uint64) []*NumberHash {
var (
@@ -776,7 +776,7 @@ func WriteBlock(db ethdb.KeyValueWriter, block *types.Block) {
WriteHeader(db, block.Header())
}
-// WriteAncientBlock writes entire block data into ancient store and returns the total written size.
+// WriteAncientBlocks writes entire block data into ancient store and returns the total written size.
func WriteAncientBlocks(db ethdb.AncientWriter, blocks []*types.Block, receipts []types.Receipts, td *big.Int) (int64, error) {
var (
tdSum = new(big.Int).Set(td)
diff --git a/core/rawdb/database.go b/core/rawdb/database.go
index 5ef64d26a2057..64cc2862bb37e 100644
--- a/core/rawdb/database.go
+++ b/core/rawdb/database.go
@@ -99,6 +99,11 @@ func (db *nofreezedb) Ancients() (uint64, error) {
return 0, errNotSupported
}
+// Tail returns an error as we don't have a backing chain freezer.
+func (db *nofreezedb) Tail() (uint64, error) {
+ return 0, errNotSupported
+}
+
// AncientSize returns an error as we don't have a backing chain freezer.
func (db *nofreezedb) AncientSize(kind string) (uint64, error) {
return 0, errNotSupported
@@ -109,8 +114,13 @@ func (db *nofreezedb) ModifyAncients(func(ethdb.AncientWriteOp) error) (int64, e
return 0, errNotSupported
}
-// TruncateAncients returns an error as we don't have a backing chain freezer.
-func (db *nofreezedb) TruncateAncients(items uint64) error {
+// TruncateHead returns an error as we don't have a backing chain freezer.
+func (db *nofreezedb) TruncateHead(items uint64) error {
+ return errNotSupported
+}
+
+// TruncateTail returns an error as we don't have a backing chain freezer.
+func (db *nofreezedb) TruncateTail(items uint64) error {
return errNotSupported
}
@@ -211,7 +221,7 @@ func NewDatabaseWithFreezer(db ethdb.KeyValueStore, freezer string, namespace st
// Block #1 is still in the database, we're allowed to init a new feezer
}
// Otherwise, the head header is still the genesis, we're allowed to init a new
- // feezer.
+ // freezer.
}
}
// Freezer is consistent with the key-value database, permit combining the two
diff --git a/core/rawdb/freezer.go b/core/rawdb/freezer.go
index 88c72625eedee..3f66f983e5fad 100644
--- a/core/rawdb/freezer.go
+++ b/core/rawdb/freezer.go
@@ -59,14 +59,14 @@ const (
freezerRecheckInterval = time.Minute
// freezerBatchLimit is the maximum number of blocks to freeze in one batch
- // before doing an fsync and deleting it from the key-value store.
+ // before doing a fsync and deleting it from the key-value store.
freezerBatchLimit = 30000
// freezerTableSize defines the maximum size of freezer data files.
freezerTableSize = 2 * 1000 * 1000 * 1000
)
-// freezer is an memory mapped append-only database to store immutable chain data
+// freezer is a memory mapped append-only database to store immutable chain data
// into flat files:
//
// - The append only nature ensures that disk writes are minimized.
@@ -78,6 +78,7 @@ type freezer struct {
// 64-bit aligned fields can be atomic. The struct is guaranteed to be so aligned,
// so take advantage of that (https://golang.org/pkg/sync/atomic/#pkg-note-BUG).
frozen uint64 // Number of blocks already frozen
+ tail uint64 // Number of the first stored item in the freezer
threshold uint64 // Number of recent blocks not to freeze (params.FullImmutabilityThreshold apart from tests)
// This lock synchronizes writers and the truncate operation, as well as
@@ -226,6 +227,11 @@ func (f *freezer) Ancients() (uint64, error) {
return atomic.LoadUint64(&f.frozen), nil
}
+// Tail returns the number of first stored item in the freezer.
+func (f *freezer) Tail() (uint64, error) {
+ return atomic.LoadUint64(&f.tail), nil
+}
+
// AncientSize returns the ancient size of the specified category.
func (f *freezer) AncientSize(kind string) (uint64, error) {
// This needs the write lock to avoid data races on table fields.
@@ -261,7 +267,7 @@ func (f *freezer) ModifyAncients(fn func(ethdb.AncientWriteOp) error) (writeSize
if err != nil {
// The write operation has failed. Go back to the previous item position.
for name, table := range f.tables {
- err := table.truncate(prevItem)
+ err := table.truncateHead(prevItem)
if err != nil {
log.Error("Freezer table roll-back failed", "table", name, "index", prevItem, "err", err)
}
@@ -281,8 +287,8 @@ func (f *freezer) ModifyAncients(fn func(ethdb.AncientWriteOp) error) (writeSize
return writeSize, nil
}
-// TruncateAncients discards any recent data above the provided threshold number.
-func (f *freezer) TruncateAncients(items uint64) error {
+// TruncateHead discards any recent data above the provided threshold number.
+func (f *freezer) TruncateHead(items uint64) error {
if f.readonly {
return errReadOnly
}
@@ -292,12 +298,42 @@ func (f *freezer) TruncateAncients(items uint64) error {
if atomic.LoadUint64(&f.frozen) <= items {
return nil
}
+ var frozen uint64
+ for _, table := range f.tables {
+ if err := table.truncateHead(items); err != nil {
+ return err
+ }
+ // Tables should be aligned, only check the first table.
+ if frozen == 0 {
+ frozen = atomic.LoadUint64(&table.items)
+ }
+ }
+ atomic.StoreUint64(&f.frozen, frozen)
+ return nil
+}
+
+// TruncateTail discards any recent data below the provided threshold number.
+func (f *freezer) TruncateTail(tail uint64) error {
+ if f.readonly {
+ return errReadOnly
+ }
+ f.writeLock.Lock()
+ defer f.writeLock.Unlock()
+
+ if atomic.LoadUint64(&f.tail) >= tail {
+ return nil
+ }
+ var truncated uint64
for _, table := range f.tables {
- if err := table.truncate(items); err != nil {
+ if err := table.truncateTail(tail); err != nil {
return err
}
+ if truncated == 0 {
+ // Tables should be aligned, only check the first table.
+ truncated = table.tail()
+ }
}
- atomic.StoreUint64(&f.frozen, items)
+ atomic.StoreUint64(&f.tail, truncated)
return nil
}
@@ -344,19 +380,29 @@ func (f *freezer) validate() error {
// repair truncates all data tables to the same length.
func (f *freezer) repair() error {
- min := uint64(math.MaxUint64)
+ var (
+ head = uint64(math.MaxUint64)
+ tail = uint64(0)
+ )
for _, table := range f.tables {
items := atomic.LoadUint64(&table.items)
- if min > items {
- min = items
+ if head > items {
+ head = items
+ }
+ if table.tail() > tail {
+ tail = table.tail()
}
}
for _, table := range f.tables {
- if err := table.truncate(min); err != nil {
+ if err := table.truncateHead(head); err != nil {
+ return err
+ }
+ if err := table.truncateTail(tail); err != nil {
return err
}
}
- atomic.StoreUint64(&f.frozen, min)
+ atomic.StoreUint64(&f.frozen, head)
+ atomic.StoreUint64(&f.tail, tail)
return nil
}
diff --git a/core/rawdb/freezer_batch.go b/core/rawdb/freezer_batch.go
index 762fa8f25f19d..864a7f5e98bfe 100644
--- a/core/rawdb/freezer_batch.go
+++ b/core/rawdb/freezer_batch.go
@@ -191,7 +191,7 @@ func (batch *freezerTableBatch) commit() error {
dataSize := int64(len(batch.dataBuffer))
batch.dataBuffer = batch.dataBuffer[:0]
- // Write index.
+ // Write indices.
_, err = batch.t.index.Write(batch.indexBuffer)
if err != nil {
return err
diff --git a/core/rawdb/freezer_meta.go b/core/rawdb/freezer_meta.go
new file mode 100644
index 0000000000000..4df449d53587d
--- /dev/null
+++ b/core/rawdb/freezer_meta.go
@@ -0,0 +1,236 @@
+// Copyright 2022 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see
+
+package rawdb
+
+import (
+ "bytes"
+ "errors"
+ "fmt"
+ "os"
+
+ "github.com/ethereum/go-ethereum/rlp"
+)
+
+const (
+ freezerVersion = 1 // The version tag of freezer table structure
+ metaLength = 1024 // The number of bytes allocated for the freezer table metadata
+)
+
+var errIncompatibleVersion = errors.New("incompatible version")
+
+type incompatibleError struct {
+ version uint16
+ expect uint16
+ err error
+}
+
+func newIncompatibleError(version uint16) *incompatibleError {
+ return &incompatibleError{
+ version: version,
+ expect: freezerVersion,
+ err: errIncompatibleVersion,
+ }
+}
+
+// Unwrap returns the internal evm error which allows us for further
+// analysis outside.
+func (err *incompatibleError) Unwrap() error {
+ return err.err
+}
+
+func (err *incompatibleError) Error() string {
+ return fmt.Sprintf("%v, get %d, expect %d", err.err, err.version, err.expect)
+}
+
+// freezerTableMeta wraps all the metadata of the freezer table.
+type freezerTableMeta struct {
+ version uint16 // Freezer table version descriptor
+ tailId uint32 // The number of the earliest file
+ deleted uint64 // The number of items that have been removed from the table
+ hidden uint64 // The number of items that have been hidden in the table
+}
+
+// newMetadata initializes the metadata object with the given parameters.
+func newMetadata(tailId uint32, deleted uint64, hidden uint64) *freezerTableMeta {
+ return &freezerTableMeta{
+ version: freezerVersion,
+ tailId: tailId,
+ deleted: deleted,
+ hidden: hidden,
+ }
+}
+
+// encodeMetadata encodes the given parameters as the freezer table metadata.
+func encodeMetadata(meta *freezerTableMeta) ([]byte, error) {
+ buffer := new(bytes.Buffer)
+ if err := rlp.Encode(buffer, meta.version); err != nil {
+ return nil, err
+ }
+ if err := rlp.Encode(buffer, meta.tailId); err != nil {
+ return nil, err
+ }
+ if err := rlp.Encode(buffer, meta.deleted); err != nil {
+ return nil, err
+ }
+ if err := rlp.Encode(buffer, meta.hidden); err != nil {
+ return nil, err
+ }
+ buffer.Write(make([]byte, metaLength-buffer.Len())) // Right pad zero bytes to the specified length
+ return buffer.Bytes(), nil
+}
+
+// decodeMetadata decodes the freezer-table metadata from the given
+// rlp stream.
+func decodeMetadata(r *rlp.Stream) (*freezerTableMeta, error) {
+ var version uint16
+ if err := r.Decode(&version); err != nil {
+ return nil, err
+ }
+ if version != freezerVersion {
+ return nil, newIncompatibleError(version)
+ }
+ var tailId uint32
+ if err := r.Decode(&tailId); err != nil {
+ return nil, err
+ }
+ var deleted, hidden uint64
+ if err := r.Decode(&deleted); err != nil {
+ return nil, err
+ }
+ if err := r.Decode(&hidden); err != nil {
+ return nil, err
+ }
+ return newMetadata(tailId, deleted, hidden), nil
+}
+
+// storeMetadata stores the metadata of the freezer table into the
+// given index file.
+func storeMetadata(index *os.File, meta *freezerTableMeta) error {
+ encoded, err := encodeMetadata(meta)
+ if err != nil {
+ return err
+ }
+ if _, err := index.WriteAt(encoded, 0); err != nil {
+ return err
+ }
+ return nil
+}
+
+// loadMetadata loads the metadata of the freezer table from the
+// given index file. Return the error if the version of loaded
+// metadata is not expected.
+func loadMetadata(index *os.File) (*freezerTableMeta, error) {
+ stat, err := index.Stat()
+ if err != nil {
+ return nil, err
+ }
+ if stat.Size() < metaLength {
+ return nil, newIncompatibleError(0)
+ }
+ buffer := make([]byte, metaLength)
+ if _, err := index.ReadAt(buffer, 0); err != nil {
+ return nil, err
+ }
+ return decodeMetadata(rlp.NewStream(bytes.NewReader(buffer), 0))
+}
+
+// upgradeV0TableIndex extracts the indexes from version-0 index file and
+// encodes/stores them into the latest version index file.
+func upgradeV0TableIndex(index *os.File) error {
+ // Create a temporary offset buffer to read indexEntry info
+ buffer := make([]byte, indexEntrySize)
+
+ // Read index zero, determine what file is the earliest
+ // and how many entries are deleted from the freezer table.
+ var first indexEntry
+ if _, err := index.ReadAt(buffer, 0); err != nil {
+ return err
+ }
+ first.unmarshalBinary(buffer)
+
+ encoded, err := encodeMetadata(newMetadata(first.filenum, uint64(first.offset), 0))
+ if err != nil {
+ return err
+ }
+ // Close the origin index file.
+ if err := index.Close(); err != nil {
+ return err
+ }
+ return copyFrom(index.Name(), index.Name(), indexEntrySize, func(f *os.File) error {
+ _, err := f.Write(encoded)
+ return err
+ })
+}
+
+// upgradeTableIndex upgrades the legacy index file to the latest version.
+// This function should be responsible for closing the origin index file
+// and return the re-opened one.
+func upgradeTableIndex(index *os.File, version uint16) (*os.File, *freezerTableMeta, error) {
+ switch version {
+ case 0:
+ if err := upgradeV0TableIndex(index); err != nil {
+ return nil, nil, err
+ }
+ default:
+ return nil, nil, errors.New("unknown freezer table index")
+ }
+ // Reopen the upgraded index file and load the metadata from it
+ index, err := os.Open(index.Name())
+ if err != nil {
+ return nil, nil, err
+ }
+ meta, err := loadMetadata(index)
+ if err != nil {
+ return nil, nil, err
+ }
+ return index, meta, nil
+}
+
+// repairTableIndex repairs the given index file of freezer table and returns
+// the stored metadata inside. If the index file is be rewritten, the function
+// should be responsible for closing the origin one and return the new handler.
+// If the table is empty, commit the empty metadata;
+// If the table is legacy, upgrade it to the latest version;
+func repairTableIndex(index *os.File) (*os.File, *freezerTableMeta, error) {
+ stat, err := index.Stat()
+ if err != nil {
+ return nil, nil, err
+ }
+ if stat.Size() == 0 {
+ meta := newMetadata(0, 0, 0)
+ if err := storeMetadata(index, meta); err != nil {
+ return nil, nil, err
+ }
+ // Shift file cursor to the end for next write operation
+ _, err = index.Seek(0, 2)
+ if err != nil {
+ return nil, nil, err
+ }
+ return index, meta, nil
+ }
+ meta, err := loadMetadata(index)
+ if err != nil {
+ if !errors.Is(err, errIncompatibleVersion) {
+ return nil, nil, err
+ }
+ index, meta, err = upgradeTableIndex(index, err.(*incompatibleError).version)
+ }
+ if err != nil {
+ return nil, nil, err
+ }
+ return index, meta, nil
+}
diff --git a/core/rawdb/freezer_meta_test.go b/core/rawdb/freezer_meta_test.go
new file mode 100644
index 0000000000000..3340450f984a9
--- /dev/null
+++ b/core/rawdb/freezer_meta_test.go
@@ -0,0 +1,69 @@
+// Copyright 2022 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see
+
+package rawdb
+
+import (
+ "errors"
+ "io/ioutil"
+ "os"
+ "testing"
+)
+
+func TestStoreLoadFreezerTableMeta(t *testing.T) {
+ var cases = []struct {
+ version uint16
+ deleted uint64
+ hidden uint64
+ expectErr error
+ }{
+ {
+ freezerVersion, 100, 200, nil,
+ },
+ {
+ 0, 100, 200, errIncompatibleVersion, // legacy version
+ },
+ }
+ for _, c := range cases {
+ f, err := ioutil.TempFile(os.TempDir(), "*")
+ if err != nil {
+ t.Fatalf("Failed to create file %v", err)
+ }
+ err = storeMetadata(f, &freezerTableMeta{
+ version: c.version,
+ deleted: c.deleted,
+ hidden: c.hidden,
+ })
+ if err != nil {
+ t.Fatalf("Failed to store metadata %v", err)
+ }
+ meta, err := loadMetadata(f)
+ if !errors.Is(err, c.expectErr) {
+ t.Fatalf("Unexpected error %v", err)
+ }
+ if c.expectErr == nil {
+ if meta.version != c.version {
+ t.Fatalf("Unexpected version field")
+ }
+ if meta.deleted != c.deleted {
+ t.Fatalf("Unexpected deleted field")
+ }
+ if meta.hidden != c.hidden {
+ t.Fatalf("Unexpected hidden field")
+ }
+ }
+ }
+}
diff --git a/core/rawdb/freezer_table.go b/core/rawdb/freezer_table.go
index 7cfba70c5004a..d57fc4a1798b8 100644
--- a/core/rawdb/freezer_table.go
+++ b/core/rawdb/freezer_table.go
@@ -46,21 +46,20 @@ var (
errNotSupported = errors.New("this operation is not supported")
)
-// indexEntry contains the number/id of the file that the data resides in, aswell as the
-// offset within the file to the end of the data
+// indexEntry contains the number/id of the file that the data resides in, as well as the
+// offset within the file to the end of the data.
// In serialized form, the filenum is stored as uint16.
type indexEntry struct {
- filenum uint32 // stored as uint16 ( 2 bytes)
- offset uint32 // stored as uint32 ( 4 bytes)
+ filenum uint32 // stored as uint16 ( 2 bytes )
+ offset uint32 // stored as uint32 ( 4 bytes )
}
const indexEntrySize = 6
// unmarshalBinary deserializes binary b into the rawIndex entry.
-func (i *indexEntry) unmarshalBinary(b []byte) error {
+func (i *indexEntry) unmarshalBinary(b []byte) {
i.filenum = uint32(binary.BigEndian.Uint16(b[:2]))
i.offset = binary.BigEndian.Uint32(b[2:6])
- return nil
}
// append adds the encoded entry to the end of b.
@@ -75,14 +74,14 @@ func (i *indexEntry) append(b []byte) []byte {
// bounds returns the start- and end- offsets, and the file number of where to
// read there data item marked by the two index entries. The two entries are
// assumed to be sequential.
-func (start *indexEntry) bounds(end *indexEntry) (startOffset, endOffset, fileId uint32) {
- if start.filenum != end.filenum {
+func (i *indexEntry) bounds(end *indexEntry) (startOffset, endOffset, fileId uint32) {
+ if i.filenum != end.filenum {
// If a piece of data 'crosses' a data-file,
// it's actually in one piece on the second data-file.
// We return a zero-indexEntry for the second file as start
return 0, end.offset, end.filenum
}
- return start.offset, end.offset, end.filenum
+ return i.offset, end.offset, end.filenum
}
// freezerTable represents a single chained data table within the freezer (e.g. blocks).
@@ -92,7 +91,15 @@ type freezerTable struct {
// WARNING: The `items` field is accessed atomically. On 32 bit platforms, only
// 64-bit aligned fields can be atomic. The struct is guaranteed to be so aligned,
// so take advantage of that (https://golang.org/pkg/sync/atomic/#pkg-note-BUG).
- items uint64 // Number of items stored in the table (including items removed from tail)
+ items uint64 // Number of items stored in the table (including items removed from tail)
+ itemOffset uint64 // Number of items removed from the table
+
+ // itemHidden is the number of items marked as deleted they are not removed
+ // from the table yet. Since the tail deletion is only supported at file level
+ // which means the actual deletion will be delayed until the total "marked as
+ // deleted" data reach the threshold. Before that these items will be hidden
+ // to prevent being visited again.
+ itemHidden uint64
noCompression bool // if true, disables snappy compression. Note: does not work retroactively
readonly bool
@@ -106,10 +113,6 @@ type freezerTable struct {
tailId uint32 // number of the earliest file
index *os.File // File descriptor for the indexEntry file of the table
- // In the case that old items are deleted (from the tail), we use itemOffset
- // to count how many historic items have gone missing.
- itemOffset uint32 // Offset (number of discarded items)
-
headBytes int64 // Number of bytes written to the head file
readMeter metrics.Meter // Meter for measuring the effective amount of data read
writeMeter metrics.Meter // Meter for measuring the effective amount of data written
@@ -163,7 +166,7 @@ func truncateFreezerFile(file *os.File, size int64) error {
}
// newTable opens a freezer table, creating the data and index files if they are
-// non existent. Both files are truncated to the shortest common length to ensure
+// non-existent. Both files are truncated to the shortest common length to ensure
// they don't go out of sync.
func newTable(path string, name string, readMeter metrics.Meter, writeMeter metrics.Meter, sizeGauge metrics.Gauge, maxFilesize uint32, noCompression, readonly bool) (*freezerTable, error) {
// Ensure the containing directory exists and open the indexEntry file
@@ -172,28 +175,26 @@ func newTable(path string, name string, readMeter metrics.Meter, writeMeter metr
}
var idxName string
if noCompression {
- // Raw idx
- idxName = fmt.Sprintf("%s.ridx", name)
+ idxName = fmt.Sprintf("%s.ridx", name) // raw index file
} else {
- // Compressed idx
- idxName = fmt.Sprintf("%s.cidx", name)
+ idxName = fmt.Sprintf("%s.cidx", name) // compressed index file
}
var (
- err error
- offsets *os.File
+ err error
+ index *os.File
)
if readonly {
// Will fail if table doesn't exist
- offsets, err = openFreezerFileForReadOnly(filepath.Join(path, idxName))
+ index, err = openFreezerFileForReadOnly(filepath.Join(path, idxName))
} else {
- offsets, err = openFreezerFileForAppend(filepath.Join(path, idxName))
+ index, err = openFreezerFileForAppend(filepath.Join(path, idxName))
}
if err != nil {
return nil, err
}
// Create the table and repair any past inconsistency
tab := &freezerTable{
- index: offsets,
+ index: index,
files: make(map[uint32]*os.File),
readMeter: readMeter,
writeMeter: writeMeter,
@@ -220,26 +221,29 @@ func newTable(path string, name string, readMeter metrics.Meter, writeMeter metr
return tab, nil
}
-// repair cross checks the head and the index file and truncates them to
+// repair cross-checks the head and the index file and truncates them to
// be in sync with each other after a potential crash / data loss.
func (t *freezerTable) repair() error {
- // Create a temporary offset buffer to init files with and read indexEntry into
- buffer := make([]byte, indexEntrySize)
+ index, meta, err := repairTableIndex(t.index)
+ if err != nil {
+ return err
+ }
+ t.index = index // index file may be reopened, update it
+ t.tailId, t.itemOffset, t.itemHidden = meta.tailId, meta.deleted, meta.hidden
- // If we've just created the files, initialize the index with the 0 indexEntry
+ // Ensure the index is a multiple of indexEntrySize bytes. The assumption
+ // is held that index file at least has metaLength bytes for storing meta-
+ // data.
stat, err := t.index.Stat()
if err != nil {
return err
}
- if stat.Size() == 0 {
- if _, err := t.index.Write(buffer); err != nil {
+ if overflow := (stat.Size() - metaLength) % indexEntrySize; overflow != 0 {
+ err := truncateFreezerFile(t.index, stat.Size()-overflow)
+ if err != nil {
return err
}
}
- // Ensure the index is a multiple of indexEntrySize bytes
- if overflow := stat.Size() % indexEntrySize; overflow != 0 {
- truncateFreezerFile(t.index, stat.Size()-overflow) // New file can't trigger this path
- }
// Retrieve the file sizes and prepare for truncation
if stat, err = t.index.Stat(); err != nil {
return err
@@ -248,29 +252,40 @@ func (t *freezerTable) repair() error {
// Open the head file
var (
- firstIndex indexEntry
- lastIndex indexEntry
contentSize int64
contentExp int64
+ lastIndex *indexEntry
)
- // Read index zero, determine what file is the earliest
- // and what item offset to use
- t.index.ReadAt(buffer, 0)
- firstIndex.unmarshalBinary(buffer)
-
- t.tailId = firstIndex.filenum
- t.itemOffset = firstIndex.offset
-
- t.index.ReadAt(buffer, offsetsSize-indexEntrySize)
- lastIndex.unmarshalBinary(buffer)
- if t.readonly {
- t.head, err = t.openFile(lastIndex.filenum, openFreezerFileForReadOnly)
- } else {
- t.head, err = t.openFile(lastIndex.filenum, openFreezerFileForAppend)
- }
+ // Read last index, determine what file is the latest and
+ // what's the current head item
+ items, err := t.indexLen()
if err != nil {
return err
}
+ if items == 0 {
+ if t.readonly {
+ t.head, err = t.openFile(t.tailId, openFreezerFileForReadOnly)
+ } else {
+ t.head, err = t.openFile(t.tailId, openFreezerFileForAppend)
+ }
+ if err != nil {
+ return err
+ }
+ lastIndex = &indexEntry{filenum: t.tailId, offset: 0}
+ } else {
+ lastIndex, err = t.getIndex(0, 1)
+ if err != nil {
+ return err
+ }
+ if t.readonly {
+ t.head, err = t.openFile(lastIndex.filenum, openFreezerFileForReadOnly)
+ } else {
+ t.head, err = t.openFile(lastIndex.filenum, openFreezerFileForAppend)
+ }
+ if err != nil {
+ return err
+ }
+ }
if stat, err = t.head.Stat(); err != nil {
return err
}
@@ -278,7 +293,6 @@ func (t *freezerTable) repair() error {
// Keep truncating both files until they come in sync
contentExp = int64(lastIndex.offset)
-
for contentExp != contentSize {
// Truncate the head file to the last offset pointer
if contentExp < contentSize {
@@ -294,15 +308,23 @@ func (t *freezerTable) repair() error {
if err := truncateFreezerFile(t.index, offsetsSize-indexEntrySize); err != nil {
return err
}
- offsetsSize -= indexEntrySize
- t.index.ReadAt(buffer, offsetsSize-indexEntrySize)
- var newLastIndex indexEntry
- newLastIndex.unmarshalBinary(buffer)
+ // Load the previous index entry from the index file
+ offsetsSize, items = offsetsSize-indexEntrySize, items-1
+
+ var newLast *indexEntry
+ if items == 0 {
+ newLast = &indexEntry{filenum: t.tailId, offset: 0}
+ } else {
+ newLast, err = t.getIndex(0, 1)
+ if err != nil {
+ return err
+ }
+ }
// We might have slipped back into an earlier head-file here
- if newLastIndex.filenum != lastIndex.filenum {
+ if newLast.filenum != lastIndex.filenum {
// Release earlier opened file
t.releaseFile(lastIndex.filenum)
- if t.head, err = t.openFile(newLastIndex.filenum, openFreezerFileForAppend); err != nil {
+ if t.head, err = t.openFile(newLast.filenum, openFreezerFileForAppend); err != nil {
return err
}
if stat, err = t.head.Stat(); err != nil {
@@ -312,7 +334,7 @@ func (t *freezerTable) repair() error {
}
contentSize = stat.Size()
}
- lastIndex = newLastIndex
+ lastIndex = newLast
contentExp = int64(lastIndex.offset)
}
}
@@ -327,10 +349,16 @@ func (t *freezerTable) repair() error {
}
}
// Update the item and byte counters and return
- t.items = uint64(t.itemOffset) + uint64(offsetsSize/indexEntrySize-1) // last indexEntry points to the end of the data file
+ t.items = t.itemOffset + uint64(items)
t.headBytes = contentSize
t.headId = lastIndex.filenum
+ // Delete the leftover files because of head deletion
+ t.releaseFilesAfter(t.headId, true)
+
+ // Delete the leftover files because of tail deletion
+ t.releaseFilesBefore(t.tailId, true)
+
// Close opened files and preopen all files
if err := t.preopen(); err != nil {
return err
@@ -346,6 +374,7 @@ func (t *freezerTable) repair() error {
func (t *freezerTable) preopen() (err error) {
// The repair might have already opened (some) files
t.releaseFilesAfter(0, false)
+
// Open all except head in RDONLY
for i := t.tailId; i < t.headId; i++ {
if _, err = t.openFile(i, openFreezerFileForReadOnly); err != nil {
@@ -361,8 +390,14 @@ func (t *freezerTable) preopen() (err error) {
return err
}
-// truncate discards any recent data above the provided threshold number.
-func (t *freezerTable) truncate(items uint64) error {
+// tail returns the index of the first stored item in the freezer table.
+// It can also be interpreted as the number of deleted items from the tail.
+func (t *freezerTable) tail() uint64 {
+ return atomic.LoadUint64(&t.itemHidden) + atomic.LoadUint64(&t.itemOffset)
+}
+
+// truncateHead discards any recent data above the provided threshold number.
+func (t *freezerTable) truncateHead(items uint64) error {
t.lock.Lock()
defer t.lock.Unlock()
@@ -382,17 +417,33 @@ func (t *freezerTable) truncate(items uint64) error {
log = t.logger.Warn // Only loud warn if we delete multiple items
}
log("Truncating freezer table", "items", existing, "limit", items)
- if err := truncateFreezerFile(t.index, int64(items+1)*indexEntrySize); err != nil {
+
+ // Calculate the relative offset between the new head and tail, use
+ // it to access the corresponding index entry. If the requested target
+ // is even below the freezer tail, reject it.
+ var (
+ itemOffset = atomic.LoadUint64(&t.itemOffset)
+ itemHidden = atomic.LoadUint64(&t.itemHidden)
+ tail = itemOffset + itemHidden
+ )
+ if items < tail {
+ return errors.New("truncation below tail")
+ }
+ offset := items - itemOffset
+
+ if err := truncateFreezerFile(t.index, int64(offset)*indexEntrySize+metaLength); err != nil {
return err
}
// Calculate the new expected size of the data file and truncate it
- buffer := make([]byte, indexEntrySize)
- if _, err := t.index.ReadAt(buffer, int64(items*indexEntrySize)); err != nil {
- return err
+ var expected *indexEntry
+ if offset == 0 {
+ expected = &indexEntry{filenum: t.tailId, offset: 0}
+ } else {
+ expected, err = t.getIndex(int64(offset-1), 0)
+ if err != nil {
+ return err
+ }
}
- var expected indexEntry
- expected.unmarshalBinary(buffer)
-
// We might need to truncate back to older files
if expected.filenum != t.headId {
// If already open for reading, force-reopen for writing
@@ -421,7 +472,94 @@ func (t *freezerTable) truncate(items uint64) error {
return err
}
t.sizeGauge.Dec(int64(oldSize - newSize))
+ return nil
+}
+func (t *freezerTable) truncateIndexFile(originDeleted, deleted, hidden uint64, tailId uint32) error {
+ encoded, err := encodeMetadata(newMetadata(tailId, deleted, hidden))
+ if err != nil {
+ return err
+ }
+ err = copyFrom(t.index.Name(), t.index.Name(), metaLength+indexEntrySize*(deleted-originDeleted), func(f *os.File) error {
+ _, err := f.Write(encoded)
+ return err
+ })
+ if err != nil {
+ return err
+ }
+ if err := t.index.Close(); err != nil {
+ return err
+ }
+ offsets, err := openFreezerFileForAppend(t.index.Name())
+ if err != nil {
+ return err
+ }
+ t.index = offsets
+ return nil
+}
+
+// truncateHead discards any recent data before the provided threshold number.
+func (t *freezerTable) truncateTail(items uint64) error {
+ t.lock.Lock()
+ defer t.lock.Unlock()
+
+ // Ensure the given truncate target falls in the correct range
+ var (
+ deleted = atomic.LoadUint64(&t.itemOffset)
+ hidden = atomic.LoadUint64(&t.itemHidden)
+ )
+ if deleted+hidden >= items {
+ return nil
+ }
+ head := atomic.LoadUint64(&t.items)
+ if head < items {
+ return errors.New("truncation above head")
+ }
+ // Load the index of new tail item after the deletion.
+ newTail, err := t.getIndex(int64(items-deleted), 0)
+ if err != nil {
+ return err
+ }
+ // Freezer only supports deletion by file, just mark the entries as hidden
+ if t.tailId == newTail.filenum {
+ atomic.StoreUint64(&t.itemHidden, items-deleted)
+ return storeMetadata(t.index, newMetadata(t.tailId, deleted, items-deleted))
+ }
+ if t.tailId > newTail.filenum {
+ return fmt.Errorf("invalid index, tail-file %d, item-file %d", t.tailId, newTail.filenum)
+ }
+ // We need to truncate, save the old size for metrics tracking
+ oldSize, err := t.sizeNolock()
+ if err != nil {
+ return err
+ }
+ // Count how many items can be deleted from the file.
+ var newDeleted = items
+ for current := items - 1; current >= deleted; current -= 1 {
+ cur, err := t.getIndex(int64(current-deleted), 0)
+ if err != nil {
+ return err
+ }
+ if cur.filenum != newTail.filenum {
+ break
+ }
+ newDeleted = current
+ }
+ if err := t.truncateIndexFile(deleted, newDeleted, items-newDeleted, newTail.filenum); err != nil {
+ return err
+ }
+ // Release any files before the current tail
+ t.tailId = newTail.filenum
+ atomic.StoreUint64(&t.itemOffset, newDeleted)
+ atomic.StoreUint64(&t.itemHidden, items-newDeleted)
+ t.releaseFilesBefore(t.tailId, true)
+
+ // Retrieve the new size and update the total size counter
+ newSize, err := t.sizeNolock()
+ if err != nil {
+ return err
+ }
+ t.sizeGauge.Dec(int64(oldSize - newSize))
return nil
}
@@ -490,6 +628,69 @@ func (t *freezerTable) releaseFilesAfter(num uint32, remove bool) {
}
}
+// releaseFilesBefore closes all open files with a lower number, and optionally also deletes the files
+func (t *freezerTable) releaseFilesBefore(num uint32, remove bool) {
+ for fnum, f := range t.files {
+ if fnum < num {
+ delete(t.files, fnum)
+ f.Close()
+ if remove {
+ os.Remove(f.Name())
+ }
+ }
+ }
+}
+
+// indexLen returns the total index entries stored in the index file.
+// This number can also be counted as the data entries stored in the
+// freezer table.
+func (t *freezerTable) indexLen() (int64, error) {
+ stat, err := t.index.Stat()
+ if err != nil {
+ return 0, err
+ }
+ size := stat.Size()
+ if size < metaLength {
+ return 0, errors.New("invalid index file")
+ }
+ indexSize := size - metaLength
+ if indexSize%indexEntrySize != 0 {
+ return 0, errors.New("invalid index file")
+ }
+ return indexSize / indexEntrySize, nil
+}
+
+// getIndex returns a single index from the index file, with the given offset
+// interpreted according to whence: 0 means relative to the origin of the file
+// and 1 means relative to the end.
+func (t *freezerTable) getIndex(offset int64, whence int) (*indexEntry, error) {
+ count, err := t.indexLen()
+ if err != nil {
+ return nil, err
+ }
+ var (
+ off int64
+ index indexEntry
+ buffer = make([]byte, indexEntrySize)
+ )
+ if whence == 0 {
+ if offset >= count {
+ return nil, errors.New("out of range")
+ }
+ off = metaLength + offset*indexEntrySize
+ } else {
+ if offset >= count {
+ return nil, errors.New("out of range")
+ }
+ off = metaLength + (count-1-offset)*indexEntrySize
+ }
+ if _, err := t.index.ReadAt(buffer, off); err != nil {
+ return nil, err
+ }
+ index.unmarshalBinary(buffer[:])
+ return &index, nil
+}
+
// getIndices returns the index entries for the given from-item, covering 'count' items.
// N.B: The actual number of returned indices for N items will always be N+1 (unless an
// error is returned).
@@ -497,32 +698,32 @@ func (t *freezerTable) releaseFilesAfter(num uint32, remove bool) {
// so that the items are within bounds. If this method is used to read out of bounds,
// it will return error.
func (t *freezerTable) getIndices(from, count uint64) ([]*indexEntry, error) {
- // Apply the table-offset
- from = from - uint64(t.itemOffset)
- // For reading N items, we need N+1 indices.
- buffer := make([]byte, (count+1)*indexEntrySize)
- if _, err := t.index.ReadAt(buffer, int64(from*indexEntrySize)); err != nil {
- return nil, err
+ // Special case if we're reading the first item in the freezer. We assume that
+ // the first item always start from zero(regarding the deletion, we
+ // only support deletion by files, so that the assumption is held).
+ var indices []*indexEntry
+ from = from - atomic.LoadUint64(&t.itemOffset)
+ if from == 0 {
+ indices = append(indices, &indexEntry{
+ filenum: t.tailId,
+ offset: 0,
+ })
+ count = count - 1
+ from = from + 1
}
+ // For reading N items, we need N+1 indices.
var (
- indices []*indexEntry
- offset int
+ buffer = make([]byte, (count+1)*indexEntrySize)
+ offset = metaLength + int64(from-1)*indexEntrySize
)
- for i := from; i <= from+count; i++ {
+ if _, err := t.index.ReadAt(buffer, offset); err != nil {
+ return nil, err
+ }
+ for i := 0; i <= int(count); i++ {
index := new(indexEntry)
- index.unmarshalBinary(buffer[offset:])
- offset += indexEntrySize
+ index.unmarshalBinary(buffer[i*indexEntrySize:])
indices = append(indices, index)
}
- if from == 0 {
- // Special case if we're reading the first item in the freezer. We assume that
- // the first item always start from zero(regarding the deletion, we
- // only support deletion by files, so that the assumption is held).
- // This means we can use the first item metadata to carry information about
- // the 'global' offset, for the deletion-case
- indices[0].offset = 0
- indices[0].filenum = indices[1].filenum
- }
return indices, nil
}
@@ -583,18 +784,23 @@ func (t *freezerTable) retrieveItems(start, count, maxBytes uint64) ([]byte, []i
t.lock.RLock()
defer t.lock.RUnlock()
- // Ensure the table and the item is accessible
+ // Ensure the table and the item are accessible
if t.index == nil || t.head == nil {
return nil, nil, errClosed
}
- itemCount := atomic.LoadUint64(&t.items) // max number
+ var (
+ items = atomic.LoadUint64(&t.items) // the total items(head + 1)
+ deleted = atomic.LoadUint64(&t.itemOffset) // the number of deleted items
+ hidden = atomic.LoadUint64(&t.itemHidden) // the number of hidden items
+ tail = deleted + hidden
+ )
// Ensure the start is written, not deleted from the tail, and that the
// caller actually wants something
- if itemCount <= start || uint64(t.itemOffset) > start || count == 0 {
+ if items <= start || tail > start || count == 0 {
return nil, nil, errOutOfBounds
}
- if start+count > itemCount {
- count = itemCount - start
+ if start+count > items {
+ count = items - start
}
var (
output = make([]byte, maxBytes) // Buffer to read data into
@@ -670,10 +876,10 @@ func (t *freezerTable) retrieveItems(start, count, maxBytes uint64) ([]byte, []i
return output[:outputSize], sizes, nil
}
-// has returns an indicator whether the specified number data
-// exists in the freezer table.
+// has returns an indicator whether the specified number data is still accessible
+// in the freezer table.
func (t *freezerTable) has(number uint64) bool {
- return atomic.LoadUint64(&t.items) > number
+ return atomic.LoadUint64(&t.items) > number && t.tail() <= number
}
// size returns the total data size in the freezer table.
@@ -744,13 +950,20 @@ func (t *freezerTable) dumpIndexString(start, stop int64) string {
}
func (t *freezerTable) dumpIndex(w io.Writer, start, stop int64) {
+ meta, err := loadMetadata(t.index)
+ if err != nil {
+ fmt.Fprintf(w, "Failed to decode freezer table %v\n", err)
+ return
+ }
+ fmt.Fprintf(w, "Version %d deleted %d, hidden %d\n", meta.version, meta.deleted, meta.hidden)
+
buf := make([]byte, indexEntrySize)
fmt.Fprintf(w, "| number | fileno | offset |\n")
fmt.Fprintf(w, "|--------|--------|--------|\n")
for i := uint64(start); ; i++ {
- if _, err := t.index.ReadAt(buf, int64(i*indexEntrySize)); err != nil {
+ if _, err := t.index.ReadAt(buf, int64(i*indexEntrySize)+metaLength); err != nil {
break
}
var entry indexEntry
diff --git a/core/rawdb/freezer_table_test.go b/core/rawdb/freezer_table_test.go
index 15464e1bd768f..eb6d422f9c8ca 100644
--- a/core/rawdb/freezer_table_test.go
+++ b/core/rawdb/freezer_table_test.go
@@ -19,6 +19,7 @@ package rawdb
import (
"bytes"
"fmt"
+ "io/ioutil"
"math/rand"
"os"
"path/filepath"
@@ -203,8 +204,8 @@ func TestFreezerRepairDanglingHeadLarge(t *testing.T) {
t.Fatalf("Failed to open index file: %v", err)
}
// Remove everything but the first item, and leave data unaligned
- // 0-indexEntry, 1-indexEntry, corrupt-indexEntry
- idxFile.Truncate(indexEntrySize + indexEntrySize + indexEntrySize/2)
+ // metadata, 1-indexEntry, corrupt-indexEntry
+ idxFile.Truncate(metaLength + indexEntrySize + indexEntrySize/2)
idxFile.Close()
// Now open it again
@@ -387,7 +388,7 @@ func TestFreezerTruncate(t *testing.T) {
t.Fatal(err)
}
defer f.Close()
- f.truncate(10) // 150 bytes
+ f.truncateHead(10) // 150 bytes
if f.items != 10 {
t.Fatalf("expected %d items, got %d", 10, f.items)
}
@@ -504,7 +505,7 @@ func TestFreezerReadAndTruncate(t *testing.T) {
}
// Now, truncate back to zero
- f.truncate(0)
+ f.truncateHead(0)
// Write the data again
batch := f.newBatch()
@@ -559,26 +560,23 @@ func TestFreezerOffset(t *testing.T) {
if err != nil {
t.Fatal(err)
}
- indexBuf := make([]byte, 7*indexEntrySize)
+ indexBuf := make([]byte, 6*indexEntrySize+metaLength)
indexFile.Read(indexBuf)
// Update the index file, so that we store
- // [ file = 2, offset = 4 ] at index zero
-
- tailId := uint32(2) // First file is 2
- itemOffset := uint32(4) // We have removed four items
- zeroIndex := indexEntry{
- filenum: tailId,
- offset: itemOffset,
+ // [ file = 2, deleted = 4, hidden = 0 ] as meta
+ blob, err := encodeMetadata(newMetadata(2, 4, 0))
+ if err != nil {
+ t.Fatal(err)
}
- buf := zeroIndex.append(nil)
- // Overwrite index zero
- copy(indexBuf, buf)
+ copy(indexBuf, blob)
+
// Remove the four next indices by overwriting
- copy(indexBuf[indexEntrySize:], indexBuf[indexEntrySize*5:])
+ copy(indexBuf[metaLength:], indexBuf[metaLength+indexEntrySize*4:])
indexFile.WriteAt(indexBuf, 0)
+
// Need to truncate the moved index items
- indexFile.Truncate(indexEntrySize * (1 + 2))
+ indexFile.Truncate(indexEntrySize*2 + metaLength)
indexFile.Close()
}
@@ -617,22 +615,22 @@ func TestFreezerOffset(t *testing.T) {
if err != nil {
t.Fatal(err)
}
- indexBuf := make([]byte, 3*indexEntrySize)
+ indexBuf := make([]byte, 2*indexEntrySize+metaLength)
indexFile.Read(indexBuf)
// Update the index file, so that we store
- // [ file = 2, offset = 1M ] at index zero
-
- tailId := uint32(2) // First file is 2
- itemOffset := uint32(1000000) // We have removed 1M items
- zeroIndex := indexEntry{
- offset: itemOffset,
- filenum: tailId,
+ // [ file = 2, deleted = 1M, hidden = 0 ] as meta
+ blob, err := encodeMetadata(newMetadata(2, 1000000, 0))
+ if err != nil {
+ t.Fatal(err)
}
- buf := zeroIndex.append(nil)
- // Overwrite index zero
- copy(indexBuf, buf)
+ copy(indexBuf, blob)
+
+ // Remove the four 2 indices by overwriting
+ copy(indexBuf[metaLength:], indexBuf[metaLength+indexEntrySize*2:])
indexFile.WriteAt(indexBuf, 0)
+
+ indexFile.Truncate(indexEntrySize*2 + metaLength)
indexFile.Close()
}
@@ -659,6 +657,214 @@ func TestFreezerOffset(t *testing.T) {
}
}
+func TestTruncateTail(t *testing.T) {
+ t.Parallel()
+ rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge()
+ fname := fmt.Sprintf("truncate-tail-%d", rand.Uint64())
+
+ // Fill table
+ f, err := newTable(os.TempDir(), fname, rm, wm, sg, 40, true)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // Write 7 x 20 bytes, splitting out into four files
+ batch := f.newBatch()
+ require.NoError(t, batch.AppendRaw(0, getChunk(20, 0xFF)))
+ require.NoError(t, batch.AppendRaw(1, getChunk(20, 0xEE)))
+ require.NoError(t, batch.AppendRaw(2, getChunk(20, 0xdd)))
+ require.NoError(t, batch.AppendRaw(3, getChunk(20, 0xcc)))
+ require.NoError(t, batch.AppendRaw(4, getChunk(20, 0xbb)))
+ require.NoError(t, batch.AppendRaw(5, getChunk(20, 0xaa)))
+ require.NoError(t, batch.AppendRaw(6, getChunk(20, 0x11)))
+ require.NoError(t, batch.commit())
+
+ // nothing to do, all the items should still be there.
+ f.truncateTail(0)
+ fmt.Println(f.dumpIndexString(0, 1000))
+ checkRetrieve(t, f, map[uint64][]byte{
+ 0: getChunk(20, 0xFF),
+ 1: getChunk(20, 0xEE),
+ 2: getChunk(20, 0xdd),
+ 3: getChunk(20, 0xcc),
+ 4: getChunk(20, 0xbb),
+ 5: getChunk(20, 0xaa),
+ 6: getChunk(20, 0x11),
+ })
+
+ // truncate single element( item 0 ), deletion is only supported at file level
+ f.truncateTail(1)
+ fmt.Println(f.dumpIndexString(0, 1000))
+ checkRetrieveError(t, f, map[uint64]error{
+ 0: errOutOfBounds,
+ })
+ checkRetrieve(t, f, map[uint64][]byte{
+ 1: getChunk(20, 0xEE),
+ 2: getChunk(20, 0xdd),
+ 3: getChunk(20, 0xcc),
+ 4: getChunk(20, 0xbb),
+ 5: getChunk(20, 0xaa),
+ 6: getChunk(20, 0x11),
+ })
+
+ // Reopen the table, the deletion information should be persisted as well
+ f.Close()
+ f, err = newTable(os.TempDir(), fname, rm, wm, sg, 40, true)
+ if err != nil {
+ t.Fatal(err)
+ }
+ checkRetrieveError(t, f, map[uint64]error{
+ 0: errOutOfBounds,
+ })
+ checkRetrieve(t, f, map[uint64][]byte{
+ 1: getChunk(20, 0xEE),
+ 2: getChunk(20, 0xdd),
+ 3: getChunk(20, 0xcc),
+ 4: getChunk(20, 0xbb),
+ 5: getChunk(20, 0xaa),
+ 6: getChunk(20, 0x11),
+ })
+
+ // truncate two elements( item 0, item 1 ), the file 0 should be deleted
+ f.truncateTail(2)
+ checkRetrieveError(t, f, map[uint64]error{
+ 0: errOutOfBounds,
+ 1: errOutOfBounds,
+ })
+ checkRetrieve(t, f, map[uint64][]byte{
+ 2: getChunk(20, 0xdd),
+ 3: getChunk(20, 0xcc),
+ 4: getChunk(20, 0xbb),
+ 5: getChunk(20, 0xaa),
+ 6: getChunk(20, 0x11),
+ })
+
+ // Reopen the table, the above testing should still pass
+ f.Close()
+ f, err = newTable(os.TempDir(), fname, rm, wm, sg, 40, true)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer f.Close()
+
+ checkRetrieveError(t, f, map[uint64]error{
+ 0: errOutOfBounds,
+ 1: errOutOfBounds,
+ })
+ checkRetrieve(t, f, map[uint64][]byte{
+ 2: getChunk(20, 0xdd),
+ 3: getChunk(20, 0xcc),
+ 4: getChunk(20, 0xbb),
+ 5: getChunk(20, 0xaa),
+ 6: getChunk(20, 0x11),
+ })
+
+ // truncate all, the entire freezer should be deleted
+ f.truncateTail(6)
+ checkRetrieveError(t, f, map[uint64]error{
+ 0: errOutOfBounds,
+ 1: errOutOfBounds,
+ 2: errOutOfBounds,
+ 3: errOutOfBounds,
+ 4: errOutOfBounds,
+ 5: errOutOfBounds,
+ })
+ checkRetrieve(t, f, map[uint64][]byte{
+ 6: getChunk(20, 0x11),
+ })
+}
+
+func TestTruncateHeadBelowTail(t *testing.T) {
+ t.Parallel()
+ rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge()
+ fname := fmt.Sprintf("truncate-head-blow-tail-%d", rand.Uint64())
+
+ // Fill table
+ f, err := newTable(os.TempDir(), fname, rm, wm, sg, 40, true)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // Write 7 x 20 bytes, splitting out into four files
+ batch := f.newBatch()
+ require.NoError(t, batch.AppendRaw(0, getChunk(20, 0xFF)))
+ require.NoError(t, batch.AppendRaw(1, getChunk(20, 0xEE)))
+ require.NoError(t, batch.AppendRaw(2, getChunk(20, 0xdd)))
+ require.NoError(t, batch.AppendRaw(3, getChunk(20, 0xcc)))
+ require.NoError(t, batch.AppendRaw(4, getChunk(20, 0xbb)))
+ require.NoError(t, batch.AppendRaw(5, getChunk(20, 0xaa)))
+ require.NoError(t, batch.AppendRaw(6, getChunk(20, 0x11)))
+ require.NoError(t, batch.commit())
+
+ f.truncateTail(4) // Tail = 4
+
+ // NewHead is required to be 3, the entire table should be truncated
+ f.truncateHead(4)
+ checkRetrieveError(t, f, map[uint64]error{
+ 0: errOutOfBounds, // Deleted by tail
+ 1: errOutOfBounds, // Deleted by tail
+ 2: errOutOfBounds, // Deleted by tail
+ 3: errOutOfBounds, // Deleted by tail
+ 4: errOutOfBounds, // Deleted by Head
+ 5: errOutOfBounds, // Deleted by Head
+ 6: errOutOfBounds, // Deleted by Head
+ })
+
+ // Append new items
+ batch = f.newBatch()
+ require.NoError(t, batch.AppendRaw(4, getChunk(20, 0xbb)))
+ require.NoError(t, batch.AppendRaw(5, getChunk(20, 0xaa)))
+ require.NoError(t, batch.AppendRaw(6, getChunk(20, 0x11)))
+ require.NoError(t, batch.commit())
+
+ checkRetrieve(t, f, map[uint64][]byte{
+ 4: getChunk(20, 0xbb),
+ 5: getChunk(20, 0xaa),
+ 6: getChunk(20, 0x11),
+ })
+
+ f.truncateTail(5) // Lazy deleted the item-4, it's hidden
+ f.truncateHead(5) // New head is reset to item-4
+ checkRetrieveError(t, f, map[uint64]error{
+ 4: errOutOfBounds, // Hidden item
+ })
+}
+
+func TestUpgradeLegacyFreezerTable(t *testing.T) {
+ f, err := ioutil.TempFile("", "")
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer os.Remove(f.Name())
+
+ index := &indexEntry{
+ filenum: 100,
+ offset: 200,
+ }
+ encoded := index.append(nil)
+ f.Write(encoded)
+
+ newf, meta, err := repairTableIndex(f)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if newf.Name() != f.Name() {
+ t.Fatal("Unexpected file name")
+ }
+ if meta.tailId != 100 {
+ t.Fatal("Unexpected tail file")
+ }
+ if meta.deleted != 200 {
+ t.Fatal("Unexpected deleted items")
+ }
+ if meta.hidden != 0 {
+ t.Fatal("Unexpected hidden items")
+ }
+ if meta.version != freezerVersion {
+ t.Fatal("Unexpected freezer version")
+ }
+}
+
func checkRetrieve(t *testing.T, f *freezerTable, items map[uint64][]byte) {
t.Helper()
diff --git a/core/rawdb/freezer_test.go b/core/rawdb/freezer_test.go
index d5c3749e5d218..74e3d660cb106 100644
--- a/core/rawdb/freezer_test.go
+++ b/core/rawdb/freezer_test.go
@@ -186,7 +186,7 @@ func TestFreezerConcurrentModifyRetrieve(t *testing.T) {
wg.Wait()
}
-// This test runs ModifyAncients and TruncateAncients concurrently with each other.
+// This test runs ModifyAncients and TruncateHead concurrently with each other.
func TestFreezerConcurrentModifyTruncate(t *testing.T) {
f, dir := newFreezerForTesting(t, freezerTestTableDef)
defer os.RemoveAll(dir)
@@ -196,7 +196,7 @@ func TestFreezerConcurrentModifyTruncate(t *testing.T) {
for i := 0; i < 1000; i++ {
// First reset and write 100 items.
- if err := f.TruncateAncients(0); err != nil {
+ if err := f.TruncateHead(0); err != nil {
t.Fatal("truncate failed:", err)
}
_, err := f.ModifyAncients(func(op ethdb.AncientWriteOp) error {
@@ -231,7 +231,7 @@ func TestFreezerConcurrentModifyTruncate(t *testing.T) {
wg.Done()
}()
go func() {
- truncateErr = f.TruncateAncients(10)
+ truncateErr = f.TruncateHead(10)
wg.Done()
}()
go func() {
diff --git a/core/rawdb/freezer_utils.go b/core/rawdb/freezer_utils.go
new file mode 100644
index 0000000000000..c7510d4da847c
--- /dev/null
+++ b/core/rawdb/freezer_utils.go
@@ -0,0 +1,83 @@
+// Copyright 2022 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see .
+
+package rawdb
+
+import (
+ "io"
+ "io/ioutil"
+ "os"
+ "path/filepath"
+)
+
+// copyFrom copies data from 'srcPath' at offset 'offset' into 'destPath'.
+// The 'destPath' is created if it doesn't exist, otherwise it is overwritten.
+// Before the copy is executed, there is a callback can be registered to
+// manipulate the dest file.
+// It is perfectly valid to have destPath == srcPath.
+func copyFrom(srcPath, destPath string, offset uint64, before func(f *os.File) error) error {
+ // Create a temp file in the same dir where we want it to wind up
+ f, err := ioutil.TempFile(filepath.Dir(destPath), "*")
+ if err != nil {
+ return err
+ }
+ fname := f.Name()
+
+ // Clean up the leftover file
+ defer func() {
+ if f != nil {
+ f.Close()
+ }
+ os.Remove(fname)
+ }()
+
+ // Apply the given function if it's not nil before we copy
+ // the content from the src.
+ if before != nil {
+ if err := before(f); err != nil {
+ return err
+ }
+ }
+ // Open the source file
+ src, err := os.Open(srcPath)
+ if err != nil {
+ return err
+ }
+ if _, err = src.Seek(int64(offset), 0); err != nil {
+ src.Close()
+ return err
+ }
+ // io.Copy uses 32K buffer internally.
+ _, err = io.Copy(f, src)
+ if err != nil {
+ src.Close()
+ return err
+ }
+ // Rename the temporary file to the specified dest name.
+ // src may be same as dest, so needs to be closed before
+ // we do the final move.
+ src.Close()
+
+ if err := f.Close(); err != nil {
+ return err
+ }
+ f = nil
+
+ if err := os.Rename(fname, destPath); err != nil {
+ return err
+ }
+ return nil
+}
diff --git a/core/rawdb/freezer_utils_test.go b/core/rawdb/freezer_utils_test.go
new file mode 100644
index 0000000000000..de8087f9b9361
--- /dev/null
+++ b/core/rawdb/freezer_utils_test.go
@@ -0,0 +1,76 @@
+// Copyright 2022 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see .
+
+package rawdb
+
+import (
+ "bytes"
+ "io/ioutil"
+ "os"
+ "testing"
+)
+
+func TestCopyFrom(t *testing.T) {
+ var (
+ content = []byte{0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8}
+ prefix = []byte{0x9, 0xa, 0xb, 0xc, 0xd, 0xf}
+ )
+ var cases = []struct {
+ src, dest string
+ offset uint64
+ writePrefix bool
+ }{
+ {"foo", "bar", 0, false},
+ {"foo", "bar", 1, false},
+ {"foo", "bar", 8, false},
+ {"foo", "foo", 0, false},
+ {"foo", "foo", 1, false},
+ {"foo", "foo", 8, false},
+ {"foo", "bar", 0, true},
+ {"foo", "bar", 1, true},
+ {"foo", "bar", 8, true},
+ }
+ for _, c := range cases {
+ ioutil.WriteFile(c.src, content, 0644)
+
+ if err := copyFrom(c.src, c.dest, c.offset, func(f *os.File) error {
+ if !c.writePrefix {
+ return nil
+ }
+ f.Write(prefix)
+ return nil
+ }); err != nil {
+ os.Remove(c.src)
+ t.Fatalf("Failed to copy %v", err)
+ }
+
+ blob, err := ioutil.ReadFile(c.dest)
+ if err != nil {
+ os.Remove(c.src)
+ os.Remove(c.dest)
+ t.Fatalf("Failed to read %v", err)
+ }
+ want := content[c.offset:]
+ if c.writePrefix {
+ want = append(prefix, want...)
+ }
+ if !bytes.Equal(blob, want) {
+ t.Fatal("Unexpected value")
+ }
+ os.Remove(c.src)
+ os.Remove(c.dest)
+ }
+}
diff --git a/core/rawdb/table.go b/core/rawdb/table.go
index 91fc31b660d67..f52f6989d765a 100644
--- a/core/rawdb/table.go
+++ b/core/rawdb/table.go
@@ -74,6 +74,12 @@ func (t *table) Ancients() (uint64, error) {
return t.db.Ancients()
}
+// Tail is a noop passthrough that just forwards the request to the underlying
+// database.
+func (t *table) Tail() (uint64, error) {
+ return t.db.Tail()
+}
+
// AncientSize is a noop passthrough that just forwards the request to the underlying
// database.
func (t *table) AncientSize(kind string) (uint64, error) {
@@ -89,10 +95,16 @@ func (t *table) ReadAncients(fn func(reader ethdb.AncientReader) error) (err err
return t.db.ReadAncients(fn)
}
-// TruncateAncients is a noop passthrough that just forwards the request to the underlying
+// TruncateHead is a noop passthrough that just forwards the request to the underlying
+// database.
+func (t *table) TruncateHead(items uint64) error {
+ return t.db.TruncateHead(items)
+}
+
+// TruncateTail is a noop passthrough that just forwards the request to the underlying
// database.
-func (t *table) TruncateAncients(items uint64) error {
- return t.db.TruncateAncients(items)
+func (t *table) TruncateTail(items uint64) error {
+ return t.db.TruncateTail(items)
}
// Sync is a noop passthrough that just forwards the request to the underlying
diff --git a/ethdb/database.go b/ethdb/database.go
index 0a5729c6c1ecc..fafa712523262 100644
--- a/ethdb/database.go
+++ b/ethdb/database.go
@@ -86,6 +86,10 @@ type AncientReader interface {
// Ancients returns the ancient item numbers in the ancient store.
Ancients() (uint64, error)
+ // Tail returns the number of first stored item in the freezer.
+ // This number can also be interpreted as the total deleted item numbers.
+ Tail() (uint64, error)
+
// AncientSize returns the ancient size of the specified category.
AncientSize(kind string) (uint64, error)
}
@@ -106,8 +110,16 @@ type AncientWriter interface {
// The integer return value is the total size of the written data.
ModifyAncients(func(AncientWriteOp) error) (int64, error)
- // TruncateAncients discards all but the first n ancient data from the ancient store.
- TruncateAncients(n uint64) error
+ // TruncateHead discards all but the first n ancient data from the ancient store.
+ // After the truncation, the latest item can be accessed it item_n-1(start from 0).
+ TruncateHead(n uint64) error
+
+ // TruncateTail discards the first n ancient data from the ancient store. The already
+ // deleted items are ignored. After the truncation, the earliest item can be accessed
+ // is item_n(start from 0). The deleted items may not be removed from the ancient store
+ // immediately, but only when the accumulated deleted data reach the threshold then
+ // will be removed all together.
+ TruncateTail(n uint64) error
// Sync flushes all in-memory ancient store data to disk.
Sync() error