Skip to content

Commit

Permalink
chore(store/v2): cleanup the migration API (#20298)
Browse files Browse the repository at this point in the history
  • Loading branch information
cool-develope committed May 13, 2024
1 parent d90f552 commit 559f784
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 45 deletions.
3 changes: 1 addition & 2 deletions store/v2/migration/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,14 +181,13 @@ func (m *Manager) writeChangeset() error {
}

batch := m.db.NewBatch()
defer batch.Close()

if err := batch.Set(csKey, csBytes); err != nil {
return fmt.Errorf("failed to write changeset to db.Batch: %w", err)
}
if err := batch.Write(); err != nil {
return fmt.Errorf("failed to write changeset to db: %w", err)
}
batch.Close()
}

return nil
Expand Down
12 changes: 10 additions & 2 deletions store/v2/root/migrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,16 @@ func (s *MigrateStoreTestSuite) TestMigrateState() {
originalLatestVersion, err := s.rootStore.GetLatestVersion()
s.Require().NoError(err)

// start the migration process
s.Require().NoError(s.rootStore.StartMigration())
// check if the Query fallback to the original SC
for version := uint64(1); version <= originalLatestVersion; version++ {
for _, storeKey := range storeKeys {
for i := 0; i < 10; i++ {
res, err := s.rootStore.Query([]byte(storeKey), version, []byte(fmt.Sprintf("key-%d-%d", version, i)), true)
s.Require().NoError(err)
s.Require().Equal([]byte(fmt.Sprintf("value-%d-%d", version, i)), res.Value)
}
}
}

// continue to apply changeset against the original store
latestVersion := originalLatestVersion + 1
Expand Down
77 changes: 42 additions & 35 deletions store/v2/root/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ type Store struct {
isMigrating bool
}

// New creates a new root Store instance.
//
// NOTE: The migration manager is optional and can be nil if no migration is required.
func New(
logger log.Logger,
ss store.VersionedDatabase,
Expand All @@ -73,6 +76,7 @@ func New(
stateCommitment: sc,
migrationManager: mm,
telemetry: m,
isMigrating: mm != nil,
}, nil
}

Expand Down Expand Up @@ -163,24 +167,29 @@ func (s *Store) Query(storeKey []byte, version uint64, key []byte, prove bool) (
defer s.telemetry.MeasureSince(now, "root_store", "query")
}

val, err := s.stateStorage.Get(storeKey, version, key)
if err != nil || val == nil {
// fallback to querying SC backend if not found in SS backend
//
// Note, this should only used during migration, i.e. while SS and IAVL v2
// are being asynchronously synced.
var val []byte
var err error
if s.isMigrating { // if we're migrating, we need to query the SC backend
val, err = s.stateCommitment.Get(storeKey, version, key)
if err != nil {
return store.QueryResult{}, fmt.Errorf("failed to query SC store: %w", err)
}
} else {
val, err = s.stateStorage.Get(storeKey, version, key)
if err != nil {
return store.QueryResult{}, fmt.Errorf("failed to query SS store: %w", err)
}
if val == nil {
// fallback to querying SC backend if not found in SS backend
//
// Note, this should only used during migration, i.e. while SS and IAVL v2
// are being asynchronously synced.
bz, scErr := s.stateCommitment.Get(storeKey, version, key)
if scErr != nil {
return store.QueryResult{}, fmt.Errorf("failed to query SC store: %w", scErr)
}

val = bz
}

if err != nil {
return store.QueryResult{}, fmt.Errorf("failed to query SS store: %w", err)
}
}

result := store.QueryResult{
Expand Down Expand Up @@ -235,6 +244,11 @@ func (s *Store) loadVersion(v uint64) error {
// set lastCommitInfo explicitly s.t. Commit commits the correct version, i.e. v+1
s.lastCommitInfo = &proof.CommitInfo{Version: v}

// if we're migrating, we need to start the migration process
if s.isMigrating {
s.startMigration()
}

return nil
}

Expand Down Expand Up @@ -289,20 +303,18 @@ func (s *Store) Commit(cs *corestore.Changeset) ([]byte, error) {

eg := new(errgroup.Group)

// commit SS async
eg.Go(func() error {
// if we're migrating, we don't want to commit to the state storage
// to avoid parallel writes
if s.isMigrating {
return nil
}

if err := s.stateStorage.ApplyChangeset(version, cs); err != nil {
return fmt.Errorf("failed to commit SS: %w", err)
}
// if we're migrating, we don't want to commit to the state storage to avoid
// parallel writes
if !s.isMigrating {
// commit SS async
eg.Go(func() error {
if err := s.stateStorage.ApplyChangeset(version, cs); err != nil {
return fmt.Errorf("failed to commit SS: %w", err)
}

return nil
})
return nil
})
}

// commit SC async
eg.Go(func() error {
Expand Down Expand Up @@ -344,22 +356,19 @@ func (s *Store) Prune(version uint64) error {
return nil
}

// StartMigration starts the migration process and initializes the channels.
// An error is returned if migration is already in progress.
// startMigration starts a migration process to migrate the RootStore/v1 to the
// SS and SC backends of store/v2 and initializes the channels.
// It runs in a separate goroutine and replaces the current RootStore with the
// migrated new backends once the migration is complete.
//
// NOTE: This method should only be called once after loadVersion.
func (s *Store) StartMigration() error {
if s.isMigrating {
return fmt.Errorf("migration already in progress")
}

func (s *Store) startMigration() {
// buffer at most 1 changeset, if the receiver is behind attempting to buffer
// more than 1 will block.
s.chChangeset = make(chan *migration.VersionedChangeset, 1)
// it is used to signal the migration manager that the migration is done
s.chDone = make(chan struct{})

s.isMigrating = true

mtx := sync.Mutex{}
mtx.Lock()
go func() {
Expand All @@ -374,8 +383,6 @@ func (s *Store) StartMigration() error {
// wait for the migration manager to start
mtx.Lock()
defer mtx.Unlock()

return nil
}

// writeSC accepts a Changeset and writes that as a batch to the underlying SC
Expand Down
6 changes: 0 additions & 6 deletions store/v2/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,6 @@ type RootStore interface {
// old versions of the RootStore by the CLI.
Prune(version uint64) error

// StartMigration starts a migration process to migrate the RootStore/v1 to the
// SS and SC backends of store/v2.
// It runs in a separate goroutine and replaces the current RootStore with the
// migrated new backends once the migration is complete.
StartMigration() error

// SetMetrics sets the telemetry handler on the RootStore.
SetMetrics(m metrics.Metrics)

Expand Down

0 comments on commit 559f784

Please sign in to comment.