Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core, ethdb, tests, trie: introduce database snapshot #24486

Merged
merged 1 commit into from Mar 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 7 additions & 0 deletions core/rawdb/table.go
Expand Up @@ -177,6 +177,13 @@ func (t *table) NewBatchWithSize(size int) ethdb.Batch {
return &tableBatch{t.db.NewBatchWithSize(size), t.prefix}
}

// NewSnapshot creates a database snapshot based on the current state.
// The created snapshot will not be affected by all following mutations
// happened on the database.
func (t *table) NewSnapshot() (ethdb.Snapshot, error) {
return t.db.NewSnapshot()
}

// tableBatch is a wrapper around a database batch that prefixes each key access
// with a pre-configured string.
type tableBatch struct {
Expand Down
2 changes: 2 additions & 0 deletions ethdb/database.go
Expand Up @@ -64,6 +64,7 @@ type KeyValueStore interface {
Iteratee
Stater
Compacter
Snapshotter
io.Closer
}

Expand Down Expand Up @@ -153,5 +154,6 @@ type Database interface {
Iteratee
Stater
Compacter
Snapshotter
io.Closer
}
62 changes: 62 additions & 0 deletions ethdb/dbtest/testsuite.go
Expand Up @@ -313,6 +313,68 @@ func TestDatabaseSuite(t *testing.T, New func() ethdb.KeyValueStore) {
}
})

t.Run("Snapshot", func(t *testing.T) {
db := New()
defer db.Close()

initial := map[string]string{
"k1": "v1", "k2": "v2", "k3": "", "k4": "",
}
for k, v := range initial {
db.Put([]byte(k), []byte(v))
}
snapshot, err := db.NewSnapshot()
if err != nil {
t.Fatal(err)
}
for k, v := range initial {
got, err := snapshot.Get([]byte(k))
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(got, []byte(v)) {
t.Fatalf("Unexpected value want: %v, got %v", v, got)
}
}

// Flush more modifications into the database, ensure the snapshot
// isn't affected.
var (
update = map[string]string{"k1": "v1-b", "k3": "v3-b"}
insert = map[string]string{"k5": "v5-b"}
delete = map[string]string{"k2": ""}
)
for k, v := range update {
db.Put([]byte(k), []byte(v))
}
for k, v := range insert {
db.Put([]byte(k), []byte(v))
}
for k := range delete {
db.Delete([]byte(k))
}
for k, v := range initial {
got, err := snapshot.Get([]byte(k))
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(got, []byte(v)) {
t.Fatalf("Unexpected value want: %v, got %v", v, got)
}
}
for k := range insert {
got, err := snapshot.Get([]byte(k))
if err == nil || len(got) != 0 {
t.Fatal("Unexpected value")
}
}
for k := range delete {
got, err := snapshot.Get([]byte(k))
if err != nil || len(got) == 0 {
t.Fatal("Unexpected deletion")
}
}
})
}

func iterateKeys(it ethdb.Iterator) []string {
Expand Down
36 changes: 36 additions & 0 deletions ethdb/leveldb/leveldb.go
Expand Up @@ -228,6 +228,19 @@ func (db *Database) NewIterator(prefix []byte, start []byte) ethdb.Iterator {
return db.db.NewIterator(bytesPrefixRange(prefix, start), nil)
}

// NewSnapshot creates a database snapshot based on the current state.
// The created snapshot will not be affected by all following mutations
// happened on the database.
// Note don't forget to release the snapshot once it's used up, otherwise
// the stale data will never be cleaned up by the underlying compactor.
func (db *Database) NewSnapshot() (ethdb.Snapshot, error) {
snap, err := db.db.GetSnapshot()
if err != nil {
return nil, err
}
return &snapshot{db: snap}, nil
}

// Stat returns a particular internal stat of the database.
func (db *Database) Stat(property string) (string, error) {
return db.db.GetProperty(property)
Expand Down Expand Up @@ -527,3 +540,26 @@ func bytesPrefixRange(prefix, start []byte) *util.Range {
r.Start = append(r.Start, start...)
return r
}

// snapshot wraps a leveldb snapshot for implementing the Snapshot interface.
type snapshot struct {
db *leveldb.Snapshot
}

// Has retrieves if a key is present in the snapshot backing by a key-value
// data store.
func (snap *snapshot) Has(key []byte) (bool, error) {
return snap.db.Has(key, nil)
}

// Get retrieves the given key if it's present in the snapshot backing by
// key-value data store.
func (snap *snapshot) Get(key []byte) ([]byte, error) {
return snap.db.Get(key, nil)
}

// Release releases associated resources. Release should always succeed and can
// be called multiple times without causing error.
func (snap *snapshot) Release() {
snap.db.Release()
}
69 changes: 68 additions & 1 deletion ethdb/memorydb/memorydb.go
Expand Up @@ -35,6 +35,10 @@ var (
// errMemorydbNotFound is returned if a key is requested that is not found in
// the provided memory database.
errMemorydbNotFound = errors.New("not found")

// errSnapshotReleased is returned if callers want to retrieve data from a
// released snapshot.
errSnapshotReleased = errors.New("snapshot released")
)

// Database is an ephemeral key-value store. Apart from basic data storage
Expand All @@ -53,7 +57,7 @@ func New() *Database {
}
}

// NewWithCap returns a wrapped map pre-allocated to the provided capcity with
// NewWithCap returns a wrapped map pre-allocated to the provided capacity with
// all the required database interface methods implemented.
func NewWithCap(size int) *Database {
return &Database{
Expand Down Expand Up @@ -170,6 +174,13 @@ func (db *Database) NewIterator(prefix []byte, start []byte) ethdb.Iterator {
}
}

// NewSnapshot creates a database snapshot based on the current state.
// The created snapshot will not be affected by all following mutations
// happened on the database.
func (db *Database) NewSnapshot() (ethdb.Snapshot, error) {
return newSnapshot(db), nil
}

// Stat returns a particular internal stat of the database.
func (db *Database) Stat(property string) (string, error) {
return "", errors.New("unknown property")
Expand Down Expand Up @@ -320,3 +331,59 @@ func (it *iterator) Value() []byte {
func (it *iterator) Release() {
it.keys, it.values = nil, nil
}

// snapshot wraps a batch of key-value entries deep copied from the in-memory
// database for implementing the Snapshot interface.
type snapshot struct {
db map[string][]byte
lock sync.RWMutex
}

// newSnapshot initializes the snapshot with the given database instance.
func newSnapshot(db *Database) *snapshot {
db.lock.RLock()
defer db.lock.RUnlock()

copied := make(map[string][]byte)
for key, val := range db.db {
copied[key] = common.CopyBytes(val)
}
return &snapshot{db: copied}
}

// Has retrieves if a key is present in the snapshot backing by a key-value
// data store.
func (snap *snapshot) Has(key []byte) (bool, error) {
snap.lock.RLock()
defer snap.lock.RUnlock()

if snap.db == nil {
return false, errSnapshotReleased
}
_, ok := snap.db[string(key)]
return ok, nil
}

// Get retrieves the given key if it's present in the snapshot backing by
// key-value data store.
func (snap *snapshot) Get(key []byte) ([]byte, error) {
snap.lock.RLock()
defer snap.lock.RUnlock()

if snap.db == nil {
return nil, errSnapshotReleased
}
if entry, ok := snap.db[string(key)]; ok {
return common.CopyBytes(entry), nil
}
return nil, errMemorydbNotFound
}

// Release releases associated resources. Release should always succeed and can
// be called multiple times without causing error.
func (snap *snapshot) Release() {
snap.lock.Lock()
defer snap.lock.Unlock()

snap.db = nil
}
41 changes: 41 additions & 0 deletions ethdb/snapshot.go
@@ -0,0 +1,41 @@
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package ethdb

type Snapshot interface {
// Has retrieves if a key is present in the snapshot backing by a key-value
// data store.
Has(key []byte) (bool, error)

// Get retrieves the given key if it's present in the snapshot backing by
// key-value data store.
Get(key []byte) ([]byte, error)

// Release releases associated resources. Release should always succeed and can
// be called multiple times without causing error.
Release()
}

// Snapshotter wraps the Snapshot method of a backing data store.
type Snapshotter interface {
// NewSnapshot creates a database snapshot based on the current state.
// The created snapshot will not be affected by all following mutations
// happened on the database.
// Note don't forget to release the snapshot once it's used up, otherwise
// the stale data will never be cleaned up by the underlying compactor.
NewSnapshot() (Snapshot, error)
}
1 change: 1 addition & 0 deletions tests/fuzzers/stacktrie/trie_fuzzer.go
Expand Up @@ -67,6 +67,7 @@ func (s *spongeDb) Get(key []byte) ([]byte, error) { return nil, error
func (s *spongeDb) Delete(key []byte) error { panic("implement me") }
func (s *spongeDb) NewBatch() ethdb.Batch { return &spongeBatch{s} }
func (s *spongeDb) NewBatchWithSize(size int) ethdb.Batch { return &spongeBatch{s} }
func (s *spongeDb) NewSnapshot() (ethdb.Snapshot, error) { panic("implement me") }
func (s *spongeDb) Stat(property string) (string, error) { panic("implement me") }
func (s *spongeDb) Compact(start []byte, limit []byte) error { panic("implement me") }
func (s *spongeDb) Close() error { return nil }
Expand Down
6 changes: 5 additions & 1 deletion trie/iterator_test.go
Expand Up @@ -475,9 +475,13 @@ func (l *loggingDb) NewBatchWithSize(size int) ethdb.Batch {
}

func (l *loggingDb) NewIterator(prefix []byte, start []byte) ethdb.Iterator {
fmt.Printf("NewIterator\n")
return l.backend.NewIterator(prefix, start)
}

func (l *loggingDb) NewSnapshot() (ethdb.Snapshot, error) {
return l.backend.NewSnapshot()
}

func (l *loggingDb) Stat(property string) (string, error) {
return l.backend.Stat(property)
}
Expand Down
1 change: 1 addition & 0 deletions trie/trie_test.go
Expand Up @@ -676,6 +676,7 @@ func (s *spongeDb) Get(key []byte) ([]byte, error) { return nil, error
func (s *spongeDb) Delete(key []byte) error { panic("implement me") }
func (s *spongeDb) NewBatch() ethdb.Batch { return &spongeBatch{s} }
func (s *spongeDb) NewBatchWithSize(size int) ethdb.Batch { return &spongeBatch{s} }
func (s *spongeDb) NewSnapshot() (ethdb.Snapshot, error) { panic("implement me") }
func (s *spongeDb) Stat(property string) (string, error) { panic("implement me") }
func (s *spongeDb) Compact(start []byte, limit []byte) error { panic("implement me") }
func (s *spongeDb) Close() error { return nil }
Expand Down