Skip to content

Commit

Permalink
Merge pull request #13854 from ahrtr/data_corruption
Browse files Browse the repository at this point in the history
Fix the data inconsistency issue by moving the SetConsistentIndex into the transaction lock
  • Loading branch information
ptabor committed Apr 7, 2022
2 parents 706cde8 + 4033f5c commit c83b1ad
Show file tree
Hide file tree
Showing 32 changed files with 328 additions and 128 deletions.
2 changes: 1 addition & 1 deletion etcdutl/etcdutl/backup_command.go
Expand Up @@ -322,7 +322,7 @@ func saveDB(lg *zap.Logger, destDB, srcDB string, idx uint64, term uint64, desir

if !v3 {
tx := be.BatchTx()
tx.Lock()
tx.LockOutsideApply()
defer tx.Unlock()
schema.UnsafeCreateMetaBucket(tx)
schema.UnsafeUpdateConsistentIndex(tx, idx, term, false)
Expand Down
4 changes: 2 additions & 2 deletions etcdutl/etcdutl/migrate_command.go
Expand Up @@ -118,7 +118,7 @@ type migrateConfig struct {
func migrateCommandFunc(c *migrateConfig) error {
defer c.be.Close()
tx := c.be.BatchTx()
current, err := schema.DetectSchemaVersion(c.lg, tx)
current, err := schema.DetectSchemaVersion(c.lg, c.be.ReadTx())
if err != nil {
c.lg.Error("failed to detect storage version. Please make sure you are using data dir from etcd v3.5 and older")
return err
Expand All @@ -140,7 +140,7 @@ func migrateCommandFunc(c *migrateConfig) error {
}

func migrateForce(lg *zap.Logger, tx backend.BatchTx, target *semver.Version) {
tx.Lock()
tx.LockOutsideApply()
defer tx.Unlock()
// Storage version is only supported since v3.6
if target.LessThan(schema.V3_6) {
Expand Down
4 changes: 2 additions & 2 deletions server/auth/range_perm_cache.go
Expand Up @@ -20,7 +20,7 @@ import (
"go.uber.org/zap"
)

func getMergedPerms(tx AuthBatchTx, userName string) *unifiedRangePermissions {
func getMergedPerms(tx AuthReadTx, userName string) *unifiedRangePermissions {
user := tx.UnsafeGetUser(userName)
if user == nil {
return nil
Expand Down Expand Up @@ -103,7 +103,7 @@ func checkKeyPoint(lg *zap.Logger, cachedPerms *unifiedRangePermissions, key []b
return false
}

func (as *authStore) isRangeOpPermitted(tx AuthBatchTx, userName string, key, rangeEnd []byte, permtyp authpb.Permission_Type) bool {
func (as *authStore) isRangeOpPermitted(tx AuthReadTx, userName string, key, rangeEnd []byte, permtyp authpb.Permission_Type) bool {
// assumption: tx is Lock()ed
_, ok := as.rangePermCache[userName]
if !ok {
Expand Down
14 changes: 10 additions & 4 deletions server/auth/store.go
Expand Up @@ -196,6 +196,7 @@ type TokenProvider interface {
type AuthBackend interface {
CreateAuthBuckets()
ForceCommit()
ReadTx() AuthReadTx
BatchTx() AuthBatchTx

GetUser(string) *authpb.User
Expand Down Expand Up @@ -345,7 +346,7 @@ func (as *authStore) CheckPassword(username, password string) (uint64, error) {
// CompareHashAndPassword is very expensive, so we use closures
// to avoid putting it in the critical section of the tx lock.
revision, err := func() (uint64, error) {
tx := as.be.BatchTx()
tx := as.be.ReadTx()
tx.Lock()
defer tx.Unlock()

Expand Down Expand Up @@ -373,7 +374,7 @@ func (as *authStore) CheckPassword(username, password string) (uint64, error) {

func (as *authStore) Recover(be AuthBackend) {
as.be = be
tx := be.BatchTx()
tx := be.ReadTx()
tx.Lock()

enabled := tx.UnsafeReadAuthEnabled()
Expand Down Expand Up @@ -855,7 +856,7 @@ func (as *authStore) isOpPermitted(userName string, revision uint64, key, rangeE
return ErrAuthOldRevision
}

tx := as.be.BatchTx()
tx := as.be.ReadTx()
tx.Lock()
defer tx.Unlock()

Expand Down Expand Up @@ -897,7 +898,10 @@ func (as *authStore) IsAdminPermitted(authInfo *AuthInfo) error {
return ErrUserEmpty
}

u := as.be.GetUser(authInfo.Username)
tx := as.be.ReadTx()
tx.Lock()
defer tx.Unlock()
u := tx.UnsafeGetUser(authInfo.Username)

if u == nil {
return ErrUserNotFound
Expand Down Expand Up @@ -935,6 +939,8 @@ func NewAuthStore(lg *zap.Logger, be AuthBackend, tp TokenProvider, bcryptCost i

be.CreateAuthBuckets()
tx := be.BatchTx()
// We should call LockOutsideApply here, but the txPostLockHoos isn't set
// to EtcdServer yet, so it's OK.
tx.Lock()
enabled := tx.UnsafeReadAuthEnabled()
as := &authStore{
Expand Down
4 changes: 4 additions & 0 deletions server/auth/store_mock_test.go
Expand Up @@ -36,6 +36,10 @@ func (b *backendMock) CreateAuthBuckets() {
func (b *backendMock) ForceCommit() {
}

func (b *backendMock) ReadTx() AuthReadTx {
return &txMock{be: b}
}

func (b *backendMock) BatchTx() AuthBatchTx {
return &txMock{be: b}
}
Expand Down
8 changes: 4 additions & 4 deletions server/etcdserver/adapters.go
Expand Up @@ -78,9 +78,9 @@ func (s *serverVersionAdapter) GetStorageVersion() *semver.Version {
s.bemu.RLock()
defer s.bemu.RUnlock()

tx := s.be.BatchTx()
tx.Lock()
defer tx.Unlock()
tx := s.be.ReadTx()
tx.RLock()
defer tx.RUnlock()
v, err := schema.UnsafeDetectSchemaVersion(s.lg, tx)
if err != nil {
return nil
Expand All @@ -94,7 +94,7 @@ func (s *serverVersionAdapter) UpdateStorageVersion(target semver.Version) error
defer s.bemu.RUnlock()

tx := s.be.BatchTx()
tx.Lock()
tx.LockOutsideApply()
defer tx.Unlock()
return schema.UnsafeMigrate(s.lg, tx, s.r.storage, target)
}
2 changes: 1 addition & 1 deletion server/etcdserver/bootstrap.go
Expand Up @@ -231,7 +231,7 @@ func bootstrapBackend(cfg config.ServerConfig, haveWAL bool, st v2store.Store, s
}
}
if beExist {
err = schema.Validate(cfg.Logger, be.BatchTx())
err = schema.Validate(cfg.Logger, be.ReadTx())
if err != nil {
cfg.Logger.Error("Failed to validate schema", zap.Error(err))
return nil, err
Expand Down
60 changes: 57 additions & 3 deletions server/etcdserver/cindex/cindex.go
Expand Up @@ -23,6 +23,7 @@ import (
)

type Backend interface {
ReadTx() backend.ReadTx
BatchTx() backend.BatchTx
}

Expand All @@ -32,9 +33,18 @@ type ConsistentIndexer interface {
// ConsistentIndex returns the consistent index of current executing entry.
ConsistentIndex() uint64

// ConsistentApplyingIndex returns the consistent applying index of current executing entry.
ConsistentApplyingIndex() (uint64, uint64)

// UnsafeConsistentIndex is similar to ConsistentIndex, but it doesn't lock the transaction.
UnsafeConsistentIndex() uint64

// SetConsistentIndex set the consistent index of current executing entry.
SetConsistentIndex(v uint64, term uint64)

// SetConsistentApplyingIndex set the consistent applying index of current executing entry.
SetConsistentApplyingIndex(v uint64, term uint64)

// UnsafeSave must be called holding the lock on the tx.
// It saves consistentIndex to the underlying stable storage.
UnsafeSave(tx backend.BatchTx)
Expand All @@ -54,6 +64,19 @@ type consistentIndex struct {
// The value is being persisted in the backend since v3.5.
term uint64

// applyingIndex and applyingTerm are just temporary cache of the raftpb.Entry.Index
// and raftpb.Entry.Term, and they are not ready to be persisted yet. They will be
// saved to consistentIndex and term above in the txPostLockInsideApplyHook.
//
// TODO(ahrtr): try to remove the OnPreCommitUnsafe, and compare the
// performance difference. Afterwards we can make a decision on whether
// or not we should remove OnPreCommitUnsafe. If it is true, then we
// can remove applyingIndex and applyingTerm, and save the e.Index and
// e.Term to consistentIndex and term directly in applyEntries, and
// persist them into db in the txPostLockInsideApplyHook.
applyingIndex uint64
applyingTerm uint64

// be is used for initial read consistentIndex
be Backend
// mutex is protecting be.
Expand All @@ -73,7 +96,17 @@ func (ci *consistentIndex) ConsistentIndex() uint64 {
ci.mutex.Lock()
defer ci.mutex.Unlock()

v, term := schema.ReadConsistentIndex(ci.be.BatchTx())
v, term := schema.ReadConsistentIndex(ci.be.ReadTx())
ci.SetConsistentIndex(v, term)
return v
}

func (ci *consistentIndex) UnsafeConsistentIndex() uint64 {
if index := atomic.LoadUint64(&ci.consistentIndex); index > 0 {
return index
}

v, term := schema.UnsafeReadConsistentIndex(ci.be.ReadTx())
ci.SetConsistentIndex(v, term)
return v
}
Expand All @@ -97,6 +130,15 @@ func (ci *consistentIndex) SetBackend(be Backend) {
ci.SetConsistentIndex(0, 0)
}

func (ci *consistentIndex) ConsistentApplyingIndex() (uint64, uint64) {
return atomic.LoadUint64(&ci.applyingIndex), atomic.LoadUint64(&ci.applyingTerm)
}

func (ci *consistentIndex) SetConsistentApplyingIndex(v uint64, term uint64) {
atomic.StoreUint64(&ci.applyingIndex, v)
atomic.StoreUint64(&ci.applyingTerm, term)
}

func NewFakeConsistentIndex(index uint64) ConsistentIndexer {
return &fakeConsistentIndex{index: index}
}
Expand All @@ -106,18 +148,30 @@ type fakeConsistentIndex struct {
term uint64
}

func (f *fakeConsistentIndex) ConsistentIndex() uint64 { return f.index }
func (f *fakeConsistentIndex) ConsistentIndex() uint64 {
return atomic.LoadUint64(&f.index)
}
func (f *fakeConsistentIndex) ConsistentApplyingIndex() (uint64, uint64) {
return atomic.LoadUint64(&f.index), atomic.LoadUint64(&f.term)
}
func (f *fakeConsistentIndex) UnsafeConsistentIndex() uint64 {
return atomic.LoadUint64(&f.index)
}

func (f *fakeConsistentIndex) SetConsistentIndex(index uint64, term uint64) {
atomic.StoreUint64(&f.index, index)
atomic.StoreUint64(&f.term, term)
}
func (f *fakeConsistentIndex) SetConsistentApplyingIndex(index uint64, term uint64) {
atomic.StoreUint64(&f.index, index)
atomic.StoreUint64(&f.term, term)
}

func (f *fakeConsistentIndex) UnsafeSave(_ backend.BatchTx) {}
func (f *fakeConsistentIndex) SetBackend(_ Backend) {}

func UpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64, onlyGrow bool) {
tx.Lock()
tx.LockOutsideApply()
defer tx.Unlock()
schema.UnsafeUpdateConsistentIndex(tx, index, term, onlyGrow)
}
35 changes: 33 additions & 2 deletions server/etcdserver/server.go
Expand Up @@ -401,6 +401,10 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
})
}

// Set the hook after EtcdServer finishes the initialization to avoid
// the hook being called during the initialization process.
srv.be.SetTxPostLockInsideApplyHook(srv.getTxPostLockInsideApplyHook())

// TODO: move transport initialization near the definition of remote
tr := &rafthttp.Transport{
Logger: cfg.Logger,
Expand Down Expand Up @@ -978,6 +982,8 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
}

s.consistIndex.SetBackend(newbe)
newbe.SetTxPostLockInsideApplyHook(s.getTxPostLockInsideApplyHook())

lg.Info("restored mvcc store", zap.Uint64("consistent-index", s.consistIndex.ConsistentIndex()))

// Closing old backend might block until all the txns
Expand Down Expand Up @@ -1771,7 +1777,7 @@ func (s *EtcdServer) apply(

// set the consistent index of current executing entry
if e.Index > s.consistIndex.ConsistentIndex() {
s.consistIndex.SetConsistentIndex(e.Index, e.Term)
s.consistIndex.SetConsistentApplyingIndex(e.Index, e.Term)
shouldApplyV3 = membership.ApplyBoth
}

Expand All @@ -1798,10 +1804,18 @@ func (s *EtcdServer) apply(
// applyEntryNormal applies an EntryNormal type raftpb request to the EtcdServer
func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
shouldApplyV3 := membership.ApplyV2storeOnly
applyV3Performed := false
defer func() {
// The txPostLock callback will not get called in this case,
// so we should set the consistent index directly.
if s.consistIndex != nil && !applyV3Performed && membership.ApplyBoth == shouldApplyV3 {
s.consistIndex.SetConsistentIndex(e.Index, e.Term)
}
}()
index := s.consistIndex.ConsistentIndex()
if e.Index > index {
// set the consistent index of current executing entry
s.consistIndex.SetConsistentIndex(e.Index, e.Term)
s.consistIndex.SetConsistentApplyingIndex(e.Index, e.Term)
shouldApplyV3 = membership.ApplyBoth
}
s.lg.Debug("apply entry normal",
Expand Down Expand Up @@ -1853,6 +1867,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
if !needResult && raftReq.Txn != nil {
removeNeedlessRangeReqs(raftReq.Txn)
}
applyV3Performed = true
ar = s.applyV3.Apply(&raftReq, shouldApplyV3)
}

Expand Down Expand Up @@ -1895,6 +1910,13 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
if err := s.cluster.ValidateConfigurationChange(cc); err != nil {
cc.NodeID = raft.None
s.r.ApplyConfChange(cc)

// The txPostLock callback will not get called in this case,
// so we should set the consistent index directly.
if s.consistIndex != nil && membership.ApplyBoth == shouldApplyV3 {
applyingIndex, applyingTerm := s.consistIndex.ConsistentApplyingIndex()
s.consistIndex.SetConsistentIndex(applyingIndex, applyingTerm)
}
return false, err
}

Expand Down Expand Up @@ -2296,3 +2318,12 @@ func (s *EtcdServer) raftStatus() raft.Status {
func (s *EtcdServer) Version() *serverversion.Manager {
return serverversion.NewManager(s.Logger(), NewServerVersionAdapter(s))
}

func (s *EtcdServer) getTxPostLockInsideApplyHook() func() {
return func() {
applyingIdx, applyingTerm := s.consistIndex.ConsistentApplyingIndex()
if applyingIdx > s.consistIndex.UnsafeConsistentIndex() {
s.consistIndex.SetConsistentIndex(applyingIdx, applyingTerm)
}
}
}
7 changes: 2 additions & 5 deletions server/etcdserver/server_test.go
Expand Up @@ -687,9 +687,7 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) {

_, appliedi, _ := srv.apply(ents, &raftpb.ConfState{})
consistIndex := srv.consistIndex.ConsistentIndex()
if consistIndex != appliedi {
t.Fatalf("consistIndex = %v, want %v", consistIndex, appliedi)
}
assert.Equal(t, uint64(2), appliedi)

t.Run("verify-backend", func(t *testing.T) {
tx := be.BatchTx()
Expand All @@ -698,9 +696,8 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) {
srv.beHooks.OnPreCommitUnsafe(tx)
assert.Equal(t, raftpb.ConfState{Voters: []uint64{2}}, *schema.UnsafeConfStateFromBackend(lg, tx))
})
rindex, rterm := schema.ReadConsistentIndex(be.BatchTx())
rindex, _ := schema.ReadConsistentIndex(be.ReadTx())
assert.Equal(t, consistIndex, rindex)
assert.Equal(t, uint64(4), rterm)
}

func realisticRaftNode(lg *zap.Logger) *raftNode {
Expand Down
4 changes: 2 additions & 2 deletions server/lease/lessor.go
Expand Up @@ -797,7 +797,7 @@ func (le *lessor) findDueScheduledCheckpoints(checkpointLimit int) []*pb.LeaseCh
func (le *lessor) initAndRecover() {
tx := le.b.BatchTx()

tx.Lock()
tx.LockOutsideApply()
schema.UnsafeCreateLeaseBucket(tx)
lpbs := schema.MustUnsafeGetAllLeases(tx)
tx.Unlock()
Expand Down Expand Up @@ -845,7 +845,7 @@ func (l *Lease) expired() bool {
func (l *Lease) persistTo(b backend.Backend) {
lpb := leasepb.Lease{ID: int64(l.ID), TTL: l.ttl, RemainingTTL: l.remainingTTL}
tx := b.BatchTx()
tx.Lock()
tx.LockInsideApply()
defer tx.Unlock()
schema.MustUnsafePutLease(tx, &lpb)
}
Expand Down
2 changes: 1 addition & 1 deletion server/storage/backend.go
Expand Up @@ -99,7 +99,7 @@ func OpenBackend(cfg config.ServerConfig, hooks backend.Hooks) backend.Backend {
func RecoverSnapshotBackend(cfg config.ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot, beExist bool, hooks *BackendHooks) (backend.Backend, error) {
consistentIndex := uint64(0)
if beExist {
consistentIndex, _ = schema.ReadConsistentIndex(oldbe.BatchTx())
consistentIndex, _ = schema.ReadConsistentIndex(oldbe.ReadTx())
}
if snapshot.Metadata.Index <= consistentIndex {
return oldbe, nil
Expand Down

0 comments on commit c83b1ad

Please sign in to comment.