From fe77bd27d4e56ee8865d628092e567a7712f2a5b Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Thu, 25 Nov 2021 11:02:10 +0100 Subject: [PATCH] core/rawdb: implement freezer tail truncation --- core/rawdb/freezer_table.go | 219 +++++++++++++++++++++---------- core/rawdb/freezer_table_test.go | 38 +++++- core/rawdb/freezer_utils.go | 123 +++++++++++++++++ core/rawdb/freezer_utils_test.go | 38 ++++++ 4 files changed, 344 insertions(+), 74 deletions(-) create mode 100644 core/rawdb/freezer_utils.go create mode 100644 core/rawdb/freezer_utils_test.go diff --git a/core/rawdb/freezer_table.go b/core/rawdb/freezer_table.go index 22405cf9b4f89..12692a69ba190 100644 --- a/core/rawdb/freezer_table.go +++ b/core/rawdb/freezer_table.go @@ -17,17 +17,20 @@ package rawdb import ( + "bufio" "bytes" "encoding/binary" "errors" "fmt" "io" + "io/ioutil" "os" "path/filepath" "sync" "sync/atomic" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/math" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" "github.com/golang/snappy" @@ -107,7 +110,7 @@ type freezerTable struct { // In the case that old items are deleted (from the tail), we use itemOffset // to count how many historic items have gone missing. - itemOffset uint32 // Offset (number of discarded items) + itemOffset uint64 // Offset (number of discarded items) headBytes int64 // Number of bytes written to the head file readMeter metrics.Meter // Meter for measuring the effective amount of data read @@ -123,44 +126,6 @@ func NewFreezerTable(path, name string, disableSnappy bool) (*freezerTable, erro return newTable(path, name, metrics.NilMeter{}, metrics.NilMeter{}, metrics.NilGauge{}, freezerTableSize, disableSnappy) } -// openFreezerFileForAppend opens a freezer table file and seeks to the end -func openFreezerFileForAppend(filename string) (*os.File, error) { - // Open the file without the O_APPEND flag - // because it has differing behaviour during Truncate operations - // on different OS's - file, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE, 0644) - if err != nil { - return nil, err - } - // Seek to end for append - if _, err = file.Seek(0, io.SeekEnd); err != nil { - return nil, err - } - return file, nil -} - -// openFreezerFileForReadOnly opens a freezer table file for read only access -func openFreezerFileForReadOnly(filename string) (*os.File, error) { - return os.OpenFile(filename, os.O_RDONLY, 0644) -} - -// openFreezerFileTruncated opens a freezer table making sure it is truncated -func openFreezerFileTruncated(filename string) (*os.File, error) { - return os.OpenFile(filename, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644) -} - -// truncateFreezerFile resizes a freezer table file and seeks to the end -func truncateFreezerFile(file *os.File, size int64) error { - if err := file.Truncate(size); err != nil { - return err - } - // Seek to end for append - if _, err := file.Seek(0, io.SeekEnd); err != nil { - return err - } - return nil -} - // newTable opens a freezer table, creating the data and index files if they are // non existent. Both files are truncated to the shortest common length to ensure // they don't go out of sync. @@ -177,7 +142,7 @@ func newTable(path string, name string, readMeter metrics.Meter, writeMeter metr // Compressed idx idxName = fmt.Sprintf("%s.cidx", name) } - offsets, err := openFreezerFileForAppend(filepath.Join(path, idxName)) + offsets, err := openFileForAppend(filepath.Join(path, idxName)) if err != nil { return nil, err } @@ -227,7 +192,7 @@ func (t *freezerTable) repair() error { } // Ensure the index is a multiple of indexEntrySize bytes if overflow := stat.Size() % indexEntrySize; overflow != 0 { - truncateFreezerFile(t.index, stat.Size()-overflow) // New file can't trigger this path + truncateFile(t.index, stat.Size()-overflow) // New file can't trigger this path } // Retrieve the file sizes and prepare for truncation if stat, err = t.index.Stat(); err != nil { @@ -248,11 +213,11 @@ func (t *freezerTable) repair() error { firstIndex.unmarshalBinary(buffer) t.tailId = firstIndex.filenum - t.itemOffset = firstIndex.offset + t.itemOffset = uint64(firstIndex.offset) t.index.ReadAt(buffer, offsetsSize-indexEntrySize) lastIndex.unmarshalBinary(buffer) - t.head, err = t.openFile(lastIndex.filenum, openFreezerFileForAppend) + t.head, err = t.openFile(lastIndex.filenum, openFileForAppend) if err != nil { return err } @@ -268,7 +233,7 @@ func (t *freezerTable) repair() error { // Truncate the head file to the last offset pointer if contentExp < contentSize { t.logger.Warn("Truncating dangling head", "indexed", common.StorageSize(contentExp), "stored", common.StorageSize(contentSize)) - if err := truncateFreezerFile(t.head, contentExp); err != nil { + if err := truncateFile(t.head, contentExp); err != nil { return err } contentSize = contentExp @@ -276,7 +241,7 @@ func (t *freezerTable) repair() error { // Truncate the index to point within the head file if contentExp > contentSize { t.logger.Warn("Truncating dangling indexes", "indexed", common.StorageSize(contentExp), "stored", common.StorageSize(contentSize)) - if err := truncateFreezerFile(t.index, offsetsSize-indexEntrySize); err != nil { + if err := truncateFile(t.index, offsetsSize-indexEntrySize); err != nil { return err } offsetsSize -= indexEntrySize @@ -287,7 +252,7 @@ func (t *freezerTable) repair() error { if newLastIndex.filenum != lastIndex.filenum { // Release earlier opened file t.releaseFile(lastIndex.filenum) - if t.head, err = t.openFile(newLastIndex.filenum, openFreezerFileForAppend); err != nil { + if t.head, err = t.openFile(newLastIndex.filenum, openFileForAppend); err != nil { return err } if stat, err = t.head.Stat(); err != nil { @@ -309,7 +274,7 @@ func (t *freezerTable) repair() error { return err } // Update the item and byte counters and return - t.items = uint64(t.itemOffset) + uint64(offsetsSize/indexEntrySize-1) // last indexEntry points to the end of the data file + t.items = t.itemOffset + uint64(offsetsSize/indexEntrySize-1) // last indexEntry points to the end of the data file t.headBytes = contentSize t.headId = lastIndex.filenum @@ -330,25 +295,30 @@ func (t *freezerTable) preopen() (err error) { t.releaseFilesAfter(0, false) // Open all except head in RDONLY for i := t.tailId; i < t.headId; i++ { - if _, err = t.openFile(i, openFreezerFileForReadOnly); err != nil { + if _, err = t.openFile(i, openFileForReadOnly); err != nil { return err } } // Open head in read/write - t.head, err = t.openFile(t.headId, openFreezerFileForAppend) + t.head, err = t.openFile(t.headId, openFileForAppend) return err } // truncate discards any recent data above the provided threshold number. -func (t *freezerTable) truncate(items uint64) error { +func (t *freezerTable) truncate(threshold uint64) error { t.lock.Lock() defer t.lock.Unlock() - + var ( + tail = t.itemOffset + head = atomic.LoadUint64(&t.items) + ) // If our item count is correct, don't do anything - existing := atomic.LoadUint64(&t.items) - if existing <= items { + if head <= threshold { return nil } + if threshold < tail { + return errors.New("truncation below tail") + } // We need to truncate, save the old size for metrics tracking oldSize, err := t.sizeNolock() if err != nil { @@ -356,16 +326,18 @@ func (t *freezerTable) truncate(items uint64) error { } // Something's out of sync, truncate the table's offset index log := t.logger.Debug - if existing > items+1 { + if head > threshold+1 { log = t.logger.Warn // Only loud warn if we delete multiple items } - log("Truncating freezer table", "items", existing, "limit", items) - if err := truncateFreezerFile(t.index, int64(items+1)*indexEntrySize); err != nil { + log("Truncating freezer table", "from", threshold, "to", head, "items", head-threshold) + // Take the tail offset into account + idx := threshold - tail + if err := truncateFile(t.index, int64(idx+1)*indexEntrySize); err != nil { return err } // Calculate the new expected size of the data file and truncate it buffer := make([]byte, indexEntrySize) - if _, err := t.index.ReadAt(buffer, int64(items*indexEntrySize)); err != nil { + if _, err := t.index.ReadAt(buffer, int64(idx*indexEntrySize)); err != nil { return err } var expected indexEntry @@ -375,7 +347,7 @@ func (t *freezerTable) truncate(items uint64) error { if expected.filenum != t.headId { // If already open for reading, force-reopen for writing t.releaseFile(expected.filenum) - newHead, err := t.openFile(expected.filenum, openFreezerFileForAppend) + newHead, err := t.openFile(expected.filenum, openFileForAppend) if err != nil { return err } @@ -386,12 +358,12 @@ func (t *freezerTable) truncate(items uint64) error { t.head = newHead t.headId = expected.filenum } - if err := truncateFreezerFile(t.head, int64(expected.offset)); err != nil { + if err := truncateFile(t.head, int64(expected.offset)); err != nil { return err } // All data files truncated, set internal counters and return t.headBytes = int64(expected.offset) - atomic.StoreUint64(&t.items, items) + atomic.StoreUint64(&t.items, threshold) // Retrieve the new size and update the total size counter newSize, err := t.sizeNolock() @@ -403,6 +375,106 @@ func (t *freezerTable) truncate(items uint64) error { return nil } +func (t *freezerTable) fileName(number int) string { + if t.noCompression { + return fmt.Sprintf("%s.%04d.rdat", t.name, number) + } + return fmt.Sprintf("%s.%04d.cdat", t.name, number) +} + +// tailTruncate discards any old data below the provided threshold number. +func (t *freezerTable) tailTruncate(threshold uint64) error { + t.lock.Lock() + defer t.lock.Unlock() + var ( + tail = t.itemOffset + head = atomic.LoadUint64(&t.items) + ) + if tail >= threshold { + return nil + } + if threshold >= head { + return errors.New("tail truncation above head") + } + // We need to truncate, save the old size for metrics tracking + oldSize, err := t.sizeNolock() + if err != nil { + return err + } + log := t.logger.Debug + if threshold-tail > 1 { + log = t.logger.Warn // Only loud warn if we delete multiple items + } + log("Tail-truncating freezer table", "from", tail, "to", threshold, "items", threshold-tail) + // Determine what file the new first item resides in + indexes, err := t.getIndices(threshold, 1) + if err != nil { + return err + } + start, _, fileNo := indexes[0].bounds(indexes[1]) + //Close/release all files up to and including fileNo + t.releaseFilesBefore(fileNo, false) + // In file corresponding to fileNo, we now need to delete all data prior to 'start' + destPath := t.fileName(int(fileNo)) + copyFrom(destPath, destPath, uint64(start)) + // Now we need to update the index. + { + tmpIndex, err := ioutil.TempFile(filepath.Dir(destPath), "index-tmp-*") + if err != nil { + return err + } + // Use a buffered writer to minimize write(2) syscalls. + bufw := bufio.NewWriter(tmpIndex) + var prev *indexEntry = &indexEntry{fileNo, uint32(threshold)} + bufw.Write(prev.append(nil)) + err = iterateIndexFile(threshold-t.itemOffset+1, t.index, func(cur *indexEntry) bool { + if _, _, fileNumber := prev.bounds(cur); fileNumber == fileNo { + // Entry needs to be modified + log("Rewriting index", "filenumber", fileNo, "new offset", cur.offset-start) + cur.offset -= start + } + bufw.Write(cur.append(nil)) + prev = cur + return true + }) + if err != nil { + return err + } + bufw.Flush() + if err := tmpIndex.Close(); err != nil { + return err + } + // Need to close the current index before we overwrite it + if err := t.index.Close(); err != nil { + return err + } + // New indexes written to tempfile, now overwrite old indexfile + if err := os.Rename(tmpIndex.Name(), t.index.Name()); err != nil { + return err + } + indexName := fmt.Sprintf("%s.ridx", t.name) + if !t.noCompression { + // Compressed idx + indexName = fmt.Sprintf("%s.cidx", t.name) + } + if offsets, err := openFileForAppend(filepath.Join(t.path, indexName)); err != nil { + return err + } else { + t.index = offsets + } + } + // All data files truncated, set internal counters and return + t.itemOffset = threshold + t.tailId = fileNo + // Retrieve the new size and update the total size counter + newSize, err := t.sizeNolock() + if err != nil { + return err + } + t.sizeGauge.Dec(int64(oldSize - newSize)) + return nil +} + // Close closes all opened files. func (t *freezerTable) Close() error { t.lock.Lock() @@ -431,12 +503,7 @@ func (t *freezerTable) Close() error { func (t *freezerTable) openFile(num uint32, opener func(string) (*os.File, error)) (f *os.File, err error) { var exist bool if f, exist = t.files[num]; !exist { - var name string - if t.noCompression { - name = fmt.Sprintf("%s.%04d.rdat", t.name, num) - } else { - name = fmt.Sprintf("%s.%04d.cdat", t.name, num) - } + name := t.fileName(int(num)) f, err = opener(filepath.Join(t.path, name)) if err != nil { return nil, err @@ -455,10 +522,20 @@ func (t *freezerTable) releaseFile(num uint32) { } } +// releaseFilesAfter closes all open files with a lower or equal number, and optionally also deletes the files +func (t *freezerTable) releaseFilesBefore(num uint32, remove bool) { + t.releaseFilesBetween(0, num, remove) +} + // releaseFilesAfter closes all open files with a higher number, and optionally also deletes the files func (t *freezerTable) releaseFilesAfter(num uint32, remove bool) { + t.releaseFilesBetween(num+1, math.MaxUint32, remove) +} + +// releaseFilesBetween closes all open files between low and high, inclusive +func (t *freezerTable) releaseFilesBetween(low, high uint32, remove bool) { for fnum, f := range t.files { - if fnum > num { + if fnum >= low && fnum < high { delete(t.files, fnum) f.Close() if remove { @@ -476,7 +553,7 @@ func (t *freezerTable) releaseFilesAfter(num uint32, remove bool) { // it will return error. func (t *freezerTable) getIndices(from, count uint64) ([]*indexEntry, error) { // Apply the table-offset - from = from - uint64(t.itemOffset) + from = from - t.itemOffset // For reading N items, we need N+1 indices. buffer := make([]byte, (count+1)*indexEntrySize) if _, err := t.index.ReadAt(buffer, int64(from*indexEntrySize)); err != nil { @@ -568,7 +645,7 @@ func (t *freezerTable) retrieveItems(start, count, maxBytes uint64) ([]byte, []i itemCount := atomic.LoadUint64(&t.items) // max number // Ensure the start is written, not deleted from the tail, and that the // caller actually wants something - if itemCount <= start || uint64(t.itemOffset) > start || count == 0 { + if itemCount <= start || t.itemOffset > start || count == 0 { return nil, nil, errOutOfBounds } if start+count > itemCount { @@ -683,14 +760,14 @@ func (t *freezerTable) advanceHead() error { // We open the next file in truncated mode -- if this file already // exists, we need to start over from scratch on it. nextID := t.headId + 1 - newHead, err := t.openFile(nextID, openFreezerFileTruncated) + newHead, err := t.openFile(nextID, openFileTruncated) if err != nil { return err } // Close old file, and reopen in RDONLY mode. t.releaseFile(t.headId) - t.openFile(t.headId, openFreezerFileForReadOnly) + t.openFile(t.headId, openFileForReadOnly) // Swap out the current head. t.head = newHead diff --git a/core/rawdb/freezer_table_test.go b/core/rawdb/freezer_table_test.go index 803809b5207f4..fbe4abb6ac8d1 100644 --- a/core/rawdb/freezer_table_test.go +++ b/core/rawdb/freezer_table_test.go @@ -502,10 +502,10 @@ func TestFreezerReadAndTruncate(t *testing.T) { for y := byte(0); y < 30; y++ { f.Retrieve(uint64(y)) } - // Now, truncate back to zero - f.truncate(0) - + if err := f.truncate(0); err != nil{ + t.Fatal(err) + } // Write the data again batch := f.newBatch() for x := 0; x < 30; x++ { @@ -829,3 +829,35 @@ func TestSequentialReadByteLimit(t *testing.T) { } } } +/* +func TestFreezerTailTruncate(t *testing.T){ + t.Parallel() + rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge() + fname := fmt.Sprintf("trailtruncate-%d", rand.Uint64()) + // Fill table + { + // Table size is 40 bytes per file. We write 7 bytes per item, so it doesn't + // line up exactly + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 40, true) + if err != nil { + t.Fatal(err) + } + // Write 100 x 7 bytes + batch := f.newBatch() + for i := 0; i < 100; i++{ + require.NoError(t, batch.AppendRaw(i, getChunk(7, byte(i)))) + } + f.Close() + } + // Now tail-truncate item by item + { // Open it, iterate, verify iteration + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 40, true) + if err != nil { + t.Fatal(err) + } + for i := 5; i < 6; i++ { + f.tailTruncate(i) + } + } +} +*/ \ No newline at end of file diff --git a/core/rawdb/freezer_utils.go b/core/rawdb/freezer_utils.go new file mode 100644 index 0000000000000..cbac74f26ae23 --- /dev/null +++ b/core/rawdb/freezer_utils.go @@ -0,0 +1,123 @@ +// Copyright 2021 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rawdb + +import ( + "io" + "io/ioutil" + "os" + "path/filepath" +) + +// openFileForAppend opens a file in append-mode and seeks to the end. +func openFileForAppend(filename string) (*os.File, error) { + // Open the file without the O_APPEND flag + // because it has differing behaviour during Truncate operations + // on different OS's + file, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE, 0644) + if err != nil { + return nil, err + } + // Seek to end for append + if _, err = file.Seek(0, io.SeekEnd); err != nil { + return nil, err + } + return file, nil +} + +// openFileForReadOnly opens a file for read only access. +func openFileForReadOnly(filename string) (*os.File, error) { + return os.OpenFile(filename, os.O_RDONLY, 0644) +} + +// openFileTruncated opens a file making sure it is truncated. +func openFileTruncated(filename string) (*os.File, error) { + return os.OpenFile(filename, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644) +} + +// truncateFile resizes a file to the given size, and seeks to the end. +func truncateFile(file *os.File, size int64) error { + if err := file.Truncate(size); err != nil { + return err + } + // Seek to end for append + if _, err := file.Seek(0, io.SeekEnd); err != nil { + return err + } + return nil +} + +// copyFrom copies data from 'srcPath' at offset 'offset' into 'destPath'. +// The 'destPath' is created if it doesn't exist, otherwise it is overwritten. +// It is perfectly valid to have destPath == srcPath. +func copyFrom(srcPath, destPath string, offset uint64) error { + // Create a temp file in the same dir where we want it to wind up + f, err := ioutil.TempFile(filepath.Dir(destPath), "copy-tmp-*") + if err != nil { + return err + } + tmpFilePath := f.Name() + defer func() { + if f != nil { + f.Close() + } + os.Remove(tmpFilePath) + }() + // Open the source file + src, err := os.Open(srcPath) + if err != nil { + return err + } + if _, err = src.Seek(int64(offset), 0); err != nil { + src.Close() + return err + } + // io.Copy uses 32K buffer internally. + _, err = io.Copy(f, src) + // src may be same as dest, so needs to be closed before we do the + // final move. + src.Close() + if err != nil { + return err + } + if err := f.Close(); err != nil { + return err + } + f = nil + // Now change tempfile into the actual destination file + if err := os.Rename(tmpFilePath, destPath); err != nil { + return err + } + return nil +} + +func iterateIndexFile(from uint64, indexFile *os.File, callback func(entry *indexEntry) bool) error { + // Apply the table-offset + //from = from - t.itemOffset + for { + buffer := make([]byte, indexEntrySize) + if _, err := indexFile.ReadAt(buffer, int64(from*indexEntrySize)); err != nil { + return err + } + index := new(indexEntry) + index.unmarshalBinary(buffer) + if !callback(index) { + break + } + } + return nil +} diff --git a/core/rawdb/freezer_utils_test.go b/core/rawdb/freezer_utils_test.go new file mode 100644 index 0000000000000..1145c586c2883 --- /dev/null +++ b/core/rawdb/freezer_utils_test.go @@ -0,0 +1,38 @@ +package rawdb + +import ( + "bytes" + mrand "math/rand" + "os" + "path/filepath" + "testing" +) + +func TestCopyFrom(t *testing.T) { + for i := 0; i < 20; i++ { + testCopyFrom(t) + } +} + +func testCopyFrom(t *testing.T) { + data := make([]byte, 1024*33) + mrand.Read(data) + src := filepath.Join(os.TempDir(), "tmp-source") + dst := filepath.Join(os.TempDir(), "tmp-dest") + os.WriteFile(src, data, 0600) + offset := uint64(mrand.Intn(len(data))) + if err := copyFrom(src, dst, offset); err != nil { + t.Fatal(err) + } + // Now validate that the contents match + haveData, err := os.ReadFile(dst) + if err != nil { + t.Fatal(err) + } + if have, want := len(haveData), len(data[offset:]); have != want { + t.Fatalf("wrong data, have length %d, want length %d", have, want) + } + if !bytes.Equal(haveData, data[offset:]) { + t.Fatalf("data mismatch\nhave:\n%x\nwant\n%x", haveData, data[offset:]) + } +}