Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow no migration of commitment #20181

Merged
merged 6 commits into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 6 additions & 8 deletions store/v2/commitment/store.go
Expand Up @@ -380,7 +380,7 @@ func (c *CommitStore) Restore(version uint64, format uint32, protoReader protoio
var (
importer Importer
snapshotItem snapshotstypes.SnapshotItem
storeKey string
storeKey []byte
)

loop:
Expand All @@ -402,10 +402,10 @@ loop:
importer.Close()
}

storeKey = item.Store.Name
storeKey = []byte(item.Store.Name)
tree := c.multiTrees[item.Store.Name]
if tree == nil {
return snapshotstypes.SnapshotItem{}, fmt.Errorf("store %s not found", storeKey)
return snapshotstypes.SnapshotItem{}, fmt.Errorf("store %s not found", item.Store.Name)
}
importer, err = tree.Import(version)
if err != nil {
Expand All @@ -432,15 +432,13 @@ loop:
node.Value = []byte{}
}

key := []byte(storeKey)
// If the node is a leaf node, it will be written to the storage.
chStorage <- &corestore.StateChanges{
Actor: key,
Actor: storeKey,
StateChanges: []corestore.KVPair{
{
Key: node.Key,
Value: node.Value,
Remove: false,
Key: node.Key,
Value: node.Value,
},
},
}
Expand Down
69 changes: 60 additions & 9 deletions store/v2/migration/manager.go
Expand Up @@ -2,7 +2,9 @@ package migration

import (
"encoding/binary"
"errors"
"fmt"
"io"
"sync"
"time"

Expand All @@ -14,6 +16,7 @@ import (
"cosmossdk.io/store/v2/commitment"
"cosmossdk.io/store/v2/internal/encoding"
"cosmossdk.io/store/v2/snapshots"
snapshotstypes "cosmossdk.io/store/v2/snapshots/types"
"cosmossdk.io/store/v2/storage"
)

Expand Down Expand Up @@ -49,6 +52,8 @@ type Manager struct {
}

// NewManager returns a new Manager.
//
// NOTE: `sc` can be `nil` if don't want to migrate the commitment.
func NewManager(db store.RawDB, sm *snapshots.Manager, ss *storage.StorageStore, sc *commitment.CommitStore, logger log.Logger) *Manager {
return &Manager{
logger: logger,
Expand Down Expand Up @@ -106,8 +111,51 @@ func (m *Manager) Migrate(height uint64) error {
})
eg.Go(func() error {
defer close(chStorage)
_, err := m.stateCommitment.Restore(height, 0, ms, chStorage)
return err
if m.stateCommitment != nil {
if _, err := m.stateCommitment.Restore(height, 0, ms, chStorage); err != nil {
return err
}
} else { // there is no commitment migration, just consume the stream to restore the state storage
var storeKey []byte
loop:
for {
snapshotItem := snapshotstypes.SnapshotItem{}
err := ms.ReadMsg(&snapshotItem)
if errors.Is(err, io.EOF) {
break
}
if err != nil {
return fmt.Errorf("failed to read snapshot item: %w", err)
}
switch item := snapshotItem.Item.(type) {
case *snapshotstypes.SnapshotItem_Store:
storeKey = []byte(item.Store.Name)
case *snapshotstypes.SnapshotItem_IAVL:
if item.IAVL.Height == 0 { // only restore the leaf nodes
key := item.IAVL.Key
if key == nil {
key = []byte{}
}
value := item.IAVL.Value
if value == nil {
value = []byte{}
}
chStorage <- &corestore.StateChanges{
Actor: storeKey,
StateChanges: []corestore.KVPair{
{
Key: key,
Value: value,
},
},
}
}
default:
break loop
}
}
}
return nil
})

if err := eg.Wait(); err != nil {
Expand Down Expand Up @@ -186,12 +234,13 @@ func (m *Manager) Sync() error {
if err := encoding.UnmarshalChangeset(cs, csBytes); err != nil {
return fmt.Errorf("failed to unmarshal changeset: %w", err)
}

if err := m.stateCommitment.WriteBatch(cs); err != nil {
return fmt.Errorf("failed to write changeset to commitment: %w", err)
}
if _, err := m.stateCommitment.Commit(version); err != nil {
return fmt.Errorf("failed to commit changeset to commitment: %w", err)
if m.stateCommitment != nil {
if err := m.stateCommitment.WriteBatch(cs); err != nil {
return fmt.Errorf("failed to write changeset to commitment: %w", err)
}
if _, err := m.stateCommitment.Commit(version); err != nil {
return fmt.Errorf("failed to commit changeset to commitment: %w", err)
}
}
if err := m.stateStorage.ApplyChangeset(version, cs); err != nil {
return fmt.Errorf("failed to write changeset to storage: %w", err)
Expand All @@ -212,7 +261,9 @@ func (m *Manager) Close() error {
if err := m.db.Close(); err != nil {
return fmt.Errorf("failed to close db: %w", err)
}
m.snapshotsManager.EndMigration(m.stateCommitment)
if m.stateCommitment != nil {
m.snapshotsManager.EndMigration(m.stateCommitment)
}

return nil
}
93 changes: 51 additions & 42 deletions store/v2/migration/manager_test.go
Expand Up @@ -18,7 +18,7 @@ import (

var storeKeys = []string{"store1", "store2"}

func setupMigrationManager(t *testing.T) (*Manager, *commitment.CommitStore) {
func setupMigrationManager(t *testing.T, noCommitStore bool) (*Manager, *commitment.CommitStore) {
t.Helper()

db := dbm.NewMemDB()
Expand Down Expand Up @@ -49,57 +49,66 @@ func setupMigrationManager(t *testing.T) (*Manager, *commitment.CommitStore) {

newCommitStore, err := commitment.NewCommitStore(multiTrees1, db1, nil, log.NewNopLogger()) // for store/v2
require.NoError(t, err)
if noCommitStore {
newCommitStore = nil
}

return NewManager(db, snapshotsManager, newStorageStore, newCommitStore, log.NewNopLogger()), commitStore
}

func TestMigrateState(t *testing.T) {
m, orgCommitStore := setupMigrationManager(t)

// apply changeset
toVersion := uint64(100)
keyCount := 10
for version := uint64(1); version <= toVersion; version++ {
cs := corestore.NewChangeset()
for _, storeKey := range storeKeys {
for i := 0; i < keyCount; i++ {
cs.Add([]byte(storeKey), []byte(fmt.Sprintf("key-%d-%d", version, i)), []byte(fmt.Sprintf("value-%d-%d", version, i)), false)
for _, noCommitStore := range []bool{false, true} {
t.Run(fmt.Sprintf("Migrate noCommitStore=%v", noCommitStore), func(t *testing.T) {
m, orgCommitStore := setupMigrationManager(t, noCommitStore)

// apply changeset
toVersion := uint64(100)
keyCount := 10
for version := uint64(1); version <= toVersion; version++ {
cs := corestore.NewChangeset()
for _, storeKey := range storeKeys {
for i := 0; i < keyCount; i++ {
cs.Add([]byte(storeKey), []byte(fmt.Sprintf("key-%d-%d", version, i)), []byte(fmt.Sprintf("value-%d-%d", version, i)), false)
}
}
require.NoError(t, orgCommitStore.WriteBatch(cs))
_, err := orgCommitStore.Commit(version)
require.NoError(t, err)
}
}
require.NoError(t, orgCommitStore.WriteBatch(cs))
_, err := orgCommitStore.Commit(version)
require.NoError(t, err)
}

err := m.Migrate(toVersion - 1)
require.NoError(t, err)

// check the migrated state
for version := uint64(1); version < toVersion; version++ {
for _, storeKey := range storeKeys {
for i := 0; i < keyCount; i++ {
val, err := m.stateCommitment.Get([]byte(storeKey), toVersion-1, []byte(fmt.Sprintf("key-%d-%d", version, i)))
err := m.Migrate(toVersion - 1)
require.NoError(t, err)

if m.stateCommitment != nil {
// check the migrated state
for version := uint64(1); version < toVersion; version++ {
for _, storeKey := range storeKeys {
for i := 0; i < keyCount; i++ {
val, err := m.stateCommitment.Get([]byte(storeKey), toVersion-1, []byte(fmt.Sprintf("key-%d-%d", version, i)))
require.NoError(t, err)
require.Equal(t, []byte(fmt.Sprintf("value-%d-%d", version, i)), val)
}
}
}
// check the latest state
val, err := m.stateCommitment.Get([]byte("store1"), toVersion-1, []byte("key-100-1"))
require.NoError(t, err)
require.Equal(t, []byte(fmt.Sprintf("value-%d-%d", version, i)), val)
require.Nil(t, val)
val, err = m.stateCommitment.Get([]byte("store2"), toVersion-1, []byte("key-100-0"))
require.NoError(t, err)
require.Nil(t, val)
}
}
}
// check the latest state
val, err := m.stateCommitment.Get([]byte("store1"), toVersion-1, []byte("key-100-1"))
require.NoError(t, err)
require.Nil(t, val)
val, err = m.stateCommitment.Get([]byte("store2"), toVersion-1, []byte("key-100-0"))
require.NoError(t, err)
require.Nil(t, val)

// check the storage
for version := uint64(1); version < toVersion; version++ {
for _, storeKey := range storeKeys {
for i := 0; i < keyCount; i++ {
val, err := m.stateStorage.Get([]byte(storeKey), toVersion-1, []byte(fmt.Sprintf("key-%d-%d", version, i)))
require.NoError(t, err)
require.Equal(t, []byte(fmt.Sprintf("value-%d-%d", version, i)), val)
// check the storage
for version := uint64(1); version < toVersion; version++ {
for _, storeKey := range storeKeys {
for i := 0; i < keyCount; i++ {
val, err := m.stateStorage.Get([]byte(storeKey), toVersion-1, []byte(fmt.Sprintf("key-%d-%d", version, i)))
require.NoError(t, err)
require.Equal(t, []byte(fmt.Sprintf("value-%d-%d", version, i)), val)
}
}
}
}
})
}
}
5 changes: 4 additions & 1 deletion store/v2/root/store.go
Expand Up @@ -395,7 +395,10 @@ func (s *Store) writeSC(cs *corestore.Changeset) error {
if err := s.stateCommitment.Close(); err != nil {
return fmt.Errorf("failed to close the old SC store: %w", err)
}
s.stateCommitment = s.migrationManager.GetStateCommitment()
newStateCommitment := s.migrationManager.GetStateCommitment()
if newStateCommitment != nil {
s.stateCommitment = newStateCommitment
}
if err := s.migrationManager.Close(); err != nil {
return fmt.Errorf("failed to close migration manager: %w", err)
}
Expand Down