diff --git a/etcdutl/etcdutl/backup_command.go b/etcdutl/etcdutl/backup_command.go index 49b6a93101d..e0d53fc1b1f 100644 --- a/etcdutl/etcdutl/backup_command.go +++ b/etcdutl/etcdutl/backup_command.go @@ -322,7 +322,7 @@ func saveDB(lg *zap.Logger, destDB, srcDB string, idx uint64, term uint64, desir if !v3 { tx := be.BatchTx() - tx.Lock() + tx.LockOutsideApply() defer tx.Unlock() schema.UnsafeCreateMetaBucket(tx) schema.UnsafeUpdateConsistentIndex(tx, idx, term, false) diff --git a/etcdutl/etcdutl/migrate_command.go b/etcdutl/etcdutl/migrate_command.go index 88f9e473c8a..87b10664ff4 100644 --- a/etcdutl/etcdutl/migrate_command.go +++ b/etcdutl/etcdutl/migrate_command.go @@ -118,7 +118,7 @@ type migrateConfig struct { func migrateCommandFunc(c *migrateConfig) error { defer c.be.Close() tx := c.be.BatchTx() - current, err := schema.DetectSchemaVersion(c.lg, tx) + current, err := schema.DetectSchemaVersion(c.lg, c.be.ReadTx()) if err != nil { c.lg.Error("failed to detect storage version. Please make sure you are using data dir from etcd v3.5 and older") return err @@ -140,7 +140,7 @@ func migrateCommandFunc(c *migrateConfig) error { } func migrateForce(lg *zap.Logger, tx backend.BatchTx, target *semver.Version) { - tx.Lock() + tx.LockOutsideApply() defer tx.Unlock() // Storage version is only supported since v3.6 if target.LessThan(schema.V3_6) { diff --git a/server/auth/range_perm_cache.go b/server/auth/range_perm_cache.go index bae07ef5242..2ebe5439b58 100644 --- a/server/auth/range_perm_cache.go +++ b/server/auth/range_perm_cache.go @@ -20,7 +20,7 @@ import ( "go.uber.org/zap" ) -func getMergedPerms(tx AuthBatchTx, userName string) *unifiedRangePermissions { +func getMergedPerms(tx AuthReadTx, userName string) *unifiedRangePermissions { user := tx.UnsafeGetUser(userName) if user == nil { return nil @@ -103,7 +103,7 @@ func checkKeyPoint(lg *zap.Logger, cachedPerms *unifiedRangePermissions, key []b return false } -func (as *authStore) isRangeOpPermitted(tx AuthBatchTx, userName string, key, rangeEnd []byte, permtyp authpb.Permission_Type) bool { +func (as *authStore) isRangeOpPermitted(tx AuthReadTx, userName string, key, rangeEnd []byte, permtyp authpb.Permission_Type) bool { // assumption: tx is Lock()ed _, ok := as.rangePermCache[userName] if !ok { diff --git a/server/auth/store.go b/server/auth/store.go index 408b235babd..2d978a01141 100644 --- a/server/auth/store.go +++ b/server/auth/store.go @@ -196,6 +196,7 @@ type TokenProvider interface { type AuthBackend interface { CreateAuthBuckets() ForceCommit() + ReadTx() AuthReadTx BatchTx() AuthBatchTx GetUser(string) *authpb.User @@ -345,7 +346,7 @@ func (as *authStore) CheckPassword(username, password string) (uint64, error) { // CompareHashAndPassword is very expensive, so we use closures // to avoid putting it in the critical section of the tx lock. revision, err := func() (uint64, error) { - tx := as.be.BatchTx() + tx := as.be.ReadTx() tx.Lock() defer tx.Unlock() @@ -373,7 +374,7 @@ func (as *authStore) CheckPassword(username, password string) (uint64, error) { func (as *authStore) Recover(be AuthBackend) { as.be = be - tx := be.BatchTx() + tx := be.ReadTx() tx.Lock() enabled := tx.UnsafeReadAuthEnabled() @@ -855,7 +856,7 @@ func (as *authStore) isOpPermitted(userName string, revision uint64, key, rangeE return ErrAuthOldRevision } - tx := as.be.BatchTx() + tx := as.be.ReadTx() tx.Lock() defer tx.Unlock() @@ -897,7 +898,10 @@ func (as *authStore) IsAdminPermitted(authInfo *AuthInfo) error { return ErrUserEmpty } - u := as.be.GetUser(authInfo.Username) + tx := as.be.ReadTx() + tx.Lock() + defer tx.Unlock() + u := tx.UnsafeGetUser(authInfo.Username) if u == nil { return ErrUserNotFound @@ -935,6 +939,8 @@ func NewAuthStore(lg *zap.Logger, be AuthBackend, tp TokenProvider, bcryptCost i be.CreateAuthBuckets() tx := be.BatchTx() + // We should call LockOutsideApply here, but the txPostLockHoos isn't set + // to EtcdServer yet, so it's OK. tx.Lock() enabled := tx.UnsafeReadAuthEnabled() as := &authStore{ diff --git a/server/auth/store_mock_test.go b/server/auth/store_mock_test.go index d49f8dd333f..39c3f6d139a 100644 --- a/server/auth/store_mock_test.go +++ b/server/auth/store_mock_test.go @@ -36,6 +36,10 @@ func (b *backendMock) CreateAuthBuckets() { func (b *backendMock) ForceCommit() { } +func (b *backendMock) ReadTx() AuthReadTx { + return &txMock{be: b} +} + func (b *backendMock) BatchTx() AuthBatchTx { return &txMock{be: b} } diff --git a/server/etcdserver/adapters.go b/server/etcdserver/adapters.go index 9511bc6a6ac..d875cf14efe 100644 --- a/server/etcdserver/adapters.go +++ b/server/etcdserver/adapters.go @@ -78,9 +78,9 @@ func (s *serverVersionAdapter) GetStorageVersion() *semver.Version { s.bemu.RLock() defer s.bemu.RUnlock() - tx := s.be.BatchTx() - tx.Lock() - defer tx.Unlock() + tx := s.be.ReadTx() + tx.RLock() + defer tx.RUnlock() v, err := schema.UnsafeDetectSchemaVersion(s.lg, tx) if err != nil { return nil @@ -94,7 +94,7 @@ func (s *serverVersionAdapter) UpdateStorageVersion(target semver.Version) error defer s.bemu.RUnlock() tx := s.be.BatchTx() - tx.Lock() + tx.LockOutsideApply() defer tx.Unlock() return schema.UnsafeMigrate(s.lg, tx, s.r.storage, target) } diff --git a/server/etcdserver/bootstrap.go b/server/etcdserver/bootstrap.go index 857e7afa6ad..43605be5e62 100644 --- a/server/etcdserver/bootstrap.go +++ b/server/etcdserver/bootstrap.go @@ -231,7 +231,7 @@ func bootstrapBackend(cfg config.ServerConfig, haveWAL bool, st v2store.Store, s } } if beExist { - err = schema.Validate(cfg.Logger, be.BatchTx()) + err = schema.Validate(cfg.Logger, be.ReadTx()) if err != nil { cfg.Logger.Error("Failed to validate schema", zap.Error(err)) return nil, err diff --git a/server/etcdserver/cindex/cindex.go b/server/etcdserver/cindex/cindex.go index 24dad66031d..de64c1c1188 100644 --- a/server/etcdserver/cindex/cindex.go +++ b/server/etcdserver/cindex/cindex.go @@ -23,6 +23,7 @@ import ( ) type Backend interface { + ReadTx() backend.ReadTx BatchTx() backend.BatchTx } @@ -32,9 +33,18 @@ type ConsistentIndexer interface { // ConsistentIndex returns the consistent index of current executing entry. ConsistentIndex() uint64 + // ConsistentApplyingIndex returns the consistent applying index of current executing entry. + ConsistentApplyingIndex() (uint64, uint64) + + // UnsafeConsistentIndex is similar to ConsistentIndex, but it doesn't lock the transaction. + UnsafeConsistentIndex() uint64 + // SetConsistentIndex set the consistent index of current executing entry. SetConsistentIndex(v uint64, term uint64) + // SetConsistentApplyingIndex set the consistent applying index of current executing entry. + SetConsistentApplyingIndex(v uint64, term uint64) + // UnsafeSave must be called holding the lock on the tx. // It saves consistentIndex to the underlying stable storage. UnsafeSave(tx backend.BatchTx) @@ -54,6 +64,19 @@ type consistentIndex struct { // The value is being persisted in the backend since v3.5. term uint64 + // applyingIndex and applyingTerm are just temporary cache of the raftpb.Entry.Index + // and raftpb.Entry.Term, and they are not ready to be persisted yet. They will be + // saved to consistentIndex and term above in the txPostLockInsideApplyHook. + // + // TODO(ahrtr): try to remove the OnPreCommitUnsafe, and compare the + // performance difference. Afterwards we can make a decision on whether + // or not we should remove OnPreCommitUnsafe. If it is true, then we + // can remove applyingIndex and applyingTerm, and save the e.Index and + // e.Term to consistentIndex and term directly in applyEntries, and + // persist them into db in the txPostLockInsideApplyHook. + applyingIndex uint64 + applyingTerm uint64 + // be is used for initial read consistentIndex be Backend // mutex is protecting be. @@ -73,7 +96,17 @@ func (ci *consistentIndex) ConsistentIndex() uint64 { ci.mutex.Lock() defer ci.mutex.Unlock() - v, term := schema.ReadConsistentIndex(ci.be.BatchTx()) + v, term := schema.ReadConsistentIndex(ci.be.ReadTx()) + ci.SetConsistentIndex(v, term) + return v +} + +func (ci *consistentIndex) UnsafeConsistentIndex() uint64 { + if index := atomic.LoadUint64(&ci.consistentIndex); index > 0 { + return index + } + + v, term := schema.UnsafeReadConsistentIndex(ci.be.ReadTx()) ci.SetConsistentIndex(v, term) return v } @@ -97,6 +130,15 @@ func (ci *consistentIndex) SetBackend(be Backend) { ci.SetConsistentIndex(0, 0) } +func (ci *consistentIndex) ConsistentApplyingIndex() (uint64, uint64) { + return atomic.LoadUint64(&ci.applyingIndex), atomic.LoadUint64(&ci.applyingTerm) +} + +func (ci *consistentIndex) SetConsistentApplyingIndex(v uint64, term uint64) { + atomic.StoreUint64(&ci.applyingIndex, v) + atomic.StoreUint64(&ci.applyingTerm, term) +} + func NewFakeConsistentIndex(index uint64) ConsistentIndexer { return &fakeConsistentIndex{index: index} } @@ -106,18 +148,30 @@ type fakeConsistentIndex struct { term uint64 } -func (f *fakeConsistentIndex) ConsistentIndex() uint64 { return f.index } +func (f *fakeConsistentIndex) ConsistentIndex() uint64 { + return atomic.LoadUint64(&f.index) +} +func (f *fakeConsistentIndex) ConsistentApplyingIndex() (uint64, uint64) { + return atomic.LoadUint64(&f.index), atomic.LoadUint64(&f.term) +} +func (f *fakeConsistentIndex) UnsafeConsistentIndex() uint64 { + return atomic.LoadUint64(&f.index) +} func (f *fakeConsistentIndex) SetConsistentIndex(index uint64, term uint64) { atomic.StoreUint64(&f.index, index) atomic.StoreUint64(&f.term, term) } +func (f *fakeConsistentIndex) SetConsistentApplyingIndex(index uint64, term uint64) { + atomic.StoreUint64(&f.index, index) + atomic.StoreUint64(&f.term, term) +} func (f *fakeConsistentIndex) UnsafeSave(_ backend.BatchTx) {} func (f *fakeConsistentIndex) SetBackend(_ Backend) {} func UpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64, onlyGrow bool) { - tx.Lock() + tx.LockOutsideApply() defer tx.Unlock() schema.UnsafeUpdateConsistentIndex(tx, index, term, onlyGrow) } diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index f5632318eab..a3d0c9376f1 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -401,6 +401,10 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { }) } + // Set the hook after EtcdServer finishes the initialization to avoid + // the hook being called during the initialization process. + srv.be.SetTxPostLockInsideApplyHook(srv.getTxPostLockInsideApplyHook()) + // TODO: move transport initialization near the definition of remote tr := &rafthttp.Transport{ Logger: cfg.Logger, @@ -978,6 +982,8 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { } s.consistIndex.SetBackend(newbe) + newbe.SetTxPostLockInsideApplyHook(s.getTxPostLockInsideApplyHook()) + lg.Info("restored mvcc store", zap.Uint64("consistent-index", s.consistIndex.ConsistentIndex())) // Closing old backend might block until all the txns @@ -1771,7 +1777,7 @@ func (s *EtcdServer) apply( // set the consistent index of current executing entry if e.Index > s.consistIndex.ConsistentIndex() { - s.consistIndex.SetConsistentIndex(e.Index, e.Term) + s.consistIndex.SetConsistentApplyingIndex(e.Index, e.Term) shouldApplyV3 = membership.ApplyBoth } @@ -1798,10 +1804,18 @@ func (s *EtcdServer) apply( // applyEntryNormal applies an EntryNormal type raftpb request to the EtcdServer func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { shouldApplyV3 := membership.ApplyV2storeOnly + applyV3Performed := false + defer func() { + // The txPostLock callback will not get called in this case, + // so we should set the consistent index directly. + if s.consistIndex != nil && !applyV3Performed && membership.ApplyBoth == shouldApplyV3 { + s.consistIndex.SetConsistentIndex(e.Index, e.Term) + } + }() index := s.consistIndex.ConsistentIndex() if e.Index > index { // set the consistent index of current executing entry - s.consistIndex.SetConsistentIndex(e.Index, e.Term) + s.consistIndex.SetConsistentApplyingIndex(e.Index, e.Term) shouldApplyV3 = membership.ApplyBoth } s.lg.Debug("apply entry normal", @@ -1853,6 +1867,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { if !needResult && raftReq.Txn != nil { removeNeedlessRangeReqs(raftReq.Txn) } + applyV3Performed = true ar = s.applyV3.Apply(&raftReq, shouldApplyV3) } @@ -1895,6 +1910,13 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con if err := s.cluster.ValidateConfigurationChange(cc); err != nil { cc.NodeID = raft.None s.r.ApplyConfChange(cc) + + // The txPostLock callback will not get called in this case, + // so we should set the consistent index directly. + if s.consistIndex != nil && membership.ApplyBoth == shouldApplyV3 { + applyingIndex, applyingTerm := s.consistIndex.ConsistentApplyingIndex() + s.consistIndex.SetConsistentIndex(applyingIndex, applyingTerm) + } return false, err } @@ -2296,3 +2318,12 @@ func (s *EtcdServer) raftStatus() raft.Status { func (s *EtcdServer) Version() *serverversion.Manager { return serverversion.NewManager(s.Logger(), NewServerVersionAdapter(s)) } + +func (s *EtcdServer) getTxPostLockInsideApplyHook() func() { + return func() { + applyingIdx, applyingTerm := s.consistIndex.ConsistentApplyingIndex() + if applyingIdx > s.consistIndex.UnsafeConsistentIndex() { + s.consistIndex.SetConsistentIndex(applyingIdx, applyingTerm) + } + } +} diff --git a/server/etcdserver/server_test.go b/server/etcdserver/server_test.go index cb4386976d9..706c7854952 100644 --- a/server/etcdserver/server_test.go +++ b/server/etcdserver/server_test.go @@ -687,9 +687,7 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) { _, appliedi, _ := srv.apply(ents, &raftpb.ConfState{}) consistIndex := srv.consistIndex.ConsistentIndex() - if consistIndex != appliedi { - t.Fatalf("consistIndex = %v, want %v", consistIndex, appliedi) - } + assert.Equal(t, uint64(2), appliedi) t.Run("verify-backend", func(t *testing.T) { tx := be.BatchTx() @@ -698,9 +696,8 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) { srv.beHooks.OnPreCommitUnsafe(tx) assert.Equal(t, raftpb.ConfState{Voters: []uint64{2}}, *schema.UnsafeConfStateFromBackend(lg, tx)) }) - rindex, rterm := schema.ReadConsistentIndex(be.BatchTx()) + rindex, _ := schema.ReadConsistentIndex(be.ReadTx()) assert.Equal(t, consistIndex, rindex) - assert.Equal(t, uint64(4), rterm) } func realisticRaftNode(lg *zap.Logger) *raftNode { diff --git a/server/lease/lessor.go b/server/lease/lessor.go index cb6f0d6ec80..931cb3d090d 100644 --- a/server/lease/lessor.go +++ b/server/lease/lessor.go @@ -797,7 +797,7 @@ func (le *lessor) findDueScheduledCheckpoints(checkpointLimit int) []*pb.LeaseCh func (le *lessor) initAndRecover() { tx := le.b.BatchTx() - tx.Lock() + tx.LockOutsideApply() schema.UnsafeCreateLeaseBucket(tx) lpbs := schema.MustUnsafeGetAllLeases(tx) tx.Unlock() @@ -845,7 +845,7 @@ func (l *Lease) expired() bool { func (l *Lease) persistTo(b backend.Backend) { lpb := leasepb.Lease{ID: int64(l.ID), TTL: l.ttl, RemainingTTL: l.remainingTTL} tx := b.BatchTx() - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() schema.MustUnsafePutLease(tx, &lpb) } diff --git a/server/storage/backend.go b/server/storage/backend.go index b7b0eb2fc7f..b1101cfa6fc 100644 --- a/server/storage/backend.go +++ b/server/storage/backend.go @@ -99,7 +99,7 @@ func OpenBackend(cfg config.ServerConfig, hooks backend.Hooks) backend.Backend { func RecoverSnapshotBackend(cfg config.ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot, beExist bool, hooks *BackendHooks) (backend.Backend, error) { consistentIndex := uint64(0) if beExist { - consistentIndex, _ = schema.ReadConsistentIndex(oldbe.BatchTx()) + consistentIndex, _ = schema.ReadConsistentIndex(oldbe.ReadTx()) } if snapshot.Metadata.Index <= consistentIndex { return oldbe, nil diff --git a/server/storage/backend/backend.go b/server/storage/backend/backend.go index a61f4ec3609..f30d79062c8 100644 --- a/server/storage/backend/backend.go +++ b/server/storage/backend/backend.go @@ -67,6 +67,9 @@ type Backend interface { Defrag() error ForceCommit() Close() error + + // SetTxPostLockInsideApplyHook sets a txPostLockInsideApplyHook. + SetTxPostLockInsideApplyHook(func()) } type Snapshot interface { @@ -119,6 +122,9 @@ type backend struct { hooks Hooks + // txPostLockInsideApplyHook is called each time right after locking the tx. + txPostLockInsideApplyHook func() + lg *zap.Logger } @@ -227,6 +233,14 @@ func (b *backend) BatchTx() BatchTx { return b.batchTx } +func (b *backend) SetTxPostLockInsideApplyHook(hook func()) { + // It needs to lock the batchTx, because the periodic commit + // may be accessing the txPostLockInsideApplyHook at the moment. + b.batchTx.lock() + defer b.batchTx.Unlock() + b.txPostLockInsideApplyHook = hook +} + func (b *backend) ReadTx() ReadTx { return b.readTx } // ConcurrentReadTx creates and returns a new ReadTx, which: @@ -438,7 +452,7 @@ func (b *backend) defrag() error { // TODO: make this non-blocking? // lock batchTx to ensure nobody is using previous tx, and then // close previous ongoing tx. - b.batchTx.Lock() + b.batchTx.LockOutsideApply() defer b.batchTx.Unlock() // lock database after lock tx to avoid deadlock. diff --git a/server/storage/backend/batch_tx.go b/server/storage/backend/batch_tx.go index a8e64919916..c8fa55954f6 100644 --- a/server/storage/backend/batch_tx.go +++ b/server/storage/backend/batch_tx.go @@ -65,18 +65,32 @@ type batchTx struct { pending int } +// Lock is supposed to be called only by the unit test. func (t *batchTx) Lock() { + ValidateCalledInsideUnittest(t.backend.lg) + t.lock() +} + +func (t *batchTx) lock() { t.Mutex.Lock() } func (t *batchTx) LockInsideApply() { - ValidateCalledInsideApply(t.backend.lg) - t.Lock() + t.lock() + if t.backend.txPostLockInsideApplyHook != nil { + // The callers of some methods (i.e., (*RaftCluster).AddMember) + // can be coming from both InsideApply and OutsideApply, but the + // callers from OutsideApply will have a nil txPostLockInsideApplyHook. + // So we should check the txPostLockInsideApplyHook before validating + // the callstack. + ValidateCalledInsideApply(t.backend.lg) + t.backend.txPostLockInsideApplyHook() + } } func (t *batchTx) LockOutsideApply() { ValidateCalledOutSideApply(t.backend.lg) - t.Lock() + t.lock() } func (t *batchTx) Unlock() { @@ -226,14 +240,14 @@ func unsafeForEach(tx *bolt.Tx, bucket Bucket, visitor func(k, v []byte) error) // Commit commits a previous tx and begins a new writable one. func (t *batchTx) Commit() { - t.Lock() + t.lock() t.commit(false) t.Unlock() } // CommitAndStop commits the previous tx and does not create a new one. func (t *batchTx) CommitAndStop() { - t.Lock() + t.lock() t.commit(true) t.Unlock() } @@ -303,13 +317,13 @@ func (t *batchTxBuffered) Unlock() { } func (t *batchTxBuffered) Commit() { - t.Lock() + t.lock() t.commit(false) t.Unlock() } func (t *batchTxBuffered) CommitAndStop() { - t.Lock() + t.lock() t.commit(true) t.Unlock() } diff --git a/server/storage/backend/hooks_test.go b/server/storage/backend/hooks_test.go index 76646448489..b77efbba492 100644 --- a/server/storage/backend/hooks_test.go +++ b/server/storage/backend/hooks_test.go @@ -41,8 +41,6 @@ func TestBackendPreCommitHook(t *testing.T) { // Empty commit. tx.Commit() - write(tx, []byte("foo"), []byte("bar")) - assert.Equal(t, ">cc", getCommitsKey(t, be), "expected 2 explict commits") tx.Commit() assert.Equal(t, ">ccc", getCommitsKey(t, be), "expected 3 explict commits") diff --git a/server/storage/backend/verify.go b/server/storage/backend/verify.go index 2f3dc0221c6..a6a0b8675ec 100644 --- a/server/storage/backend/verify.go +++ b/server/storage/backend/verify.go @@ -46,6 +46,15 @@ func ValidateCalledOutSideApply(lg *zap.Logger) { } } +func ValidateCalledInsideUnittest(lg *zap.Logger) { + if !verifyLockEnabled() { + return + } + if !insideUnittest() { + lg.Fatal("Lock called outside of unit test!", zap.Stack("stacktrace")) + } +} + func verifyLockEnabled() bool { return os.Getenv(ENV_VERIFY) == ENV_VERIFY_ALL_VALUE || os.Getenv(ENV_VERIFY) == ENV_VERIFY_LOCK } @@ -54,3 +63,8 @@ func insideApply() bool { stackTraceStr := string(debug.Stack()) return strings.Contains(stackTraceStr, ".applyEntries") } + +func insideUnittest() bool { + stackTraceStr := string(debug.Stack()) + return strings.Contains(stackTraceStr, "_test.go") && !strings.Contains(stackTraceStr, "tests/") +} diff --git a/server/storage/backend/verify_test.go b/server/storage/backend/verify_test.go index 08efb392104..5cb38ee9da7 100644 --- a/server/storage/backend/verify_test.go +++ b/server/storage/backend/verify_test.go @@ -15,7 +15,6 @@ package backend_test import ( - "fmt" "os" "testing" "time" @@ -26,40 +25,60 @@ import ( func TestLockVerify(t *testing.T) { tcs := []struct { - insideApply bool - lock func(tx backend.BatchTx) - expectPanic bool + name string + insideApply bool + lock func(tx backend.BatchTx) + txPostLockInsideApplyHook func() + expectPanic bool }{ { + name: "call lockInsideApply from inside apply", insideApply: true, lock: lockInsideApply, expectPanic: false, }, { + name: "call lockInsideApply from outside apply (without txPostLockInsideApplyHook)", insideApply: false, lock: lockInsideApply, - expectPanic: true, + expectPanic: false, + }, + { + name: "call lockInsideApply from outside apply (with txPostLockInsideApplyHook)", + insideApply: false, + lock: lockInsideApply, + txPostLockInsideApplyHook: func() {}, + expectPanic: true, }, { + name: "call lockOutsideApply from outside apply", insideApply: false, lock: lockOutsideApply, expectPanic: false, }, { + name: "call lockOutsideApply from inside apply", insideApply: true, lock: lockOutsideApply, expectPanic: true, }, + { + name: "call Lock from unit test", + insideApply: false, + lock: lockFromUT, + expectPanic: false, + }, } env := os.Getenv("ETCD_VERIFY") os.Setenv("ETCD_VERIFY", "lock") defer func() { os.Setenv("ETCD_VERIFY", env) }() - for i, tc := range tcs { - t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { be, _ := betesting.NewTmpBackend(t, time.Hour, 10000) + be.SetTxPostLockInsideApplyHook(tc.txPostLockInsideApplyHook) hasPaniced := handlePanic(func() { if tc.insideApply { @@ -89,3 +108,4 @@ func applyEntries(be backend.Backend, f func(tx backend.BatchTx)) { func lockInsideApply(tx backend.BatchTx) { tx.LockInsideApply() } func lockOutsideApply(tx backend.BatchTx) { tx.LockOutsideApply() } +func lockFromUT(tx backend.BatchTx) { tx.Lock() } diff --git a/server/storage/mvcc/kvstore.go b/server/storage/mvcc/kvstore.go index 846f83cde18..074f1bea61b 100644 --- a/server/storage/mvcc/kvstore.go +++ b/server/storage/mvcc/kvstore.go @@ -121,7 +121,7 @@ func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfi } tx := s.b.BatchTx() - tx.Lock() + tx.LockOutsideApply() tx.UnsafeCreateBucket(schema.Key) schema.UnsafeCreateMetaBucket(tx) tx.Unlock() @@ -331,7 +331,7 @@ func (s *store) restore() error { // restore index tx := s.b.BatchTx() - tx.Lock() + tx.LockOutsideApply() finishedCompact, found := UnsafeReadFinishedCompact(tx) if found { diff --git a/server/storage/mvcc/kvstore_compaction.go b/server/storage/mvcc/kvstore_compaction.go index ba944008216..849f73b9576 100644 --- a/server/storage/mvcc/kvstore_compaction.go +++ b/server/storage/mvcc/kvstore_compaction.go @@ -42,7 +42,7 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc start := time.Now() tx := s.b.BatchTx() - tx.Lock() + tx.LockOutsideApply() keys, _ := tx.UnsafeRange(schema.Key, last, end, int64(batchNum)) for _, key := range keys { rev = bytesToRev(key) diff --git a/server/storage/mvcc/kvstore_test.go b/server/storage/mvcc/kvstore_test.go index 31cb3765541..2779f10b7bf 100644 --- a/server/storage/mvcc/kvstore_test.go +++ b/server/storage/mvcc/kvstore_test.go @@ -881,6 +881,8 @@ type fakeBatchTx struct { rangeRespc chan rangeResp } +func (b *fakeBatchTx) LockInsideApply() {} +func (b *fakeBatchTx) LockOutsideApply() {} func (b *fakeBatchTx) Lock() {} func (b *fakeBatchTx) Unlock() {} func (b *fakeBatchTx) RLock() {} @@ -904,10 +906,8 @@ func (b *fakeBatchTx) UnsafeDelete(bucket backend.Bucket, key []byte) { func (b *fakeBatchTx) UnsafeForEach(bucket backend.Bucket, visitor func(k, v []byte) error) error { return nil } -func (b *fakeBatchTx) Commit() {} -func (b *fakeBatchTx) CommitAndStop() {} -func (b *fakeBatchTx) LockInsideApply() {} -func (b *fakeBatchTx) LockOutsideApply() {} +func (b *fakeBatchTx) Commit() {} +func (b *fakeBatchTx) CommitAndStop() {} type fakeBackend struct { tx *fakeBatchTx @@ -924,6 +924,7 @@ func (b *fakeBackend) Snapshot() backend.Snapshot func (b *fakeBackend) ForceCommit() {} func (b *fakeBackend) Defrag() error { return nil } func (b *fakeBackend) Close() error { return nil } +func (b *fakeBackend) SetTxPostLockInsideApplyHook(func()) {} type indexGetResp struct { rev revision diff --git a/server/storage/mvcc/kvstore_txn.go b/server/storage/mvcc/kvstore_txn.go index fb7a9ca1fa5..604fac78cb3 100644 --- a/server/storage/mvcc/kvstore_txn.go +++ b/server/storage/mvcc/kvstore_txn.go @@ -133,7 +133,7 @@ type storeTxnWrite struct { func (s *store) Write(trace *traceutil.Trace) TxnWrite { s.mu.RLock() tx := s.b.BatchTx() - tx.Lock() + tx.LockInsideApply() tw := &storeTxnWrite{ storeTxnRead: storeTxnRead{s, tx, 0, 0, trace}, tx: tx, diff --git a/server/storage/mvcc/store.go b/server/storage/mvcc/store.go index e530c82f4ec..a002ada7177 100644 --- a/server/storage/mvcc/store.go +++ b/server/storage/mvcc/store.go @@ -36,7 +36,7 @@ func UnsafeReadScheduledCompact(tx backend.ReadTx) (scheduledComact int64, found } func SetScheduledCompact(tx backend.BatchTx, value int64) { - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() UnsafeSetScheduledCompact(tx, value) } @@ -48,7 +48,7 @@ func UnsafeSetScheduledCompact(tx backend.BatchTx, value int64) { } func SetFinishedCompact(tx backend.BatchTx, value int64) { - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() UnsafeSetFinishedCompact(tx, value) } diff --git a/server/storage/schema/alarm.go b/server/storage/schema/alarm.go index 605bb3a0bfd..825a8dbe0bf 100644 --- a/server/storage/schema/alarm.go +++ b/server/storage/schema/alarm.go @@ -34,14 +34,14 @@ func NewAlarmBackend(lg *zap.Logger, be backend.Backend) *alarmBackend { func (s *alarmBackend) CreateAlarmBucket() { tx := s.be.BatchTx() - tx.Lock() + tx.LockOutsideApply() defer tx.Unlock() tx.UnsafeCreateBucket(Alarm) } func (s *alarmBackend) MustPutAlarm(alarm *etcdserverpb.AlarmMember) { tx := s.be.BatchTx() - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() s.mustUnsafePutAlarm(tx, alarm) } @@ -57,7 +57,7 @@ func (s *alarmBackend) mustUnsafePutAlarm(tx backend.BatchTx, alarm *etcdserverp func (s *alarmBackend) MustDeleteAlarm(alarm *etcdserverpb.AlarmMember) { tx := s.be.BatchTx() - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() s.mustUnsafeDeleteAlarm(tx, alarm) } diff --git a/server/storage/schema/auth.go b/server/storage/schema/auth.go index 93ef34c371e..aa695bb1d46 100644 --- a/server/storage/schema/auth.go +++ b/server/storage/schema/auth.go @@ -49,7 +49,7 @@ func NewAuthBackend(lg *zap.Logger, be backend.Backend) *authBackend { func (abe *authBackend) CreateAuthBuckets() { tx := abe.be.BatchTx() - tx.Lock() + tx.LockOutsideApply() defer tx.Unlock() tx.UnsafeCreateBucket(Auth) tx.UnsafeCreateBucket(AuthUsers) @@ -60,15 +60,25 @@ func (abe *authBackend) ForceCommit() { abe.be.ForceCommit() } +func (abe *authBackend) ReadTx() auth.AuthReadTx { + return &authReadTx{tx: abe.be.ReadTx(), lg: abe.lg} +} + func (abe *authBackend) BatchTx() auth.AuthBatchTx { return &authBatchTx{tx: abe.be.BatchTx(), lg: abe.lg} } +type authReadTx struct { + tx backend.ReadTx + lg *zap.Logger +} + type authBatchTx struct { tx backend.BatchTx lg *zap.Logger } +var _ auth.AuthReadTx = (*authReadTx)(nil) var _ auth.AuthBatchTx = (*authBatchTx)(nil) func (atx *authBatchTx) UnsafeSaveAuthEnabled(enabled bool) { @@ -86,6 +96,24 @@ func (atx *authBatchTx) UnsafeSaveAuthRevision(rev uint64) { } func (atx *authBatchTx) UnsafeReadAuthEnabled() bool { + arx := &authReadTx{tx: atx.tx, lg: atx.lg} + return arx.UnsafeReadAuthEnabled() +} + +func (atx *authBatchTx) UnsafeReadAuthRevision() uint64 { + arx := &authReadTx{tx: atx.tx, lg: atx.lg} + return arx.UnsafeReadAuthRevision() +} + +func (atx *authBatchTx) Lock() { + atx.tx.LockInsideApply() +} + +func (atx *authBatchTx) Unlock() { + atx.tx.Unlock() +} + +func (atx *authReadTx) UnsafeReadAuthEnabled() bool { _, vs := atx.tx.UnsafeRange(Auth, AuthEnabledKeyName, nil, 0) if len(vs) == 1 { if bytes.Equal(vs[0], authEnabled) { @@ -95,7 +123,7 @@ func (atx *authBatchTx) UnsafeReadAuthEnabled() bool { return false } -func (atx *authBatchTx) UnsafeReadAuthRevision() uint64 { +func (atx *authReadTx) UnsafeReadAuthRevision() uint64 { _, vs := atx.tx.UnsafeRange(Auth, AuthRevisionKeyName, nil, 0) if len(vs) != 1 { // this can happen in the initialization phase @@ -104,10 +132,10 @@ func (atx *authBatchTx) UnsafeReadAuthRevision() uint64 { return binary.BigEndian.Uint64(vs[0]) } -func (atx *authBatchTx) Lock() { - atx.tx.Lock() +func (atx *authReadTx) Lock() { + atx.tx.RLock() } -func (atx *authBatchTx) Unlock() { - atx.tx.Unlock() +func (atx *authReadTx) Unlock() { + atx.tx.RUnlock() } diff --git a/server/storage/schema/auth_roles.go b/server/storage/schema/auth_roles.go index 541e37b7191..dfda7ce5b7b 100644 --- a/server/storage/schema/auth_roles.go +++ b/server/storage/schema/auth_roles.go @@ -32,6 +32,40 @@ func (abe *authBackend) GetRole(roleName string) *authpb.Role { } func (atx *authBatchTx) UnsafeGetRole(roleName string) *authpb.Role { + arx := &authReadTx{tx: atx.tx, lg: atx.lg} + return arx.UnsafeGetRole(roleName) +} + +func (abe *authBackend) GetAllRoles() []*authpb.Role { + tx := abe.BatchTx() + tx.Lock() + defer tx.Unlock() + return tx.UnsafeGetAllRoles() +} + +func (atx *authBatchTx) UnsafeGetAllRoles() []*authpb.Role { + arx := &authReadTx{tx: atx.tx, lg: atx.lg} + return arx.UnsafeGetAllRoles() +} + +func (atx *authBatchTx) UnsafePutRole(role *authpb.Role) { + b, err := role.Marshal() + if err != nil { + atx.lg.Panic( + "failed to marshal 'authpb.Role'", + zap.String("role-name", string(role.Name)), + zap.Error(err), + ) + } + + atx.tx.UnsafePut(AuthRoles, role.Name, b) +} + +func (atx *authBatchTx) UnsafeDeleteRole(rolename string) { + atx.tx.UnsafeDelete(AuthRoles, []byte(rolename)) +} + +func (atx *authReadTx) UnsafeGetRole(roleName string) *authpb.Role { _, vs := atx.tx.UnsafeRange(AuthRoles, []byte(roleName), nil, 0) if len(vs) == 0 { return nil @@ -45,14 +79,7 @@ func (atx *authBatchTx) UnsafeGetRole(roleName string) *authpb.Role { return role } -func (abe *authBackend) GetAllRoles() []*authpb.Role { - tx := abe.BatchTx() - tx.Lock() - defer tx.Unlock() - return tx.UnsafeGetAllRoles() -} - -func (atx *authBatchTx) UnsafeGetAllRoles() []*authpb.Role { +func (atx *authReadTx) UnsafeGetAllRoles() []*authpb.Role { _, vs := atx.tx.UnsafeRange(AuthRoles, []byte{0}, []byte{0xff}, -1) if len(vs) == 0 { return nil @@ -69,20 +96,3 @@ func (atx *authBatchTx) UnsafeGetAllRoles() []*authpb.Role { } return roles } - -func (atx *authBatchTx) UnsafePutRole(role *authpb.Role) { - b, err := role.Marshal() - if err != nil { - atx.lg.Panic( - "failed to marshal 'authpb.Role'", - zap.String("role-name", string(role.Name)), - zap.Error(err), - ) - } - - atx.tx.UnsafePut(AuthRoles, role.Name, b) -} - -func (atx *authBatchTx) UnsafeDeleteRole(rolename string) { - atx.tx.UnsafeDelete(AuthRoles, []byte(rolename)) -} diff --git a/server/storage/schema/auth_users.go b/server/storage/schema/auth_users.go index f385afa5122..c3e7a92ff39 100644 --- a/server/storage/schema/auth_users.go +++ b/server/storage/schema/auth_users.go @@ -27,6 +27,35 @@ func (abe *authBackend) GetUser(username string) *authpb.User { } func (atx *authBatchTx) UnsafeGetUser(username string) *authpb.User { + arx := &authReadTx{tx: atx.tx, lg: atx.lg} + return arx.UnsafeGetUser(username) +} + +func (abe *authBackend) GetAllUsers() []*authpb.User { + tx := abe.BatchTx() + tx.Lock() + defer tx.Unlock() + return tx.UnsafeGetAllUsers() +} + +func (atx *authBatchTx) UnsafeGetAllUsers() []*authpb.User { + arx := &authReadTx{tx: atx.tx, lg: atx.lg} + return arx.UnsafeGetAllUsers() +} + +func (atx *authBatchTx) UnsafePutUser(user *authpb.User) { + b, err := user.Marshal() + if err != nil { + atx.lg.Panic("failed to unmarshal 'authpb.User'", zap.Error(err)) + } + atx.tx.UnsafePut(AuthUsers, user.Name, b) +} + +func (atx *authBatchTx) UnsafeDeleteUser(username string) { + atx.tx.UnsafeDelete(AuthUsers, []byte(username)) +} + +func (atx *authReadTx) UnsafeGetUser(username string) *authpb.User { _, vs := atx.tx.UnsafeRange(AuthUsers, []byte(username), nil, 0) if len(vs) == 0 { return nil @@ -44,14 +73,7 @@ func (atx *authBatchTx) UnsafeGetUser(username string) *authpb.User { return user } -func (abe *authBackend) GetAllUsers() []*authpb.User { - tx := abe.BatchTx() - tx.Lock() - defer tx.Unlock() - return tx.UnsafeGetAllUsers() -} - -func (atx *authBatchTx) UnsafeGetAllUsers() []*authpb.User { +func (atx *authReadTx) UnsafeGetAllUsers() []*authpb.User { _, vs := atx.tx.UnsafeRange(AuthUsers, []byte{0}, []byte{0xff}, -1) if len(vs) == 0 { return nil @@ -68,15 +90,3 @@ func (atx *authBatchTx) UnsafeGetAllUsers() []*authpb.User { } return users } - -func (atx *authBatchTx) UnsafePutUser(user *authpb.User) { - b, err := user.Marshal() - if err != nil { - atx.lg.Panic("failed to unmarshal 'authpb.User'", zap.Error(err)) - } - atx.tx.UnsafePut(AuthUsers, user.Name, b) -} - -func (atx *authBatchTx) UnsafeDeleteUser(username string) { - atx.tx.UnsafeDelete(AuthUsers, []byte(username)) -} diff --git a/server/storage/schema/cindex.go b/server/storage/schema/cindex.go index d7b06b9cef7..7d215bac654 100644 --- a/server/storage/schema/cindex.go +++ b/server/storage/schema/cindex.go @@ -26,7 +26,7 @@ func UnsafeCreateMetaBucket(tx backend.BatchTx) { // CreateMetaBucket creates the `meta` bucket (if it does not exists yet). func CreateMetaBucket(tx backend.BatchTx) { - tx.Lock() + tx.LockOutsideApply() defer tx.Unlock() tx.UnsafeCreateBucket(Meta) } @@ -51,8 +51,8 @@ func UnsafeReadConsistentIndex(tx backend.ReadTx) (uint64, uint64) { // ReadConsistentIndex loads consistent index and term from given transaction. // returns 0 if the data are not found. func ReadConsistentIndex(tx backend.ReadTx) (uint64, uint64) { - tx.Lock() - defer tx.Unlock() + tx.RLock() + defer tx.RUnlock() return UnsafeReadConsistentIndex(tx) } diff --git a/server/storage/schema/membership.go b/server/storage/schema/membership.go index 844b50a85a4..a42353c4168 100644 --- a/server/storage/schema/membership.go +++ b/server/storage/schema/membership.go @@ -52,7 +52,7 @@ func (s *membershipBackend) MustSaveMemberToBackend(m *membership.Member) { } tx := s.be.BatchTx() - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() tx.UnsafePut(Members, mkey, mvalue) } @@ -61,7 +61,7 @@ func (s *membershipBackend) MustSaveMemberToBackend(m *membership.Member) { // from the v3 backend. func (s *membershipBackend) TrimClusterFromBackend() error { tx := s.be.BatchTx() - tx.Lock() + tx.LockOutsideApply() defer tx.Unlock() tx.UnsafeDeleteBucket(Cluster) return nil @@ -71,7 +71,7 @@ func (s *membershipBackend) MustDeleteMemberFromBackend(id types.ID) { mkey := BackendMemberKey(id) tx := s.be.BatchTx() - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() tx.UnsafeDelete(Members, mkey) tx.UnsafePut(MembersRemoved, mkey, []byte("removed")) @@ -121,7 +121,7 @@ func (s *membershipBackend) readMembersFromBackend() (map[types.ID]*membership.M func (s *membershipBackend) TrimMembershipFromBackend() error { s.lg.Info("Trimming membership information from the backend...") tx := s.be.BatchTx() - tx.Lock() + tx.LockOutsideApply() defer tx.Unlock() err := tx.UnsafeForEach(Members, func(k, v []byte) error { tx.UnsafeDelete(Members, k) @@ -146,7 +146,7 @@ func (s *membershipBackend) MustSaveClusterVersionToBackend(ver *semver.Version) ckey := ClusterClusterVersionKeyName tx := s.be.BatchTx() - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() tx.UnsafePut(Cluster, ckey, []byte(ver.String())) } @@ -160,14 +160,14 @@ func (s *membershipBackend) MustSaveDowngradeToBackend(downgrade *version.Downgr s.lg.Panic("failed to marshal downgrade information", zap.Error(err)) } tx := s.be.BatchTx() - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() tx.UnsafePut(Cluster, dkey, dvalue) } func (s *membershipBackend) MustCreateBackendBuckets() { tx := s.be.BatchTx() - tx.Lock() + tx.LockOutsideApply() defer tx.Unlock() tx.UnsafeCreateBucket(Members) tx.UnsafeCreateBucket(MembersRemoved) diff --git a/server/storage/schema/migration.go b/server/storage/schema/migration.go index 47734b4b851..61ea51bf273 100644 --- a/server/storage/schema/migration.go +++ b/server/storage/schema/migration.go @@ -49,7 +49,7 @@ func newPlan(lg *zap.Logger, current semver.Version, target semver.Version) (pla } func (p migrationPlan) Execute(lg *zap.Logger, tx backend.BatchTx) error { - tx.Lock() + tx.LockOutsideApply() defer tx.Unlock() return p.unsafeExecute(lg, tx) } @@ -90,7 +90,7 @@ func newMigrationStep(v semver.Version, isUpgrade bool, changes []schemaChange) // execute runs actions required to migrate etcd storage between two minor versions. func (s migrationStep) execute(lg *zap.Logger, tx backend.BatchTx) error { - tx.Lock() + tx.LockOutsideApply() defer tx.Unlock() return s.unsafeExecute(lg, tx) } diff --git a/server/storage/schema/schema.go b/server/storage/schema/schema.go index 850b55d5bd5..68bb212d71e 100644 --- a/server/storage/schema/schema.go +++ b/server/storage/schema/schema.go @@ -30,13 +30,13 @@ var ( ) // Validate checks provided backend to confirm that schema used is supported. -func Validate(lg *zap.Logger, tx backend.BatchTx) error { +func Validate(lg *zap.Logger, tx backend.ReadTx) error { tx.Lock() defer tx.Unlock() return unsafeValidate(lg, tx) } -func unsafeValidate(lg *zap.Logger, tx backend.BatchTx) error { +func unsafeValidate(lg *zap.Logger, tx backend.ReadTx) error { current, err := UnsafeDetectSchemaVersion(lg, tx) if err != nil { // v3.5 requires a wal snapshot to persist its fields, so we can assign it a schema version. @@ -60,7 +60,7 @@ type WALVersion interface { // Migrate updates storage schema to provided target version. // Downgrading requires that provided WAL doesn't contain unsupported entries. func Migrate(lg *zap.Logger, tx backend.BatchTx, w WALVersion, target semver.Version) error { - tx.Lock() + tx.LockOutsideApply() defer tx.Unlock() return UnsafeMigrate(lg, tx, w, target) } @@ -89,8 +89,8 @@ func UnsafeMigrate(lg *zap.Logger, tx backend.BatchTx, w WALVersion, target semv // * v3.5 will return it's version if it includes all storage fields added in v3.5 (might require a snapshot). // * v3.4 and older is not supported and will return error. func DetectSchemaVersion(lg *zap.Logger, tx backend.ReadTx) (v semver.Version, err error) { - tx.Lock() - defer tx.Unlock() + tx.RLock() + defer tx.RUnlock() return UnsafeDetectSchemaVersion(lg, tx) } diff --git a/server/storage/schema/schema_test.go b/server/storage/schema/schema_test.go index f3c0c4a7fe8..8dbd337b2e5 100644 --- a/server/storage/schema/schema_test.go +++ b/server/storage/schema/schema_test.go @@ -88,7 +88,7 @@ func TestValidate(t *testing.T) { b := backend.NewDefaultBackend(lg, dataPath) defer b.Close() - err := Validate(lg, b.BatchTx()) + err := Validate(lg, b.ReadTx()) if (err != nil) != tc.expectError { t.Errorf("Validate(lg, tx) = %+v, expected error: %v", err, tc.expectError) } diff --git a/server/verify/verify.go b/server/verify/verify.go index 9402e5eb5ea..2b73fbc07b2 100644 --- a/server/verify/verify.go +++ b/server/verify/verify.go @@ -108,8 +108,7 @@ func MustVerifyIfEnabled(cfg Config) { } func validateConsistentIndex(cfg Config, hardstate *raftpb.HardState, snapshot *walpb.Snapshot, be backend.Backend) error { - tx := be.BatchTx() - index, term := schema.ReadConsistentIndex(tx) + index, term := schema.ReadConsistentIndex(be.ReadTx()) if cfg.ExactIndex && index != hardstate.Commit { return fmt.Errorf("backend.ConsistentIndex (%v) expected == WAL.HardState.commit (%v)", index, hardstate.Commit) }