From bfd5170f66812b43775b95e11ccb742d61cff466 Mon Sep 17 00:00:00 2001 From: ahrtr Date: Wed, 30 Mar 2022 15:26:31 +0800 Subject: [PATCH 1/6] add a txPostLockHook into the backend Previously the SetConsistentIndex() is called during the apply workflow, but it's outside the db transaction. If a commit happens between SetConsistentIndex and the following apply workflow, and etcd crashes for whatever reason right after the commit, then etcd commits an incomplete transaction to db. Eventually etcd runs into the data inconsistency issue. In this commit, we move the SetConsistentIndex into a txPostLockHook, so it will be executed inside the transaction lock. --- etcdutl/etcdutl/migrate_command.go | 4 +-- server/etcdserver/adapters.go | 8 +++--- server/etcdserver/cindex/cindex.go | 23 ++++++++++++++--- server/etcdserver/server.go | 31 ++++++++++++++++++++--- server/etcdserver/server_test.go | 7 ++--- server/lease/lessor.go | 2 +- server/storage/backend.go | 2 +- server/storage/backend/backend.go | 16 +++++++++++- server/storage/backend/batch_tx.go | 16 +++++++++--- server/storage/mvcc/kvstore.go | 4 +-- server/storage/mvcc/kvstore_compaction.go | 2 +- server/storage/mvcc/kvstore_test.go | 2 ++ server/storage/schema/alarm.go | 2 +- server/storage/schema/auth.go | 2 +- server/storage/schema/cindex.go | 6 ++--- server/storage/schema/membership.go | 6 ++--- server/storage/schema/migration.go | 4 +-- server/storage/schema/schema.go | 8 +++--- server/verify/verify.go | 3 +-- 19 files changed, 104 insertions(+), 44 deletions(-) diff --git a/etcdutl/etcdutl/migrate_command.go b/etcdutl/etcdutl/migrate_command.go index 88f9e473c8a..195576e313a 100644 --- a/etcdutl/etcdutl/migrate_command.go +++ b/etcdutl/etcdutl/migrate_command.go @@ -118,7 +118,7 @@ type migrateConfig struct { func migrateCommandFunc(c *migrateConfig) error { defer c.be.Close() tx := c.be.BatchTx() - current, err := schema.DetectSchemaVersion(c.lg, tx) + current, err := schema.DetectSchemaVersion(c.lg, c.be.ReadTx()) if err != nil { c.lg.Error("failed to detect storage version. Please make sure you are using data dir from etcd v3.5 and older") return err @@ -140,7 +140,7 @@ func migrateCommandFunc(c *migrateConfig) error { } func migrateForce(lg *zap.Logger, tx backend.BatchTx, target *semver.Version) { - tx.Lock() + tx.LockWithoutHook() defer tx.Unlock() // Storage version is only supported since v3.6 if target.LessThan(schema.V3_6) { diff --git a/server/etcdserver/adapters.go b/server/etcdserver/adapters.go index 9511bc6a6ac..5f1bcfef119 100644 --- a/server/etcdserver/adapters.go +++ b/server/etcdserver/adapters.go @@ -78,9 +78,9 @@ func (s *serverVersionAdapter) GetStorageVersion() *semver.Version { s.bemu.RLock() defer s.bemu.RUnlock() - tx := s.be.BatchTx() - tx.Lock() - defer tx.Unlock() + tx := s.be.ReadTx() + tx.RLock() + defer tx.RUnlock() v, err := schema.UnsafeDetectSchemaVersion(s.lg, tx) if err != nil { return nil @@ -94,7 +94,7 @@ func (s *serverVersionAdapter) UpdateStorageVersion(target semver.Version) error defer s.bemu.RUnlock() tx := s.be.BatchTx() - tx.Lock() + tx.LockWithoutHook() defer tx.Unlock() return schema.UnsafeMigrate(s.lg, tx, s.r.storage, target) } diff --git a/server/etcdserver/cindex/cindex.go b/server/etcdserver/cindex/cindex.go index 24dad66031d..7ec1b121283 100644 --- a/server/etcdserver/cindex/cindex.go +++ b/server/etcdserver/cindex/cindex.go @@ -23,6 +23,7 @@ import ( ) type Backend interface { + ReadTx() backend.ReadTx BatchTx() backend.BatchTx } @@ -32,6 +33,9 @@ type ConsistentIndexer interface { // ConsistentIndex returns the consistent index of current executing entry. ConsistentIndex() uint64 + // UnsafeConsistentIndex is similar to ConsistentIndex, but it doesn't lock the transaction. + UnsafeConsistentIndex() uint64 + // SetConsistentIndex set the consistent index of current executing entry. SetConsistentIndex(v uint64, term uint64) @@ -73,7 +77,19 @@ func (ci *consistentIndex) ConsistentIndex() uint64 { ci.mutex.Lock() defer ci.mutex.Unlock() - v, term := schema.ReadConsistentIndex(ci.be.BatchTx()) + v, term := schema.ReadConsistentIndex(ci.be.ReadTx()) + ci.SetConsistentIndex(v, term) + return v +} + +// UnsafeConsistentIndex is similar to ConsistentIndex, +// but it shouldn't lock the transaction. +func (ci *consistentIndex) UnsafeConsistentIndex() uint64 { + if index := atomic.LoadUint64(&ci.consistentIndex); index > 0 { + return index + } + + v, term := schema.UnsafeReadConsistentIndex(ci.be.BatchTx()) ci.SetConsistentIndex(v, term) return v } @@ -106,7 +122,8 @@ type fakeConsistentIndex struct { term uint64 } -func (f *fakeConsistentIndex) ConsistentIndex() uint64 { return f.index } +func (f *fakeConsistentIndex) ConsistentIndex() uint64 { return f.index } +func (f *fakeConsistentIndex) UnsafeConsistentIndex() uint64 { return f.index } func (f *fakeConsistentIndex) SetConsistentIndex(index uint64, term uint64) { atomic.StoreUint64(&f.index, index) @@ -117,7 +134,7 @@ func (f *fakeConsistentIndex) UnsafeSave(_ backend.BatchTx) {} func (f *fakeConsistentIndex) SetBackend(_ Backend) {} func UpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64, onlyGrow bool) { - tx.Lock() + tx.LockWithoutHook() defer tx.Unlock() schema.UnsafeUpdateConsistentIndex(tx, index, term, onlyGrow) } diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index f5632318eab..b22f680bb46 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -207,8 +207,10 @@ type EtcdServer struct { term uint64 // must use atomic operations to access; keep 64-bit aligned. lead uint64 // must use atomic operations to access; keep 64-bit aligned. - consistIndex cindex.ConsistentIndexer // consistIndex is used to get/set/save consistentIndex - r raftNode // uses 64-bit atomics; keep 64-bit aligned. + consistentIdx uint64 // must use atomic operations to access; keep 64-bit aligned. + consistentTerm uint64 // must use atomic operations to access; keep 64-bit aligned. + consistIndex cindex.ConsistentIndexer // consistIndex is used to get/set/save consistentIndex + r raftNode // uses 64-bit atomics; keep 64-bit aligned. readych chan struct{} Cfg config.ServerConfig @@ -341,6 +343,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { srv.applyV2 = NewApplierV2(cfg.Logger, srv.v2store, srv.cluster) srv.be = b.storage.backend.be + srv.be.SetTxPostLockHook(srv.getTxPostLockHook()) srv.beHooks = b.storage.backend.beHooks minTTL := time.Duration((3*cfg.ElectionTicks)/2) * heartbeat @@ -978,6 +981,8 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { } s.consistIndex.SetBackend(newbe) + newbe.SetTxPostLockHook(s.getTxPostLockHook()) + lg.Info("restored mvcc store", zap.Uint64("consistent-index", s.consistIndex.ConsistentIndex())) // Closing old backend might block until all the txns @@ -1547,6 +1552,15 @@ func (s *EtcdServer) getTerm() uint64 { return atomic.LoadUint64(&s.term) } +func (s *EtcdServer) setConsistentIndexAndTerm(cIdx, cTerm uint64) { + atomic.StoreUint64(&s.consistentIdx, cIdx) + atomic.StoreUint64(&s.consistentTerm, cTerm) +} + +func (s *EtcdServer) getConsistentIndexAndTerm() (uint64, uint64) { + return atomic.LoadUint64(&s.consistentIdx), atomic.LoadUint64(&s.consistentTerm) +} + func (s *EtcdServer) setLead(v uint64) { atomic.StoreUint64(&s.lead, v) } @@ -1771,7 +1785,7 @@ func (s *EtcdServer) apply( // set the consistent index of current executing entry if e.Index > s.consistIndex.ConsistentIndex() { - s.consistIndex.SetConsistentIndex(e.Index, e.Term) + s.setConsistentIndexAndTerm(e.Index, e.Term) shouldApplyV3 = membership.ApplyBoth } @@ -1801,7 +1815,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { index := s.consistIndex.ConsistentIndex() if e.Index > index { // set the consistent index of current executing entry - s.consistIndex.SetConsistentIndex(e.Index, e.Term) + s.setConsistentIndexAndTerm(e.Index, e.Term) shouldApplyV3 = membership.ApplyBoth } s.lg.Debug("apply entry normal", @@ -2296,3 +2310,12 @@ func (s *EtcdServer) raftStatus() raft.Status { func (s *EtcdServer) Version() *serverversion.Manager { return serverversion.NewManager(s.Logger(), NewServerVersionAdapter(s)) } + +func (s *EtcdServer) getTxPostLockHook() func() { + return func() { + cIdx, term := s.getConsistentIndexAndTerm() + if cIdx > s.consistIndex.UnsafeConsistentIndex() { + s.consistIndex.SetConsistentIndex(cIdx, term) + } + } +} diff --git a/server/etcdserver/server_test.go b/server/etcdserver/server_test.go index cb4386976d9..706c7854952 100644 --- a/server/etcdserver/server_test.go +++ b/server/etcdserver/server_test.go @@ -687,9 +687,7 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) { _, appliedi, _ := srv.apply(ents, &raftpb.ConfState{}) consistIndex := srv.consistIndex.ConsistentIndex() - if consistIndex != appliedi { - t.Fatalf("consistIndex = %v, want %v", consistIndex, appliedi) - } + assert.Equal(t, uint64(2), appliedi) t.Run("verify-backend", func(t *testing.T) { tx := be.BatchTx() @@ -698,9 +696,8 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) { srv.beHooks.OnPreCommitUnsafe(tx) assert.Equal(t, raftpb.ConfState{Voters: []uint64{2}}, *schema.UnsafeConfStateFromBackend(lg, tx)) }) - rindex, rterm := schema.ReadConsistentIndex(be.BatchTx()) + rindex, _ := schema.ReadConsistentIndex(be.ReadTx()) assert.Equal(t, consistIndex, rindex) - assert.Equal(t, uint64(4), rterm) } func realisticRaftNode(lg *zap.Logger) *raftNode { diff --git a/server/lease/lessor.go b/server/lease/lessor.go index cb6f0d6ec80..4af816c76bb 100644 --- a/server/lease/lessor.go +++ b/server/lease/lessor.go @@ -797,7 +797,7 @@ func (le *lessor) findDueScheduledCheckpoints(checkpointLimit int) []*pb.LeaseCh func (le *lessor) initAndRecover() { tx := le.b.BatchTx() - tx.Lock() + tx.LockWithoutHook() schema.UnsafeCreateLeaseBucket(tx) lpbs := schema.MustUnsafeGetAllLeases(tx) tx.Unlock() diff --git a/server/storage/backend.go b/server/storage/backend.go index b7b0eb2fc7f..b1101cfa6fc 100644 --- a/server/storage/backend.go +++ b/server/storage/backend.go @@ -99,7 +99,7 @@ func OpenBackend(cfg config.ServerConfig, hooks backend.Hooks) backend.Backend { func RecoverSnapshotBackend(cfg config.ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot, beExist bool, hooks *BackendHooks) (backend.Backend, error) { consistentIndex := uint64(0) if beExist { - consistentIndex, _ = schema.ReadConsistentIndex(oldbe.BatchTx()) + consistentIndex, _ = schema.ReadConsistentIndex(oldbe.ReadTx()) } if snapshot.Metadata.Index <= consistentIndex { return oldbe, nil diff --git a/server/storage/backend/backend.go b/server/storage/backend/backend.go index a61f4ec3609..f949f282b70 100644 --- a/server/storage/backend/backend.go +++ b/server/storage/backend/backend.go @@ -67,6 +67,9 @@ type Backend interface { Defrag() error ForceCommit() Close() error + + // SetTxPostLockHook sets a txPostLockHook. + SetTxPostLockHook(func()) } type Snapshot interface { @@ -119,6 +122,9 @@ type backend struct { hooks Hooks + // txPostLockHook is called each time right after locking the tx. + txPostLockHook func() + lg *zap.Logger } @@ -227,6 +233,14 @@ func (b *backend) BatchTx() BatchTx { return b.batchTx } +func (b *backend) SetTxPostLockHook(hook func()) { + // It needs to lock the batchTx, because the periodic commit + // may be accessing the txPostLockHook at the moment. + b.batchTx.LockWithoutHook() + defer b.batchTx.Unlock() + b.txPostLockHook = hook +} + func (b *backend) ReadTx() ReadTx { return b.readTx } // ConcurrentReadTx creates and returns a new ReadTx, which: @@ -438,7 +452,7 @@ func (b *backend) defrag() error { // TODO: make this non-blocking? // lock batchTx to ensure nobody is using previous tx, and then // close previous ongoing tx. - b.batchTx.Lock() + b.batchTx.LockWithoutHook() defer b.batchTx.Unlock() // lock database after lock tx to avoid deadlock. diff --git a/server/storage/backend/batch_tx.go b/server/storage/backend/batch_tx.go index a8e64919916..a27266bb22a 100644 --- a/server/storage/backend/batch_tx.go +++ b/server/storage/backend/batch_tx.go @@ -55,6 +55,7 @@ type BatchTx interface { CommitAndStop() LockInsideApply() LockOutsideApply() + } type batchTx struct { @@ -66,6 +67,13 @@ type batchTx struct { } func (t *batchTx) Lock() { + t.LockWithoutHook() + if t.backend.txPostLockHook != nil { + t.backend.txPostLockHook() + } +} + +func (t *batchTx) LockWithoutHook() { t.Mutex.Lock() } @@ -226,14 +234,14 @@ func unsafeForEach(tx *bolt.Tx, bucket Bucket, visitor func(k, v []byte) error) // Commit commits a previous tx and begins a new writable one. func (t *batchTx) Commit() { - t.Lock() + t.LockWithoutHook() t.commit(false) t.Unlock() } // CommitAndStop commits the previous tx and does not create a new one. func (t *batchTx) CommitAndStop() { - t.Lock() + t.LockWithoutHook() t.commit(true) t.Unlock() } @@ -303,13 +311,13 @@ func (t *batchTxBuffered) Unlock() { } func (t *batchTxBuffered) Commit() { - t.Lock() + t.LockWithoutHook() t.commit(false) t.Unlock() } func (t *batchTxBuffered) CommitAndStop() { - t.Lock() + t.LockWithoutHook() t.commit(true) t.Unlock() } diff --git a/server/storage/mvcc/kvstore.go b/server/storage/mvcc/kvstore.go index 846f83cde18..9b79c090afc 100644 --- a/server/storage/mvcc/kvstore.go +++ b/server/storage/mvcc/kvstore.go @@ -121,7 +121,7 @@ func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfi } tx := s.b.BatchTx() - tx.Lock() + tx.LockWithoutHook() tx.UnsafeCreateBucket(schema.Key) schema.UnsafeCreateMetaBucket(tx) tx.Unlock() @@ -331,7 +331,7 @@ func (s *store) restore() error { // restore index tx := s.b.BatchTx() - tx.Lock() + tx.LockWithoutHook() finishedCompact, found := UnsafeReadFinishedCompact(tx) if found { diff --git a/server/storage/mvcc/kvstore_compaction.go b/server/storage/mvcc/kvstore_compaction.go index ba944008216..941f056a921 100644 --- a/server/storage/mvcc/kvstore_compaction.go +++ b/server/storage/mvcc/kvstore_compaction.go @@ -42,7 +42,7 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc start := time.Now() tx := s.b.BatchTx() - tx.Lock() + tx.LockWithoutHook() keys, _ := tx.UnsafeRange(schema.Key, last, end, int64(batchNum)) for _, key := range keys { rev = bytesToRev(key) diff --git a/server/storage/mvcc/kvstore_test.go b/server/storage/mvcc/kvstore_test.go index 31cb3765541..4a51e9c406b 100644 --- a/server/storage/mvcc/kvstore_test.go +++ b/server/storage/mvcc/kvstore_test.go @@ -881,6 +881,7 @@ type fakeBatchTx struct { rangeRespc chan rangeResp } +func (b *fakeBatchTx) LockWithoutHook() {} func (b *fakeBatchTx) Lock() {} func (b *fakeBatchTx) Unlock() {} func (b *fakeBatchTx) RLock() {} @@ -924,6 +925,7 @@ func (b *fakeBackend) Snapshot() backend.Snapshot func (b *fakeBackend) ForceCommit() {} func (b *fakeBackend) Defrag() error { return nil } func (b *fakeBackend) Close() error { return nil } +func (b *fakeBackend) SetTxPostLockHook(func()) {} type indexGetResp struct { rev revision diff --git a/server/storage/schema/alarm.go b/server/storage/schema/alarm.go index 605bb3a0bfd..09a49994da4 100644 --- a/server/storage/schema/alarm.go +++ b/server/storage/schema/alarm.go @@ -34,7 +34,7 @@ func NewAlarmBackend(lg *zap.Logger, be backend.Backend) *alarmBackend { func (s *alarmBackend) CreateAlarmBucket() { tx := s.be.BatchTx() - tx.Lock() + tx.LockWithoutHook() defer tx.Unlock() tx.UnsafeCreateBucket(Alarm) } diff --git a/server/storage/schema/auth.go b/server/storage/schema/auth.go index 93ef34c371e..fc334a8bcf9 100644 --- a/server/storage/schema/auth.go +++ b/server/storage/schema/auth.go @@ -49,7 +49,7 @@ func NewAuthBackend(lg *zap.Logger, be backend.Backend) *authBackend { func (abe *authBackend) CreateAuthBuckets() { tx := abe.be.BatchTx() - tx.Lock() + tx.LockWithoutHook() defer tx.Unlock() tx.UnsafeCreateBucket(Auth) tx.UnsafeCreateBucket(AuthUsers) diff --git a/server/storage/schema/cindex.go b/server/storage/schema/cindex.go index d7b06b9cef7..38eea6f9179 100644 --- a/server/storage/schema/cindex.go +++ b/server/storage/schema/cindex.go @@ -26,7 +26,7 @@ func UnsafeCreateMetaBucket(tx backend.BatchTx) { // CreateMetaBucket creates the `meta` bucket (if it does not exists yet). func CreateMetaBucket(tx backend.BatchTx) { - tx.Lock() + tx.LockWithoutHook() defer tx.Unlock() tx.UnsafeCreateBucket(Meta) } @@ -51,8 +51,8 @@ func UnsafeReadConsistentIndex(tx backend.ReadTx) (uint64, uint64) { // ReadConsistentIndex loads consistent index and term from given transaction. // returns 0 if the data are not found. func ReadConsistentIndex(tx backend.ReadTx) (uint64, uint64) { - tx.Lock() - defer tx.Unlock() + tx.RLock() + defer tx.RUnlock() return UnsafeReadConsistentIndex(tx) } diff --git a/server/storage/schema/membership.go b/server/storage/schema/membership.go index 844b50a85a4..153699e694a 100644 --- a/server/storage/schema/membership.go +++ b/server/storage/schema/membership.go @@ -61,7 +61,7 @@ func (s *membershipBackend) MustSaveMemberToBackend(m *membership.Member) { // from the v3 backend. func (s *membershipBackend) TrimClusterFromBackend() error { tx := s.be.BatchTx() - tx.Lock() + tx.LockWithoutHook() defer tx.Unlock() tx.UnsafeDeleteBucket(Cluster) return nil @@ -121,7 +121,7 @@ func (s *membershipBackend) readMembersFromBackend() (map[types.ID]*membership.M func (s *membershipBackend) TrimMembershipFromBackend() error { s.lg.Info("Trimming membership information from the backend...") tx := s.be.BatchTx() - tx.Lock() + tx.LockWithoutHook() defer tx.Unlock() err := tx.UnsafeForEach(Members, func(k, v []byte) error { tx.UnsafeDelete(Members, k) @@ -167,7 +167,7 @@ func (s *membershipBackend) MustSaveDowngradeToBackend(downgrade *version.Downgr func (s *membershipBackend) MustCreateBackendBuckets() { tx := s.be.BatchTx() - tx.Lock() + tx.LockWithoutHook() defer tx.Unlock() tx.UnsafeCreateBucket(Members) tx.UnsafeCreateBucket(MembersRemoved) diff --git a/server/storage/schema/migration.go b/server/storage/schema/migration.go index 47734b4b851..e1e44dab5f9 100644 --- a/server/storage/schema/migration.go +++ b/server/storage/schema/migration.go @@ -49,7 +49,7 @@ func newPlan(lg *zap.Logger, current semver.Version, target semver.Version) (pla } func (p migrationPlan) Execute(lg *zap.Logger, tx backend.BatchTx) error { - tx.Lock() + tx.LockWithoutHook() defer tx.Unlock() return p.unsafeExecute(lg, tx) } @@ -90,7 +90,7 @@ func newMigrationStep(v semver.Version, isUpgrade bool, changes []schemaChange) // execute runs actions required to migrate etcd storage between two minor versions. func (s migrationStep) execute(lg *zap.Logger, tx backend.BatchTx) error { - tx.Lock() + tx.LockWithoutHook() defer tx.Unlock() return s.unsafeExecute(lg, tx) } diff --git a/server/storage/schema/schema.go b/server/storage/schema/schema.go index 850b55d5bd5..2b4c15f29c8 100644 --- a/server/storage/schema/schema.go +++ b/server/storage/schema/schema.go @@ -31,7 +31,7 @@ var ( // Validate checks provided backend to confirm that schema used is supported. func Validate(lg *zap.Logger, tx backend.BatchTx) error { - tx.Lock() + tx.LockWithoutHook() defer tx.Unlock() return unsafeValidate(lg, tx) } @@ -60,7 +60,7 @@ type WALVersion interface { // Migrate updates storage schema to provided target version. // Downgrading requires that provided WAL doesn't contain unsupported entries. func Migrate(lg *zap.Logger, tx backend.BatchTx, w WALVersion, target semver.Version) error { - tx.Lock() + tx.LockWithoutHook() defer tx.Unlock() return UnsafeMigrate(lg, tx, w, target) } @@ -89,8 +89,8 @@ func UnsafeMigrate(lg *zap.Logger, tx backend.BatchTx, w WALVersion, target semv // * v3.5 will return it's version if it includes all storage fields added in v3.5 (might require a snapshot). // * v3.4 and older is not supported and will return error. func DetectSchemaVersion(lg *zap.Logger, tx backend.ReadTx) (v semver.Version, err error) { - tx.Lock() - defer tx.Unlock() + tx.RLock() + defer tx.RUnlock() return UnsafeDetectSchemaVersion(lg, tx) } diff --git a/server/verify/verify.go b/server/verify/verify.go index 9402e5eb5ea..2b73fbc07b2 100644 --- a/server/verify/verify.go +++ b/server/verify/verify.go @@ -108,8 +108,7 @@ func MustVerifyIfEnabled(cfg Config) { } func validateConsistentIndex(cfg Config, hardstate *raftpb.HardState, snapshot *walpb.Snapshot, be backend.Backend) error { - tx := be.BatchTx() - index, term := schema.ReadConsistentIndex(tx) + index, term := schema.ReadConsistentIndex(be.ReadTx()) if cfg.ExactIndex && index != hardstate.Commit { return fmt.Errorf("backend.ConsistentIndex (%v) expected == WAL.HardState.commit (%v)", index, hardstate.Commit) } From a4c5da844d732e9308646c117adc45b654608427 Mon Sep 17 00:00:00 2001 From: ahrtr Date: Thu, 31 Mar 2022 17:06:48 +0800 Subject: [PATCH 2/6] added detailed comment to explain the difference between Lock and LockWithoutHook --- server/storage/backend/batch_tx.go | 1 - 1 file changed, 1 deletion(-) diff --git a/server/storage/backend/batch_tx.go b/server/storage/backend/batch_tx.go index a27266bb22a..8628d9aaa15 100644 --- a/server/storage/backend/batch_tx.go +++ b/server/storage/backend/batch_tx.go @@ -55,7 +55,6 @@ type BatchTx interface { CommitAndStop() LockInsideApply() LockOutsideApply() - } type batchTx struct { From 7ac995cdde6ae56beba93b040fe231dfca03c38d Mon Sep 17 00:00:00 2001 From: ahrtr Date: Sat, 2 Apr 2022 06:02:22 +0800 Subject: [PATCH 3/6] enhanced authBackend to support authReadTx --- server/auth/range_perm_cache.go | 4 +- server/auth/store.go | 12 ++++-- server/auth/store_mock_test.go | 4 ++ server/etcdserver/cindex/cindex.go | 2 +- server/etcdserver/server.go | 5 ++- server/storage/schema/auth.go | 38 +++++++++++++++--- server/storage/schema/auth_roles.go | 60 +++++++++++++++++------------ server/storage/schema/auth_users.go | 50 ++++++++++++++---------- 8 files changed, 118 insertions(+), 57 deletions(-) diff --git a/server/auth/range_perm_cache.go b/server/auth/range_perm_cache.go index bae07ef5242..2ebe5439b58 100644 --- a/server/auth/range_perm_cache.go +++ b/server/auth/range_perm_cache.go @@ -20,7 +20,7 @@ import ( "go.uber.org/zap" ) -func getMergedPerms(tx AuthBatchTx, userName string) *unifiedRangePermissions { +func getMergedPerms(tx AuthReadTx, userName string) *unifiedRangePermissions { user := tx.UnsafeGetUser(userName) if user == nil { return nil @@ -103,7 +103,7 @@ func checkKeyPoint(lg *zap.Logger, cachedPerms *unifiedRangePermissions, key []b return false } -func (as *authStore) isRangeOpPermitted(tx AuthBatchTx, userName string, key, rangeEnd []byte, permtyp authpb.Permission_Type) bool { +func (as *authStore) isRangeOpPermitted(tx AuthReadTx, userName string, key, rangeEnd []byte, permtyp authpb.Permission_Type) bool { // assumption: tx is Lock()ed _, ok := as.rangePermCache[userName] if !ok { diff --git a/server/auth/store.go b/server/auth/store.go index 408b235babd..762caecd780 100644 --- a/server/auth/store.go +++ b/server/auth/store.go @@ -196,6 +196,7 @@ type TokenProvider interface { type AuthBackend interface { CreateAuthBuckets() ForceCommit() + ReadTx() AuthReadTx BatchTx() AuthBatchTx GetUser(string) *authpb.User @@ -345,7 +346,7 @@ func (as *authStore) CheckPassword(username, password string) (uint64, error) { // CompareHashAndPassword is very expensive, so we use closures // to avoid putting it in the critical section of the tx lock. revision, err := func() (uint64, error) { - tx := as.be.BatchTx() + tx := as.be.ReadTx() tx.Lock() defer tx.Unlock() @@ -855,7 +856,7 @@ func (as *authStore) isOpPermitted(userName string, revision uint64, key, rangeE return ErrAuthOldRevision } - tx := as.be.BatchTx() + tx := as.be.ReadTx() tx.Lock() defer tx.Unlock() @@ -897,7 +898,10 @@ func (as *authStore) IsAdminPermitted(authInfo *AuthInfo) error { return ErrUserEmpty } - u := as.be.GetUser(authInfo.Username) + tx := as.be.ReadTx() + tx.Lock() + defer tx.Unlock() + u := tx.UnsafeGetUser(authInfo.Username) if u == nil { return ErrUserNotFound @@ -935,6 +939,8 @@ func NewAuthStore(lg *zap.Logger, be AuthBackend, tp TokenProvider, bcryptCost i be.CreateAuthBuckets() tx := be.BatchTx() + // We should call LockWithoutHook here, but the txPostLockHoos isn't set + // to EtcdServer yet, so it's OK. tx.Lock() enabled := tx.UnsafeReadAuthEnabled() as := &authStore{ diff --git a/server/auth/store_mock_test.go b/server/auth/store_mock_test.go index d49f8dd333f..39c3f6d139a 100644 --- a/server/auth/store_mock_test.go +++ b/server/auth/store_mock_test.go @@ -36,6 +36,10 @@ func (b *backendMock) CreateAuthBuckets() { func (b *backendMock) ForceCommit() { } +func (b *backendMock) ReadTx() AuthReadTx { + return &txMock{be: b} +} + func (b *backendMock) BatchTx() AuthBatchTx { return &txMock{be: b} } diff --git a/server/etcdserver/cindex/cindex.go b/server/etcdserver/cindex/cindex.go index 7ec1b121283..6367967f875 100644 --- a/server/etcdserver/cindex/cindex.go +++ b/server/etcdserver/cindex/cindex.go @@ -89,7 +89,7 @@ func (ci *consistentIndex) UnsafeConsistentIndex() uint64 { return index } - v, term := schema.UnsafeReadConsistentIndex(ci.be.BatchTx()) + v, term := schema.UnsafeReadConsistentIndex(ci.be.ReadTx()) ci.SetConsistentIndex(v, term) return v } diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index b22f680bb46..6a89f4592f8 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -343,7 +343,6 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { srv.applyV2 = NewApplierV2(cfg.Logger, srv.v2store, srv.cluster) srv.be = b.storage.backend.be - srv.be.SetTxPostLockHook(srv.getTxPostLockHook()) srv.beHooks = b.storage.backend.beHooks minTTL := time.Duration((3*cfg.ElectionTicks)/2) * heartbeat @@ -404,6 +403,10 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { }) } + // Set the hook after EtcdServer finishes the initialization to avoid + // the hook being called during the initialization process. + srv.be.SetTxPostLockHook(srv.getTxPostLockHook()) + // TODO: move transport initialization near the definition of remote tr := &rafthttp.Transport{ Logger: cfg.Logger, diff --git a/server/storage/schema/auth.go b/server/storage/schema/auth.go index fc334a8bcf9..3956ca782f9 100644 --- a/server/storage/schema/auth.go +++ b/server/storage/schema/auth.go @@ -60,15 +60,25 @@ func (abe *authBackend) ForceCommit() { abe.be.ForceCommit() } +func (abe *authBackend) ReadTx() auth.AuthReadTx { + return &authReadTx{tx: abe.be.ReadTx(), lg: abe.lg} +} + func (abe *authBackend) BatchTx() auth.AuthBatchTx { return &authBatchTx{tx: abe.be.BatchTx(), lg: abe.lg} } +type authReadTx struct { + tx backend.ReadTx + lg *zap.Logger +} + type authBatchTx struct { tx backend.BatchTx lg *zap.Logger } +var _ auth.AuthReadTx = (*authReadTx)(nil) var _ auth.AuthBatchTx = (*authBatchTx)(nil) func (atx *authBatchTx) UnsafeSaveAuthEnabled(enabled bool) { @@ -86,6 +96,24 @@ func (atx *authBatchTx) UnsafeSaveAuthRevision(rev uint64) { } func (atx *authBatchTx) UnsafeReadAuthEnabled() bool { + arx := &authReadTx{tx: atx.tx, lg: atx.lg} + return arx.UnsafeReadAuthEnabled() +} + +func (atx *authBatchTx) UnsafeReadAuthRevision() uint64 { + arx := &authReadTx{tx: atx.tx, lg: atx.lg} + return arx.UnsafeReadAuthRevision() +} + +func (atx *authBatchTx) Lock() { + atx.tx.Lock() +} + +func (atx *authBatchTx) Unlock() { + atx.tx.Unlock() +} + +func (atx *authReadTx) UnsafeReadAuthEnabled() bool { _, vs := atx.tx.UnsafeRange(Auth, AuthEnabledKeyName, nil, 0) if len(vs) == 1 { if bytes.Equal(vs[0], authEnabled) { @@ -95,7 +123,7 @@ func (atx *authBatchTx) UnsafeReadAuthEnabled() bool { return false } -func (atx *authBatchTx) UnsafeReadAuthRevision() uint64 { +func (atx *authReadTx) UnsafeReadAuthRevision() uint64 { _, vs := atx.tx.UnsafeRange(Auth, AuthRevisionKeyName, nil, 0) if len(vs) != 1 { // this can happen in the initialization phase @@ -104,10 +132,10 @@ func (atx *authBatchTx) UnsafeReadAuthRevision() uint64 { return binary.BigEndian.Uint64(vs[0]) } -func (atx *authBatchTx) Lock() { - atx.tx.Lock() +func (atx *authReadTx) Lock() { + atx.tx.RLock() } -func (atx *authBatchTx) Unlock() { - atx.tx.Unlock() +func (atx *authReadTx) Unlock() { + atx.tx.RUnlock() } diff --git a/server/storage/schema/auth_roles.go b/server/storage/schema/auth_roles.go index 541e37b7191..dfda7ce5b7b 100644 --- a/server/storage/schema/auth_roles.go +++ b/server/storage/schema/auth_roles.go @@ -32,6 +32,40 @@ func (abe *authBackend) GetRole(roleName string) *authpb.Role { } func (atx *authBatchTx) UnsafeGetRole(roleName string) *authpb.Role { + arx := &authReadTx{tx: atx.tx, lg: atx.lg} + return arx.UnsafeGetRole(roleName) +} + +func (abe *authBackend) GetAllRoles() []*authpb.Role { + tx := abe.BatchTx() + tx.Lock() + defer tx.Unlock() + return tx.UnsafeGetAllRoles() +} + +func (atx *authBatchTx) UnsafeGetAllRoles() []*authpb.Role { + arx := &authReadTx{tx: atx.tx, lg: atx.lg} + return arx.UnsafeGetAllRoles() +} + +func (atx *authBatchTx) UnsafePutRole(role *authpb.Role) { + b, err := role.Marshal() + if err != nil { + atx.lg.Panic( + "failed to marshal 'authpb.Role'", + zap.String("role-name", string(role.Name)), + zap.Error(err), + ) + } + + atx.tx.UnsafePut(AuthRoles, role.Name, b) +} + +func (atx *authBatchTx) UnsafeDeleteRole(rolename string) { + atx.tx.UnsafeDelete(AuthRoles, []byte(rolename)) +} + +func (atx *authReadTx) UnsafeGetRole(roleName string) *authpb.Role { _, vs := atx.tx.UnsafeRange(AuthRoles, []byte(roleName), nil, 0) if len(vs) == 0 { return nil @@ -45,14 +79,7 @@ func (atx *authBatchTx) UnsafeGetRole(roleName string) *authpb.Role { return role } -func (abe *authBackend) GetAllRoles() []*authpb.Role { - tx := abe.BatchTx() - tx.Lock() - defer tx.Unlock() - return tx.UnsafeGetAllRoles() -} - -func (atx *authBatchTx) UnsafeGetAllRoles() []*authpb.Role { +func (atx *authReadTx) UnsafeGetAllRoles() []*authpb.Role { _, vs := atx.tx.UnsafeRange(AuthRoles, []byte{0}, []byte{0xff}, -1) if len(vs) == 0 { return nil @@ -69,20 +96,3 @@ func (atx *authBatchTx) UnsafeGetAllRoles() []*authpb.Role { } return roles } - -func (atx *authBatchTx) UnsafePutRole(role *authpb.Role) { - b, err := role.Marshal() - if err != nil { - atx.lg.Panic( - "failed to marshal 'authpb.Role'", - zap.String("role-name", string(role.Name)), - zap.Error(err), - ) - } - - atx.tx.UnsafePut(AuthRoles, role.Name, b) -} - -func (atx *authBatchTx) UnsafeDeleteRole(rolename string) { - atx.tx.UnsafeDelete(AuthRoles, []byte(rolename)) -} diff --git a/server/storage/schema/auth_users.go b/server/storage/schema/auth_users.go index f385afa5122..c3e7a92ff39 100644 --- a/server/storage/schema/auth_users.go +++ b/server/storage/schema/auth_users.go @@ -27,6 +27,35 @@ func (abe *authBackend) GetUser(username string) *authpb.User { } func (atx *authBatchTx) UnsafeGetUser(username string) *authpb.User { + arx := &authReadTx{tx: atx.tx, lg: atx.lg} + return arx.UnsafeGetUser(username) +} + +func (abe *authBackend) GetAllUsers() []*authpb.User { + tx := abe.BatchTx() + tx.Lock() + defer tx.Unlock() + return tx.UnsafeGetAllUsers() +} + +func (atx *authBatchTx) UnsafeGetAllUsers() []*authpb.User { + arx := &authReadTx{tx: atx.tx, lg: atx.lg} + return arx.UnsafeGetAllUsers() +} + +func (atx *authBatchTx) UnsafePutUser(user *authpb.User) { + b, err := user.Marshal() + if err != nil { + atx.lg.Panic("failed to unmarshal 'authpb.User'", zap.Error(err)) + } + atx.tx.UnsafePut(AuthUsers, user.Name, b) +} + +func (atx *authBatchTx) UnsafeDeleteUser(username string) { + atx.tx.UnsafeDelete(AuthUsers, []byte(username)) +} + +func (atx *authReadTx) UnsafeGetUser(username string) *authpb.User { _, vs := atx.tx.UnsafeRange(AuthUsers, []byte(username), nil, 0) if len(vs) == 0 { return nil @@ -44,14 +73,7 @@ func (atx *authBatchTx) UnsafeGetUser(username string) *authpb.User { return user } -func (abe *authBackend) GetAllUsers() []*authpb.User { - tx := abe.BatchTx() - tx.Lock() - defer tx.Unlock() - return tx.UnsafeGetAllUsers() -} - -func (atx *authBatchTx) UnsafeGetAllUsers() []*authpb.User { +func (atx *authReadTx) UnsafeGetAllUsers() []*authpb.User { _, vs := atx.tx.UnsafeRange(AuthUsers, []byte{0}, []byte{0xff}, -1) if len(vs) == 0 { return nil @@ -68,15 +90,3 @@ func (atx *authBatchTx) UnsafeGetAllUsers() []*authpb.User { } return users } - -func (atx *authBatchTx) UnsafePutUser(user *authpb.User) { - b, err := user.Marshal() - if err != nil { - atx.lg.Panic("failed to unmarshal 'authpb.User'", zap.Error(err)) - } - atx.tx.UnsafePut(AuthUsers, user.Name, b) -} - -func (atx *authBatchTx) UnsafeDeleteUser(username string) { - atx.tx.UnsafeDelete(AuthUsers, []byte(username)) -} From 47038593e9eba0993156a6ddef2e1500758685a1 Mon Sep 17 00:00:00 2001 From: ahrtr Date: Mon, 4 Apr 2022 19:56:38 +0800 Subject: [PATCH 4/6] set the consistent_index directly when applyV3 isn't performed --- server/etcdserver/server.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 6a89f4592f8..612454227d6 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -1815,6 +1815,14 @@ func (s *EtcdServer) apply( // applyEntryNormal applies an EntryNormal type raftpb request to the EtcdServer func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { shouldApplyV3 := membership.ApplyV2storeOnly + applyV3Performed := false + defer func() { + // The txPostLock callback will not get called in this case, + // so we should set the consistent index directly. + if s.consistIndex != nil && !applyV3Performed && membership.ApplyBoth == shouldApplyV3 { + s.consistIndex.SetConsistentIndex(e.Index, e.Term) + } + }() index := s.consistIndex.ConsistentIndex() if e.Index > index { // set the consistent index of current executing entry @@ -1870,6 +1878,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { if !needResult && raftReq.Txn != nil { removeNeedlessRangeReqs(raftReq.Txn) } + applyV3Performed = true ar = s.applyV3.Apply(&raftReq, shouldApplyV3) } @@ -1912,6 +1921,12 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con if err := s.cluster.ValidateConfigurationChange(cc); err != nil { cc.NodeID = raft.None s.r.ApplyConfChange(cc) + + // The txPostLock callback will not get called in this case, + // so we should set the consistent index directly. + if s.consistIndex != nil && membership.ApplyBoth == shouldApplyV3 { + s.consistIndex.SetConsistentIndex(s.consistentIdx, s.consistentTerm) + } return false, err } From e155e50886316b8ff6c0d3e6362406ca2fb11a7d Mon Sep 17 00:00:00 2001 From: ahrtr Date: Wed, 6 Apr 2022 05:07:07 +0800 Subject: [PATCH 5/6] rename LockWithoutHook to LockOutsideApply and add LockInsideApply --- etcdutl/etcdutl/backup_command.go | 2 +- etcdutl/etcdutl/migrate_command.go | 2 +- server/auth/store.go | 4 +-- server/etcdserver/adapters.go | 2 +- server/etcdserver/bootstrap.go | 2 +- server/etcdserver/cindex/cindex.go | 4 +-- server/etcdserver/server.go | 4 +-- server/lease/lessor.go | 4 +-- server/storage/backend/backend.go | 10 +++---- server/storage/backend/batch_tx.go | 30 ++++++++++++-------- server/storage/backend/hooks_test.go | 2 -- server/storage/backend/verify.go | 14 ++++++++++ server/storage/backend/verify_test.go | 34 ++++++++++++++++++----- server/storage/mvcc/kvstore.go | 4 +-- server/storage/mvcc/kvstore_compaction.go | 2 +- server/storage/mvcc/kvstore_test.go | 11 ++++---- server/storage/mvcc/kvstore_txn.go | 2 +- server/storage/mvcc/store.go | 4 +-- server/storage/schema/alarm.go | 6 ++-- server/storage/schema/auth.go | 4 +-- server/storage/schema/cindex.go | 2 +- server/storage/schema/membership.go | 14 +++++----- server/storage/schema/migration.go | 4 +-- server/storage/schema/schema.go | 8 +++--- server/storage/schema/schema_test.go | 2 +- 25 files changed, 106 insertions(+), 71 deletions(-) diff --git a/etcdutl/etcdutl/backup_command.go b/etcdutl/etcdutl/backup_command.go index 49b6a93101d..e0d53fc1b1f 100644 --- a/etcdutl/etcdutl/backup_command.go +++ b/etcdutl/etcdutl/backup_command.go @@ -322,7 +322,7 @@ func saveDB(lg *zap.Logger, destDB, srcDB string, idx uint64, term uint64, desir if !v3 { tx := be.BatchTx() - tx.Lock() + tx.LockOutsideApply() defer tx.Unlock() schema.UnsafeCreateMetaBucket(tx) schema.UnsafeUpdateConsistentIndex(tx, idx, term, false) diff --git a/etcdutl/etcdutl/migrate_command.go b/etcdutl/etcdutl/migrate_command.go index 195576e313a..87b10664ff4 100644 --- a/etcdutl/etcdutl/migrate_command.go +++ b/etcdutl/etcdutl/migrate_command.go @@ -140,7 +140,7 @@ func migrateCommandFunc(c *migrateConfig) error { } func migrateForce(lg *zap.Logger, tx backend.BatchTx, target *semver.Version) { - tx.LockWithoutHook() + tx.LockOutsideApply() defer tx.Unlock() // Storage version is only supported since v3.6 if target.LessThan(schema.V3_6) { diff --git a/server/auth/store.go b/server/auth/store.go index 762caecd780..2d978a01141 100644 --- a/server/auth/store.go +++ b/server/auth/store.go @@ -374,7 +374,7 @@ func (as *authStore) CheckPassword(username, password string) (uint64, error) { func (as *authStore) Recover(be AuthBackend) { as.be = be - tx := be.BatchTx() + tx := be.ReadTx() tx.Lock() enabled := tx.UnsafeReadAuthEnabled() @@ -939,7 +939,7 @@ func NewAuthStore(lg *zap.Logger, be AuthBackend, tp TokenProvider, bcryptCost i be.CreateAuthBuckets() tx := be.BatchTx() - // We should call LockWithoutHook here, but the txPostLockHoos isn't set + // We should call LockOutsideApply here, but the txPostLockHoos isn't set // to EtcdServer yet, so it's OK. tx.Lock() enabled := tx.UnsafeReadAuthEnabled() diff --git a/server/etcdserver/adapters.go b/server/etcdserver/adapters.go index 5f1bcfef119..d875cf14efe 100644 --- a/server/etcdserver/adapters.go +++ b/server/etcdserver/adapters.go @@ -94,7 +94,7 @@ func (s *serverVersionAdapter) UpdateStorageVersion(target semver.Version) error defer s.bemu.RUnlock() tx := s.be.BatchTx() - tx.LockWithoutHook() + tx.LockOutsideApply() defer tx.Unlock() return schema.UnsafeMigrate(s.lg, tx, s.r.storage, target) } diff --git a/server/etcdserver/bootstrap.go b/server/etcdserver/bootstrap.go index 857e7afa6ad..43605be5e62 100644 --- a/server/etcdserver/bootstrap.go +++ b/server/etcdserver/bootstrap.go @@ -231,7 +231,7 @@ func bootstrapBackend(cfg config.ServerConfig, haveWAL bool, st v2store.Store, s } } if beExist { - err = schema.Validate(cfg.Logger, be.BatchTx()) + err = schema.Validate(cfg.Logger, be.ReadTx()) if err != nil { cfg.Logger.Error("Failed to validate schema", zap.Error(err)) return nil, err diff --git a/server/etcdserver/cindex/cindex.go b/server/etcdserver/cindex/cindex.go index 6367967f875..91046cd0370 100644 --- a/server/etcdserver/cindex/cindex.go +++ b/server/etcdserver/cindex/cindex.go @@ -82,8 +82,6 @@ func (ci *consistentIndex) ConsistentIndex() uint64 { return v } -// UnsafeConsistentIndex is similar to ConsistentIndex, -// but it shouldn't lock the transaction. func (ci *consistentIndex) UnsafeConsistentIndex() uint64 { if index := atomic.LoadUint64(&ci.consistentIndex); index > 0 { return index @@ -134,7 +132,7 @@ func (f *fakeConsistentIndex) UnsafeSave(_ backend.BatchTx) {} func (f *fakeConsistentIndex) SetBackend(_ Backend) {} func UpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64, onlyGrow bool) { - tx.LockWithoutHook() + tx.LockOutsideApply() defer tx.Unlock() schema.UnsafeUpdateConsistentIndex(tx, index, term, onlyGrow) } diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 612454227d6..015bcaf6f0a 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -405,7 +405,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { // Set the hook after EtcdServer finishes the initialization to avoid // the hook being called during the initialization process. - srv.be.SetTxPostLockHook(srv.getTxPostLockHook()) + srv.be.SetTxPostLockInsideApplyHook(srv.getTxPostLockHook()) // TODO: move transport initialization near the definition of remote tr := &rafthttp.Transport{ @@ -984,7 +984,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { } s.consistIndex.SetBackend(newbe) - newbe.SetTxPostLockHook(s.getTxPostLockHook()) + newbe.SetTxPostLockInsideApplyHook(s.getTxPostLockHook()) lg.Info("restored mvcc store", zap.Uint64("consistent-index", s.consistIndex.ConsistentIndex())) diff --git a/server/lease/lessor.go b/server/lease/lessor.go index 4af816c76bb..931cb3d090d 100644 --- a/server/lease/lessor.go +++ b/server/lease/lessor.go @@ -797,7 +797,7 @@ func (le *lessor) findDueScheduledCheckpoints(checkpointLimit int) []*pb.LeaseCh func (le *lessor) initAndRecover() { tx := le.b.BatchTx() - tx.LockWithoutHook() + tx.LockOutsideApply() schema.UnsafeCreateLeaseBucket(tx) lpbs := schema.MustUnsafeGetAllLeases(tx) tx.Unlock() @@ -845,7 +845,7 @@ func (l *Lease) expired() bool { func (l *Lease) persistTo(b backend.Backend) { lpb := leasepb.Lease{ID: int64(l.ID), TTL: l.ttl, RemainingTTL: l.remainingTTL} tx := b.BatchTx() - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() schema.MustUnsafePutLease(tx, &lpb) } diff --git a/server/storage/backend/backend.go b/server/storage/backend/backend.go index f949f282b70..ebb99ee2c34 100644 --- a/server/storage/backend/backend.go +++ b/server/storage/backend/backend.go @@ -68,8 +68,8 @@ type Backend interface { ForceCommit() Close() error - // SetTxPostLockHook sets a txPostLockHook. - SetTxPostLockHook(func()) + // SetTxPostLockInsideApplyHook sets a txPostLockHook. + SetTxPostLockInsideApplyHook(func()) } type Snapshot interface { @@ -233,10 +233,10 @@ func (b *backend) BatchTx() BatchTx { return b.batchTx } -func (b *backend) SetTxPostLockHook(hook func()) { +func (b *backend) SetTxPostLockInsideApplyHook(hook func()) { // It needs to lock the batchTx, because the periodic commit // may be accessing the txPostLockHook at the moment. - b.batchTx.LockWithoutHook() + b.batchTx.lock() defer b.batchTx.Unlock() b.txPostLockHook = hook } @@ -452,7 +452,7 @@ func (b *backend) defrag() error { // TODO: make this non-blocking? // lock batchTx to ensure nobody is using previous tx, and then // close previous ongoing tx. - b.batchTx.LockWithoutHook() + b.batchTx.LockOutsideApply() defer b.batchTx.Unlock() // lock database after lock tx to avoid deadlock. diff --git a/server/storage/backend/batch_tx.go b/server/storage/backend/batch_tx.go index 8628d9aaa15..7eca835fd22 100644 --- a/server/storage/backend/batch_tx.go +++ b/server/storage/backend/batch_tx.go @@ -65,25 +65,31 @@ type batchTx struct { pending int } +// Lock is supposed to be called only by the unit test. func (t *batchTx) Lock() { - t.LockWithoutHook() - if t.backend.txPostLockHook != nil { - t.backend.txPostLockHook() - } + ValidateCalledInsideUnittest(t.backend.lg) + t.lock() } -func (t *batchTx) LockWithoutHook() { +func (t *batchTx) lock() { t.Mutex.Lock() } func (t *batchTx) LockInsideApply() { - ValidateCalledInsideApply(t.backend.lg) - t.Lock() + t.lock() + if t.backend.txPostLockHook != nil { + // The callers of some methods (i.e., (*RaftCluster).AddMember) + // can be coming from both InsideApply and OutsideApply, but the + // callers from OutsideApply will have a nil txPostLockHook. So we + // should check the txPostLockHook before validating the callstack. + ValidateCalledInsideApply(t.backend.lg) + t.backend.txPostLockHook() + } } func (t *batchTx) LockOutsideApply() { ValidateCalledOutSideApply(t.backend.lg) - t.Lock() + t.lock() } func (t *batchTx) Unlock() { @@ -233,14 +239,14 @@ func unsafeForEach(tx *bolt.Tx, bucket Bucket, visitor func(k, v []byte) error) // Commit commits a previous tx and begins a new writable one. func (t *batchTx) Commit() { - t.LockWithoutHook() + t.lock() t.commit(false) t.Unlock() } // CommitAndStop commits the previous tx and does not create a new one. func (t *batchTx) CommitAndStop() { - t.LockWithoutHook() + t.lock() t.commit(true) t.Unlock() } @@ -310,13 +316,13 @@ func (t *batchTxBuffered) Unlock() { } func (t *batchTxBuffered) Commit() { - t.LockWithoutHook() + t.lock() t.commit(false) t.Unlock() } func (t *batchTxBuffered) CommitAndStop() { - t.LockWithoutHook() + t.lock() t.commit(true) t.Unlock() } diff --git a/server/storage/backend/hooks_test.go b/server/storage/backend/hooks_test.go index 76646448489..b77efbba492 100644 --- a/server/storage/backend/hooks_test.go +++ b/server/storage/backend/hooks_test.go @@ -41,8 +41,6 @@ func TestBackendPreCommitHook(t *testing.T) { // Empty commit. tx.Commit() - write(tx, []byte("foo"), []byte("bar")) - assert.Equal(t, ">cc", getCommitsKey(t, be), "expected 2 explict commits") tx.Commit() assert.Equal(t, ">ccc", getCommitsKey(t, be), "expected 3 explict commits") diff --git a/server/storage/backend/verify.go b/server/storage/backend/verify.go index 2f3dc0221c6..a6a0b8675ec 100644 --- a/server/storage/backend/verify.go +++ b/server/storage/backend/verify.go @@ -46,6 +46,15 @@ func ValidateCalledOutSideApply(lg *zap.Logger) { } } +func ValidateCalledInsideUnittest(lg *zap.Logger) { + if !verifyLockEnabled() { + return + } + if !insideUnittest() { + lg.Fatal("Lock called outside of unit test!", zap.Stack("stacktrace")) + } +} + func verifyLockEnabled() bool { return os.Getenv(ENV_VERIFY) == ENV_VERIFY_ALL_VALUE || os.Getenv(ENV_VERIFY) == ENV_VERIFY_LOCK } @@ -54,3 +63,8 @@ func insideApply() bool { stackTraceStr := string(debug.Stack()) return strings.Contains(stackTraceStr, ".applyEntries") } + +func insideUnittest() bool { + stackTraceStr := string(debug.Stack()) + return strings.Contains(stackTraceStr, "_test.go") && !strings.Contains(stackTraceStr, "tests/") +} diff --git a/server/storage/backend/verify_test.go b/server/storage/backend/verify_test.go index 08efb392104..2345f46b55f 100644 --- a/server/storage/backend/verify_test.go +++ b/server/storage/backend/verify_test.go @@ -15,7 +15,6 @@ package backend_test import ( - "fmt" "os" "testing" "time" @@ -26,40 +25,60 @@ import ( func TestLockVerify(t *testing.T) { tcs := []struct { - insideApply bool - lock func(tx backend.BatchTx) - expectPanic bool + name string + insideApply bool + lock func(tx backend.BatchTx) + txPostLockHook func() + expectPanic bool }{ { + name: "call lockInsideApply from inside apply", insideApply: true, lock: lockInsideApply, expectPanic: false, }, { + name: "call lockInsideApply from outside apply (without txPostLockHook)", insideApply: false, lock: lockInsideApply, - expectPanic: true, + expectPanic: false, + }, + { + name: "call lockInsideApply from outside apply (with txPostLockHook)", + insideApply: false, + lock: lockInsideApply, + txPostLockHook: func() {}, + expectPanic: true, }, { + name: "call lockOutsideApply from outside apply", insideApply: false, lock: lockOutsideApply, expectPanic: false, }, { + name: "call lockOutsideApply from inside apply", insideApply: true, lock: lockOutsideApply, expectPanic: true, }, + { + name: "call Lock from unit test", + insideApply: false, + lock: lockFromUT, + expectPanic: false, + }, } env := os.Getenv("ETCD_VERIFY") os.Setenv("ETCD_VERIFY", "lock") defer func() { os.Setenv("ETCD_VERIFY", env) }() - for i, tc := range tcs { - t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { be, _ := betesting.NewTmpBackend(t, time.Hour, 10000) + be.SetTxPostLockInsideApplyHook(tc.txPostLockHook) hasPaniced := handlePanic(func() { if tc.insideApply { @@ -89,3 +108,4 @@ func applyEntries(be backend.Backend, f func(tx backend.BatchTx)) { func lockInsideApply(tx backend.BatchTx) { tx.LockInsideApply() } func lockOutsideApply(tx backend.BatchTx) { tx.LockOutsideApply() } +func lockFromUT(tx backend.BatchTx) { tx.Lock() } diff --git a/server/storage/mvcc/kvstore.go b/server/storage/mvcc/kvstore.go index 9b79c090afc..074f1bea61b 100644 --- a/server/storage/mvcc/kvstore.go +++ b/server/storage/mvcc/kvstore.go @@ -121,7 +121,7 @@ func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfi } tx := s.b.BatchTx() - tx.LockWithoutHook() + tx.LockOutsideApply() tx.UnsafeCreateBucket(schema.Key) schema.UnsafeCreateMetaBucket(tx) tx.Unlock() @@ -331,7 +331,7 @@ func (s *store) restore() error { // restore index tx := s.b.BatchTx() - tx.LockWithoutHook() + tx.LockOutsideApply() finishedCompact, found := UnsafeReadFinishedCompact(tx) if found { diff --git a/server/storage/mvcc/kvstore_compaction.go b/server/storage/mvcc/kvstore_compaction.go index 941f056a921..849f73b9576 100644 --- a/server/storage/mvcc/kvstore_compaction.go +++ b/server/storage/mvcc/kvstore_compaction.go @@ -42,7 +42,7 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc start := time.Now() tx := s.b.BatchTx() - tx.LockWithoutHook() + tx.LockOutsideApply() keys, _ := tx.UnsafeRange(schema.Key, last, end, int64(batchNum)) for _, key := range keys { rev = bytesToRev(key) diff --git a/server/storage/mvcc/kvstore_test.go b/server/storage/mvcc/kvstore_test.go index 4a51e9c406b..2779f10b7bf 100644 --- a/server/storage/mvcc/kvstore_test.go +++ b/server/storage/mvcc/kvstore_test.go @@ -881,7 +881,8 @@ type fakeBatchTx struct { rangeRespc chan rangeResp } -func (b *fakeBatchTx) LockWithoutHook() {} +func (b *fakeBatchTx) LockInsideApply() {} +func (b *fakeBatchTx) LockOutsideApply() {} func (b *fakeBatchTx) Lock() {} func (b *fakeBatchTx) Unlock() {} func (b *fakeBatchTx) RLock() {} @@ -905,10 +906,8 @@ func (b *fakeBatchTx) UnsafeDelete(bucket backend.Bucket, key []byte) { func (b *fakeBatchTx) UnsafeForEach(bucket backend.Bucket, visitor func(k, v []byte) error) error { return nil } -func (b *fakeBatchTx) Commit() {} -func (b *fakeBatchTx) CommitAndStop() {} -func (b *fakeBatchTx) LockInsideApply() {} -func (b *fakeBatchTx) LockOutsideApply() {} +func (b *fakeBatchTx) Commit() {} +func (b *fakeBatchTx) CommitAndStop() {} type fakeBackend struct { tx *fakeBatchTx @@ -925,7 +924,7 @@ func (b *fakeBackend) Snapshot() backend.Snapshot func (b *fakeBackend) ForceCommit() {} func (b *fakeBackend) Defrag() error { return nil } func (b *fakeBackend) Close() error { return nil } -func (b *fakeBackend) SetTxPostLockHook(func()) {} +func (b *fakeBackend) SetTxPostLockInsideApplyHook(func()) {} type indexGetResp struct { rev revision diff --git a/server/storage/mvcc/kvstore_txn.go b/server/storage/mvcc/kvstore_txn.go index fb7a9ca1fa5..604fac78cb3 100644 --- a/server/storage/mvcc/kvstore_txn.go +++ b/server/storage/mvcc/kvstore_txn.go @@ -133,7 +133,7 @@ type storeTxnWrite struct { func (s *store) Write(trace *traceutil.Trace) TxnWrite { s.mu.RLock() tx := s.b.BatchTx() - tx.Lock() + tx.LockInsideApply() tw := &storeTxnWrite{ storeTxnRead: storeTxnRead{s, tx, 0, 0, trace}, tx: tx, diff --git a/server/storage/mvcc/store.go b/server/storage/mvcc/store.go index e530c82f4ec..a002ada7177 100644 --- a/server/storage/mvcc/store.go +++ b/server/storage/mvcc/store.go @@ -36,7 +36,7 @@ func UnsafeReadScheduledCompact(tx backend.ReadTx) (scheduledComact int64, found } func SetScheduledCompact(tx backend.BatchTx, value int64) { - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() UnsafeSetScheduledCompact(tx, value) } @@ -48,7 +48,7 @@ func UnsafeSetScheduledCompact(tx backend.BatchTx, value int64) { } func SetFinishedCompact(tx backend.BatchTx, value int64) { - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() UnsafeSetFinishedCompact(tx, value) } diff --git a/server/storage/schema/alarm.go b/server/storage/schema/alarm.go index 09a49994da4..825a8dbe0bf 100644 --- a/server/storage/schema/alarm.go +++ b/server/storage/schema/alarm.go @@ -34,14 +34,14 @@ func NewAlarmBackend(lg *zap.Logger, be backend.Backend) *alarmBackend { func (s *alarmBackend) CreateAlarmBucket() { tx := s.be.BatchTx() - tx.LockWithoutHook() + tx.LockOutsideApply() defer tx.Unlock() tx.UnsafeCreateBucket(Alarm) } func (s *alarmBackend) MustPutAlarm(alarm *etcdserverpb.AlarmMember) { tx := s.be.BatchTx() - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() s.mustUnsafePutAlarm(tx, alarm) } @@ -57,7 +57,7 @@ func (s *alarmBackend) mustUnsafePutAlarm(tx backend.BatchTx, alarm *etcdserverp func (s *alarmBackend) MustDeleteAlarm(alarm *etcdserverpb.AlarmMember) { tx := s.be.BatchTx() - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() s.mustUnsafeDeleteAlarm(tx, alarm) } diff --git a/server/storage/schema/auth.go b/server/storage/schema/auth.go index 3956ca782f9..aa695bb1d46 100644 --- a/server/storage/schema/auth.go +++ b/server/storage/schema/auth.go @@ -49,7 +49,7 @@ func NewAuthBackend(lg *zap.Logger, be backend.Backend) *authBackend { func (abe *authBackend) CreateAuthBuckets() { tx := abe.be.BatchTx() - tx.LockWithoutHook() + tx.LockOutsideApply() defer tx.Unlock() tx.UnsafeCreateBucket(Auth) tx.UnsafeCreateBucket(AuthUsers) @@ -106,7 +106,7 @@ func (atx *authBatchTx) UnsafeReadAuthRevision() uint64 { } func (atx *authBatchTx) Lock() { - atx.tx.Lock() + atx.tx.LockInsideApply() } func (atx *authBatchTx) Unlock() { diff --git a/server/storage/schema/cindex.go b/server/storage/schema/cindex.go index 38eea6f9179..7d215bac654 100644 --- a/server/storage/schema/cindex.go +++ b/server/storage/schema/cindex.go @@ -26,7 +26,7 @@ func UnsafeCreateMetaBucket(tx backend.BatchTx) { // CreateMetaBucket creates the `meta` bucket (if it does not exists yet). func CreateMetaBucket(tx backend.BatchTx) { - tx.LockWithoutHook() + tx.LockOutsideApply() defer tx.Unlock() tx.UnsafeCreateBucket(Meta) } diff --git a/server/storage/schema/membership.go b/server/storage/schema/membership.go index 153699e694a..a42353c4168 100644 --- a/server/storage/schema/membership.go +++ b/server/storage/schema/membership.go @@ -52,7 +52,7 @@ func (s *membershipBackend) MustSaveMemberToBackend(m *membership.Member) { } tx := s.be.BatchTx() - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() tx.UnsafePut(Members, mkey, mvalue) } @@ -61,7 +61,7 @@ func (s *membershipBackend) MustSaveMemberToBackend(m *membership.Member) { // from the v3 backend. func (s *membershipBackend) TrimClusterFromBackend() error { tx := s.be.BatchTx() - tx.LockWithoutHook() + tx.LockOutsideApply() defer tx.Unlock() tx.UnsafeDeleteBucket(Cluster) return nil @@ -71,7 +71,7 @@ func (s *membershipBackend) MustDeleteMemberFromBackend(id types.ID) { mkey := BackendMemberKey(id) tx := s.be.BatchTx() - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() tx.UnsafeDelete(Members, mkey) tx.UnsafePut(MembersRemoved, mkey, []byte("removed")) @@ -121,7 +121,7 @@ func (s *membershipBackend) readMembersFromBackend() (map[types.ID]*membership.M func (s *membershipBackend) TrimMembershipFromBackend() error { s.lg.Info("Trimming membership information from the backend...") tx := s.be.BatchTx() - tx.LockWithoutHook() + tx.LockOutsideApply() defer tx.Unlock() err := tx.UnsafeForEach(Members, func(k, v []byte) error { tx.UnsafeDelete(Members, k) @@ -146,7 +146,7 @@ func (s *membershipBackend) MustSaveClusterVersionToBackend(ver *semver.Version) ckey := ClusterClusterVersionKeyName tx := s.be.BatchTx() - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() tx.UnsafePut(Cluster, ckey, []byte(ver.String())) } @@ -160,14 +160,14 @@ func (s *membershipBackend) MustSaveDowngradeToBackend(downgrade *version.Downgr s.lg.Panic("failed to marshal downgrade information", zap.Error(err)) } tx := s.be.BatchTx() - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() tx.UnsafePut(Cluster, dkey, dvalue) } func (s *membershipBackend) MustCreateBackendBuckets() { tx := s.be.BatchTx() - tx.LockWithoutHook() + tx.LockOutsideApply() defer tx.Unlock() tx.UnsafeCreateBucket(Members) tx.UnsafeCreateBucket(MembersRemoved) diff --git a/server/storage/schema/migration.go b/server/storage/schema/migration.go index e1e44dab5f9..61ea51bf273 100644 --- a/server/storage/schema/migration.go +++ b/server/storage/schema/migration.go @@ -49,7 +49,7 @@ func newPlan(lg *zap.Logger, current semver.Version, target semver.Version) (pla } func (p migrationPlan) Execute(lg *zap.Logger, tx backend.BatchTx) error { - tx.LockWithoutHook() + tx.LockOutsideApply() defer tx.Unlock() return p.unsafeExecute(lg, tx) } @@ -90,7 +90,7 @@ func newMigrationStep(v semver.Version, isUpgrade bool, changes []schemaChange) // execute runs actions required to migrate etcd storage between two minor versions. func (s migrationStep) execute(lg *zap.Logger, tx backend.BatchTx) error { - tx.LockWithoutHook() + tx.LockOutsideApply() defer tx.Unlock() return s.unsafeExecute(lg, tx) } diff --git a/server/storage/schema/schema.go b/server/storage/schema/schema.go index 2b4c15f29c8..68bb212d71e 100644 --- a/server/storage/schema/schema.go +++ b/server/storage/schema/schema.go @@ -30,13 +30,13 @@ var ( ) // Validate checks provided backend to confirm that schema used is supported. -func Validate(lg *zap.Logger, tx backend.BatchTx) error { - tx.LockWithoutHook() +func Validate(lg *zap.Logger, tx backend.ReadTx) error { + tx.Lock() defer tx.Unlock() return unsafeValidate(lg, tx) } -func unsafeValidate(lg *zap.Logger, tx backend.BatchTx) error { +func unsafeValidate(lg *zap.Logger, tx backend.ReadTx) error { current, err := UnsafeDetectSchemaVersion(lg, tx) if err != nil { // v3.5 requires a wal snapshot to persist its fields, so we can assign it a schema version. @@ -60,7 +60,7 @@ type WALVersion interface { // Migrate updates storage schema to provided target version. // Downgrading requires that provided WAL doesn't contain unsupported entries. func Migrate(lg *zap.Logger, tx backend.BatchTx, w WALVersion, target semver.Version) error { - tx.LockWithoutHook() + tx.LockOutsideApply() defer tx.Unlock() return UnsafeMigrate(lg, tx, w, target) } diff --git a/server/storage/schema/schema_test.go b/server/storage/schema/schema_test.go index f3c0c4a7fe8..8dbd337b2e5 100644 --- a/server/storage/schema/schema_test.go +++ b/server/storage/schema/schema_test.go @@ -88,7 +88,7 @@ func TestValidate(t *testing.T) { b := backend.NewDefaultBackend(lg, dataPath) defer b.Close() - err := Validate(lg, b.BatchTx()) + err := Validate(lg, b.ReadTx()) if (err != nil) != tc.expectError { t.Errorf("Validate(lg, tx) = %+v, expected error: %v", err, tc.expectError) } From 4033f5c2b9b57cd341d1b3522abb03bedbeb9a07 Mon Sep 17 00:00:00 2001 From: ahrtr Date: Thu, 7 Apr 2022 06:11:34 +0800 Subject: [PATCH 6/6] move the consistentIdx and consistentTerm from Etcdserver to cindex package Removed the fields consistentIdx and consistentTerm from struct EtcdServer, and added applyingIndex and applyingTerm into struct consistentIndex in package cindex. We may remove the two fields completely if we decide to remove the OnPreCommitUnsafe, and it will depend on the performance test result. --- server/etcdserver/cindex/cindex.go | 43 +++++++++++++++++++++++++-- server/etcdserver/server.go | 34 ++++++++------------- server/storage/backend/backend.go | 10 +++---- server/storage/backend/batch_tx.go | 9 +++--- server/storage/backend/verify_test.go | 24 +++++++-------- 5 files changed, 75 insertions(+), 45 deletions(-) diff --git a/server/etcdserver/cindex/cindex.go b/server/etcdserver/cindex/cindex.go index 91046cd0370..de64c1c1188 100644 --- a/server/etcdserver/cindex/cindex.go +++ b/server/etcdserver/cindex/cindex.go @@ -33,12 +33,18 @@ type ConsistentIndexer interface { // ConsistentIndex returns the consistent index of current executing entry. ConsistentIndex() uint64 + // ConsistentApplyingIndex returns the consistent applying index of current executing entry. + ConsistentApplyingIndex() (uint64, uint64) + // UnsafeConsistentIndex is similar to ConsistentIndex, but it doesn't lock the transaction. UnsafeConsistentIndex() uint64 // SetConsistentIndex set the consistent index of current executing entry. SetConsistentIndex(v uint64, term uint64) + // SetConsistentApplyingIndex set the consistent applying index of current executing entry. + SetConsistentApplyingIndex(v uint64, term uint64) + // UnsafeSave must be called holding the lock on the tx. // It saves consistentIndex to the underlying stable storage. UnsafeSave(tx backend.BatchTx) @@ -58,6 +64,19 @@ type consistentIndex struct { // The value is being persisted in the backend since v3.5. term uint64 + // applyingIndex and applyingTerm are just temporary cache of the raftpb.Entry.Index + // and raftpb.Entry.Term, and they are not ready to be persisted yet. They will be + // saved to consistentIndex and term above in the txPostLockInsideApplyHook. + // + // TODO(ahrtr): try to remove the OnPreCommitUnsafe, and compare the + // performance difference. Afterwards we can make a decision on whether + // or not we should remove OnPreCommitUnsafe. If it is true, then we + // can remove applyingIndex and applyingTerm, and save the e.Index and + // e.Term to consistentIndex and term directly in applyEntries, and + // persist them into db in the txPostLockInsideApplyHook. + applyingIndex uint64 + applyingTerm uint64 + // be is used for initial read consistentIndex be Backend // mutex is protecting be. @@ -111,6 +130,15 @@ func (ci *consistentIndex) SetBackend(be Backend) { ci.SetConsistentIndex(0, 0) } +func (ci *consistentIndex) ConsistentApplyingIndex() (uint64, uint64) { + return atomic.LoadUint64(&ci.applyingIndex), atomic.LoadUint64(&ci.applyingTerm) +} + +func (ci *consistentIndex) SetConsistentApplyingIndex(v uint64, term uint64) { + atomic.StoreUint64(&ci.applyingIndex, v) + atomic.StoreUint64(&ci.applyingTerm, term) +} + func NewFakeConsistentIndex(index uint64) ConsistentIndexer { return &fakeConsistentIndex{index: index} } @@ -120,13 +148,24 @@ type fakeConsistentIndex struct { term uint64 } -func (f *fakeConsistentIndex) ConsistentIndex() uint64 { return f.index } -func (f *fakeConsistentIndex) UnsafeConsistentIndex() uint64 { return f.index } +func (f *fakeConsistentIndex) ConsistentIndex() uint64 { + return atomic.LoadUint64(&f.index) +} +func (f *fakeConsistentIndex) ConsistentApplyingIndex() (uint64, uint64) { + return atomic.LoadUint64(&f.index), atomic.LoadUint64(&f.term) +} +func (f *fakeConsistentIndex) UnsafeConsistentIndex() uint64 { + return atomic.LoadUint64(&f.index) +} func (f *fakeConsistentIndex) SetConsistentIndex(index uint64, term uint64) { atomic.StoreUint64(&f.index, index) atomic.StoreUint64(&f.term, term) } +func (f *fakeConsistentIndex) SetConsistentApplyingIndex(index uint64, term uint64) { + atomic.StoreUint64(&f.index, index) + atomic.StoreUint64(&f.term, term) +} func (f *fakeConsistentIndex) UnsafeSave(_ backend.BatchTx) {} func (f *fakeConsistentIndex) SetBackend(_ Backend) {} diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 015bcaf6f0a..a3d0c9376f1 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -207,10 +207,8 @@ type EtcdServer struct { term uint64 // must use atomic operations to access; keep 64-bit aligned. lead uint64 // must use atomic operations to access; keep 64-bit aligned. - consistentIdx uint64 // must use atomic operations to access; keep 64-bit aligned. - consistentTerm uint64 // must use atomic operations to access; keep 64-bit aligned. - consistIndex cindex.ConsistentIndexer // consistIndex is used to get/set/save consistentIndex - r raftNode // uses 64-bit atomics; keep 64-bit aligned. + consistIndex cindex.ConsistentIndexer // consistIndex is used to get/set/save consistentIndex + r raftNode // uses 64-bit atomics; keep 64-bit aligned. readych chan struct{} Cfg config.ServerConfig @@ -405,7 +403,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { // Set the hook after EtcdServer finishes the initialization to avoid // the hook being called during the initialization process. - srv.be.SetTxPostLockInsideApplyHook(srv.getTxPostLockHook()) + srv.be.SetTxPostLockInsideApplyHook(srv.getTxPostLockInsideApplyHook()) // TODO: move transport initialization near the definition of remote tr := &rafthttp.Transport{ @@ -984,7 +982,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { } s.consistIndex.SetBackend(newbe) - newbe.SetTxPostLockInsideApplyHook(s.getTxPostLockHook()) + newbe.SetTxPostLockInsideApplyHook(s.getTxPostLockInsideApplyHook()) lg.Info("restored mvcc store", zap.Uint64("consistent-index", s.consistIndex.ConsistentIndex())) @@ -1555,15 +1553,6 @@ func (s *EtcdServer) getTerm() uint64 { return atomic.LoadUint64(&s.term) } -func (s *EtcdServer) setConsistentIndexAndTerm(cIdx, cTerm uint64) { - atomic.StoreUint64(&s.consistentIdx, cIdx) - atomic.StoreUint64(&s.consistentTerm, cTerm) -} - -func (s *EtcdServer) getConsistentIndexAndTerm() (uint64, uint64) { - return atomic.LoadUint64(&s.consistentIdx), atomic.LoadUint64(&s.consistentTerm) -} - func (s *EtcdServer) setLead(v uint64) { atomic.StoreUint64(&s.lead, v) } @@ -1788,7 +1777,7 @@ func (s *EtcdServer) apply( // set the consistent index of current executing entry if e.Index > s.consistIndex.ConsistentIndex() { - s.setConsistentIndexAndTerm(e.Index, e.Term) + s.consistIndex.SetConsistentApplyingIndex(e.Index, e.Term) shouldApplyV3 = membership.ApplyBoth } @@ -1826,7 +1815,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { index := s.consistIndex.ConsistentIndex() if e.Index > index { // set the consistent index of current executing entry - s.setConsistentIndexAndTerm(e.Index, e.Term) + s.consistIndex.SetConsistentApplyingIndex(e.Index, e.Term) shouldApplyV3 = membership.ApplyBoth } s.lg.Debug("apply entry normal", @@ -1925,7 +1914,8 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con // The txPostLock callback will not get called in this case, // so we should set the consistent index directly. if s.consistIndex != nil && membership.ApplyBoth == shouldApplyV3 { - s.consistIndex.SetConsistentIndex(s.consistentIdx, s.consistentTerm) + applyingIndex, applyingTerm := s.consistIndex.ConsistentApplyingIndex() + s.consistIndex.SetConsistentIndex(applyingIndex, applyingTerm) } return false, err } @@ -2329,11 +2319,11 @@ func (s *EtcdServer) Version() *serverversion.Manager { return serverversion.NewManager(s.Logger(), NewServerVersionAdapter(s)) } -func (s *EtcdServer) getTxPostLockHook() func() { +func (s *EtcdServer) getTxPostLockInsideApplyHook() func() { return func() { - cIdx, term := s.getConsistentIndexAndTerm() - if cIdx > s.consistIndex.UnsafeConsistentIndex() { - s.consistIndex.SetConsistentIndex(cIdx, term) + applyingIdx, applyingTerm := s.consistIndex.ConsistentApplyingIndex() + if applyingIdx > s.consistIndex.UnsafeConsistentIndex() { + s.consistIndex.SetConsistentIndex(applyingIdx, applyingTerm) } } } diff --git a/server/storage/backend/backend.go b/server/storage/backend/backend.go index ebb99ee2c34..f30d79062c8 100644 --- a/server/storage/backend/backend.go +++ b/server/storage/backend/backend.go @@ -68,7 +68,7 @@ type Backend interface { ForceCommit() Close() error - // SetTxPostLockInsideApplyHook sets a txPostLockHook. + // SetTxPostLockInsideApplyHook sets a txPostLockInsideApplyHook. SetTxPostLockInsideApplyHook(func()) } @@ -122,8 +122,8 @@ type backend struct { hooks Hooks - // txPostLockHook is called each time right after locking the tx. - txPostLockHook func() + // txPostLockInsideApplyHook is called each time right after locking the tx. + txPostLockInsideApplyHook func() lg *zap.Logger } @@ -235,10 +235,10 @@ func (b *backend) BatchTx() BatchTx { func (b *backend) SetTxPostLockInsideApplyHook(hook func()) { // It needs to lock the batchTx, because the periodic commit - // may be accessing the txPostLockHook at the moment. + // may be accessing the txPostLockInsideApplyHook at the moment. b.batchTx.lock() defer b.batchTx.Unlock() - b.txPostLockHook = hook + b.txPostLockInsideApplyHook = hook } func (b *backend) ReadTx() ReadTx { return b.readTx } diff --git a/server/storage/backend/batch_tx.go b/server/storage/backend/batch_tx.go index 7eca835fd22..c8fa55954f6 100644 --- a/server/storage/backend/batch_tx.go +++ b/server/storage/backend/batch_tx.go @@ -77,13 +77,14 @@ func (t *batchTx) lock() { func (t *batchTx) LockInsideApply() { t.lock() - if t.backend.txPostLockHook != nil { + if t.backend.txPostLockInsideApplyHook != nil { // The callers of some methods (i.e., (*RaftCluster).AddMember) // can be coming from both InsideApply and OutsideApply, but the - // callers from OutsideApply will have a nil txPostLockHook. So we - // should check the txPostLockHook before validating the callstack. + // callers from OutsideApply will have a nil txPostLockInsideApplyHook. + // So we should check the txPostLockInsideApplyHook before validating + // the callstack. ValidateCalledInsideApply(t.backend.lg) - t.backend.txPostLockHook() + t.backend.txPostLockInsideApplyHook() } } diff --git a/server/storage/backend/verify_test.go b/server/storage/backend/verify_test.go index 2345f46b55f..5cb38ee9da7 100644 --- a/server/storage/backend/verify_test.go +++ b/server/storage/backend/verify_test.go @@ -25,11 +25,11 @@ import ( func TestLockVerify(t *testing.T) { tcs := []struct { - name string - insideApply bool - lock func(tx backend.BatchTx) - txPostLockHook func() - expectPanic bool + name string + insideApply bool + lock func(tx backend.BatchTx) + txPostLockInsideApplyHook func() + expectPanic bool }{ { name: "call lockInsideApply from inside apply", @@ -38,17 +38,17 @@ func TestLockVerify(t *testing.T) { expectPanic: false, }, { - name: "call lockInsideApply from outside apply (without txPostLockHook)", + name: "call lockInsideApply from outside apply (without txPostLockInsideApplyHook)", insideApply: false, lock: lockInsideApply, expectPanic: false, }, { - name: "call lockInsideApply from outside apply (with txPostLockHook)", - insideApply: false, - lock: lockInsideApply, - txPostLockHook: func() {}, - expectPanic: true, + name: "call lockInsideApply from outside apply (with txPostLockInsideApplyHook)", + insideApply: false, + lock: lockInsideApply, + txPostLockInsideApplyHook: func() {}, + expectPanic: true, }, { name: "call lockOutsideApply from outside apply", @@ -78,7 +78,7 @@ func TestLockVerify(t *testing.T) { t.Run(tc.name, func(t *testing.T) { be, _ := betesting.NewTmpBackend(t, time.Hour, 10000) - be.SetTxPostLockInsideApplyHook(tc.txPostLockHook) + be.SetTxPostLockInsideApplyHook(tc.txPostLockInsideApplyHook) hasPaniced := handlePanic(func() { if tc.insideApply {