Skip to content

Commit

Permalink
fix the data inconsistency issue by adding a txPostLockHook into the …
Browse files Browse the repository at this point in the history
…backend

Previously the SetConsistentIndex() is called during the apply workflow,
but it's outside the db transaction. If a commit happens between SetConsistentIndex
and the following apply workflow, and etcd crashes for whatever reason right
after the commit, then etcd commits an incomplete transaction to db.
Eventually etcd runs into the data inconsistency issue.

In this commit, we move the SetConsistentIndex into a txPostLockHook, so
it will be executed inside the transaction lock.
  • Loading branch information
ahrtr committed Apr 8, 2022
1 parent 3ace622 commit 66c7aab
Show file tree
Hide file tree
Showing 21 changed files with 216 additions and 83 deletions.
2 changes: 1 addition & 1 deletion etcdutl/etcdutl/backup_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ func saveDB(lg *zap.Logger, destDB, srcDB string, idx uint64, term uint64, desir

if !v3 {
tx := be.BatchTx()
tx.Lock()
tx.LockOutsideApply()
defer tx.Unlock()
cindex.UnsafeCreateMetaBucket(tx)
cindex.UnsafeUpdateConsistentIndex(tx, idx, term)
Expand Down
4 changes: 2 additions & 2 deletions server/auth/range_perm_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"go.uber.org/zap"
)

func getMergedPerms(lg *zap.Logger, tx backend.BatchTx, userName string) *unifiedRangePermissions {
func getMergedPerms(lg *zap.Logger, tx backend.ReadTx, userName string) *unifiedRangePermissions {
user := getUser(lg, tx, userName)
if user == nil {
return nil
Expand Down Expand Up @@ -105,7 +105,7 @@ func checkKeyPoint(lg *zap.Logger, cachedPerms *unifiedRangePermissions, key []b
return false
}

func (as *authStore) isRangeOpPermitted(tx backend.BatchTx, userName string, key, rangeEnd []byte, permtyp authpb.Permission_Type) bool {
func (as *authStore) isRangeOpPermitted(tx backend.ReadTx, userName string, key, rangeEnd []byte, permtyp authpb.Permission_Type) bool {
// assumption: tx is Lock()ed
_, ok := as.rangePermCache[userName]
if !ok {
Expand Down
54 changes: 27 additions & 27 deletions server/auth/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func (as *authStore) AuthEnable() error {
}
b := as.be
tx := b.BatchTx()
tx.Lock()
tx.LockInsideApply()
defer func() {
tx.Unlock()
b.ForceCommit()
Expand Down Expand Up @@ -259,7 +259,7 @@ func (as *authStore) AuthDisable() {
}
b := as.be
tx := b.BatchTx()
tx.Lock()
tx.LockInsideApply()
tx.UnsafePut(buckets.Auth, enableFlagKey, authDisabled)
as.commitRevision(tx)
tx.Unlock()
Expand Down Expand Up @@ -287,7 +287,7 @@ func (as *authStore) Authenticate(ctx context.Context, username, password string
}

tx := as.be.BatchTx()
tx.Lock()
tx.LockInsideApply()
defer tx.Unlock()

user := getUser(as.lg, tx, username)
Expand Down Expand Up @@ -324,7 +324,7 @@ func (as *authStore) CheckPassword(username, password string) (uint64, error) {
// CompareHashAndPassword is very expensive, so we use closures
// to avoid putting it in the critical section of the tx lock.
revision, err := func() (uint64, error) {
tx := as.be.BatchTx()
tx := as.be.ReadTx()
tx.Lock()
defer tx.Unlock()

Expand Down Expand Up @@ -353,7 +353,7 @@ func (as *authStore) CheckPassword(username, password string) (uint64, error) {
func (as *authStore) Recover(be backend.Backend) {
enabled := false
as.be = be
tx := be.BatchTx()
tx := be.ReadTx()
tx.Lock()
_, vs := tx.UnsafeRange(buckets.Auth, enableFlagKey, nil, 0)
if len(vs) == 1 {
Expand Down Expand Up @@ -385,7 +385,7 @@ func (as *authStore) UserAdd(r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse,
}

tx := as.be.BatchTx()
tx.Lock()
tx.LockInsideApply()
defer tx.Unlock()

user := getUser(as.lg, tx, r.Name)
Expand Down Expand Up @@ -431,7 +431,7 @@ func (as *authStore) UserDelete(r *pb.AuthUserDeleteRequest) (*pb.AuthUserDelete
}

tx := as.be.BatchTx()
tx.Lock()
tx.LockInsideApply()
defer tx.Unlock()

user := getUser(as.lg, tx, r.Name)
Expand All @@ -456,7 +456,7 @@ func (as *authStore) UserDelete(r *pb.AuthUserDeleteRequest) (*pb.AuthUserDelete

func (as *authStore) UserChangePassword(r *pb.AuthUserChangePasswordRequest) (*pb.AuthUserChangePasswordResponse, error) {
tx := as.be.BatchTx()
tx.Lock()
tx.LockInsideApply()
defer tx.Unlock()

user := getUser(as.lg, tx, r.Name)
Expand Down Expand Up @@ -498,7 +498,7 @@ func (as *authStore) UserChangePassword(r *pb.AuthUserChangePasswordRequest) (*p

func (as *authStore) UserGrantRole(r *pb.AuthUserGrantRoleRequest) (*pb.AuthUserGrantRoleResponse, error) {
tx := as.be.BatchTx()
tx.Lock()
tx.LockInsideApply()
defer tx.Unlock()

user := getUser(as.lg, tx, r.User)
Expand Down Expand Up @@ -544,7 +544,7 @@ func (as *authStore) UserGrantRole(r *pb.AuthUserGrantRoleRequest) (*pb.AuthUser

func (as *authStore) UserGet(r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, error) {
tx := as.be.BatchTx()
tx.Lock()
tx.LockInsideApply()
user := getUser(as.lg, tx, r.Name)
tx.Unlock()

Expand All @@ -559,7 +559,7 @@ func (as *authStore) UserGet(r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse,

func (as *authStore) UserList(r *pb.AuthUserListRequest) (*pb.AuthUserListResponse, error) {
tx := as.be.BatchTx()
tx.Lock()
tx.LockInsideApply()
users := getAllUsers(as.lg, tx)
tx.Unlock()

Expand All @@ -581,7 +581,7 @@ func (as *authStore) UserRevokeRole(r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUs
}

tx := as.be.BatchTx()
tx.Lock()
tx.LockInsideApply()
defer tx.Unlock()

user := getUser(as.lg, tx, r.Name)
Expand Down Expand Up @@ -623,7 +623,7 @@ func (as *authStore) UserRevokeRole(r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUs

func (as *authStore) RoleGet(r *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse, error) {
tx := as.be.BatchTx()
tx.Lock()
tx.LockInsideApply()
defer tx.Unlock()

var resp pb.AuthRoleGetResponse
Expand All @@ -638,7 +638,7 @@ func (as *authStore) RoleGet(r *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse,

func (as *authStore) RoleList(r *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, error) {
tx := as.be.BatchTx()
tx.Lock()
tx.LockInsideApply()
roles := getAllRoles(as.lg, tx)
tx.Unlock()

Expand All @@ -651,7 +651,7 @@ func (as *authStore) RoleList(r *pb.AuthRoleListRequest) (*pb.AuthRoleListRespon

func (as *authStore) RoleRevokePermission(r *pb.AuthRoleRevokePermissionRequest) (*pb.AuthRoleRevokePermissionResponse, error) {
tx := as.be.BatchTx()
tx.Lock()
tx.LockInsideApply()
defer tx.Unlock()

role := getRole(as.lg, tx, r.Role)
Expand Down Expand Up @@ -697,7 +697,7 @@ func (as *authStore) RoleDelete(r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDelete
}

tx := as.be.BatchTx()
tx.Lock()
tx.LockInsideApply()
defer tx.Unlock()

role := getRole(as.lg, tx, r.Role)
Expand Down Expand Up @@ -742,7 +742,7 @@ func (as *authStore) RoleAdd(r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse,
}

tx := as.be.BatchTx()
tx.Lock()
tx.LockInsideApply()
defer tx.Unlock()

role := getRole(as.lg, tx, r.Name)
Expand Down Expand Up @@ -786,7 +786,7 @@ func (as *authStore) RoleGrantPermission(r *pb.AuthRoleGrantPermissionRequest) (
}

tx := as.be.BatchTx()
tx.Lock()
tx.LockInsideApply()
defer tx.Unlock()

role := getRole(as.lg, tx, r.Name)
Expand Down Expand Up @@ -849,7 +849,7 @@ func (as *authStore) isOpPermitted(userName string, revision uint64, key, rangeE
return ErrAuthOldRevision
}

tx := as.be.BatchTx()
tx := as.be.ReadTx()
tx.Lock()
defer tx.Unlock()

Expand Down Expand Up @@ -891,7 +891,7 @@ func (as *authStore) IsAdminPermitted(authInfo *AuthInfo) error {
return ErrUserEmpty
}

tx := as.be.BatchTx()
tx := as.be.ReadTx()
tx.Lock()
u := getUser(as.lg, tx, authInfo.Username)
tx.Unlock()
Expand All @@ -907,7 +907,7 @@ func (as *authStore) IsAdminPermitted(authInfo *AuthInfo) error {
return nil
}

func getUser(lg *zap.Logger, tx backend.BatchTx, username string) *authpb.User {
func getUser(lg *zap.Logger, tx backend.ReadTx, username string) *authpb.User {
_, vs := tx.UnsafeRange(buckets.AuthUsers, []byte(username), nil, 0)
if len(vs) == 0 {
return nil
Expand All @@ -925,7 +925,7 @@ func getUser(lg *zap.Logger, tx backend.BatchTx, username string) *authpb.User {
return user
}

func getAllUsers(lg *zap.Logger, tx backend.BatchTx) []*authpb.User {
func getAllUsers(lg *zap.Logger, tx backend.ReadTx) []*authpb.User {
_, vs := tx.UnsafeRange(buckets.AuthUsers, []byte{0}, []byte{0xff}, -1)
if len(vs) == 0 {
return nil
Expand Down Expand Up @@ -955,7 +955,7 @@ func delUser(tx backend.BatchTx, username string) {
tx.UnsafeDelete(buckets.AuthUsers, []byte(username))
}

func getRole(lg *zap.Logger, tx backend.BatchTx, rolename string) *authpb.Role {
func getRole(lg *zap.Logger, tx backend.ReadTx, rolename string) *authpb.Role {
_, vs := tx.UnsafeRange(buckets.AuthRoles, []byte(rolename), nil, 0)
if len(vs) == 0 {
return nil
Expand All @@ -969,7 +969,7 @@ func getRole(lg *zap.Logger, tx backend.BatchTx, rolename string) *authpb.Role {
return role
}

func getAllRoles(lg *zap.Logger, tx backend.BatchTx) []*authpb.Role {
func getAllRoles(lg *zap.Logger, tx backend.ReadTx) []*authpb.Role {
_, vs := tx.UnsafeRange(buckets.AuthRoles, []byte{0}, []byte{0xff}, -1)
if len(vs) == 0 {
return nil
Expand Down Expand Up @@ -1028,7 +1028,7 @@ func NewAuthStore(lg *zap.Logger, be backend.Backend, tp TokenProvider, bcryptCo
}

tx := be.BatchTx()
tx.Lock()
tx.LockOutsideApply()

tx.UnsafeCreateBucket(buckets.Auth)
tx.UnsafeCreateBucket(buckets.AuthUsers)
Expand Down Expand Up @@ -1081,7 +1081,7 @@ func (as *authStore) commitRevision(tx backend.BatchTx) {
tx.UnsafePut(buckets.Auth, revisionKey, revBytes)
}

func getRevision(tx backend.BatchTx) uint64 {
func getRevision(tx backend.ReadTx) uint64 {
_, vs := tx.UnsafeRange(buckets.Auth, revisionKey, nil, 0)
if len(vs) != 1 {
// this can happen in the initialization phase
Expand Down Expand Up @@ -1281,7 +1281,7 @@ func (as *authStore) WithRoot(ctx context.Context) context.Context {

func (as *authStore) HasRole(user, role string) bool {
tx := as.be.BatchTx()
tx.Lock()
tx.LockInsideApply()
u := getUser(as.lg, tx, user)
tx.Unlock()

Expand Down
14 changes: 7 additions & 7 deletions server/etcdserver/api/membership/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func unsafeSaveMemberToBackend(lg *zap.Logger, be backend.Backend, m *Member) er
}

tx := be.BatchTx()
tx.Lock()
tx.LockInsideApply()
defer tx.Unlock()
if unsafeMemberExists(tx, mkey) {
return errMemberAlreadyExist
Expand All @@ -65,7 +65,7 @@ func unsafeSaveMemberToBackend(lg *zap.Logger, be backend.Backend, m *Member) er
// from the v3 backend.
func TrimClusterFromBackend(be backend.Backend) error {
tx := be.BatchTx()
tx.Lock()
tx.LockOutsideApply()
defer tx.Unlock()
tx.UnsafeDeleteBucket(buckets.Cluster)
return nil
Expand All @@ -75,7 +75,7 @@ func unsafeDeleteMemberFromBackend(be backend.Backend, id types.ID) error {
mkey := backendMemberKey(id)

tx := be.BatchTx()
tx.Lock()
tx.LockInsideApply()
defer tx.Unlock()
tx.UnsafePut(buckets.MembersRemoved, mkey, []byte("removed"))
if !unsafeMemberExists(tx, mkey) {
Expand Down Expand Up @@ -140,7 +140,7 @@ func mustReadMembersFromBackend(lg *zap.Logger, be backend.Backend) (map[types.I
func TrimMembershipFromBackend(lg *zap.Logger, be backend.Backend) error {
lg.Info("Trimming membership information from the backend...")
tx := be.BatchTx()
tx.Lock()
tx.LockOutsideApply()
defer tx.Unlock()
err := tx.UnsafeForEach(buckets.Members, func(k, v []byte) error {
tx.UnsafeDelete(buckets.Members, k)
Expand Down Expand Up @@ -185,7 +185,7 @@ func mustSaveClusterVersionToBackend(be backend.Backend, ver *semver.Version) {
ckey := backendClusterVersionKey()

tx := be.BatchTx()
tx.Lock()
tx.LockInsideApply()
defer tx.Unlock()
tx.UnsafePut(buckets.Cluster, ckey, []byte(ver.String()))
}
Expand All @@ -198,7 +198,7 @@ func mustSaveDowngradeToBackend(lg *zap.Logger, be backend.Backend, downgrade *D
lg.Panic("failed to marshal downgrade information", zap.Error(err))
}
tx := be.BatchTx()
tx.Lock()
tx.LockInsideApply()
defer tx.Unlock()
tx.UnsafePut(buckets.Cluster, dkey, dvalue)
}
Expand Down Expand Up @@ -316,7 +316,7 @@ func backendDowngradeKey() []byte {

func mustCreateBackendBuckets(be backend.Backend) {
tx := be.BatchTx()
tx.Lock()
tx.LockOutsideApply()
defer tx.Unlock()
tx.UnsafeCreateBucket(buckets.Members)
tx.UnsafeCreateBucket(buckets.MembersRemoved)
Expand Down
6 changes: 3 additions & 3 deletions server/etcdserver/api/v3alarm/alarms.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (a *AlarmStore) Activate(id types.ID, at pb.AlarmType) *pb.AlarmMember {
}

b := a.bg.Backend()
b.BatchTx().Lock()
b.BatchTx().LockInsideApply()
b.BatchTx().UnsafePut(buckets.Alarm, v, nil)
b.BatchTx().Unlock()

Expand Down Expand Up @@ -94,7 +94,7 @@ func (a *AlarmStore) Deactivate(id types.ID, at pb.AlarmType) *pb.AlarmMember {
}

b := a.bg.Backend()
b.BatchTx().Lock()
b.BatchTx().LockInsideApply()
b.BatchTx().UnsafeDelete(buckets.Alarm, v)
b.BatchTx().Unlock()

Expand Down Expand Up @@ -122,7 +122,7 @@ func (a *AlarmStore) restore() error {
b := a.bg.Backend()
tx := b.BatchTx()

tx.Lock()
tx.LockOutsideApply()
tx.UnsafeCreateBucket(buckets.Alarm)
err := tx.UnsafeForEach(buckets.Alarm, func(k, v []byte) error {
var m pb.AlarmMember
Expand Down
2 changes: 1 addition & 1 deletion server/etcdserver/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func openBackend(cfg config.ServerConfig, hooks backend.Hooks) backend.Backend {
func recoverSnapshotBackend(cfg config.ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot, beExist bool, hooks backend.Hooks) (backend.Backend, error) {
consistentIndex := uint64(0)
if beExist {
consistentIndex, _ = cindex.ReadConsistentIndex(oldbe.BatchTx())
consistentIndex, _ = cindex.ReadConsistentIndex(oldbe.ReadTx())
}
if snapshot.Metadata.Index <= consistentIndex {
return oldbe, nil
Expand Down

0 comments on commit 66c7aab

Please sign in to comment.