From 66c7aab4d3ac065468a10bb368aa363ca5fce57a Mon Sep 17 00:00:00 2001 From: ahrtr Date: Fri, 8 Apr 2022 14:39:23 +0800 Subject: [PATCH] fix the data inconsistency issue by adding a txPostLockHook into the backend Previously the SetConsistentIndex() is called during the apply workflow, but it's outside the db transaction. If a commit happens between SetConsistentIndex and the following apply workflow, and etcd crashes for whatever reason right after the commit, then etcd commits an incomplete transaction to db. Eventually etcd runs into the data inconsistency issue. In this commit, we move the SetConsistentIndex into a txPostLockHook, so it will be executed inside the transaction lock. --- etcdutl/etcdutl/backup_command.go | 2 +- server/auth/range_perm_cache.go | 4 +- server/auth/store.go | 54 +++++++++++------------ server/etcdserver/api/membership/store.go | 14 +++--- server/etcdserver/api/v3alarm/alarms.go | 6 +-- server/etcdserver/backend.go | 2 +- server/etcdserver/cindex/cindex.go | 54 +++++++++++++++++++++-- server/etcdserver/server.go | 34 +++++++++++++- server/etcdserver/server_test.go | 7 +-- server/lease/lessor.go | 4 +- server/mvcc/backend/backend.go | 16 ++++++- server/mvcc/backend/batch_tx.go | 28 +++++++++--- server/mvcc/backend/hooks_test.go | 2 - server/mvcc/backend/verify.go | 14 ++++++ server/mvcc/backend/verify_test.go | 34 +++++++++++--- server/mvcc/kvstore.go | 6 +-- server/mvcc/kvstore_compaction.go | 2 +- server/mvcc/kvstore_test.go | 9 ++-- server/mvcc/kvstore_txn.go | 2 +- server/mvcc/util.go | 2 +- server/verify/verify.go | 3 +- 21 files changed, 216 insertions(+), 83 deletions(-) diff --git a/etcdutl/etcdutl/backup_command.go b/etcdutl/etcdutl/backup_command.go index 6bebaf920a1..6cd243f3811 100644 --- a/etcdutl/etcdutl/backup_command.go +++ b/etcdutl/etcdutl/backup_command.go @@ -319,7 +319,7 @@ func saveDB(lg *zap.Logger, destDB, srcDB string, idx uint64, term uint64, desir if !v3 { tx := be.BatchTx() - tx.Lock() + tx.LockOutsideApply() defer tx.Unlock() cindex.UnsafeCreateMetaBucket(tx) cindex.UnsafeUpdateConsistentIndex(tx, idx, term) diff --git a/server/auth/range_perm_cache.go b/server/auth/range_perm_cache.go index 7d77b16ea1a..54be7d25f07 100644 --- a/server/auth/range_perm_cache.go +++ b/server/auth/range_perm_cache.go @@ -22,7 +22,7 @@ import ( "go.uber.org/zap" ) -func getMergedPerms(lg *zap.Logger, tx backend.BatchTx, userName string) *unifiedRangePermissions { +func getMergedPerms(lg *zap.Logger, tx backend.ReadTx, userName string) *unifiedRangePermissions { user := getUser(lg, tx, userName) if user == nil { return nil @@ -105,7 +105,7 @@ func checkKeyPoint(lg *zap.Logger, cachedPerms *unifiedRangePermissions, key []b return false } -func (as *authStore) isRangeOpPermitted(tx backend.BatchTx, userName string, key, rangeEnd []byte, permtyp authpb.Permission_Type) bool { +func (as *authStore) isRangeOpPermitted(tx backend.ReadTx, userName string, key, rangeEnd []byte, permtyp authpb.Permission_Type) bool { // assumption: tx is Lock()ed _, ok := as.rangePermCache[userName] if !ok { diff --git a/server/auth/store.go b/server/auth/store.go index 44c1d35fda0..09d9cdc671b 100644 --- a/server/auth/store.go +++ b/server/auth/store.go @@ -223,7 +223,7 @@ func (as *authStore) AuthEnable() error { } b := as.be tx := b.BatchTx() - tx.Lock() + tx.LockInsideApply() defer func() { tx.Unlock() b.ForceCommit() @@ -259,7 +259,7 @@ func (as *authStore) AuthDisable() { } b := as.be tx := b.BatchTx() - tx.Lock() + tx.LockInsideApply() tx.UnsafePut(buckets.Auth, enableFlagKey, authDisabled) as.commitRevision(tx) tx.Unlock() @@ -287,7 +287,7 @@ func (as *authStore) Authenticate(ctx context.Context, username, password string } tx := as.be.BatchTx() - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() user := getUser(as.lg, tx, username) @@ -324,7 +324,7 @@ func (as *authStore) CheckPassword(username, password string) (uint64, error) { // CompareHashAndPassword is very expensive, so we use closures // to avoid putting it in the critical section of the tx lock. revision, err := func() (uint64, error) { - tx := as.be.BatchTx() + tx := as.be.ReadTx() tx.Lock() defer tx.Unlock() @@ -353,7 +353,7 @@ func (as *authStore) CheckPassword(username, password string) (uint64, error) { func (as *authStore) Recover(be backend.Backend) { enabled := false as.be = be - tx := be.BatchTx() + tx := be.ReadTx() tx.Lock() _, vs := tx.UnsafeRange(buckets.Auth, enableFlagKey, nil, 0) if len(vs) == 1 { @@ -385,7 +385,7 @@ func (as *authStore) UserAdd(r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse, } tx := as.be.BatchTx() - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() user := getUser(as.lg, tx, r.Name) @@ -431,7 +431,7 @@ func (as *authStore) UserDelete(r *pb.AuthUserDeleteRequest) (*pb.AuthUserDelete } tx := as.be.BatchTx() - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() user := getUser(as.lg, tx, r.Name) @@ -456,7 +456,7 @@ func (as *authStore) UserDelete(r *pb.AuthUserDeleteRequest) (*pb.AuthUserDelete func (as *authStore) UserChangePassword(r *pb.AuthUserChangePasswordRequest) (*pb.AuthUserChangePasswordResponse, error) { tx := as.be.BatchTx() - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() user := getUser(as.lg, tx, r.Name) @@ -498,7 +498,7 @@ func (as *authStore) UserChangePassword(r *pb.AuthUserChangePasswordRequest) (*p func (as *authStore) UserGrantRole(r *pb.AuthUserGrantRoleRequest) (*pb.AuthUserGrantRoleResponse, error) { tx := as.be.BatchTx() - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() user := getUser(as.lg, tx, r.User) @@ -544,7 +544,7 @@ func (as *authStore) UserGrantRole(r *pb.AuthUserGrantRoleRequest) (*pb.AuthUser func (as *authStore) UserGet(r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, error) { tx := as.be.BatchTx() - tx.Lock() + tx.LockInsideApply() user := getUser(as.lg, tx, r.Name) tx.Unlock() @@ -559,7 +559,7 @@ func (as *authStore) UserGet(r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, func (as *authStore) UserList(r *pb.AuthUserListRequest) (*pb.AuthUserListResponse, error) { tx := as.be.BatchTx() - tx.Lock() + tx.LockInsideApply() users := getAllUsers(as.lg, tx) tx.Unlock() @@ -581,7 +581,7 @@ func (as *authStore) UserRevokeRole(r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUs } tx := as.be.BatchTx() - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() user := getUser(as.lg, tx, r.Name) @@ -623,7 +623,7 @@ func (as *authStore) UserRevokeRole(r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUs func (as *authStore) RoleGet(r *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse, error) { tx := as.be.BatchTx() - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() var resp pb.AuthRoleGetResponse @@ -638,7 +638,7 @@ func (as *authStore) RoleGet(r *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse, func (as *authStore) RoleList(r *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, error) { tx := as.be.BatchTx() - tx.Lock() + tx.LockInsideApply() roles := getAllRoles(as.lg, tx) tx.Unlock() @@ -651,7 +651,7 @@ func (as *authStore) RoleList(r *pb.AuthRoleListRequest) (*pb.AuthRoleListRespon func (as *authStore) RoleRevokePermission(r *pb.AuthRoleRevokePermissionRequest) (*pb.AuthRoleRevokePermissionResponse, error) { tx := as.be.BatchTx() - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() role := getRole(as.lg, tx, r.Role) @@ -697,7 +697,7 @@ func (as *authStore) RoleDelete(r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDelete } tx := as.be.BatchTx() - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() role := getRole(as.lg, tx, r.Role) @@ -742,7 +742,7 @@ func (as *authStore) RoleAdd(r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse, } tx := as.be.BatchTx() - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() role := getRole(as.lg, tx, r.Name) @@ -786,7 +786,7 @@ func (as *authStore) RoleGrantPermission(r *pb.AuthRoleGrantPermissionRequest) ( } tx := as.be.BatchTx() - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() role := getRole(as.lg, tx, r.Name) @@ -849,7 +849,7 @@ func (as *authStore) isOpPermitted(userName string, revision uint64, key, rangeE return ErrAuthOldRevision } - tx := as.be.BatchTx() + tx := as.be.ReadTx() tx.Lock() defer tx.Unlock() @@ -891,7 +891,7 @@ func (as *authStore) IsAdminPermitted(authInfo *AuthInfo) error { return ErrUserEmpty } - tx := as.be.BatchTx() + tx := as.be.ReadTx() tx.Lock() u := getUser(as.lg, tx, authInfo.Username) tx.Unlock() @@ -907,7 +907,7 @@ func (as *authStore) IsAdminPermitted(authInfo *AuthInfo) error { return nil } -func getUser(lg *zap.Logger, tx backend.BatchTx, username string) *authpb.User { +func getUser(lg *zap.Logger, tx backend.ReadTx, username string) *authpb.User { _, vs := tx.UnsafeRange(buckets.AuthUsers, []byte(username), nil, 0) if len(vs) == 0 { return nil @@ -925,7 +925,7 @@ func getUser(lg *zap.Logger, tx backend.BatchTx, username string) *authpb.User { return user } -func getAllUsers(lg *zap.Logger, tx backend.BatchTx) []*authpb.User { +func getAllUsers(lg *zap.Logger, tx backend.ReadTx) []*authpb.User { _, vs := tx.UnsafeRange(buckets.AuthUsers, []byte{0}, []byte{0xff}, -1) if len(vs) == 0 { return nil @@ -955,7 +955,7 @@ func delUser(tx backend.BatchTx, username string) { tx.UnsafeDelete(buckets.AuthUsers, []byte(username)) } -func getRole(lg *zap.Logger, tx backend.BatchTx, rolename string) *authpb.Role { +func getRole(lg *zap.Logger, tx backend.ReadTx, rolename string) *authpb.Role { _, vs := tx.UnsafeRange(buckets.AuthRoles, []byte(rolename), nil, 0) if len(vs) == 0 { return nil @@ -969,7 +969,7 @@ func getRole(lg *zap.Logger, tx backend.BatchTx, rolename string) *authpb.Role { return role } -func getAllRoles(lg *zap.Logger, tx backend.BatchTx) []*authpb.Role { +func getAllRoles(lg *zap.Logger, tx backend.ReadTx) []*authpb.Role { _, vs := tx.UnsafeRange(buckets.AuthRoles, []byte{0}, []byte{0xff}, -1) if len(vs) == 0 { return nil @@ -1028,7 +1028,7 @@ func NewAuthStore(lg *zap.Logger, be backend.Backend, tp TokenProvider, bcryptCo } tx := be.BatchTx() - tx.Lock() + tx.LockOutsideApply() tx.UnsafeCreateBucket(buckets.Auth) tx.UnsafeCreateBucket(buckets.AuthUsers) @@ -1081,7 +1081,7 @@ func (as *authStore) commitRevision(tx backend.BatchTx) { tx.UnsafePut(buckets.Auth, revisionKey, revBytes) } -func getRevision(tx backend.BatchTx) uint64 { +func getRevision(tx backend.ReadTx) uint64 { _, vs := tx.UnsafeRange(buckets.Auth, revisionKey, nil, 0) if len(vs) != 1 { // this can happen in the initialization phase @@ -1281,7 +1281,7 @@ func (as *authStore) WithRoot(ctx context.Context) context.Context { func (as *authStore) HasRole(user, role string) bool { tx := as.be.BatchTx() - tx.Lock() + tx.LockInsideApply() u := getUser(as.lg, tx, user) tx.Unlock() diff --git a/server/etcdserver/api/membership/store.go b/server/etcdserver/api/membership/store.go index a0cdf370afd..fadc8182233 100644 --- a/server/etcdserver/api/membership/store.go +++ b/server/etcdserver/api/membership/store.go @@ -52,7 +52,7 @@ func unsafeSaveMemberToBackend(lg *zap.Logger, be backend.Backend, m *Member) er } tx := be.BatchTx() - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() if unsafeMemberExists(tx, mkey) { return errMemberAlreadyExist @@ -65,7 +65,7 @@ func unsafeSaveMemberToBackend(lg *zap.Logger, be backend.Backend, m *Member) er // from the v3 backend. func TrimClusterFromBackend(be backend.Backend) error { tx := be.BatchTx() - tx.Lock() + tx.LockOutsideApply() defer tx.Unlock() tx.UnsafeDeleteBucket(buckets.Cluster) return nil @@ -75,7 +75,7 @@ func unsafeDeleteMemberFromBackend(be backend.Backend, id types.ID) error { mkey := backendMemberKey(id) tx := be.BatchTx() - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() tx.UnsafePut(buckets.MembersRemoved, mkey, []byte("removed")) if !unsafeMemberExists(tx, mkey) { @@ -140,7 +140,7 @@ func mustReadMembersFromBackend(lg *zap.Logger, be backend.Backend) (map[types.I func TrimMembershipFromBackend(lg *zap.Logger, be backend.Backend) error { lg.Info("Trimming membership information from the backend...") tx := be.BatchTx() - tx.Lock() + tx.LockOutsideApply() defer tx.Unlock() err := tx.UnsafeForEach(buckets.Members, func(k, v []byte) error { tx.UnsafeDelete(buckets.Members, k) @@ -185,7 +185,7 @@ func mustSaveClusterVersionToBackend(be backend.Backend, ver *semver.Version) { ckey := backendClusterVersionKey() tx := be.BatchTx() - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() tx.UnsafePut(buckets.Cluster, ckey, []byte(ver.String())) } @@ -198,7 +198,7 @@ func mustSaveDowngradeToBackend(lg *zap.Logger, be backend.Backend, downgrade *D lg.Panic("failed to marshal downgrade information", zap.Error(err)) } tx := be.BatchTx() - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() tx.UnsafePut(buckets.Cluster, dkey, dvalue) } @@ -316,7 +316,7 @@ func backendDowngradeKey() []byte { func mustCreateBackendBuckets(be backend.Backend) { tx := be.BatchTx() - tx.Lock() + tx.LockOutsideApply() defer tx.Unlock() tx.UnsafeCreateBucket(buckets.Members) tx.UnsafeCreateBucket(buckets.MembersRemoved) diff --git a/server/etcdserver/api/v3alarm/alarms.go b/server/etcdserver/api/v3alarm/alarms.go index 3038813cc84..5d4a641cea0 100644 --- a/server/etcdserver/api/v3alarm/alarms.go +++ b/server/etcdserver/api/v3alarm/alarms.go @@ -65,7 +65,7 @@ func (a *AlarmStore) Activate(id types.ID, at pb.AlarmType) *pb.AlarmMember { } b := a.bg.Backend() - b.BatchTx().Lock() + b.BatchTx().LockInsideApply() b.BatchTx().UnsafePut(buckets.Alarm, v, nil) b.BatchTx().Unlock() @@ -94,7 +94,7 @@ func (a *AlarmStore) Deactivate(id types.ID, at pb.AlarmType) *pb.AlarmMember { } b := a.bg.Backend() - b.BatchTx().Lock() + b.BatchTx().LockInsideApply() b.BatchTx().UnsafeDelete(buckets.Alarm, v) b.BatchTx().Unlock() @@ -122,7 +122,7 @@ func (a *AlarmStore) restore() error { b := a.bg.Backend() tx := b.BatchTx() - tx.Lock() + tx.LockOutsideApply() tx.UnsafeCreateBucket(buckets.Alarm) err := tx.UnsafeForEach(buckets.Alarm, func(k, v []byte) error { var m pb.AlarmMember diff --git a/server/etcdserver/backend.go b/server/etcdserver/backend.go index 081be2b5259..2beef576346 100644 --- a/server/etcdserver/backend.go +++ b/server/etcdserver/backend.go @@ -99,7 +99,7 @@ func openBackend(cfg config.ServerConfig, hooks backend.Hooks) backend.Backend { func recoverSnapshotBackend(cfg config.ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot, beExist bool, hooks backend.Hooks) (backend.Backend, error) { consistentIndex := uint64(0) if beExist { - consistentIndex, _ = cindex.ReadConsistentIndex(oldbe.BatchTx()) + consistentIndex, _ = cindex.ReadConsistentIndex(oldbe.ReadTx()) } if snapshot.Metadata.Index <= consistentIndex { return oldbe, nil diff --git a/server/etcdserver/cindex/cindex.go b/server/etcdserver/cindex/cindex.go index 4978124baa5..ac6ae63742f 100644 --- a/server/etcdserver/cindex/cindex.go +++ b/server/etcdserver/cindex/cindex.go @@ -34,9 +34,18 @@ type ConsistentIndexer interface { // ConsistentIndex returns the consistent index of current executing entry. ConsistentIndex() uint64 + // ConsistentApplyingIndex returns the consistent applying index of current executing entry. + ConsistentApplyingIndex() (uint64, uint64) + + // UnsafeConsistentIndex is similar to ConsistentIndex, but it doesn't lock the transaction. + UnsafeConsistentIndex() uint64 + // SetConsistentIndex set the consistent index of current executing entry. SetConsistentIndex(v uint64, term uint64) + // SetConsistentApplyingIndex set the consistent applying index of current executing entry. + SetConsistentApplyingIndex(v uint64, term uint64) + // UnsafeSave must be called holding the lock on the tx. // It saves consistentIndex to the underlying stable storage. UnsafeSave(tx backend.BatchTx) @@ -56,6 +65,12 @@ type consistentIndex struct { // The value is being persisted in the backend since v3.5. term uint64 + // applyingIndex and applyingTerm are just temporary cache of the raftpb.Entry.Index + // and raftpb.Entry.Term, and they are not ready to be persisted yet. They will be + // saved to consistentIndex and term above in the txPostLockInsideApplyHook. + applyingIndex uint64 + applyingTerm uint64 + // be is used for initial read consistentIndex be Backend // mutex is protecting be. @@ -75,7 +90,17 @@ func (ci *consistentIndex) ConsistentIndex() uint64 { ci.mutex.Lock() defer ci.mutex.Unlock() - v, term := ReadConsistentIndex(ci.be.BatchTx()) + v, term := ReadConsistentIndex(ci.be.ReadTx()) + ci.SetConsistentIndex(v, term) + return v +} + +func (ci *consistentIndex) UnsafeConsistentIndex() uint64 { + if index := atomic.LoadUint64(&ci.consistentIndex); index > 0 { + return index + } + + v, term := unsafeReadConsistentIndex(ci.be.ReadTx()) ci.SetConsistentIndex(v, term) return v } @@ -99,6 +124,15 @@ func (ci *consistentIndex) SetBackend(be Backend) { ci.SetConsistentIndex(0, 0) } +func (ci *consistentIndex) ConsistentApplyingIndex() (uint64, uint64) { + return atomic.LoadUint64(&ci.applyingIndex), atomic.LoadUint64(&ci.applyingTerm) +} + +func (ci *consistentIndex) SetConsistentApplyingIndex(v uint64, term uint64) { + atomic.StoreUint64(&ci.applyingIndex, v) + atomic.StoreUint64(&ci.applyingTerm, term) +} + func NewFakeConsistentIndex(index uint64) ConsistentIndexer { return &fakeConsistentIndex{index: index} } @@ -108,12 +142,24 @@ type fakeConsistentIndex struct { term uint64 } -func (f *fakeConsistentIndex) ConsistentIndex() uint64 { return f.index } +func (f *fakeConsistentIndex) ConsistentIndex() uint64 { + return atomic.LoadUint64(&f.index) +} +func (f *fakeConsistentIndex) ConsistentApplyingIndex() (uint64, uint64) { + return atomic.LoadUint64(&f.index), atomic.LoadUint64(&f.term) +} +func (f *fakeConsistentIndex) UnsafeConsistentIndex() uint64 { + return atomic.LoadUint64(&f.index) +} func (f *fakeConsistentIndex) SetConsistentIndex(index uint64, term uint64) { atomic.StoreUint64(&f.index, index) atomic.StoreUint64(&f.term, term) } +func (f *fakeConsistentIndex) SetConsistentApplyingIndex(index uint64, term uint64) { + atomic.StoreUint64(&f.index, index) + atomic.StoreUint64(&f.term, term) +} func (f *fakeConsistentIndex) UnsafeSave(_ backend.BatchTx) {} func (f *fakeConsistentIndex) SetBackend(_ Backend) {} @@ -125,7 +171,7 @@ func UnsafeCreateMetaBucket(tx backend.BatchTx) { // CreateMetaBucket creates the `meta` bucket (if it does not exists yet). func CreateMetaBucket(tx backend.BatchTx) { - tx.Lock() + tx.LockOutsideApply() defer tx.Unlock() tx.UnsafeCreateBucket(buckets.Meta) } @@ -174,7 +220,7 @@ func UnsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64) } func UpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64) { - tx.Lock() + tx.LockOutsideApply() defer tx.Unlock() UnsafeUpdateConsistentIndex(tx, index, term) } diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 0b7e48c9819..bba431971cb 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -661,6 +661,10 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { }) } + // Set the hook after EtcdServer finishes the initialization to avoid + // the hook being called during the initialization process. + srv.be.SetTxPostLockInsideApplyHook(srv.getTxPostLockInsideApplyHook()) + // TODO: move transport initialization near the definition of remote tr := &rafthttp.Transport{ Logger: cfg.Logger, @@ -1260,6 +1264,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { } s.consistIndex.SetBackend(newbe) + newbe.SetTxPostLockInsideApplyHook(s.getTxPostLockInsideApplyHook()) lg.Info("restored mvcc store", zap.Uint64("consistent-index", s.consistIndex.ConsistentIndex())) // Closing old backend might block until all the txns @@ -2128,7 +2133,7 @@ func (s *EtcdServer) apply( // set the consistent index of current executing entry if e.Index > s.consistIndex.ConsistentIndex() { - s.consistIndex.SetConsistentIndex(e.Index, e.Term) + s.consistIndex.SetConsistentApplyingIndex(e.Index, e.Term) shouldApplyV3 = membership.ApplyBoth } @@ -2155,10 +2160,18 @@ func (s *EtcdServer) apply( // applyEntryNormal apples an EntryNormal type raftpb request to the EtcdServer func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { shouldApplyV3 := membership.ApplyV2storeOnly + applyV3Performed := false + defer func() { + // The txPostLock callback will not get called in this case, + // so we should set the consistent index directly. + if s.consistIndex != nil && !applyV3Performed && membership.ApplyBoth == shouldApplyV3 { + s.consistIndex.SetConsistentIndex(e.Index, e.Term) + } + }() index := s.consistIndex.ConsistentIndex() if e.Index > index { // set the consistent index of current executing entry - s.consistIndex.SetConsistentIndex(e.Index, e.Term) + s.consistIndex.SetConsistentApplyingIndex(e.Index, e.Term) shouldApplyV3 = membership.ApplyBoth } s.lg.Debug("apply entry normal", @@ -2207,6 +2220,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { if !needResult && raftReq.Txn != nil { removeNeedlessRangeReqs(raftReq.Txn) } + applyV3Performed = true ar = s.applyV3.Apply(&raftReq, shouldApplyV3) } @@ -2258,6 +2272,13 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con if err := s.cluster.ValidateConfigurationChange(cc); err != nil { cc.NodeID = raft.None s.r.ApplyConfChange(cc) + + // The txPostLock callback will not get called in this case, + // so we should set the consistent index directly. + if s.consistIndex != nil && membership.ApplyBoth == shouldApplyV3 { + applyingIndex, applyingTerm := s.consistIndex.ConsistentApplyingIndex() + s.consistIndex.SetConsistentIndex(applyingIndex, applyingTerm) + } return false, err } @@ -2683,6 +2704,15 @@ func (s *EtcdServer) raftStatus() raft.Status { return s.r.Node.Status() } +func (s *EtcdServer) getTxPostLockInsideApplyHook() func() { + return func() { + applyingIdx, applyingTerm := s.consistIndex.ConsistentApplyingIndex() + if applyingIdx > s.consistIndex.UnsafeConsistentIndex() { + s.consistIndex.SetConsistentIndex(applyingIdx, applyingTerm) + } + } +} + func maybeDefragBackend(cfg config.ServerConfig, be backend.Backend) error { size := be.Size() sizeInUse := be.SizeInUse() diff --git a/server/etcdserver/server_test.go b/server/etcdserver/server_test.go index c5a61da4736..5127e5c71b5 100644 --- a/server/etcdserver/server_test.go +++ b/server/etcdserver/server_test.go @@ -686,9 +686,7 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) { _, appliedi, _ := srv.apply(ents, &raftpb.ConfState{}) consistIndex := srv.consistIndex.ConsistentIndex() - if consistIndex != appliedi { - t.Fatalf("consistIndex = %v, want %v", consistIndex, appliedi) - } + assert.Equal(t, uint64(2), appliedi) t.Run("verify-backend", func(t *testing.T) { tx := be.BatchTx() @@ -697,9 +695,8 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) { srv.beHooks.OnPreCommitUnsafe(tx) assert.Equal(t, raftpb.ConfState{Voters: []uint64{2}}, *membership.UnsafeConfStateFromBackend(lg, tx)) }) - rindex, rterm := cindex.ReadConsistentIndex(be.BatchTx()) + rindex, _ := cindex.ReadConsistentIndex(be.ReadTx()) assert.Equal(t, consistIndex, rindex) - assert.Equal(t, uint64(4), rterm) } func realisticRaftNode(lg *zap.Logger) *raftNode { diff --git a/server/lease/lessor.go b/server/lease/lessor.go index 409fa5b2a40..02ee77f5053 100644 --- a/server/lease/lessor.go +++ b/server/lease/lessor.go @@ -797,7 +797,7 @@ func (le *lessor) findDueScheduledCheckpoints(checkpointLimit int) []*pb.LeaseCh func (le *lessor) initAndRecover() { tx := le.b.BatchTx() - tx.Lock() + tx.LockOutsideApply() tx.UnsafeCreateBucket(buckets.Lease) lpbs := unsafeGetAllLeases(tx) @@ -852,7 +852,7 @@ func (l *Lease) persistTo(b backend.Backend) { panic("failed to marshal lease proto item") } - b.BatchTx().Lock() + b.BatchTx().LockInsideApply() b.BatchTx().UnsafePut(buckets.Lease, key, val) b.BatchTx().Unlock() } diff --git a/server/mvcc/backend/backend.go b/server/mvcc/backend/backend.go index 1e5f191c8d4..f156ae9948f 100644 --- a/server/mvcc/backend/backend.go +++ b/server/mvcc/backend/backend.go @@ -68,6 +68,9 @@ type Backend interface { Defrag() error ForceCommit() Close() error + + // SetTxPostLockInsideApplyHook sets a txPostLockInsideApplyHook. + SetTxPostLockInsideApplyHook(func()) } type Snapshot interface { @@ -120,6 +123,9 @@ type backend struct { hooks Hooks + // txPostLockInsideApplyHook is called each time right after locking the tx. + txPostLockInsideApplyHook func() + lg *zap.Logger } @@ -231,6 +237,14 @@ func (b *backend) BatchTx() BatchTx { return b.batchTx } +func (b *backend) SetTxPostLockInsideApplyHook(hook func()) { + // It needs to lock the batchTx, because the periodic commit + // may be accessing the txPostLockInsideApplyHook at the moment. + b.batchTx.lock() + defer b.batchTx.Unlock() + b.txPostLockInsideApplyHook = hook +} + func (b *backend) ReadTx() ReadTx { return b.readTx } // ConcurrentReadTx creates and returns a new ReadTx, which: @@ -440,7 +454,7 @@ func (b *backend) defrag() error { // TODO: make this non-blocking? // lock batchTx to ensure nobody is using previous tx, and then // close previous ongoing tx. - b.batchTx.Lock() + b.batchTx.LockOutsideApply() defer b.batchTx.Unlock() // lock database after lock tx to avoid deadlock. diff --git a/server/mvcc/backend/batch_tx.go b/server/mvcc/backend/batch_tx.go index a8e64919916..c8fa55954f6 100644 --- a/server/mvcc/backend/batch_tx.go +++ b/server/mvcc/backend/batch_tx.go @@ -65,18 +65,32 @@ type batchTx struct { pending int } +// Lock is supposed to be called only by the unit test. func (t *batchTx) Lock() { + ValidateCalledInsideUnittest(t.backend.lg) + t.lock() +} + +func (t *batchTx) lock() { t.Mutex.Lock() } func (t *batchTx) LockInsideApply() { - ValidateCalledInsideApply(t.backend.lg) - t.Lock() + t.lock() + if t.backend.txPostLockInsideApplyHook != nil { + // The callers of some methods (i.e., (*RaftCluster).AddMember) + // can be coming from both InsideApply and OutsideApply, but the + // callers from OutsideApply will have a nil txPostLockInsideApplyHook. + // So we should check the txPostLockInsideApplyHook before validating + // the callstack. + ValidateCalledInsideApply(t.backend.lg) + t.backend.txPostLockInsideApplyHook() + } } func (t *batchTx) LockOutsideApply() { ValidateCalledOutSideApply(t.backend.lg) - t.Lock() + t.lock() } func (t *batchTx) Unlock() { @@ -226,14 +240,14 @@ func unsafeForEach(tx *bolt.Tx, bucket Bucket, visitor func(k, v []byte) error) // Commit commits a previous tx and begins a new writable one. func (t *batchTx) Commit() { - t.Lock() + t.lock() t.commit(false) t.Unlock() } // CommitAndStop commits the previous tx and does not create a new one. func (t *batchTx) CommitAndStop() { - t.Lock() + t.lock() t.commit(true) t.Unlock() } @@ -303,13 +317,13 @@ func (t *batchTxBuffered) Unlock() { } func (t *batchTxBuffered) Commit() { - t.Lock() + t.lock() t.commit(false) t.Unlock() } func (t *batchTxBuffered) CommitAndStop() { - t.Lock() + t.lock() t.commit(true) t.Unlock() } diff --git a/server/mvcc/backend/hooks_test.go b/server/mvcc/backend/hooks_test.go index 03a90dd3d83..69a1e7216c1 100644 --- a/server/mvcc/backend/hooks_test.go +++ b/server/mvcc/backend/hooks_test.go @@ -40,8 +40,6 @@ func TestBackendPreCommitHook(t *testing.T) { // Empty commit. tx.Commit() - write(tx, []byte("foo"), []byte("bar")) - assert.Equal(t, ">cc", getCommitsKey(t, be), "expected 2 explict commits") tx.Commit() assert.Equal(t, ">ccc", getCommitsKey(t, be), "expected 3 explict commits") diff --git a/server/mvcc/backend/verify.go b/server/mvcc/backend/verify.go index 2f3dc0221c6..a6a0b8675ec 100644 --- a/server/mvcc/backend/verify.go +++ b/server/mvcc/backend/verify.go @@ -46,6 +46,15 @@ func ValidateCalledOutSideApply(lg *zap.Logger) { } } +func ValidateCalledInsideUnittest(lg *zap.Logger) { + if !verifyLockEnabled() { + return + } + if !insideUnittest() { + lg.Fatal("Lock called outside of unit test!", zap.Stack("stacktrace")) + } +} + func verifyLockEnabled() bool { return os.Getenv(ENV_VERIFY) == ENV_VERIFY_ALL_VALUE || os.Getenv(ENV_VERIFY) == ENV_VERIFY_LOCK } @@ -54,3 +63,8 @@ func insideApply() bool { stackTraceStr := string(debug.Stack()) return strings.Contains(stackTraceStr, ".applyEntries") } + +func insideUnittest() bool { + stackTraceStr := string(debug.Stack()) + return strings.Contains(stackTraceStr, "_test.go") && !strings.Contains(stackTraceStr, "tests/") +} diff --git a/server/mvcc/backend/verify_test.go b/server/mvcc/backend/verify_test.go index f4f4db47527..b8f0a84c566 100644 --- a/server/mvcc/backend/verify_test.go +++ b/server/mvcc/backend/verify_test.go @@ -15,7 +15,6 @@ package backend_test import ( - "fmt" "os" "testing" "time" @@ -26,40 +25,60 @@ import ( func TestLockVerify(t *testing.T) { tcs := []struct { - insideApply bool - lock func(tx backend.BatchTx) - expectPanic bool + name string + insideApply bool + lock func(tx backend.BatchTx) + txPostLockInsideApplyHook func() + expectPanic bool }{ { + name: "call lockInsideApply from inside apply", insideApply: true, lock: lockInsideApply, expectPanic: false, }, { + name: "call lockInsideApply from outside apply (without txPostLockInsideApplyHook)", insideApply: false, lock: lockInsideApply, - expectPanic: true, + expectPanic: false, + }, + { + name: "call lockInsideApply from outside apply (with txPostLockInsideApplyHook)", + insideApply: false, + lock: lockInsideApply, + txPostLockInsideApplyHook: func() {}, + expectPanic: true, }, { + name: "call lockOutsideApply from outside apply", insideApply: false, lock: lockOutsideApply, expectPanic: false, }, { + name: "call lockOutsideApply from inside apply", insideApply: true, lock: lockOutsideApply, expectPanic: true, }, + { + name: "call Lock from unit test", + insideApply: false, + lock: lockFromUT, + expectPanic: false, + }, } env := os.Getenv("ETCD_VERIFY") os.Setenv("ETCD_VERIFY", "lock") defer func() { os.Setenv("ETCD_VERIFY", env) }() - for i, tc := range tcs { - t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { be, _ := betesting.NewTmpBackend(t, time.Hour, 10000) + be.SetTxPostLockInsideApplyHook(tc.txPostLockInsideApplyHook) hasPaniced := handlePanic(func() { if tc.insideApply { @@ -89,3 +108,4 @@ func applyEntries(be backend.Backend, f func(tx backend.BatchTx)) { func lockInsideApply(tx backend.BatchTx) { tx.LockInsideApply() } func lockOutsideApply(tx backend.BatchTx) { tx.LockOutsideApply() } +func lockFromUT(tx backend.BatchTx) { tx.Lock() } diff --git a/server/mvcc/kvstore.go b/server/mvcc/kvstore.go index 54055ed0552..ae31d968c26 100644 --- a/server/mvcc/kvstore.go +++ b/server/mvcc/kvstore.go @@ -119,7 +119,7 @@ func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfi } tx := s.b.BatchTx() - tx.Lock() + tx.LockOutsideApply() tx.UnsafeCreateBucket(buckets.Key) tx.UnsafeCreateBucket(buckets.Meta) tx.Unlock() @@ -238,7 +238,7 @@ func (s *store) updateCompactRev(rev int64) (<-chan struct{}, error) { revToBytes(revision{main: rev}, rbytes) tx := s.b.BatchTx() - tx.Lock() + tx.LockInsideApply() tx.UnsafePut(buckets.Meta, scheduledCompactKeyName, rbytes) tx.Unlock() // ensure that desired compaction is persisted @@ -334,7 +334,7 @@ func (s *store) restore() error { keyToLease := make(map[string]lease.LeaseID) // restore index - tx := s.b.BatchTx() + tx := s.b.ReadTx() tx.Lock() _, finishedCompactBytes := tx.UnsafeRange(buckets.Meta, finishedCompactKeyName, nil, 0) diff --git a/server/mvcc/kvstore_compaction.go b/server/mvcc/kvstore_compaction.go index 71bd4b7369c..c63d3f4f4e8 100644 --- a/server/mvcc/kvstore_compaction.go +++ b/server/mvcc/kvstore_compaction.go @@ -39,7 +39,7 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc start := time.Now() tx := s.b.BatchTx() - tx.Lock() + tx.LockOutsideApply() keys, _ := tx.UnsafeRange(buckets.Key, last, end, int64(s.cfg.CompactionBatchLimit)) for _, key := range keys { rev = bytesToRev(key) diff --git a/server/mvcc/kvstore_test.go b/server/mvcc/kvstore_test.go index d2a9e55e89f..e6d437fb15a 100644 --- a/server/mvcc/kvstore_test.go +++ b/server/mvcc/kvstore_test.go @@ -871,6 +871,8 @@ type fakeBatchTx struct { rangeRespc chan rangeResp } +func (b *fakeBatchTx) LockInsideApply() {} +func (b *fakeBatchTx) LockOutsideApply() {} func (b *fakeBatchTx) Lock() {} func (b *fakeBatchTx) Unlock() {} func (b *fakeBatchTx) RLock() {} @@ -894,10 +896,8 @@ func (b *fakeBatchTx) UnsafeDelete(bucket backend.Bucket, key []byte) { func (b *fakeBatchTx) UnsafeForEach(bucket backend.Bucket, visitor func(k, v []byte) error) error { return nil } -func (b *fakeBatchTx) Commit() {} -func (b *fakeBatchTx) CommitAndStop() {} -func (b *fakeBatchTx) LockInsideApply() {} -func (b *fakeBatchTx) LockOutsideApply() {} +func (b *fakeBatchTx) Commit() {} +func (b *fakeBatchTx) CommitAndStop() {} type fakeBackend struct { tx *fakeBatchTx @@ -914,6 +914,7 @@ func (b *fakeBackend) Snapshot() backend.Snapshot func (b *fakeBackend) ForceCommit() {} func (b *fakeBackend) Defrag() error { return nil } func (b *fakeBackend) Close() error { return nil } +func (b *fakeBackend) SetTxPostLockInsideApplyHook(func()) {} type indexGetResp struct { rev revision diff --git a/server/mvcc/kvstore_txn.go b/server/mvcc/kvstore_txn.go index 93d7db20e06..9df7b79410f 100644 --- a/server/mvcc/kvstore_txn.go +++ b/server/mvcc/kvstore_txn.go @@ -78,7 +78,7 @@ type storeTxnWrite struct { func (s *store) Write(trace *traceutil.Trace) TxnWrite { s.mu.RLock() tx := s.b.BatchTx() - tx.Lock() + tx.LockInsideApply() tw := &storeTxnWrite{ storeTxnRead: storeTxnRead{s, tx, 0, 0, trace}, tx: tx, diff --git a/server/mvcc/util.go b/server/mvcc/util.go index 83cbf44bf84..c4c0ff2f013 100644 --- a/server/mvcc/util.go +++ b/server/mvcc/util.go @@ -31,7 +31,7 @@ func WriteKV(be backend.Backend, kv mvccpb.KeyValue) { panic(fmt.Errorf("cannot marshal event: %v", err)) } - be.BatchTx().Lock() + be.BatchTx().LockOutsideApply() be.BatchTx().UnsafePut(buckets.Key, ibytes, d) be.BatchTx().Unlock() } diff --git a/server/verify/verify.go b/server/verify/verify.go index f727201ce8a..ef613d7eb05 100644 --- a/server/verify/verify.go +++ b/server/verify/verify.go @@ -108,8 +108,7 @@ func MustVerifyIfEnabled(cfg Config) { } func validateConsistentIndex(cfg Config, hardstate *raftpb.HardState, snapshot *walpb.Snapshot, be backend.Backend) error { - tx := be.BatchTx() - index, term := cindex.ReadConsistentIndex(tx) + index, term := cindex.ReadConsistentIndex(be.ReadTx()) if cfg.ExactIndex && index != hardstate.Commit { return fmt.Errorf("backend.ConsistentIndex (%v) expected == WAL.HardState.commit (%v)", index, hardstate.Commit) }