Skip to content

Commit

Permalink
generalize schema version verification.
Browse files Browse the repository at this point in the history
Signed-off-by: Siyuan Zhang <sizhang@google.com>
  • Loading branch information
siyuanfoundation committed Apr 29, 2024
1 parent 5061e67 commit b7f6a39
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 8 deletions.
35 changes: 35 additions & 0 deletions server/storage/schema/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package schema

import (
"bytes"

"go.uber.org/zap"

"go.etcd.io/etcd/server/v3/storage/backend"
Expand All @@ -24,6 +26,8 @@ type action interface {
// unsafeDo executes the action and returns revert action, when executed
// should restore the state from before.
unsafeDo(tx backend.UnsafeReadWriter) (revert action, err error)
// unsafeHasStorageSchemaChange checks if the action is a no-op on the backend.
unsafeHasStorageSchemaChange(lg *zap.Logger, tx backend.UnsafeReader) bool
}

type setKeyAction struct {
Expand All @@ -38,6 +42,19 @@ func (a setKeyAction) unsafeDo(tx backend.UnsafeReadWriter) (action, error) {
return revert, nil
}

func (a setKeyAction) unsafeHasStorageSchemaChange(lg *zap.Logger, tx backend.UnsafeReader) bool {
_, vs := tx.UnsafeRange(a.Bucket, a.FieldName, nil, 1)
if len(vs) != 1 {
lg.Info("setKeyAction would add new field", zap.Stringer("bucket-name", a.Bucket), zap.String("field-name", string(a.FieldName)))
return true
}
if !bytes.Equal(vs[0], a.FieldValue) {
lg.Info("setKeyAction would set new value for field", zap.Stringer("bucket-name", a.Bucket), zap.String("field-name", string(a.FieldName)), zap.String("field-value", string(a.FieldValue)))
return true
}
return false
}

type deleteKeyAction struct {
Bucket backend.Bucket
FieldName []byte
Expand All @@ -49,6 +66,15 @@ func (a deleteKeyAction) unsafeDo(tx backend.UnsafeReadWriter) (action, error) {
return revert, nil
}

func (a deleteKeyAction) unsafeHasStorageSchemaChange(lg *zap.Logger, tx backend.UnsafeReader) bool {
_, vs := tx.UnsafeRange(a.Bucket, a.FieldName, nil, 1)
if len(vs) != 0 {
lg.Info("deleteKeyAction would delete field", zap.Stringer("bucket-name", a.Bucket), zap.String("field-name", string(a.FieldName)))
return true
}
return false
}

func restoreFieldValueAction(tx backend.UnsafeReader, bucket backend.Bucket, fieldName []byte) action {
_, vs := tx.UnsafeRange(bucket, fieldName, nil, 1)
if len(vs) == 1 {
Expand Down Expand Up @@ -92,3 +118,12 @@ func (as ActionList) unsafeExecuteInReversedOrder(lg *zap.Logger, tx backend.Uns
}
}
}

func (as ActionList) unsafeHasStorageSchemaChange(lg *zap.Logger, tx backend.UnsafeReader) bool {
for _, a := range as {
if a.unsafeHasStorageSchemaChange(lg, tx) {
return true
}
}
return false
}
5 changes: 5 additions & 0 deletions server/storage/schema/actions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"github.com/stretchr/testify/assert"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"

"go.etcd.io/etcd/server/v3/storage/backend"
Expand Down Expand Up @@ -151,6 +152,10 @@ func (c brokenAction) unsafeDo(tx backend.UnsafeReadWriter) (action, error) {
return nil, errBrokenAction
}

func (c brokenAction) unsafeHasStorageSchemaChange(lg *zap.Logger, tx backend.UnsafeReader) bool {
return true
}

func putKeyValues(tx backend.UnsafeWriter, bucket backend.Bucket, kvs map[string]string) {
for k, v := range kvs {
tx.UnsafePut(bucket, []byte(k), []byte(v))
Expand Down
15 changes: 15 additions & 0 deletions server/storage/schema/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,17 @@ func (p migrationPlan) unsafeExecute(lg *zap.Logger, tx backend.UnsafeReadWriter
return nil
}

func (p migrationPlan) unsafeHasStorageSchemaChange(lg *zap.Logger, tx backend.UnsafeReader) bool {
for _, s := range p {
hasSchemaChange := s.unsafeHasStorageSchemaChange(lg, tx)
if hasSchemaChange {
lg.Info("migration step is not no-op", zap.String("new-storage-version", s.target.String()))
return true
}
}
return false
}

// migrationStep represents a single migrationStep of migrating etcd storage between two minor versions.
type migrationStep struct {
target semver.Version
Expand Down Expand Up @@ -103,6 +114,10 @@ func (s migrationStep) unsafeExecute(lg *zap.Logger, tx backend.UnsafeReadWriter
return nil
}

func (s migrationStep) unsafeHasStorageSchemaChange(lg *zap.Logger, tx backend.UnsafeReader) bool {
return s.actions.unsafeHasStorageSchemaChange(lg, tx)
}

func trimToMinor(ver semver.Version) semver.Version {
return semver.Version{Major: ver.Major, Minor: ver.Minor}
}
6 changes: 6 additions & 0 deletions server/storage/schema/migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/coreos/go-semver/semver"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"

"go.etcd.io/etcd/api/v3/version"
Expand Down Expand Up @@ -219,6 +220,7 @@ type actionMock struct {
recorder *actionRecorder
name string
err error
isNoOp bool
}

func (a actionMock) unsafeDo(tx backend.UnsafeReadWriter) (action, error) {
Expand All @@ -228,3 +230,7 @@ func (a actionMock) unsafeDo(tx backend.UnsafeReadWriter) (action, error) {
name: "revert " + a.name,
}, a.err
}

func (a actionMock) unsafeHasStorageSchemaChange(lg *zap.Logger, tx backend.UnsafeReader) bool {
return a.isNoOp
}
10 changes: 10 additions & 0 deletions server/storage/schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,16 @@ func UnsafeDetectSchemaVersion(lg *zap.Logger, tx backend.UnsafeReader) (v semve
return version.V3_5, nil
}

// UnsafeHasStorageSchemaChange checks if any new fields would actually be added/deleted
// if migrating the storage from the current version to the target version.
func UnsafeHasStorageSchemaChange(lg *zap.Logger, tx backend.UnsafeReader, current semver.Version, target semver.Version) (bool, error) {
plan, err := newPlan(lg, current, target)
if err != nil {
return false, fmt.Errorf("cannot create migration plan: %v", err)
}
return plan.unsafeHasStorageSchemaChange(lg, tx), nil
}

func schemaChangesForVersion(v semver.Version, isUpgrade bool) ([]schemaChange, error) {
// changes should be taken from higher version
var higherV = v
Expand Down
20 changes: 12 additions & 8 deletions tests/framework/e2e/etcd_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/coreos/go-semver/semver"
"go.uber.org/zap"

"go.etcd.io/etcd/api/v3/version"
"go.etcd.io/etcd/client/pkg/v3/fileutil"
"go.etcd.io/etcd/pkg/v3/expect"
"go.etcd.io/etcd/pkg/v3/proxy"
Expand Down Expand Up @@ -279,13 +278,18 @@ func (ep *EtcdServerProcess) VerifySchemaVersion(lg *zap.Logger) error {
if currentEtcdVer.LessThan(ver) || ver.LessThan(*prevEtcdVer) {
return fmt.Errorf("expect backend schema version to be between [%s, %s], but got %s", prevEtcdVer.String(), currentEtcdVer.String(), ver.String())
}
// check new fields introduced in V3_6 do not exist in V3_5 data file.
// V3_6 contains all the fields in V3_5, so no need to check for V3_6 servers.
if *currentEtcdVer == version.V3_5 {
_, vs := be.BatchTx().UnsafeRange(schema.Meta, schema.MetaStorageVersionName, nil, 1)
if len(vs) != 0 {
return fmt.Errorf("expect storageVersion not exist in the meta bucket, but got %s", string(vs[0]))
}
if ep.cfg.ExecPath == BinPath.Etcd {
return nil
}
nextVer := semver.Version{Major: currentEtcdVer.Major, Minor: currentEtcdVer.Minor + 1}
lg.Info("verify no fields from next version is in the storage schema.",
zap.String("current-etcd-version", currentEtcdVer.String()), zap.String("next-etcd-version", nextVer.String()))
hasSchemaChange, err := schema.UnsafeHasStorageSchemaChange(lg, be.BatchTx(), nextVer, *currentEtcdVer)
if err != nil {
return err
}
if hasSchemaChange {
return fmt.Errorf("new fields from higher version %s detected in the db file", nextVer.String())
}
return nil
}
Expand Down

0 comments on commit b7f6a39

Please sign in to comment.