diff --git a/etcdutl/etcdutl/backup_command.go b/etcdutl/etcdutl/backup_command.go index 6bebaf920a1..6cd243f3811 100644 --- a/etcdutl/etcdutl/backup_command.go +++ b/etcdutl/etcdutl/backup_command.go @@ -319,7 +319,7 @@ func saveDB(lg *zap.Logger, destDB, srcDB string, idx uint64, term uint64, desir if !v3 { tx := be.BatchTx() - tx.Lock() + tx.LockOutsideApply() defer tx.Unlock() cindex.UnsafeCreateMetaBucket(tx) cindex.UnsafeUpdateConsistentIndex(tx, idx, term) diff --git a/server/auth/range_perm_cache.go b/server/auth/range_perm_cache.go index 7d77b16ea1a..54be7d25f07 100644 --- a/server/auth/range_perm_cache.go +++ b/server/auth/range_perm_cache.go @@ -22,7 +22,7 @@ import ( "go.uber.org/zap" ) -func getMergedPerms(lg *zap.Logger, tx backend.BatchTx, userName string) *unifiedRangePermissions { +func getMergedPerms(lg *zap.Logger, tx backend.ReadTx, userName string) *unifiedRangePermissions { user := getUser(lg, tx, userName) if user == nil { return nil @@ -105,7 +105,7 @@ func checkKeyPoint(lg *zap.Logger, cachedPerms *unifiedRangePermissions, key []b return false } -func (as *authStore) isRangeOpPermitted(tx backend.BatchTx, userName string, key, rangeEnd []byte, permtyp authpb.Permission_Type) bool { +func (as *authStore) isRangeOpPermitted(tx backend.ReadTx, userName string, key, rangeEnd []byte, permtyp authpb.Permission_Type) bool { // assumption: tx is Lock()ed _, ok := as.rangePermCache[userName] if !ok { diff --git a/server/auth/store.go b/server/auth/store.go index 44c1d35fda0..09d9cdc671b 100644 --- a/server/auth/store.go +++ b/server/auth/store.go @@ -223,7 +223,7 @@ func (as *authStore) AuthEnable() error { } b := as.be tx := b.BatchTx() - tx.Lock() + tx.LockInsideApply() defer func() { tx.Unlock() b.ForceCommit() @@ -259,7 +259,7 @@ func (as *authStore) AuthDisable() { } b := as.be tx := b.BatchTx() - tx.Lock() + tx.LockInsideApply() tx.UnsafePut(buckets.Auth, enableFlagKey, authDisabled) as.commitRevision(tx) tx.Unlock() @@ -287,7 +287,7 @@ func (as *authStore) Authenticate(ctx context.Context, username, password string } tx := as.be.BatchTx() - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() user := getUser(as.lg, tx, username) @@ -324,7 +324,7 @@ func (as *authStore) CheckPassword(username, password string) (uint64, error) { // CompareHashAndPassword is very expensive, so we use closures // to avoid putting it in the critical section of the tx lock. revision, err := func() (uint64, error) { - tx := as.be.BatchTx() + tx := as.be.ReadTx() tx.Lock() defer tx.Unlock() @@ -353,7 +353,7 @@ func (as *authStore) CheckPassword(username, password string) (uint64, error) { func (as *authStore) Recover(be backend.Backend) { enabled := false as.be = be - tx := be.BatchTx() + tx := be.ReadTx() tx.Lock() _, vs := tx.UnsafeRange(buckets.Auth, enableFlagKey, nil, 0) if len(vs) == 1 { @@ -385,7 +385,7 @@ func (as *authStore) UserAdd(r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse, } tx := as.be.BatchTx() - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() user := getUser(as.lg, tx, r.Name) @@ -431,7 +431,7 @@ func (as *authStore) UserDelete(r *pb.AuthUserDeleteRequest) (*pb.AuthUserDelete } tx := as.be.BatchTx() - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() user := getUser(as.lg, tx, r.Name) @@ -456,7 +456,7 @@ func (as *authStore) UserDelete(r *pb.AuthUserDeleteRequest) (*pb.AuthUserDelete func (as *authStore) UserChangePassword(r *pb.AuthUserChangePasswordRequest) (*pb.AuthUserChangePasswordResponse, error) { tx := as.be.BatchTx() - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() user := getUser(as.lg, tx, r.Name) @@ -498,7 +498,7 @@ func (as *authStore) UserChangePassword(r *pb.AuthUserChangePasswordRequest) (*p func (as *authStore) UserGrantRole(r *pb.AuthUserGrantRoleRequest) (*pb.AuthUserGrantRoleResponse, error) { tx := as.be.BatchTx() - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() user := getUser(as.lg, tx, r.User) @@ -544,7 +544,7 @@ func (as *authStore) UserGrantRole(r *pb.AuthUserGrantRoleRequest) (*pb.AuthUser func (as *authStore) UserGet(r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, error) { tx := as.be.BatchTx() - tx.Lock() + tx.LockInsideApply() user := getUser(as.lg, tx, r.Name) tx.Unlock() @@ -559,7 +559,7 @@ func (as *authStore) UserGet(r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, func (as *authStore) UserList(r *pb.AuthUserListRequest) (*pb.AuthUserListResponse, error) { tx := as.be.BatchTx() - tx.Lock() + tx.LockInsideApply() users := getAllUsers(as.lg, tx) tx.Unlock() @@ -581,7 +581,7 @@ func (as *authStore) UserRevokeRole(r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUs } tx := as.be.BatchTx() - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() user := getUser(as.lg, tx, r.Name) @@ -623,7 +623,7 @@ func (as *authStore) UserRevokeRole(r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUs func (as *authStore) RoleGet(r *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse, error) { tx := as.be.BatchTx() - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() var resp pb.AuthRoleGetResponse @@ -638,7 +638,7 @@ func (as *authStore) RoleGet(r *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse, func (as *authStore) RoleList(r *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, error) { tx := as.be.BatchTx() - tx.Lock() + tx.LockInsideApply() roles := getAllRoles(as.lg, tx) tx.Unlock() @@ -651,7 +651,7 @@ func (as *authStore) RoleList(r *pb.AuthRoleListRequest) (*pb.AuthRoleListRespon func (as *authStore) RoleRevokePermission(r *pb.AuthRoleRevokePermissionRequest) (*pb.AuthRoleRevokePermissionResponse, error) { tx := as.be.BatchTx() - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() role := getRole(as.lg, tx, r.Role) @@ -697,7 +697,7 @@ func (as *authStore) RoleDelete(r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDelete } tx := as.be.BatchTx() - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() role := getRole(as.lg, tx, r.Role) @@ -742,7 +742,7 @@ func (as *authStore) RoleAdd(r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse, } tx := as.be.BatchTx() - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() role := getRole(as.lg, tx, r.Name) @@ -786,7 +786,7 @@ func (as *authStore) RoleGrantPermission(r *pb.AuthRoleGrantPermissionRequest) ( } tx := as.be.BatchTx() - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() role := getRole(as.lg, tx, r.Name) @@ -849,7 +849,7 @@ func (as *authStore) isOpPermitted(userName string, revision uint64, key, rangeE return ErrAuthOldRevision } - tx := as.be.BatchTx() + tx := as.be.ReadTx() tx.Lock() defer tx.Unlock() @@ -891,7 +891,7 @@ func (as *authStore) IsAdminPermitted(authInfo *AuthInfo) error { return ErrUserEmpty } - tx := as.be.BatchTx() + tx := as.be.ReadTx() tx.Lock() u := getUser(as.lg, tx, authInfo.Username) tx.Unlock() @@ -907,7 +907,7 @@ func (as *authStore) IsAdminPermitted(authInfo *AuthInfo) error { return nil } -func getUser(lg *zap.Logger, tx backend.BatchTx, username string) *authpb.User { +func getUser(lg *zap.Logger, tx backend.ReadTx, username string) *authpb.User { _, vs := tx.UnsafeRange(buckets.AuthUsers, []byte(username), nil, 0) if len(vs) == 0 { return nil @@ -925,7 +925,7 @@ func getUser(lg *zap.Logger, tx backend.BatchTx, username string) *authpb.User { return user } -func getAllUsers(lg *zap.Logger, tx backend.BatchTx) []*authpb.User { +func getAllUsers(lg *zap.Logger, tx backend.ReadTx) []*authpb.User { _, vs := tx.UnsafeRange(buckets.AuthUsers, []byte{0}, []byte{0xff}, -1) if len(vs) == 0 { return nil @@ -955,7 +955,7 @@ func delUser(tx backend.BatchTx, username string) { tx.UnsafeDelete(buckets.AuthUsers, []byte(username)) } -func getRole(lg *zap.Logger, tx backend.BatchTx, rolename string) *authpb.Role { +func getRole(lg *zap.Logger, tx backend.ReadTx, rolename string) *authpb.Role { _, vs := tx.UnsafeRange(buckets.AuthRoles, []byte(rolename), nil, 0) if len(vs) == 0 { return nil @@ -969,7 +969,7 @@ func getRole(lg *zap.Logger, tx backend.BatchTx, rolename string) *authpb.Role { return role } -func getAllRoles(lg *zap.Logger, tx backend.BatchTx) []*authpb.Role { +func getAllRoles(lg *zap.Logger, tx backend.ReadTx) []*authpb.Role { _, vs := tx.UnsafeRange(buckets.AuthRoles, []byte{0}, []byte{0xff}, -1) if len(vs) == 0 { return nil @@ -1028,7 +1028,7 @@ func NewAuthStore(lg *zap.Logger, be backend.Backend, tp TokenProvider, bcryptCo } tx := be.BatchTx() - tx.Lock() + tx.LockOutsideApply() tx.UnsafeCreateBucket(buckets.Auth) tx.UnsafeCreateBucket(buckets.AuthUsers) @@ -1081,7 +1081,7 @@ func (as *authStore) commitRevision(tx backend.BatchTx) { tx.UnsafePut(buckets.Auth, revisionKey, revBytes) } -func getRevision(tx backend.BatchTx) uint64 { +func getRevision(tx backend.ReadTx) uint64 { _, vs := tx.UnsafeRange(buckets.Auth, revisionKey, nil, 0) if len(vs) != 1 { // this can happen in the initialization phase @@ -1281,7 +1281,7 @@ func (as *authStore) WithRoot(ctx context.Context) context.Context { func (as *authStore) HasRole(user, role string) bool { tx := as.be.BatchTx() - tx.Lock() + tx.LockInsideApply() u := getUser(as.lg, tx, user) tx.Unlock() diff --git a/server/etcdserver/api/membership/store.go b/server/etcdserver/api/membership/store.go index a0cdf370afd..fadc8182233 100644 --- a/server/etcdserver/api/membership/store.go +++ b/server/etcdserver/api/membership/store.go @@ -52,7 +52,7 @@ func unsafeSaveMemberToBackend(lg *zap.Logger, be backend.Backend, m *Member) er } tx := be.BatchTx() - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() if unsafeMemberExists(tx, mkey) { return errMemberAlreadyExist @@ -65,7 +65,7 @@ func unsafeSaveMemberToBackend(lg *zap.Logger, be backend.Backend, m *Member) er // from the v3 backend. func TrimClusterFromBackend(be backend.Backend) error { tx := be.BatchTx() - tx.Lock() + tx.LockOutsideApply() defer tx.Unlock() tx.UnsafeDeleteBucket(buckets.Cluster) return nil @@ -75,7 +75,7 @@ func unsafeDeleteMemberFromBackend(be backend.Backend, id types.ID) error { mkey := backendMemberKey(id) tx := be.BatchTx() - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() tx.UnsafePut(buckets.MembersRemoved, mkey, []byte("removed")) if !unsafeMemberExists(tx, mkey) { @@ -140,7 +140,7 @@ func mustReadMembersFromBackend(lg *zap.Logger, be backend.Backend) (map[types.I func TrimMembershipFromBackend(lg *zap.Logger, be backend.Backend) error { lg.Info("Trimming membership information from the backend...") tx := be.BatchTx() - tx.Lock() + tx.LockOutsideApply() defer tx.Unlock() err := tx.UnsafeForEach(buckets.Members, func(k, v []byte) error { tx.UnsafeDelete(buckets.Members, k) @@ -185,7 +185,7 @@ func mustSaveClusterVersionToBackend(be backend.Backend, ver *semver.Version) { ckey := backendClusterVersionKey() tx := be.BatchTx() - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() tx.UnsafePut(buckets.Cluster, ckey, []byte(ver.String())) } @@ -198,7 +198,7 @@ func mustSaveDowngradeToBackend(lg *zap.Logger, be backend.Backend, downgrade *D lg.Panic("failed to marshal downgrade information", zap.Error(err)) } tx := be.BatchTx() - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() tx.UnsafePut(buckets.Cluster, dkey, dvalue) } @@ -316,7 +316,7 @@ func backendDowngradeKey() []byte { func mustCreateBackendBuckets(be backend.Backend) { tx := be.BatchTx() - tx.Lock() + tx.LockOutsideApply() defer tx.Unlock() tx.UnsafeCreateBucket(buckets.Members) tx.UnsafeCreateBucket(buckets.MembersRemoved) diff --git a/server/etcdserver/api/v3alarm/alarms.go b/server/etcdserver/api/v3alarm/alarms.go index 3038813cc84..5d4a641cea0 100644 --- a/server/etcdserver/api/v3alarm/alarms.go +++ b/server/etcdserver/api/v3alarm/alarms.go @@ -65,7 +65,7 @@ func (a *AlarmStore) Activate(id types.ID, at pb.AlarmType) *pb.AlarmMember { } b := a.bg.Backend() - b.BatchTx().Lock() + b.BatchTx().LockInsideApply() b.BatchTx().UnsafePut(buckets.Alarm, v, nil) b.BatchTx().Unlock() @@ -94,7 +94,7 @@ func (a *AlarmStore) Deactivate(id types.ID, at pb.AlarmType) *pb.AlarmMember { } b := a.bg.Backend() - b.BatchTx().Lock() + b.BatchTx().LockInsideApply() b.BatchTx().UnsafeDelete(buckets.Alarm, v) b.BatchTx().Unlock() @@ -122,7 +122,7 @@ func (a *AlarmStore) restore() error { b := a.bg.Backend() tx := b.BatchTx() - tx.Lock() + tx.LockOutsideApply() tx.UnsafeCreateBucket(buckets.Alarm) err := tx.UnsafeForEach(buckets.Alarm, func(k, v []byte) error { var m pb.AlarmMember diff --git a/server/etcdserver/backend.go b/server/etcdserver/backend.go index 081be2b5259..2beef576346 100644 --- a/server/etcdserver/backend.go +++ b/server/etcdserver/backend.go @@ -99,7 +99,7 @@ func openBackend(cfg config.ServerConfig, hooks backend.Hooks) backend.Backend { func recoverSnapshotBackend(cfg config.ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot, beExist bool, hooks backend.Hooks) (backend.Backend, error) { consistentIndex := uint64(0) if beExist { - consistentIndex, _ = cindex.ReadConsistentIndex(oldbe.BatchTx()) + consistentIndex, _ = cindex.ReadConsistentIndex(oldbe.ReadTx()) } if snapshot.Metadata.Index <= consistentIndex { return oldbe, nil diff --git a/server/etcdserver/cindex/cindex.go b/server/etcdserver/cindex/cindex.go index 4978124baa5..ac6ae63742f 100644 --- a/server/etcdserver/cindex/cindex.go +++ b/server/etcdserver/cindex/cindex.go @@ -34,9 +34,18 @@ type ConsistentIndexer interface { // ConsistentIndex returns the consistent index of current executing entry. ConsistentIndex() uint64 + // ConsistentApplyingIndex returns the consistent applying index of current executing entry. + ConsistentApplyingIndex() (uint64, uint64) + + // UnsafeConsistentIndex is similar to ConsistentIndex, but it doesn't lock the transaction. + UnsafeConsistentIndex() uint64 + // SetConsistentIndex set the consistent index of current executing entry. SetConsistentIndex(v uint64, term uint64) + // SetConsistentApplyingIndex set the consistent applying index of current executing entry. + SetConsistentApplyingIndex(v uint64, term uint64) + // UnsafeSave must be called holding the lock on the tx. // It saves consistentIndex to the underlying stable storage. UnsafeSave(tx backend.BatchTx) @@ -56,6 +65,12 @@ type consistentIndex struct { // The value is being persisted in the backend since v3.5. term uint64 + // applyingIndex and applyingTerm are just temporary cache of the raftpb.Entry.Index + // and raftpb.Entry.Term, and they are not ready to be persisted yet. They will be + // saved to consistentIndex and term above in the txPostLockInsideApplyHook. + applyingIndex uint64 + applyingTerm uint64 + // be is used for initial read consistentIndex be Backend // mutex is protecting be. @@ -75,7 +90,17 @@ func (ci *consistentIndex) ConsistentIndex() uint64 { ci.mutex.Lock() defer ci.mutex.Unlock() - v, term := ReadConsistentIndex(ci.be.BatchTx()) + v, term := ReadConsistentIndex(ci.be.ReadTx()) + ci.SetConsistentIndex(v, term) + return v +} + +func (ci *consistentIndex) UnsafeConsistentIndex() uint64 { + if index := atomic.LoadUint64(&ci.consistentIndex); index > 0 { + return index + } + + v, term := unsafeReadConsistentIndex(ci.be.ReadTx()) ci.SetConsistentIndex(v, term) return v } @@ -99,6 +124,15 @@ func (ci *consistentIndex) SetBackend(be Backend) { ci.SetConsistentIndex(0, 0) } +func (ci *consistentIndex) ConsistentApplyingIndex() (uint64, uint64) { + return atomic.LoadUint64(&ci.applyingIndex), atomic.LoadUint64(&ci.applyingTerm) +} + +func (ci *consistentIndex) SetConsistentApplyingIndex(v uint64, term uint64) { + atomic.StoreUint64(&ci.applyingIndex, v) + atomic.StoreUint64(&ci.applyingTerm, term) +} + func NewFakeConsistentIndex(index uint64) ConsistentIndexer { return &fakeConsistentIndex{index: index} } @@ -108,12 +142,24 @@ type fakeConsistentIndex struct { term uint64 } -func (f *fakeConsistentIndex) ConsistentIndex() uint64 { return f.index } +func (f *fakeConsistentIndex) ConsistentIndex() uint64 { + return atomic.LoadUint64(&f.index) +} +func (f *fakeConsistentIndex) ConsistentApplyingIndex() (uint64, uint64) { + return atomic.LoadUint64(&f.index), atomic.LoadUint64(&f.term) +} +func (f *fakeConsistentIndex) UnsafeConsistentIndex() uint64 { + return atomic.LoadUint64(&f.index) +} func (f *fakeConsistentIndex) SetConsistentIndex(index uint64, term uint64) { atomic.StoreUint64(&f.index, index) atomic.StoreUint64(&f.term, term) } +func (f *fakeConsistentIndex) SetConsistentApplyingIndex(index uint64, term uint64) { + atomic.StoreUint64(&f.index, index) + atomic.StoreUint64(&f.term, term) +} func (f *fakeConsistentIndex) UnsafeSave(_ backend.BatchTx) {} func (f *fakeConsistentIndex) SetBackend(_ Backend) {} @@ -125,7 +171,7 @@ func UnsafeCreateMetaBucket(tx backend.BatchTx) { // CreateMetaBucket creates the `meta` bucket (if it does not exists yet). func CreateMetaBucket(tx backend.BatchTx) { - tx.Lock() + tx.LockOutsideApply() defer tx.Unlock() tx.UnsafeCreateBucket(buckets.Meta) } @@ -174,7 +220,7 @@ func UnsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64) } func UpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64) { - tx.Lock() + tx.LockOutsideApply() defer tx.Unlock() UnsafeUpdateConsistentIndex(tx, index, term) } diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 0b7e48c9819..bba431971cb 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -661,6 +661,10 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { }) } + // Set the hook after EtcdServer finishes the initialization to avoid + // the hook being called during the initialization process. + srv.be.SetTxPostLockInsideApplyHook(srv.getTxPostLockInsideApplyHook()) + // TODO: move transport initialization near the definition of remote tr := &rafthttp.Transport{ Logger: cfg.Logger, @@ -1260,6 +1264,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { } s.consistIndex.SetBackend(newbe) + newbe.SetTxPostLockInsideApplyHook(s.getTxPostLockInsideApplyHook()) lg.Info("restored mvcc store", zap.Uint64("consistent-index", s.consistIndex.ConsistentIndex())) // Closing old backend might block until all the txns @@ -2128,7 +2133,7 @@ func (s *EtcdServer) apply( // set the consistent index of current executing entry if e.Index > s.consistIndex.ConsistentIndex() { - s.consistIndex.SetConsistentIndex(e.Index, e.Term) + s.consistIndex.SetConsistentApplyingIndex(e.Index, e.Term) shouldApplyV3 = membership.ApplyBoth } @@ -2155,10 +2160,18 @@ func (s *EtcdServer) apply( // applyEntryNormal apples an EntryNormal type raftpb request to the EtcdServer func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { shouldApplyV3 := membership.ApplyV2storeOnly + applyV3Performed := false + defer func() { + // The txPostLock callback will not get called in this case, + // so we should set the consistent index directly. + if s.consistIndex != nil && !applyV3Performed && membership.ApplyBoth == shouldApplyV3 { + s.consistIndex.SetConsistentIndex(e.Index, e.Term) + } + }() index := s.consistIndex.ConsistentIndex() if e.Index > index { // set the consistent index of current executing entry - s.consistIndex.SetConsistentIndex(e.Index, e.Term) + s.consistIndex.SetConsistentApplyingIndex(e.Index, e.Term) shouldApplyV3 = membership.ApplyBoth } s.lg.Debug("apply entry normal", @@ -2207,6 +2220,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { if !needResult && raftReq.Txn != nil { removeNeedlessRangeReqs(raftReq.Txn) } + applyV3Performed = true ar = s.applyV3.Apply(&raftReq, shouldApplyV3) } @@ -2258,6 +2272,13 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con if err := s.cluster.ValidateConfigurationChange(cc); err != nil { cc.NodeID = raft.None s.r.ApplyConfChange(cc) + + // The txPostLock callback will not get called in this case, + // so we should set the consistent index directly. + if s.consistIndex != nil && membership.ApplyBoth == shouldApplyV3 { + applyingIndex, applyingTerm := s.consistIndex.ConsistentApplyingIndex() + s.consistIndex.SetConsistentIndex(applyingIndex, applyingTerm) + } return false, err } @@ -2683,6 +2704,15 @@ func (s *EtcdServer) raftStatus() raft.Status { return s.r.Node.Status() } +func (s *EtcdServer) getTxPostLockInsideApplyHook() func() { + return func() { + applyingIdx, applyingTerm := s.consistIndex.ConsistentApplyingIndex() + if applyingIdx > s.consistIndex.UnsafeConsistentIndex() { + s.consistIndex.SetConsistentIndex(applyingIdx, applyingTerm) + } + } +} + func maybeDefragBackend(cfg config.ServerConfig, be backend.Backend) error { size := be.Size() sizeInUse := be.SizeInUse() diff --git a/server/etcdserver/server_test.go b/server/etcdserver/server_test.go index c5a61da4736..5127e5c71b5 100644 --- a/server/etcdserver/server_test.go +++ b/server/etcdserver/server_test.go @@ -686,9 +686,7 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) { _, appliedi, _ := srv.apply(ents, &raftpb.ConfState{}) consistIndex := srv.consistIndex.ConsistentIndex() - if consistIndex != appliedi { - t.Fatalf("consistIndex = %v, want %v", consistIndex, appliedi) - } + assert.Equal(t, uint64(2), appliedi) t.Run("verify-backend", func(t *testing.T) { tx := be.BatchTx() @@ -697,9 +695,8 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) { srv.beHooks.OnPreCommitUnsafe(tx) assert.Equal(t, raftpb.ConfState{Voters: []uint64{2}}, *membership.UnsafeConfStateFromBackend(lg, tx)) }) - rindex, rterm := cindex.ReadConsistentIndex(be.BatchTx()) + rindex, _ := cindex.ReadConsistentIndex(be.ReadTx()) assert.Equal(t, consistIndex, rindex) - assert.Equal(t, uint64(4), rterm) } func realisticRaftNode(lg *zap.Logger) *raftNode { diff --git a/server/lease/lessor.go b/server/lease/lessor.go index 409fa5b2a40..02ee77f5053 100644 --- a/server/lease/lessor.go +++ b/server/lease/lessor.go @@ -797,7 +797,7 @@ func (le *lessor) findDueScheduledCheckpoints(checkpointLimit int) []*pb.LeaseCh func (le *lessor) initAndRecover() { tx := le.b.BatchTx() - tx.Lock() + tx.LockOutsideApply() tx.UnsafeCreateBucket(buckets.Lease) lpbs := unsafeGetAllLeases(tx) @@ -852,7 +852,7 @@ func (l *Lease) persistTo(b backend.Backend) { panic("failed to marshal lease proto item") } - b.BatchTx().Lock() + b.BatchTx().LockInsideApply() b.BatchTx().UnsafePut(buckets.Lease, key, val) b.BatchTx().Unlock() } diff --git a/server/mvcc/backend/backend.go b/server/mvcc/backend/backend.go index 1e5f191c8d4..f156ae9948f 100644 --- a/server/mvcc/backend/backend.go +++ b/server/mvcc/backend/backend.go @@ -68,6 +68,9 @@ type Backend interface { Defrag() error ForceCommit() Close() error + + // SetTxPostLockInsideApplyHook sets a txPostLockInsideApplyHook. + SetTxPostLockInsideApplyHook(func()) } type Snapshot interface { @@ -120,6 +123,9 @@ type backend struct { hooks Hooks + // txPostLockInsideApplyHook is called each time right after locking the tx. + txPostLockInsideApplyHook func() + lg *zap.Logger } @@ -231,6 +237,14 @@ func (b *backend) BatchTx() BatchTx { return b.batchTx } +func (b *backend) SetTxPostLockInsideApplyHook(hook func()) { + // It needs to lock the batchTx, because the periodic commit + // may be accessing the txPostLockInsideApplyHook at the moment. + b.batchTx.lock() + defer b.batchTx.Unlock() + b.txPostLockInsideApplyHook = hook +} + func (b *backend) ReadTx() ReadTx { return b.readTx } // ConcurrentReadTx creates and returns a new ReadTx, which: @@ -440,7 +454,7 @@ func (b *backend) defrag() error { // TODO: make this non-blocking? // lock batchTx to ensure nobody is using previous tx, and then // close previous ongoing tx. - b.batchTx.Lock() + b.batchTx.LockOutsideApply() defer b.batchTx.Unlock() // lock database after lock tx to avoid deadlock. diff --git a/server/mvcc/backend/batch_tx.go b/server/mvcc/backend/batch_tx.go index a8e64919916..c8fa55954f6 100644 --- a/server/mvcc/backend/batch_tx.go +++ b/server/mvcc/backend/batch_tx.go @@ -65,18 +65,32 @@ type batchTx struct { pending int } +// Lock is supposed to be called only by the unit test. func (t *batchTx) Lock() { + ValidateCalledInsideUnittest(t.backend.lg) + t.lock() +} + +func (t *batchTx) lock() { t.Mutex.Lock() } func (t *batchTx) LockInsideApply() { - ValidateCalledInsideApply(t.backend.lg) - t.Lock() + t.lock() + if t.backend.txPostLockInsideApplyHook != nil { + // The callers of some methods (i.e., (*RaftCluster).AddMember) + // can be coming from both InsideApply and OutsideApply, but the + // callers from OutsideApply will have a nil txPostLockInsideApplyHook. + // So we should check the txPostLockInsideApplyHook before validating + // the callstack. + ValidateCalledInsideApply(t.backend.lg) + t.backend.txPostLockInsideApplyHook() + } } func (t *batchTx) LockOutsideApply() { ValidateCalledOutSideApply(t.backend.lg) - t.Lock() + t.lock() } func (t *batchTx) Unlock() { @@ -226,14 +240,14 @@ func unsafeForEach(tx *bolt.Tx, bucket Bucket, visitor func(k, v []byte) error) // Commit commits a previous tx and begins a new writable one. func (t *batchTx) Commit() { - t.Lock() + t.lock() t.commit(false) t.Unlock() } // CommitAndStop commits the previous tx and does not create a new one. func (t *batchTx) CommitAndStop() { - t.Lock() + t.lock() t.commit(true) t.Unlock() } @@ -303,13 +317,13 @@ func (t *batchTxBuffered) Unlock() { } func (t *batchTxBuffered) Commit() { - t.Lock() + t.lock() t.commit(false) t.Unlock() } func (t *batchTxBuffered) CommitAndStop() { - t.Lock() + t.lock() t.commit(true) t.Unlock() } diff --git a/server/mvcc/backend/hooks_test.go b/server/mvcc/backend/hooks_test.go index 03a90dd3d83..69a1e7216c1 100644 --- a/server/mvcc/backend/hooks_test.go +++ b/server/mvcc/backend/hooks_test.go @@ -40,8 +40,6 @@ func TestBackendPreCommitHook(t *testing.T) { // Empty commit. tx.Commit() - write(tx, []byte("foo"), []byte("bar")) - assert.Equal(t, ">cc", getCommitsKey(t, be), "expected 2 explict commits") tx.Commit() assert.Equal(t, ">ccc", getCommitsKey(t, be), "expected 3 explict commits") diff --git a/server/mvcc/backend/verify.go b/server/mvcc/backend/verify.go index 2f3dc0221c6..a6a0b8675ec 100644 --- a/server/mvcc/backend/verify.go +++ b/server/mvcc/backend/verify.go @@ -46,6 +46,15 @@ func ValidateCalledOutSideApply(lg *zap.Logger) { } } +func ValidateCalledInsideUnittest(lg *zap.Logger) { + if !verifyLockEnabled() { + return + } + if !insideUnittest() { + lg.Fatal("Lock called outside of unit test!", zap.Stack("stacktrace")) + } +} + func verifyLockEnabled() bool { return os.Getenv(ENV_VERIFY) == ENV_VERIFY_ALL_VALUE || os.Getenv(ENV_VERIFY) == ENV_VERIFY_LOCK } @@ -54,3 +63,8 @@ func insideApply() bool { stackTraceStr := string(debug.Stack()) return strings.Contains(stackTraceStr, ".applyEntries") } + +func insideUnittest() bool { + stackTraceStr := string(debug.Stack()) + return strings.Contains(stackTraceStr, "_test.go") && !strings.Contains(stackTraceStr, "tests/") +} diff --git a/server/mvcc/backend/verify_test.go b/server/mvcc/backend/verify_test.go index f4f4db47527..b8f0a84c566 100644 --- a/server/mvcc/backend/verify_test.go +++ b/server/mvcc/backend/verify_test.go @@ -15,7 +15,6 @@ package backend_test import ( - "fmt" "os" "testing" "time" @@ -26,40 +25,60 @@ import ( func TestLockVerify(t *testing.T) { tcs := []struct { - insideApply bool - lock func(tx backend.BatchTx) - expectPanic bool + name string + insideApply bool + lock func(tx backend.BatchTx) + txPostLockInsideApplyHook func() + expectPanic bool }{ { + name: "call lockInsideApply from inside apply", insideApply: true, lock: lockInsideApply, expectPanic: false, }, { + name: "call lockInsideApply from outside apply (without txPostLockInsideApplyHook)", insideApply: false, lock: lockInsideApply, - expectPanic: true, + expectPanic: false, + }, + { + name: "call lockInsideApply from outside apply (with txPostLockInsideApplyHook)", + insideApply: false, + lock: lockInsideApply, + txPostLockInsideApplyHook: func() {}, + expectPanic: true, }, { + name: "call lockOutsideApply from outside apply", insideApply: false, lock: lockOutsideApply, expectPanic: false, }, { + name: "call lockOutsideApply from inside apply", insideApply: true, lock: lockOutsideApply, expectPanic: true, }, + { + name: "call Lock from unit test", + insideApply: false, + lock: lockFromUT, + expectPanic: false, + }, } env := os.Getenv("ETCD_VERIFY") os.Setenv("ETCD_VERIFY", "lock") defer func() { os.Setenv("ETCD_VERIFY", env) }() - for i, tc := range tcs { - t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { be, _ := betesting.NewTmpBackend(t, time.Hour, 10000) + be.SetTxPostLockInsideApplyHook(tc.txPostLockInsideApplyHook) hasPaniced := handlePanic(func() { if tc.insideApply { @@ -89,3 +108,4 @@ func applyEntries(be backend.Backend, f func(tx backend.BatchTx)) { func lockInsideApply(tx backend.BatchTx) { tx.LockInsideApply() } func lockOutsideApply(tx backend.BatchTx) { tx.LockOutsideApply() } +func lockFromUT(tx backend.BatchTx) { tx.Lock() } diff --git a/server/mvcc/kvstore.go b/server/mvcc/kvstore.go index 54055ed0552..ae31d968c26 100644 --- a/server/mvcc/kvstore.go +++ b/server/mvcc/kvstore.go @@ -119,7 +119,7 @@ func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfi } tx := s.b.BatchTx() - tx.Lock() + tx.LockOutsideApply() tx.UnsafeCreateBucket(buckets.Key) tx.UnsafeCreateBucket(buckets.Meta) tx.Unlock() @@ -238,7 +238,7 @@ func (s *store) updateCompactRev(rev int64) (<-chan struct{}, error) { revToBytes(revision{main: rev}, rbytes) tx := s.b.BatchTx() - tx.Lock() + tx.LockInsideApply() tx.UnsafePut(buckets.Meta, scheduledCompactKeyName, rbytes) tx.Unlock() // ensure that desired compaction is persisted @@ -334,7 +334,7 @@ func (s *store) restore() error { keyToLease := make(map[string]lease.LeaseID) // restore index - tx := s.b.BatchTx() + tx := s.b.ReadTx() tx.Lock() _, finishedCompactBytes := tx.UnsafeRange(buckets.Meta, finishedCompactKeyName, nil, 0) diff --git a/server/mvcc/kvstore_compaction.go b/server/mvcc/kvstore_compaction.go index 71bd4b7369c..c63d3f4f4e8 100644 --- a/server/mvcc/kvstore_compaction.go +++ b/server/mvcc/kvstore_compaction.go @@ -39,7 +39,7 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc start := time.Now() tx := s.b.BatchTx() - tx.Lock() + tx.LockOutsideApply() keys, _ := tx.UnsafeRange(buckets.Key, last, end, int64(s.cfg.CompactionBatchLimit)) for _, key := range keys { rev = bytesToRev(key) diff --git a/server/mvcc/kvstore_test.go b/server/mvcc/kvstore_test.go index d2a9e55e89f..e6d437fb15a 100644 --- a/server/mvcc/kvstore_test.go +++ b/server/mvcc/kvstore_test.go @@ -871,6 +871,8 @@ type fakeBatchTx struct { rangeRespc chan rangeResp } +func (b *fakeBatchTx) LockInsideApply() {} +func (b *fakeBatchTx) LockOutsideApply() {} func (b *fakeBatchTx) Lock() {} func (b *fakeBatchTx) Unlock() {} func (b *fakeBatchTx) RLock() {} @@ -894,10 +896,8 @@ func (b *fakeBatchTx) UnsafeDelete(bucket backend.Bucket, key []byte) { func (b *fakeBatchTx) UnsafeForEach(bucket backend.Bucket, visitor func(k, v []byte) error) error { return nil } -func (b *fakeBatchTx) Commit() {} -func (b *fakeBatchTx) CommitAndStop() {} -func (b *fakeBatchTx) LockInsideApply() {} -func (b *fakeBatchTx) LockOutsideApply() {} +func (b *fakeBatchTx) Commit() {} +func (b *fakeBatchTx) CommitAndStop() {} type fakeBackend struct { tx *fakeBatchTx @@ -914,6 +914,7 @@ func (b *fakeBackend) Snapshot() backend.Snapshot func (b *fakeBackend) ForceCommit() {} func (b *fakeBackend) Defrag() error { return nil } func (b *fakeBackend) Close() error { return nil } +func (b *fakeBackend) SetTxPostLockInsideApplyHook(func()) {} type indexGetResp struct { rev revision diff --git a/server/mvcc/kvstore_txn.go b/server/mvcc/kvstore_txn.go index 93d7db20e06..9df7b79410f 100644 --- a/server/mvcc/kvstore_txn.go +++ b/server/mvcc/kvstore_txn.go @@ -78,7 +78,7 @@ type storeTxnWrite struct { func (s *store) Write(trace *traceutil.Trace) TxnWrite { s.mu.RLock() tx := s.b.BatchTx() - tx.Lock() + tx.LockInsideApply() tw := &storeTxnWrite{ storeTxnRead: storeTxnRead{s, tx, 0, 0, trace}, tx: tx, diff --git a/server/mvcc/util.go b/server/mvcc/util.go index 83cbf44bf84..c4c0ff2f013 100644 --- a/server/mvcc/util.go +++ b/server/mvcc/util.go @@ -31,7 +31,7 @@ func WriteKV(be backend.Backend, kv mvccpb.KeyValue) { panic(fmt.Errorf("cannot marshal event: %v", err)) } - be.BatchTx().Lock() + be.BatchTx().LockOutsideApply() be.BatchTx().UnsafePut(buckets.Key, ibytes, d) be.BatchTx().Unlock() } diff --git a/server/verify/verify.go b/server/verify/verify.go index f727201ce8a..ef613d7eb05 100644 --- a/server/verify/verify.go +++ b/server/verify/verify.go @@ -108,8 +108,7 @@ func MustVerifyIfEnabled(cfg Config) { } func validateConsistentIndex(cfg Config, hardstate *raftpb.HardState, snapshot *walpb.Snapshot, be backend.Backend) error { - tx := be.BatchTx() - index, term := cindex.ReadConsistentIndex(tx) + index, term := cindex.ReadConsistentIndex(be.ReadTx()) if cfg.ExactIndex && index != hardstate.Commit { return fmt.Errorf("backend.ConsistentIndex (%v) expected == WAL.HardState.commit (%v)", index, hardstate.Commit) }