Skip to content

Commit

Permalink
Merge pull request #13908 from ahrtr/data_corruption_3.5
Browse files Browse the repository at this point in the history
[3.5] Fix the data inconsistency issue by adding a txPostLockHook into the backend
  • Loading branch information
serathius committed Apr 8, 2022
2 parents 3ace622 + 66c7aab commit bf22ef3
Show file tree
Hide file tree
Showing 21 changed files with 216 additions and 83 deletions.
2 changes: 1 addition & 1 deletion etcdutl/etcdutl/backup_command.go
Expand Up @@ -319,7 +319,7 @@ func saveDB(lg *zap.Logger, destDB, srcDB string, idx uint64, term uint64, desir

if !v3 {
tx := be.BatchTx()
tx.Lock()
tx.LockOutsideApply()
defer tx.Unlock()
cindex.UnsafeCreateMetaBucket(tx)
cindex.UnsafeUpdateConsistentIndex(tx, idx, term)
Expand Down
4 changes: 2 additions & 2 deletions server/auth/range_perm_cache.go
Expand Up @@ -22,7 +22,7 @@ import (
"go.uber.org/zap"
)

func getMergedPerms(lg *zap.Logger, tx backend.BatchTx, userName string) *unifiedRangePermissions {
func getMergedPerms(lg *zap.Logger, tx backend.ReadTx, userName string) *unifiedRangePermissions {
user := getUser(lg, tx, userName)
if user == nil {
return nil
Expand Down Expand Up @@ -105,7 +105,7 @@ func checkKeyPoint(lg *zap.Logger, cachedPerms *unifiedRangePermissions, key []b
return false
}

func (as *authStore) isRangeOpPermitted(tx backend.BatchTx, userName string, key, rangeEnd []byte, permtyp authpb.Permission_Type) bool {
func (as *authStore) isRangeOpPermitted(tx backend.ReadTx, userName string, key, rangeEnd []byte, permtyp authpb.Permission_Type) bool {
// assumption: tx is Lock()ed
_, ok := as.rangePermCache[userName]
if !ok {
Expand Down
54 changes: 27 additions & 27 deletions server/auth/store.go
Expand Up @@ -223,7 +223,7 @@ func (as *authStore) AuthEnable() error {
}
b := as.be
tx := b.BatchTx()
tx.Lock()
tx.LockInsideApply()
defer func() {
tx.Unlock()
b.ForceCommit()
Expand Down Expand Up @@ -259,7 +259,7 @@ func (as *authStore) AuthDisable() {
}
b := as.be
tx := b.BatchTx()
tx.Lock()
tx.LockInsideApply()
tx.UnsafePut(buckets.Auth, enableFlagKey, authDisabled)
as.commitRevision(tx)
tx.Unlock()
Expand Down Expand Up @@ -287,7 +287,7 @@ func (as *authStore) Authenticate(ctx context.Context, username, password string
}

tx := as.be.BatchTx()
tx.Lock()
tx.LockInsideApply()
defer tx.Unlock()

user := getUser(as.lg, tx, username)
Expand Down Expand Up @@ -324,7 +324,7 @@ func (as *authStore) CheckPassword(username, password string) (uint64, error) {
// CompareHashAndPassword is very expensive, so we use closures
// to avoid putting it in the critical section of the tx lock.
revision, err := func() (uint64, error) {
tx := as.be.BatchTx()
tx := as.be.ReadTx()
tx.Lock()
defer tx.Unlock()

Expand Down Expand Up @@ -353,7 +353,7 @@ func (as *authStore) CheckPassword(username, password string) (uint64, error) {
func (as *authStore) Recover(be backend.Backend) {
enabled := false
as.be = be
tx := be.BatchTx()
tx := be.ReadTx()
tx.Lock()
_, vs := tx.UnsafeRange(buckets.Auth, enableFlagKey, nil, 0)
if len(vs) == 1 {
Expand Down Expand Up @@ -385,7 +385,7 @@ func (as *authStore) UserAdd(r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse,
}

tx := as.be.BatchTx()
tx.Lock()
tx.LockInsideApply()
defer tx.Unlock()

user := getUser(as.lg, tx, r.Name)
Expand Down Expand Up @@ -431,7 +431,7 @@ func (as *authStore) UserDelete(r *pb.AuthUserDeleteRequest) (*pb.AuthUserDelete
}

tx := as.be.BatchTx()
tx.Lock()
tx.LockInsideApply()
defer tx.Unlock()

user := getUser(as.lg, tx, r.Name)
Expand All @@ -456,7 +456,7 @@ func (as *authStore) UserDelete(r *pb.AuthUserDeleteRequest) (*pb.AuthUserDelete

func (as *authStore) UserChangePassword(r *pb.AuthUserChangePasswordRequest) (*pb.AuthUserChangePasswordResponse, error) {
tx := as.be.BatchTx()
tx.Lock()
tx.LockInsideApply()
defer tx.Unlock()

user := getUser(as.lg, tx, r.Name)
Expand Down Expand Up @@ -498,7 +498,7 @@ func (as *authStore) UserChangePassword(r *pb.AuthUserChangePasswordRequest) (*p

func (as *authStore) UserGrantRole(r *pb.AuthUserGrantRoleRequest) (*pb.AuthUserGrantRoleResponse, error) {
tx := as.be.BatchTx()
tx.Lock()
tx.LockInsideApply()
defer tx.Unlock()

user := getUser(as.lg, tx, r.User)
Expand Down Expand Up @@ -544,7 +544,7 @@ func (as *authStore) UserGrantRole(r *pb.AuthUserGrantRoleRequest) (*pb.AuthUser

func (as *authStore) UserGet(r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, error) {
tx := as.be.BatchTx()
tx.Lock()
tx.LockInsideApply()
user := getUser(as.lg, tx, r.Name)
tx.Unlock()

Expand All @@ -559,7 +559,7 @@ func (as *authStore) UserGet(r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse,

func (as *authStore) UserList(r *pb.AuthUserListRequest) (*pb.AuthUserListResponse, error) {
tx := as.be.BatchTx()
tx.Lock()
tx.LockInsideApply()
users := getAllUsers(as.lg, tx)
tx.Unlock()

Expand All @@ -581,7 +581,7 @@ func (as *authStore) UserRevokeRole(r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUs
}

tx := as.be.BatchTx()
tx.Lock()
tx.LockInsideApply()
defer tx.Unlock()

user := getUser(as.lg, tx, r.Name)
Expand Down Expand Up @@ -623,7 +623,7 @@ func (as *authStore) UserRevokeRole(r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUs

func (as *authStore) RoleGet(r *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse, error) {
tx := as.be.BatchTx()
tx.Lock()
tx.LockInsideApply()
defer tx.Unlock()

var resp pb.AuthRoleGetResponse
Expand All @@ -638,7 +638,7 @@ func (as *authStore) RoleGet(r *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse,

func (as *authStore) RoleList(r *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, error) {
tx := as.be.BatchTx()
tx.Lock()
tx.LockInsideApply()
roles := getAllRoles(as.lg, tx)
tx.Unlock()

Expand All @@ -651,7 +651,7 @@ func (as *authStore) RoleList(r *pb.AuthRoleListRequest) (*pb.AuthRoleListRespon

func (as *authStore) RoleRevokePermission(r *pb.AuthRoleRevokePermissionRequest) (*pb.AuthRoleRevokePermissionResponse, error) {
tx := as.be.BatchTx()
tx.Lock()
tx.LockInsideApply()
defer tx.Unlock()

role := getRole(as.lg, tx, r.Role)
Expand Down Expand Up @@ -697,7 +697,7 @@ func (as *authStore) RoleDelete(r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDelete
}

tx := as.be.BatchTx()
tx.Lock()
tx.LockInsideApply()
defer tx.Unlock()

role := getRole(as.lg, tx, r.Role)
Expand Down Expand Up @@ -742,7 +742,7 @@ func (as *authStore) RoleAdd(r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse,
}

tx := as.be.BatchTx()
tx.Lock()
tx.LockInsideApply()
defer tx.Unlock()

role := getRole(as.lg, tx, r.Name)
Expand Down Expand Up @@ -786,7 +786,7 @@ func (as *authStore) RoleGrantPermission(r *pb.AuthRoleGrantPermissionRequest) (
}

tx := as.be.BatchTx()
tx.Lock()
tx.LockInsideApply()
defer tx.Unlock()

role := getRole(as.lg, tx, r.Name)
Expand Down Expand Up @@ -849,7 +849,7 @@ func (as *authStore) isOpPermitted(userName string, revision uint64, key, rangeE
return ErrAuthOldRevision
}

tx := as.be.BatchTx()
tx := as.be.ReadTx()
tx.Lock()
defer tx.Unlock()

Expand Down Expand Up @@ -891,7 +891,7 @@ func (as *authStore) IsAdminPermitted(authInfo *AuthInfo) error {
return ErrUserEmpty
}

tx := as.be.BatchTx()
tx := as.be.ReadTx()
tx.Lock()
u := getUser(as.lg, tx, authInfo.Username)
tx.Unlock()
Expand All @@ -907,7 +907,7 @@ func (as *authStore) IsAdminPermitted(authInfo *AuthInfo) error {
return nil
}

func getUser(lg *zap.Logger, tx backend.BatchTx, username string) *authpb.User {
func getUser(lg *zap.Logger, tx backend.ReadTx, username string) *authpb.User {
_, vs := tx.UnsafeRange(buckets.AuthUsers, []byte(username), nil, 0)
if len(vs) == 0 {
return nil
Expand All @@ -925,7 +925,7 @@ func getUser(lg *zap.Logger, tx backend.BatchTx, username string) *authpb.User {
return user
}

func getAllUsers(lg *zap.Logger, tx backend.BatchTx) []*authpb.User {
func getAllUsers(lg *zap.Logger, tx backend.ReadTx) []*authpb.User {
_, vs := tx.UnsafeRange(buckets.AuthUsers, []byte{0}, []byte{0xff}, -1)
if len(vs) == 0 {
return nil
Expand Down Expand Up @@ -955,7 +955,7 @@ func delUser(tx backend.BatchTx, username string) {
tx.UnsafeDelete(buckets.AuthUsers, []byte(username))
}

func getRole(lg *zap.Logger, tx backend.BatchTx, rolename string) *authpb.Role {
func getRole(lg *zap.Logger, tx backend.ReadTx, rolename string) *authpb.Role {
_, vs := tx.UnsafeRange(buckets.AuthRoles, []byte(rolename), nil, 0)
if len(vs) == 0 {
return nil
Expand All @@ -969,7 +969,7 @@ func getRole(lg *zap.Logger, tx backend.BatchTx, rolename string) *authpb.Role {
return role
}

func getAllRoles(lg *zap.Logger, tx backend.BatchTx) []*authpb.Role {
func getAllRoles(lg *zap.Logger, tx backend.ReadTx) []*authpb.Role {
_, vs := tx.UnsafeRange(buckets.AuthRoles, []byte{0}, []byte{0xff}, -1)
if len(vs) == 0 {
return nil
Expand Down Expand Up @@ -1028,7 +1028,7 @@ func NewAuthStore(lg *zap.Logger, be backend.Backend, tp TokenProvider, bcryptCo
}

tx := be.BatchTx()
tx.Lock()
tx.LockOutsideApply()

tx.UnsafeCreateBucket(buckets.Auth)
tx.UnsafeCreateBucket(buckets.AuthUsers)
Expand Down Expand Up @@ -1081,7 +1081,7 @@ func (as *authStore) commitRevision(tx backend.BatchTx) {
tx.UnsafePut(buckets.Auth, revisionKey, revBytes)
}

func getRevision(tx backend.BatchTx) uint64 {
func getRevision(tx backend.ReadTx) uint64 {
_, vs := tx.UnsafeRange(buckets.Auth, revisionKey, nil, 0)
if len(vs) != 1 {
// this can happen in the initialization phase
Expand Down Expand Up @@ -1281,7 +1281,7 @@ func (as *authStore) WithRoot(ctx context.Context) context.Context {

func (as *authStore) HasRole(user, role string) bool {
tx := as.be.BatchTx()
tx.Lock()
tx.LockInsideApply()
u := getUser(as.lg, tx, user)
tx.Unlock()

Expand Down
14 changes: 7 additions & 7 deletions server/etcdserver/api/membership/store.go
Expand Up @@ -52,7 +52,7 @@ func unsafeSaveMemberToBackend(lg *zap.Logger, be backend.Backend, m *Member) er
}

tx := be.BatchTx()
tx.Lock()
tx.LockInsideApply()
defer tx.Unlock()
if unsafeMemberExists(tx, mkey) {
return errMemberAlreadyExist
Expand All @@ -65,7 +65,7 @@ func unsafeSaveMemberToBackend(lg *zap.Logger, be backend.Backend, m *Member) er
// from the v3 backend.
func TrimClusterFromBackend(be backend.Backend) error {
tx := be.BatchTx()
tx.Lock()
tx.LockOutsideApply()
defer tx.Unlock()
tx.UnsafeDeleteBucket(buckets.Cluster)
return nil
Expand All @@ -75,7 +75,7 @@ func unsafeDeleteMemberFromBackend(be backend.Backend, id types.ID) error {
mkey := backendMemberKey(id)

tx := be.BatchTx()
tx.Lock()
tx.LockInsideApply()
defer tx.Unlock()
tx.UnsafePut(buckets.MembersRemoved, mkey, []byte("removed"))
if !unsafeMemberExists(tx, mkey) {
Expand Down Expand Up @@ -140,7 +140,7 @@ func mustReadMembersFromBackend(lg *zap.Logger, be backend.Backend) (map[types.I
func TrimMembershipFromBackend(lg *zap.Logger, be backend.Backend) error {
lg.Info("Trimming membership information from the backend...")
tx := be.BatchTx()
tx.Lock()
tx.LockOutsideApply()
defer tx.Unlock()
err := tx.UnsafeForEach(buckets.Members, func(k, v []byte) error {
tx.UnsafeDelete(buckets.Members, k)
Expand Down Expand Up @@ -185,7 +185,7 @@ func mustSaveClusterVersionToBackend(be backend.Backend, ver *semver.Version) {
ckey := backendClusterVersionKey()

tx := be.BatchTx()
tx.Lock()
tx.LockInsideApply()
defer tx.Unlock()
tx.UnsafePut(buckets.Cluster, ckey, []byte(ver.String()))
}
Expand All @@ -198,7 +198,7 @@ func mustSaveDowngradeToBackend(lg *zap.Logger, be backend.Backend, downgrade *D
lg.Panic("failed to marshal downgrade information", zap.Error(err))
}
tx := be.BatchTx()
tx.Lock()
tx.LockInsideApply()
defer tx.Unlock()
tx.UnsafePut(buckets.Cluster, dkey, dvalue)
}
Expand Down Expand Up @@ -316,7 +316,7 @@ func backendDowngradeKey() []byte {

func mustCreateBackendBuckets(be backend.Backend) {
tx := be.BatchTx()
tx.Lock()
tx.LockOutsideApply()
defer tx.Unlock()
tx.UnsafeCreateBucket(buckets.Members)
tx.UnsafeCreateBucket(buckets.MembersRemoved)
Expand Down
6 changes: 3 additions & 3 deletions server/etcdserver/api/v3alarm/alarms.go
Expand Up @@ -65,7 +65,7 @@ func (a *AlarmStore) Activate(id types.ID, at pb.AlarmType) *pb.AlarmMember {
}

b := a.bg.Backend()
b.BatchTx().Lock()
b.BatchTx().LockInsideApply()
b.BatchTx().UnsafePut(buckets.Alarm, v, nil)
b.BatchTx().Unlock()

Expand Down Expand Up @@ -94,7 +94,7 @@ func (a *AlarmStore) Deactivate(id types.ID, at pb.AlarmType) *pb.AlarmMember {
}

b := a.bg.Backend()
b.BatchTx().Lock()
b.BatchTx().LockInsideApply()
b.BatchTx().UnsafeDelete(buckets.Alarm, v)
b.BatchTx().Unlock()

Expand Down Expand Up @@ -122,7 +122,7 @@ func (a *AlarmStore) restore() error {
b := a.bg.Backend()
tx := b.BatchTx()

tx.Lock()
tx.LockOutsideApply()
tx.UnsafeCreateBucket(buckets.Alarm)
err := tx.UnsafeForEach(buckets.Alarm, func(k, v []byte) error {
var m pb.AlarmMember
Expand Down
2 changes: 1 addition & 1 deletion server/etcdserver/backend.go
Expand Up @@ -99,7 +99,7 @@ func openBackend(cfg config.ServerConfig, hooks backend.Hooks) backend.Backend {
func recoverSnapshotBackend(cfg config.ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot, beExist bool, hooks backend.Hooks) (backend.Backend, error) {
consistentIndex := uint64(0)
if beExist {
consistentIndex, _ = cindex.ReadConsistentIndex(oldbe.BatchTx())
consistentIndex, _ = cindex.ReadConsistentIndex(oldbe.ReadTx())
}
if snapshot.Metadata.Index <= consistentIndex {
return oldbe, nil
Expand Down

0 comments on commit bf22ef3

Please sign in to comment.