diff --git a/server/config/config.go b/server/config/config.go index d02e1f4b0b7..0dac25c41f1 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -137,8 +137,10 @@ type ServerConfig struct { // InitialCorruptCheck is true to check data corruption on boot // before serving any peer/client traffic. - InitialCorruptCheck bool - CorruptCheckTime time.Duration + InitialCorruptCheck bool + CorruptCheckTime time.Duration + CompactHashCheckEnabled bool + CompactHashCheckTime time.Duration // PreVote is true to enable Raft Pre-Vote. PreVote bool diff --git a/server/embed/config.go b/server/embed/config.go index c449f4919b3..75b6d5d5cbf 100644 --- a/server/embed/config.go +++ b/server/embed/config.go @@ -314,8 +314,10 @@ type Config struct { // AuthTokenTTL specifies the TTL in seconds of the simple token AuthTokenTTL uint `json:"auth-token-ttl"` - ExperimentalInitialCorruptCheck bool `json:"experimental-initial-corrupt-check"` - ExperimentalCorruptCheckTime time.Duration `json:"experimental-corrupt-check-time"` + ExperimentalInitialCorruptCheck bool `json:"experimental-initial-corrupt-check"` + ExperimentalCorruptCheckTime time.Duration `json:"experimental-corrupt-check-time"` + ExperimentalCompactHashCheckEnabled bool `json:"experimental-compact-hash-check-enabled"` + ExperimentalCompactHashCheckTime time.Duration `json:"experimental-compact-hash-check-time"` // ExperimentalEnableV2V3 configures URLs that expose deprecated V2 API working on V3 store. // Deprecated in v3.5. // TODO: Delete in v3.6 (https://github.com/etcd-io/etcd/issues/12913) @@ -501,6 +503,9 @@ func NewConfig() *Config { ExperimentalMemoryMlock: false, ExperimentalTxnModeWriteWithSharedBuffer: true, + ExperimentalCompactHashCheckEnabled: false, + ExperimentalCompactHashCheckTime: time.Minute, + V2Deprecation: config.V2_DEPR_DEFAULT, } cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name) @@ -698,6 +703,10 @@ func (cfg *Config) Validate() error { return fmt.Errorf("setting experimental-enable-lease-checkpoint-persist requires experimental-enable-lease-checkpoint") } + if cfg.ExperimentalCompactHashCheckTime <= 0 { + return fmt.Errorf("--experimental-compact-hash-check-time must be >0 (set to %v)", cfg.ExperimentalCompactHashCheckTime) + } + return nil } diff --git a/server/embed/etcd.go b/server/embed/etcd.go index 28b0ff92cf6..f612e96ef3f 100644 --- a/server/embed/etcd.go +++ b/server/embed/etcd.go @@ -203,6 +203,8 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { HostWhitelist: cfg.HostWhitelist, InitialCorruptCheck: cfg.ExperimentalInitialCorruptCheck, CorruptCheckTime: cfg.ExperimentalCorruptCheckTime, + CompactHashCheckEnabled: cfg.ExperimentalCompactHashCheckEnabled, + CompactHashCheckTime: cfg.ExperimentalCompactHashCheckTime, PreVote: cfg.PreVote, Logger: cfg.logger, ForceNewCluster: cfg.ForceNewCluster, @@ -247,8 +249,8 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { // newly started member ("memberInitialized==false") // does not need corruption check - if memberInitialized { - if err = e.Server.CheckInitialHashKV(); err != nil { + if memberInitialized && srvcfg.InitialCorruptCheck { + if err = e.Server.CorruptionChecker().InitialCheck(); err != nil { // set "EtcdServer" to nil, so that it does not block on "EtcdServer.Close()" // (nothing to close since rafthttp transports have not been started) @@ -339,6 +341,8 @@ func print(lg *zap.Logger, ec Config, sc config.ServerConfig, memberInitialized zap.Bool("pre-vote", sc.PreVote), zap.Bool("initial-corrupt-check", sc.InitialCorruptCheck), zap.String("corrupt-check-time-interval", sc.CorruptCheckTime.String()), + zap.Bool("compact-check-time-enabled", sc.CompactHashCheckEnabled), + zap.Duration("compact-check-time-interval", sc.CompactHashCheckTime), zap.String("auto-compaction-mode", sc.AutoCompactionMode), zap.Duration("auto-compaction-retention", sc.AutoCompactionRetention), zap.String("auto-compaction-interval", sc.AutoCompactionRetention.String()), diff --git a/server/etcdmain/config.go b/server/etcdmain/config.go index 14b58c47d47..59d3a973fdc 100644 --- a/server/etcdmain/config.go +++ b/server/etcdmain/config.go @@ -281,6 +281,8 @@ func newConfig() *config { // experimental fs.BoolVar(&cfg.ec.ExperimentalInitialCorruptCheck, "experimental-initial-corrupt-check", cfg.ec.ExperimentalInitialCorruptCheck, "Enable to check data corruption before serving any client/peer traffic.") fs.DurationVar(&cfg.ec.ExperimentalCorruptCheckTime, "experimental-corrupt-check-time", cfg.ec.ExperimentalCorruptCheckTime, "Duration of time between cluster corruption check passes.") + fs.BoolVar(&cfg.ec.ExperimentalCompactHashCheckEnabled, "experimental-compact-hash-check-enabled", cfg.ec.ExperimentalCompactHashCheckEnabled, "Enable leader to periodically check followers compaction hashes.") + fs.DurationVar(&cfg.ec.ExperimentalCompactHashCheckTime, "experimental-compact-hash-check-time", cfg.ec.ExperimentalCompactHashCheckTime, "Duration of time between leader checks followers compaction hashes.") fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpoint, "experimental-enable-lease-checkpoint", false, "Enable leader to send regular checkpoints to other members to prevent reset of remaining TTL on leader change.") // TODO: delete in v3.7 diff --git a/server/etcdserver/api/v3rpc/maintenance.go b/server/etcdserver/api/v3rpc/maintenance.go index 38cc9137163..42f8b0da2ee 100644 --- a/server/etcdserver/api/v3rpc/maintenance.go +++ b/server/etcdserver/api/v3rpc/maintenance.go @@ -66,19 +66,20 @@ type ClusterStatusGetter interface { } type maintenanceServer struct { - lg *zap.Logger - rg etcdserver.RaftStatusGetter - kg KVGetter - bg BackendGetter - a Alarmer - lt LeaderTransferrer - hdr header - cs ClusterStatusGetter - d Downgrader + lg *zap.Logger + rg etcdserver.RaftStatusGetter + hasher mvcc.HashStorage + kg KVGetter + bg BackendGetter + a Alarmer + lt LeaderTransferrer + hdr header + cs ClusterStatusGetter + d Downgrader } func NewMaintenanceServer(s *etcdserver.EtcdServer) pb.MaintenanceServer { - srv := &maintenanceServer{lg: s.Cfg.Logger, rg: s, kg: s, bg: s, a: s, lt: s, hdr: newHeader(s), cs: s, d: s} + srv := &maintenanceServer{lg: s.Cfg.Logger, rg: s, hasher: s.KV().HashStorage(), kg: s, bg: s, a: s, lt: s, hdr: newHeader(s), cs: s, d: s} if srv.lg == nil { srv.lg = zap.NewNop() } @@ -180,7 +181,7 @@ func (ms *maintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance } func (ms *maintenanceServer) Hash(ctx context.Context, r *pb.HashRequest) (*pb.HashResponse, error) { - h, rev, err := ms.kg.KV().Hash() + h, rev, err := ms.hasher.Hash() if err != nil { return nil, togRPCError(err) } @@ -190,12 +191,12 @@ func (ms *maintenanceServer) Hash(ctx context.Context, r *pb.HashRequest) (*pb.H } func (ms *maintenanceServer) HashKV(ctx context.Context, r *pb.HashKVRequest) (*pb.HashKVResponse, error) { - h, rev, compactRev, err := ms.kg.KV().HashByRev(r.Revision) + h, rev, err := ms.hasher.HashByRev(r.Revision) if err != nil { return nil, togRPCError(err) } - resp := &pb.HashKVResponse{Header: &pb.ResponseHeader{Revision: rev}, Hash: h, CompactRevision: compactRev} + resp := &pb.HashKVResponse{Header: &pb.ResponseHeader{Revision: rev}, Hash: h.Hash, CompactRevision: h.CompactRevision} ms.hdr.fill(resp.Header) return resp, nil } diff --git a/server/etcdserver/corrupt.go b/server/etcdserver/corrupt.go index 3a4bab6d5c4..f721b3034fd 100644 --- a/server/etcdserver/corrupt.go +++ b/server/etcdserver/corrupt.go @@ -21,7 +21,9 @@ import ( "fmt" "io/ioutil" "net/http" + "sort" "strings" + "sync" "time" pb "go.etcd.io/etcd/api/v3/etcdserverpb" @@ -33,36 +35,83 @@ import ( "go.uber.org/zap" ) -// CheckInitialHashKV compares initial hash values with its peers -// before serving any peer/client traffic. Only mismatch when hashes -// are different at requested revision, with same compact revision. -func (s *EtcdServer) CheckInitialHashKV() error { - if !s.Cfg.InitialCorruptCheck { - return nil +type CorruptionChecker interface { + InitialCheck() error + PeriodicCheck() error + CompactHashCheck() +} + +type corruptionChecker struct { + lg *zap.Logger + + hasher Hasher + + mux sync.RWMutex + latestRevisionChecked int64 +} + +type Hasher interface { + mvcc.HashStorage + ReqTimeout() time.Duration + MemberId() types.ID + PeerHashByRev(int64) []*peerHashKVResp + LinearizableReadNotify(context.Context) error + TriggerCorruptAlarm(types.ID) +} + +func newCorruptionChecker(lg *zap.Logger, s *EtcdServer, storage mvcc.HashStorage) *corruptionChecker { + return &corruptionChecker{ + lg: lg, + hasher: hasherAdapter{s, storage}, } +} - lg := s.Logger() +type hasherAdapter struct { + *EtcdServer + mvcc.HashStorage +} + +func (h hasherAdapter) MemberId() types.ID { + return h.EtcdServer.ID() +} + +func (h hasherAdapter) ReqTimeout() time.Duration { + return h.EtcdServer.Cfg.ReqTimeout() +} + +func (h hasherAdapter) PeerHashByRev(rev int64) []*peerHashKVResp { + return h.EtcdServer.getPeerHashKVs(rev) +} + +func (h hasherAdapter) TriggerCorruptAlarm(memberID types.ID) { + h.EtcdServer.triggerCorruptAlarm(memberID) +} - lg.Info( +// InitialCheck compares initial hash values with its peers +// before serving any peer/client traffic. Only mismatch when hashes +// are different at requested revision, with same compact revision. +func (cm *corruptionChecker) InitialCheck() error { + + cm.lg.Info( "starting initial corruption check", - zap.String("local-member-id", s.ID().String()), - zap.Duration("timeout", s.Cfg.ReqTimeout()), + zap.String("local-member-id", cm.hasher.MemberId().String()), + zap.Duration("timeout", cm.hasher.ReqTimeout()), ) - h, rev, crev, err := s.kv.HashByRev(0) + h, rev, err := cm.hasher.HashByRev(0) if err != nil { - return fmt.Errorf("%s failed to fetch hash (%v)", s.ID(), err) + return fmt.Errorf("%s failed to fetch hash (%v)", cm.hasher.MemberId(), err) } - peers := s.getPeerHashKVs(rev) + peers := cm.hasher.PeerHashByRev(rev) mismatch := 0 for _, p := range peers { if p.resp != nil { peerID := types.ID(p.resp.Header.MemberId) fields := []zap.Field{ - zap.String("local-member-id", s.ID().String()), + zap.String("local-member-id", cm.hasher.MemberId().String()), zap.Int64("local-member-revision", rev), - zap.Int64("local-member-compact-revision", crev), - zap.Uint32("local-member-hash", h), + zap.Int64("local-member-compact-revision", h.CompactRevision), + zap.Uint32("local-member-hash", h.Hash), zap.String("remote-peer-id", peerID.String()), zap.Strings("remote-peer-endpoints", p.eps), zap.Int64("remote-peer-revision", p.resp.Header.Revision), @@ -70,12 +119,12 @@ func (s *EtcdServer) CheckInitialHashKV() error { zap.Uint32("remote-peer-hash", p.resp.Hash), } - if h != p.resp.Hash { - if crev == p.resp.CompactRevision { - lg.Warn("found different hash values from remote peer", fields...) + if h.Hash != p.resp.Hash { + if h.CompactRevision == p.resp.CompactRevision { + cm.lg.Warn("found different hash values from remote peer", fields...) mismatch++ } else { - lg.Warn("found different compact revision values from remote peer", fields...) + cm.lg.Warn("found different compact revision values from remote peer", fields...) } } @@ -85,23 +134,23 @@ func (s *EtcdServer) CheckInitialHashKV() error { if p.err != nil { switch p.err { case rpctypes.ErrFutureRev: - lg.Warn( + cm.lg.Warn( "cannot fetch hash from slow remote peer", - zap.String("local-member-id", s.ID().String()), + zap.String("local-member-id", cm.hasher.MemberId().String()), zap.Int64("local-member-revision", rev), - zap.Int64("local-member-compact-revision", crev), - zap.Uint32("local-member-hash", h), + zap.Int64("local-member-compact-revision", h.CompactRevision), + zap.Uint32("local-member-hash", h.Hash), zap.String("remote-peer-id", p.id.String()), zap.Strings("remote-peer-endpoints", p.eps), zap.Error(err), ) case rpctypes.ErrCompacted: - lg.Warn( + cm.lg.Warn( "cannot fetch hash from remote peer; local member is behind", - zap.String("local-member-id", s.ID().String()), + zap.String("local-member-id", cm.hasher.MemberId().String()), zap.Int64("local-member-revision", rev), - zap.Int64("local-member-compact-revision", crev), - zap.Uint32("local-member-hash", h), + zap.Int64("local-member-compact-revision", h.CompactRevision), + zap.Uint32("local-member-hash", h.Hash), zap.String("remote-peer-id", p.id.String()), zap.Strings("remote-peer-endpoints", p.eps), zap.Error(err), @@ -110,92 +159,55 @@ func (s *EtcdServer) CheckInitialHashKV() error { } } if mismatch > 0 { - return fmt.Errorf("%s found data inconsistency with peers", s.ID()) + return fmt.Errorf("%s found data inconsistency with peers", cm.hasher.MemberId()) } - lg.Info( + cm.lg.Info( "initial corruption checking passed; no corruption", - zap.String("local-member-id", s.ID().String()), + zap.String("local-member-id", cm.hasher.MemberId().String()), ) return nil } -func (s *EtcdServer) monitorKVHash() { - t := s.Cfg.CorruptCheckTime - if t == 0 { - return - } - - lg := s.Logger() - lg.Info( - "enabled corruption checking", - zap.String("local-member-id", s.ID().String()), - zap.Duration("interval", t), - ) - - for { - select { - case <-s.stopping: - return - case <-time.After(t): - } - if !s.isLeader() { - continue - } - if err := s.checkHashKV(); err != nil { - lg.Warn("failed to check hash KV", zap.Error(err)) - } - } -} - -func (s *EtcdServer) checkHashKV() error { - lg := s.Logger() - - h, rev, crev, err := s.kv.HashByRev(0) +func (cm *corruptionChecker) PeriodicCheck() error { + h, rev, err := cm.hasher.HashByRev(0) if err != nil { return err } - peers := s.getPeerHashKVs(rev) + peers := cm.hasher.PeerHashByRev(rev) - ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout()) - err = s.linearizableReadNotify(ctx) + ctx, cancel := context.WithTimeout(context.Background(), cm.hasher.ReqTimeout()) + err = cm.hasher.LinearizableReadNotify(ctx) cancel() if err != nil { return err } - h2, rev2, crev2, err := s.kv.HashByRev(0) + h2, rev2, err := cm.hasher.HashByRev(0) if err != nil { return err } alarmed := false - mismatch := func(id uint64) { + mismatch := func(id types.ID) { if alarmed { return } alarmed = true - a := &pb.AlarmRequest{ - MemberID: id, - Action: pb.AlarmRequest_ACTIVATE, - Alarm: pb.AlarmType_CORRUPT, - } - s.GoAttach(func() { - s.raftRequest(s.ctx, pb.InternalRaftRequest{Alarm: a}) - }) + cm.hasher.TriggerCorruptAlarm(id) } - if h2 != h && rev2 == rev && crev == crev2 { - lg.Warn( + if h2.Hash != h.Hash && rev2 == rev && h.CompactRevision == h2.CompactRevision { + cm.lg.Warn( "found hash mismatch", zap.Int64("revision-1", rev), - zap.Int64("compact-revision-1", crev), - zap.Uint32("hash-1", h), + zap.Int64("compact-revision-1", h.CompactRevision), + zap.Uint32("hash-1", h.Hash), zap.Int64("revision-2", rev2), - zap.Int64("compact-revision-2", crev2), - zap.Uint32("hash-2", h2), + zap.Int64("compact-revision-2", h2.CompactRevision), + zap.Uint32("hash-2", h2.Hash), ) - mismatch(uint64(s.ID())) + mismatch(cm.hasher.MemberId()) } checkedCount := 0 @@ -204,47 +216,135 @@ func (s *EtcdServer) checkHashKV() error { continue } checkedCount++ - id := p.resp.Header.MemberId // leader expects follower's latest revision less than or equal to leader's if p.resp.Header.Revision > rev2 { - lg.Warn( + cm.lg.Warn( "revision from follower must be less than or equal to leader's", zap.Int64("leader-revision", rev2), zap.Int64("follower-revision", p.resp.Header.Revision), - zap.String("follower-peer-id", types.ID(id).String()), + zap.String("follower-peer-id", p.id.String()), ) - mismatch(id) + mismatch(p.id) } // leader expects follower's latest compact revision less than or equal to leader's - if p.resp.CompactRevision > crev2 { - lg.Warn( + if p.resp.CompactRevision > h2.CompactRevision { + cm.lg.Warn( "compact revision from follower must be less than or equal to leader's", - zap.Int64("leader-compact-revision", crev2), + zap.Int64("leader-compact-revision", h2.CompactRevision), zap.Int64("follower-compact-revision", p.resp.CompactRevision), - zap.String("follower-peer-id", types.ID(id).String()), + zap.String("follower-peer-id", p.id.String()), ) - mismatch(id) + mismatch(p.id) } // follower's compact revision is leader's old one, then hashes must match - if p.resp.CompactRevision == crev && p.resp.Hash != h { - lg.Warn( + if p.resp.CompactRevision == h.CompactRevision && p.resp.Hash != h.Hash { + cm.lg.Warn( "same compact revision then hashes must match", - zap.Int64("leader-compact-revision", crev2), - zap.Uint32("leader-hash", h), + zap.Int64("leader-compact-revision", h2.CompactRevision), + zap.Uint32("leader-hash", h.Hash), zap.Int64("follower-compact-revision", p.resp.CompactRevision), zap.Uint32("follower-hash", p.resp.Hash), - zap.String("follower-peer-id", types.ID(id).String()), + zap.String("follower-peer-id", p.id.String()), ) - mismatch(id) + mismatch(p.id) } } - lg.Info("finished peer corruption check", zap.Int("number-of-peers-checked", checkedCount)) + cm.lg.Info("finished peer corruption check", zap.Int("number-of-peers-checked", checkedCount)) return nil } +func (cm *corruptionChecker) CompactHashCheck() { + cm.lg.Info("starting compact hash check", + zap.String("local-member-id", cm.hasher.MemberId().String()), + zap.Duration("timeout", cm.hasher.ReqTimeout()), + ) + hashes := cm.uncheckedRevisions() + // Assume that revisions are ordered from largest to smallest + for i, hash := range hashes { + peers := cm.hasher.PeerHashByRev(hash.Revision) + if len(peers) == 0 { + continue + } + peersChecked := 0 + for _, p := range peers { + if p.resp == nil || p.resp.CompactRevision != hash.CompactRevision { + continue + } + + // follower's compact revision is leader's old one, then hashes must match + if p.resp.Hash != hash.Hash { + cm.hasher.TriggerCorruptAlarm(p.id) + cm.lg.Error("failed compaction hash check", + zap.Int64("revision", hash.Revision), + zap.Int64("leader-compact-revision", hash.CompactRevision), + zap.Uint32("leader-hash", hash.Hash), + zap.Int64("follower-compact-revision", p.resp.CompactRevision), + zap.Uint32("follower-hash", p.resp.Hash), + zap.String("follower-peer-id", p.id.String()), + ) + return + } + peersChecked++ + cm.lg.Info("successfully checked hash on follower", + zap.Int64("revision", hash.Revision), + zap.String("peer-id", p.id.String()), + ) + } + if len(peers) == peersChecked { + cm.lg.Info("successfully checked hash on whole cluster", + zap.Int("number-of-peers-checked", peersChecked), + zap.Int64("revision", hash.Revision), + ) + cm.mux.Lock() + if hash.Revision > cm.latestRevisionChecked { + cm.latestRevisionChecked = hash.Revision + } + cm.mux.Unlock() + cm.lg.Info("finished compaction hash check", zap.Int("number-of-hashes-checked", i+1)) + return + } + cm.lg.Warn("skipped revision in compaction hash check; was not able to check all peers", + zap.Int("number-of-peers-checked", peersChecked), + zap.Int("number-of-peers", len(peers)), + zap.Int64("revision", hash.Revision), + ) + } + cm.lg.Info("finished compaction hash check", zap.Int("number-of-hashes-checked", len(hashes))) + return +} + +func (cm *corruptionChecker) uncheckedRevisions() []mvcc.KeyValueHash { + cm.mux.RLock() + lastRevisionChecked := cm.latestRevisionChecked + cm.mux.RUnlock() + + hashes := cm.hasher.Hashes() + // Sort in descending order + sort.Slice(hashes, func(i, j int) bool { + return hashes[i].Revision > hashes[j].Revision + }) + for i, hash := range hashes { + if hash.Revision <= lastRevisionChecked { + return hashes[:i] + } + } + return hashes +} + +func (s *EtcdServer) triggerCorruptAlarm(id types.ID) { + a := &pb.AlarmRequest{ + MemberID: uint64(id), + Action: pb.AlarmRequest_ACTIVATE, + Alarm: pb.AlarmType_CORRUPT, + } + s.GoAttach(func() { + s.raftRequest(s.ctx, pb.InternalRaftRequest{Alarm: a}) + }) +} + type peerInfo struct { id types.ID eps []string @@ -270,6 +370,7 @@ func (s *EtcdServer) getPeerHashKVs(rev int64) []*peerHashKVResp { lg := s.Logger() + cc := &http.Client{Transport: s.peerRt} var resps []*peerHashKVResp for _, p := range peers { if len(p.eps) == 0 { @@ -280,7 +381,7 @@ func (s *EtcdServer) getPeerHashKVs(rev int64) []*peerHashKVResp { var lastErr error for _, ep := range p.eps { ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout()) - resp, lastErr := s.getPeerHashKVHTTP(ctx, ep, rev) + resp, lastErr := HashByRev(ctx, cc, ep, rev) cancel() if lastErr == nil { resps = append(resps, &peerHashKVResp{peerInfo: p, resp: resp, err: nil}) @@ -372,7 +473,7 @@ func (h *hashKVHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { http.Error(w, "error unmarshalling request", http.StatusBadRequest) return } - hash, rev, compactRev, err := h.server.KV().HashByRev(req.Revision) + hash, rev, err := h.server.KV().HashStorage().HashByRev(req.Revision) if err != nil { h.lg.Warn( "failed to get hashKV", @@ -382,7 +483,7 @@ func (h *hashKVHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { http.Error(w, err.Error(), http.StatusBadRequest) return } - resp := &pb.HashKVResponse{Header: &pb.ResponseHeader{Revision: rev}, Hash: hash, CompactRevision: compactRev} + resp := &pb.HashKVResponse{Header: &pb.ResponseHeader{Revision: rev}, Hash: hash.Hash, CompactRevision: hash.CompactRevision} respBytes, err := json.Marshal(resp) if err != nil { h.lg.Warn("failed to marshal hashKV response", zap.Error(err)) @@ -395,9 +496,8 @@ func (h *hashKVHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { w.Write(respBytes) } -// getPeerHashKVHTTP fetch hash of kv store at the given rev via http call to the given url -func (s *EtcdServer) getPeerHashKVHTTP(ctx context.Context, url string, rev int64) (*pb.HashKVResponse, error) { - cc := &http.Client{Transport: s.peerRt} +// HashByRev fetch hash of kv store at the given rev via http call to the given url +func HashByRev(ctx context.Context, cc *http.Client, url string, rev int64) (*pb.HashKVResponse, error) { hashReq := &pb.HashKVRequest{Revision: rev} hashReqBytes, err := json.Marshal(hashReq) if err != nil { diff --git a/server/etcdserver/corrupt_test.go b/server/etcdserver/corrupt_test.go new file mode 100644 index 00000000000..9e8156f2a9c --- /dev/null +++ b/server/etcdserver/corrupt_test.go @@ -0,0 +1,377 @@ +// Copyright 2022 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package etcdserver + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + pb "go.etcd.io/etcd/api/v3/etcdserverpb" + "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" + "go.etcd.io/etcd/client/pkg/v3/types" + "go.etcd.io/etcd/server/v3/mvcc" + "go.uber.org/zap/zaptest" +) + +func TestInitialCheck(t *testing.T) { + tcs := []struct { + name string + hasher fakeHasher + expectError bool + expectCorrupt bool + expectActions []string + }{ + { + name: "No peers", + hasher: fakeHasher{ + hashByRevResponses: []hashByRev{{revision: 10}}, + }, + expectActions: []string{"MemberId()", "ReqTimeout()", "HashByRev(0)", "PeerHashByRev(10)", "MemberId()"}, + }, + { + name: "Error getting hash", + hasher: fakeHasher{hashByRevResponses: []hashByRev{{err: fmt.Errorf("error getting hash")}}}, + expectActions: []string{"MemberId()", "ReqTimeout()", "HashByRev(0)", "MemberId()"}, + expectError: true, + }, + { + name: "Peer with empty response", + hasher: fakeHasher{peerHashes: []*peerHashKVResp{{}}}, + expectActions: []string{"MemberId()", "ReqTimeout()", "HashByRev(0)", "PeerHashByRev(0)", "MemberId()"}, + }, + { + name: "Peer returned ErrFutureRev", + hasher: fakeHasher{peerHashes: []*peerHashKVResp{{err: rpctypes.ErrFutureRev}}}, + expectActions: []string{"MemberId()", "ReqTimeout()", "HashByRev(0)", "PeerHashByRev(0)", "MemberId()", "MemberId()"}, + }, + { + name: "Peer returned ErrCompacted", + hasher: fakeHasher{peerHashes: []*peerHashKVResp{{err: rpctypes.ErrCompacted}}}, + expectActions: []string{"MemberId()", "ReqTimeout()", "HashByRev(0)", "PeerHashByRev(0)", "MemberId()", "MemberId()"}, + }, + { + name: "Peer returned other error", + hasher: fakeHasher{peerHashes: []*peerHashKVResp{{err: rpctypes.ErrCorrupt}}}, + expectActions: []string{"MemberId()", "ReqTimeout()", "HashByRev(0)", "PeerHashByRev(0)", "MemberId()"}, + }, + { + name: "Peer returned same hash", + hasher: fakeHasher{hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1}}}, peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{}, Hash: 1}}}}, + expectActions: []string{"MemberId()", "ReqTimeout()", "HashByRev(0)", "PeerHashByRev(0)", "MemberId()", "MemberId()"}, + }, + { + name: "Peer returned different hash with same compaction rev", + hasher: fakeHasher{hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1, CompactRevision: 1}}}, peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{}, Hash: 2, CompactRevision: 1}}}}, + expectActions: []string{"MemberId()", "ReqTimeout()", "HashByRev(0)", "PeerHashByRev(0)", "MemberId()", "MemberId()"}, + expectError: true, + }, + { + name: "Peer returned different hash and compaction rev", + hasher: fakeHasher{hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1, CompactRevision: 1}}}, peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{}, Hash: 2, CompactRevision: 2}}}}, + expectActions: []string{"MemberId()", "ReqTimeout()", "HashByRev(0)", "PeerHashByRev(0)", "MemberId()", "MemberId()"}, + }, + } + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + monitor := corruptionChecker{ + lg: zaptest.NewLogger(t), + hasher: &tc.hasher, + } + err := monitor.InitialCheck() + if gotError := err != nil; gotError != tc.expectError { + t.Errorf("Unexpected error, got: %v, expected?: %v", err, tc.expectError) + } + if tc.hasher.alarmTriggered != tc.expectCorrupt { + t.Errorf("Unexpected corrupt triggered, got: %v, expected?: %v", tc.hasher.alarmTriggered, tc.expectCorrupt) + } + assert.Equal(t, tc.expectActions, tc.hasher.actions) + }) + } +} + +func TestPeriodicCheck(t *testing.T) { + tcs := []struct { + name string + hasher fakeHasher + expectError bool + expectCorrupt bool + expectActions []string + }{ + { + name: "Same local hash and no peers", + hasher: fakeHasher{hashByRevResponses: []hashByRev{{revision: 10}, {revision: 10}}}, + expectActions: []string{"HashByRev(0)", "PeerHashByRev(10)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)"}, + }, + { + name: "Error getting hash first time", + hasher: fakeHasher{hashByRevResponses: []hashByRev{{err: fmt.Errorf("error getting hash")}}}, + expectActions: []string{"HashByRev(0)"}, + expectError: true, + }, + { + name: "Error getting hash second time", + hasher: fakeHasher{hashByRevResponses: []hashByRev{{revision: 11}, {err: fmt.Errorf("error getting hash")}}}, + expectActions: []string{"HashByRev(0)", "PeerHashByRev(11)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)"}, + expectError: true, + }, + { + name: "Error linearizableReadNotify", + hasher: fakeHasher{linearizableReadNotify: fmt.Errorf("error getting linearizableReadNotify")}, + expectActions: []string{"HashByRev(0)", "PeerHashByRev(0)", "ReqTimeout()", "LinearizableReadNotify()"}, + expectError: true, + }, + { + name: "Different local hash and revision", + hasher: fakeHasher{hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1}, revision: 1}, {hash: mvcc.KeyValueHash{Hash: 2}, revision: 2}}}, + expectActions: []string{"HashByRev(0)", "PeerHashByRev(1)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)"}, + }, + { + name: "Different local hash and compaction revision", + hasher: fakeHasher{hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1, CompactRevision: 1}}, {hash: mvcc.KeyValueHash{Hash: 2, CompactRevision: 2}}}}, + expectActions: []string{"HashByRev(0)", "PeerHashByRev(0)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)"}, + }, + { + name: "Different local hash and same revisions", + hasher: fakeHasher{hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1, CompactRevision: 1}, revision: 1}, {hash: mvcc.KeyValueHash{Hash: 2, CompactRevision: 1}, revision: 1}}}, + expectActions: []string{"HashByRev(0)", "PeerHashByRev(1)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)", "MemberId()", "TriggerCorruptAlarm(1)"}, + expectCorrupt: true, + }, + { + name: "Peer with nil response", + hasher: fakeHasher{ + peerHashes: []*peerHashKVResp{{}}, + }, + expectActions: []string{"HashByRev(0)", "PeerHashByRev(0)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)"}, + }, + { + name: "Peer with newer revision", + hasher: fakeHasher{ + peerHashes: []*peerHashKVResp{{peerInfo: peerInfo{id: 42}, resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{Revision: 1}}}}, + }, + expectActions: []string{"HashByRev(0)", "PeerHashByRev(0)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)", "TriggerCorruptAlarm(42)"}, + expectCorrupt: true, + }, + { + name: "Peer with newer compact revision", + hasher: fakeHasher{ + peerHashes: []*peerHashKVResp{{peerInfo: peerInfo{id: 88}, resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{Revision: 10}, CompactRevision: 2}}}, + }, + expectActions: []string{"HashByRev(0)", "PeerHashByRev(0)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)", "TriggerCorruptAlarm(88)"}, + expectCorrupt: true, + }, + { + name: "Peer with same hash and compact revision", + hasher: fakeHasher{ + hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1, CompactRevision: 1}, revision: 1}, {hash: mvcc.KeyValueHash{Hash: 2, CompactRevision: 2}, revision: 2}}, + peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{Revision: 1}, CompactRevision: 1, Hash: 1}}}, + }, + expectActions: []string{"HashByRev(0)", "PeerHashByRev(1)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)"}, + }, + { + name: "Peer with different hash and same compact revision as first local", + hasher: fakeHasher{ + hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1, CompactRevision: 1}, revision: 1}, {hash: mvcc.KeyValueHash{Hash: 2, CompactRevision: 2}, revision: 2}}, + peerHashes: []*peerHashKVResp{{peerInfo: peerInfo{id: 666}, resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{Revision: 1}, CompactRevision: 1, Hash: 2}}}, + }, + expectActions: []string{"HashByRev(0)", "PeerHashByRev(1)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)", "TriggerCorruptAlarm(666)"}, + expectCorrupt: true, + }, + { + name: "Multiple corrupted peers trigger one alarm", + hasher: fakeHasher{ + peerHashes: []*peerHashKVResp{ + {peerInfo: peerInfo{id: 88}, resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{Revision: 10}, CompactRevision: 2}}, + {peerInfo: peerInfo{id: 89}, resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{Revision: 10}, CompactRevision: 2}}, + }, + }, + expectActions: []string{"HashByRev(0)", "PeerHashByRev(0)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)", "TriggerCorruptAlarm(88)"}, + expectCorrupt: true, + }, + } + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + monitor := corruptionChecker{ + lg: zaptest.NewLogger(t), + hasher: &tc.hasher, + } + err := monitor.PeriodicCheck() + if gotError := err != nil; gotError != tc.expectError { + t.Errorf("Unexpected error, got: %v, expected?: %v", err, tc.expectError) + } + if tc.hasher.alarmTriggered != tc.expectCorrupt { + t.Errorf("Unexpected corrupt triggered, got: %v, expected?: %v", tc.hasher.alarmTriggered, tc.expectCorrupt) + } + assert.Equal(t, tc.expectActions, tc.hasher.actions) + }) + } +} + +func TestCompactHashCheck(t *testing.T) { + tcs := []struct { + name string + hasher fakeHasher + lastRevisionChecked int64 + + expectError bool + expectCorrupt bool + expectActions []string + expectLastRevisionChecked int64 + }{ + { + name: "No hashes", + expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()"}, + }, + { + name: "No peers, check new checked from largest to smallest", + hasher: fakeHasher{ + hashes: []mvcc.KeyValueHash{{Revision: 1}, {Revision: 2}, {Revision: 3}, {Revision: 4}}, + }, + lastRevisionChecked: 2, + expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(4)", "PeerHashByRev(3)"}, + expectLastRevisionChecked: 2, + }, + { + name: "Peer error", + hasher: fakeHasher{ + hashes: []mvcc.KeyValueHash{{Revision: 1}, {Revision: 2}}, + peerHashes: []*peerHashKVResp{{err: fmt.Errorf("failed getting hash")}}, + }, + expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "PeerHashByRev(1)"}, + }, + { + name: "Peer returned different compaction revision is skipped", + hasher: fakeHasher{ + hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1}, {Revision: 2, CompactRevision: 2}}, + peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{CompactRevision: 3}}}, + }, + expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "PeerHashByRev(1)"}, + }, + { + name: "Peer returned same compaction revision but different hash triggers alarm", + hasher: fakeHasher{ + hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 1}, {Revision: 2, CompactRevision: 1, Hash: 2}}, + peerHashes: []*peerHashKVResp{{peerInfo: peerInfo{id: 42}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 3}}}, + }, + expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "TriggerCorruptAlarm(42)"}, + expectCorrupt: true, + }, + { + name: "Peer returned same hash bumps last revision checked", + hasher: fakeHasher{ + hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 1}, {Revision: 2, CompactRevision: 1, Hash: 1}}, + peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{MemberId: 42}, CompactRevision: 1, Hash: 1}}}, + }, + expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)"}, + expectLastRevisionChecked: 2, + }, + { + name: "Only one peer succeeded check", + hasher: fakeHasher{ + hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 1}}, + peerHashes: []*peerHashKVResp{ + {resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{MemberId: 42}, CompactRevision: 1, Hash: 1}}, + {err: fmt.Errorf("failed getting hash")}, + }, + }, + expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(1)"}, + }, + } + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + monitor := corruptionChecker{ + latestRevisionChecked: tc.lastRevisionChecked, + lg: zaptest.NewLogger(t), + hasher: &tc.hasher, + } + monitor.CompactHashCheck() + if tc.hasher.alarmTriggered != tc.expectCorrupt { + t.Errorf("Unexpected corrupt triggered, got: %v, expected?: %v", tc.hasher.alarmTriggered, tc.expectCorrupt) + } + if tc.expectLastRevisionChecked != monitor.latestRevisionChecked { + t.Errorf("Unexpected last revision checked, got: %v, expected?: %v", monitor.latestRevisionChecked, tc.expectLastRevisionChecked) + } + assert.Equal(t, tc.expectActions, tc.hasher.actions) + }) + } +} + +type fakeHasher struct { + peerHashes []*peerHashKVResp + hashByRevIndex int + hashByRevResponses []hashByRev + linearizableReadNotify error + hashes []mvcc.KeyValueHash + + alarmTriggered bool + actions []string +} + +type hashByRev struct { + hash mvcc.KeyValueHash + revision int64 + err error +} + +func (f *fakeHasher) Hash() (hash uint32, revision int64, err error) { + panic("not implemented") +} + +func (f *fakeHasher) HashByRev(rev int64) (hash mvcc.KeyValueHash, revision int64, err error) { + f.actions = append(f.actions, fmt.Sprintf("HashByRev(%d)", rev)) + if len(f.hashByRevResponses) == 0 { + return mvcc.KeyValueHash{}, 0, nil + } + hashByRev := f.hashByRevResponses[f.hashByRevIndex] + f.hashByRevIndex++ + return hashByRev.hash, hashByRev.revision, hashByRev.err +} + +func (f *fakeHasher) Store(hash mvcc.KeyValueHash) { + f.actions = append(f.actions, fmt.Sprintf("Store(%v)", hash)) + f.hashes = append(f.hashes, hash) +} + +func (f *fakeHasher) Hashes() []mvcc.KeyValueHash { + f.actions = append(f.actions, "Hashes()") + return f.hashes +} + +func (f *fakeHasher) ReqTimeout() time.Duration { + f.actions = append(f.actions, "ReqTimeout()") + return time.Second +} + +func (f *fakeHasher) MemberId() types.ID { + f.actions = append(f.actions, "MemberId()") + return 1 +} + +func (f *fakeHasher) PeerHashByRev(rev int64) []*peerHashKVResp { + f.actions = append(f.actions, fmt.Sprintf("PeerHashByRev(%d)", rev)) + return f.peerHashes +} + +func (f *fakeHasher) LinearizableReadNotify(ctx context.Context) error { + f.actions = append(f.actions, "LinearizableReadNotify()") + return f.linearizableReadNotify +} + +func (f *fakeHasher) TriggerCorruptAlarm(memberId types.ID) { + f.actions = append(f.actions, fmt.Sprintf("TriggerCorruptAlarm(%d)", memberId)) + f.alarmTriggered = true +} diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 87148eab414..cbf11738a7f 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -293,6 +293,7 @@ type EtcdServer struct { firstCommitInTermC chan struct{} *AccessController + corruptionChecker CorruptionChecker } type backendHooks struct { @@ -629,6 +630,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { ) } } + srv.corruptionChecker = newCorruptionChecker(cfg.Logger, srv, srv.kv.HashStorage()) srv.authStore = auth.NewAuthStore(srv.Logger(), srv.be, tp, int(cfg.BcryptCost)) @@ -803,6 +805,7 @@ func (s *EtcdServer) Start() { s.GoAttach(s.monitorVersions) s.GoAttach(s.linearizableReadLoop) s.GoAttach(s.monitorKVHash) + s.GoAttach(s.monitorCompactHash) s.GoAttach(s.monitorDowngrade) } @@ -2508,6 +2511,51 @@ func (s *EtcdServer) monitorVersions() { } } +func (s *EtcdServer) monitorKVHash() { + t := s.Cfg.CorruptCheckTime + if t == 0 { + return + } + + lg := s.Logger() + lg.Info( + "enabled corruption checking", + zap.String("local-member-id", s.ID().String()), + zap.Duration("interval", t), + ) + for { + select { + case <-s.stopping: + return + case <-time.After(t): + } + if !s.isLeader() { + continue + } + if err := s.corruptionChecker.PeriodicCheck(); err != nil { + lg.Warn("failed to check hash KV", zap.Error(err)) + } + } +} + +func (s *EtcdServer) monitorCompactHash() { + if !s.Cfg.CompactHashCheckEnabled { + return + } + t := s.Cfg.CompactHashCheckTime + for { + select { + case <-time.After(t): + case <-s.stopping: + return + } + if !s.isLeader() { + continue + } + s.corruptionChecker.CompactHashCheck() + } +} + func (s *EtcdServer) updateClusterVersionV2(ver string) { lg := s.Logger() @@ -2745,3 +2793,7 @@ func maybeDefragBackend(cfg config.ServerConfig, be backend.Backend) error { } return be.Defrag() } + +func (s *EtcdServer) CorruptionChecker() CorruptionChecker { + return s.corruptionChecker +} diff --git a/server/mvcc/hash.go b/server/mvcc/hash.go new file mode 100644 index 00000000000..11232bf5677 --- /dev/null +++ b/server/mvcc/hash.go @@ -0,0 +1,162 @@ +// Copyright 2022 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mvcc + +import ( + "hash" + "hash/crc32" + "sort" + "sync" + + "go.etcd.io/etcd/server/v3/mvcc/backend" + "go.etcd.io/etcd/server/v3/mvcc/buckets" + "go.uber.org/zap" +) + +const ( + hashStorageMaxSize = 10 +) + +func unsafeHashByRev(tx backend.ReadTx, compactRevision, revision int64, keep map[revision]struct{}) (KeyValueHash, error) { + h := newKVHasher(compactRevision, revision, keep) + err := tx.UnsafeForEach(buckets.Key, func(k, v []byte) error { + h.WriteKeyValue(k, v) + return nil + }) + return h.Hash(), err +} + +type kvHasher struct { + hash hash.Hash32 + compactRevision int64 + revision int64 + keep map[revision]struct{} +} + +func newKVHasher(compactRev, rev int64, keep map[revision]struct{}) kvHasher { + h := crc32.New(crc32.MakeTable(crc32.Castagnoli)) + h.Write(buckets.Key.Name()) + return kvHasher{ + hash: h, + compactRevision: compactRev, + revision: rev, + keep: keep, + } +} + +func (h *kvHasher) WriteKeyValue(k, v []byte) { + kr := bytesToRev(k) + upper := revision{main: h.revision + 1} + if !upper.GreaterThan(kr) { + return + } + lower := revision{main: h.compactRevision + 1} + // skip revisions that are scheduled for deletion + // due to compacting; don't skip if there isn't one. + if lower.GreaterThan(kr) && len(h.keep) > 0 { + if _, ok := h.keep[kr]; !ok { + return + } + } + h.hash.Write(k) + h.hash.Write(v) +} + +func (h *kvHasher) Hash() KeyValueHash { + return KeyValueHash{Hash: h.hash.Sum32(), CompactRevision: h.compactRevision, Revision: h.revision} +} + +type KeyValueHash struct { + Hash uint32 + CompactRevision int64 + Revision int64 +} + +type HashStorage interface { + // Hash computes the hash of the KV's backend. + Hash() (hash uint32, revision int64, err error) + + // HashByRev computes the hash of all MVCC revisions up to a given revision. + HashByRev(rev int64) (hash KeyValueHash, currentRev int64, err error) + + // Store adds hash value in local cache, allowing it can be returned by HashByRev. + Store(valueHash KeyValueHash) + + // Hashes returns list of up to `hashStorageMaxSize` newest previously stored hashes. + Hashes() []KeyValueHash +} + +type hashStorage struct { + store *store + hashMu sync.RWMutex + hashes []KeyValueHash + lg *zap.Logger +} + +func newHashStorage(lg *zap.Logger, s *store) *hashStorage { + return &hashStorage{ + store: s, + lg: lg, + } +} + +func (s *hashStorage) Hash() (hash uint32, revision int64, err error) { + return s.store.hash() +} + +func (s *hashStorage) HashByRev(rev int64) (KeyValueHash, int64, error) { + s.hashMu.RLock() + for _, h := range s.hashes { + if rev == h.Revision { + s.hashMu.RUnlock() + + s.store.revMu.RLock() + currentRev := s.store.currentRev + s.store.revMu.RUnlock() + return h, currentRev, nil + } + } + s.hashMu.RUnlock() + + return s.store.hashByRev(rev) +} + +func (s *hashStorage) Store(hash KeyValueHash) { + s.lg.Info("storing new hash", + zap.Uint32("hash", hash.Hash), + zap.Int64("revision", hash.Revision), + zap.Int64("compact-revision", hash.CompactRevision), + ) + s.hashMu.Lock() + defer s.hashMu.Unlock() + s.hashes = append(s.hashes, hash) + sort.Slice(s.hashes, func(i, j int) bool { + return s.hashes[i].Revision < s.hashes[j].Revision + }) + if len(s.hashes) > hashStorageMaxSize { + s.hashes = s.hashes[len(s.hashes)-hashStorageMaxSize:] + } +} + +func (s *hashStorage) Hashes() []KeyValueHash { + s.hashMu.RLock() + // Copy out hashes under lock just to be safe + hashes := make([]KeyValueHash, 0, len(s.hashes)) + for _, hash := range s.hashes { + hashes = append(hashes, hash) + } + s.hashMu.RUnlock() + return hashes +} diff --git a/server/mvcc/hash_test.go b/server/mvcc/hash_test.go new file mode 100644 index 00000000000..f7173509a1f --- /dev/null +++ b/server/mvcc/hash_test.go @@ -0,0 +1,222 @@ +// Copyright 2022 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mvcc + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "go.etcd.io/etcd/pkg/v3/traceutil" + "go.etcd.io/etcd/server/v3/lease" + betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing" + "go.etcd.io/etcd/server/v3/storage/mvcc/testutil" + "go.uber.org/zap/zaptest" +) + +// Test HashByRevValue values to ensure we don't change the output which would +// have catastrophic consequences. Expected output is just hardcoded, so please +// regenerate it every time you change input parameters. +func TestHashByRevValue(t *testing.T) { + b, _ := betesting.NewDefaultTmpBackend(t) + s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + + var totalRevisions int64 = 1210 + assert.Less(t, int64(s.cfg.CompactionBatchLimit), totalRevisions) + assert.Less(t, int64(testutil.CompactionCycle*10), totalRevisions) + var rev int64 + var got []KeyValueHash + for ; rev < totalRevisions; rev += testutil.CompactionCycle { + putKVs(s, rev, testutil.CompactionCycle) + hash := testHashByRev(t, s, rev+testutil.CompactionCycle/2) + got = append(got, hash) + } + putKVs(s, rev, totalRevisions) + hash := testHashByRev(t, s, rev+totalRevisions/2) + got = append(got, hash) + assert.Equal(t, []KeyValueHash{ + {4082599214, -1, 35}, + {2279933401, 35, 106}, + {3284231217, 106, 177}, + {126286495, 177, 248}, + {900108730, 248, 319}, + {2475485232, 319, 390}, + {1226296507, 390, 461}, + {2503661030, 461, 532}, + {4155130747, 532, 603}, + {106915399, 603, 674}, + {406914006, 674, 745}, + {1882211381, 745, 816}, + {806177088, 816, 887}, + {664311366, 887, 958}, + {1496914449, 958, 1029}, + {2434525091, 1029, 1100}, + {3988652253, 1100, 1171}, + {1122462288, 1171, 1242}, + {724436716, 1242, 1883}, + }, got) +} + +func TestHashByRevValueLastRevision(t *testing.T) { + b, _ := betesting.NewDefaultTmpBackend(t) + s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + + var totalRevisions int64 = 1210 + assert.Less(t, int64(s.cfg.CompactionBatchLimit), totalRevisions) + assert.Less(t, int64(testutil.CompactionCycle*10), totalRevisions) + var rev int64 + var got []KeyValueHash + for ; rev < totalRevisions; rev += testutil.CompactionCycle { + putKVs(s, rev, testutil.CompactionCycle) + hash := testHashByRev(t, s, 0) + got = append(got, hash) + } + putKVs(s, rev, totalRevisions) + hash := testHashByRev(t, s, 0) + got = append(got, hash) + assert.Equal(t, []KeyValueHash{ + {1913897190, -1, 73}, + {224860069, 73, 145}, + {1565167519, 145, 217}, + {1566261620, 217, 289}, + {2037173024, 289, 361}, + {691659396, 361, 433}, + {2713730748, 433, 505}, + {3919322507, 505, 577}, + {769967540, 577, 649}, + {2909194793, 649, 721}, + {1576921157, 721, 793}, + {4067701532, 793, 865}, + {2226384237, 865, 937}, + {2923408134, 937, 1009}, + {2680329256, 1009, 1081}, + {1546717673, 1081, 1153}, + {2713657846, 1153, 1225}, + {1046575299, 1225, 1297}, + {2017735779, 1297, 2508}, + }, got) +} + +func putKVs(s *store, rev, count int64) { + for i := rev; i <= rev+count; i++ { + s.Put([]byte(testutil.PickKey(i)), []byte(fmt.Sprint(i)), 0) + } +} + +func testHashByRev(t *testing.T, s *store, rev int64) KeyValueHash { + if rev == 0 { + rev = s.Rev() + } + hash, _, err := s.hashByRev(rev) + assert.NoError(t, err, "error on rev %v", rev) + _, err = s.Compact(traceutil.TODO(), rev) + assert.NoError(t, err, "error on compact %v", rev) + return hash +} + +// TODO: Change this to fuzz test +func TestCompactionHash(t *testing.T) { + b, _ := betesting.NewDefaultTmpBackend(t) + s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + + testutil.TestCompactionHash(context.Background(), t, hashTestCase{s}, s.cfg.CompactionBatchLimit) +} + +type hashTestCase struct { + *store +} + +func (tc hashTestCase) Put(ctx context.Context, key, value string) error { + tc.store.Put([]byte(key), []byte(value), 0) + return nil +} + +func (tc hashTestCase) Delete(ctx context.Context, key string) error { + tc.store.DeleteRange([]byte(key), nil) + return nil +} + +func (tc hashTestCase) HashByRev(ctx context.Context, rev int64) (testutil.KeyValueHash, error) { + hash, _, err := tc.store.HashStorage().HashByRev(rev) + return testutil.KeyValueHash{Hash: hash.Hash, CompactRevision: hash.CompactRevision, Revision: hash.Revision}, err +} + +func (tc hashTestCase) Defrag(ctx context.Context) error { + return tc.store.b.Defrag() +} + +func (tc hashTestCase) Compact(ctx context.Context, rev int64) error { + done, err := tc.store.Compact(traceutil.TODO(), rev) + if err != nil { + return err + } + select { + case <-done: + case <-ctx.Done(): + return ctx.Err() + } + return nil +} + +func TestHasherStore(t *testing.T) { + lg := zaptest.NewLogger(t) + s := newHashStorage(lg, newFakeStore()) + var hashes []KeyValueHash + for i := 0; i < hashStorageMaxSize; i++ { + hash := KeyValueHash{Hash: uint32(i), Revision: int64(i) + 10, CompactRevision: int64(i) + 100} + hashes = append(hashes, hash) + s.Store(hash) + } + + for _, want := range hashes { + got, _, err := s.HashByRev(want.Revision) + if err != nil { + t.Fatal(err) + } + if want.Hash != got.Hash { + t.Errorf("Expected stored hash to match, got: %d, expected: %d", want.Hash, got.Hash) + } + if want.Revision != got.Revision { + t.Errorf("Expected stored revision to match, got: %d, expected: %d", want.Revision, got.Revision) + } + if want.CompactRevision != got.CompactRevision { + t.Errorf("Expected stored compact revision to match, got: %d, expected: %d", want.CompactRevision, got.CompactRevision) + } + } +} + +func TestHasherStoreFull(t *testing.T) { + lg := zaptest.NewLogger(t) + s := newHashStorage(lg, newFakeStore()) + var minRevision int64 = 100 + var maxRevision = minRevision + hashStorageMaxSize + for i := 0; i < hashStorageMaxSize; i++ { + s.Store(KeyValueHash{Revision: int64(i) + minRevision}) + } + + // Hash for old revision should be discarded as storage is already full + s.Store(KeyValueHash{Revision: minRevision - 1}) + hash, _, err := s.HashByRev(minRevision - 1) + if err == nil { + t.Errorf("Expected an error as old revision should be discarded, got: %v", hash) + } + // Hash for new revision should be stored even when storage is full + s.Store(KeyValueHash{Revision: maxRevision + 1}) + _, _, err = s.HashByRev(maxRevision + 1) + if err != nil { + t.Errorf("Didn't expect error for new revision, err: %v", err) + } +} diff --git a/server/mvcc/kv.go b/server/mvcc/kv.go index 79c2e687005..109b0d7ccb2 100644 --- a/server/mvcc/kv.go +++ b/server/mvcc/kv.go @@ -119,11 +119,8 @@ type KV interface { // Write creates a write transaction. Write(trace *traceutil.Trace) TxnWrite - // Hash computes the hash of the KV's backend. - Hash() (hash uint32, revision int64, err error) - - // HashByRev computes the hash of all MVCC revisions up to a given revision. - HashByRev(rev int64) (hash uint32, revision int64, compactRev int64, err error) + // HashStorage returns HashStorage interface for KV storage. + HashStorage() HashStorage // Compact frees all superseded keys with revisions less than rev. Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) diff --git a/server/mvcc/kv_test.go b/server/mvcc/kv_test.go index ad33b4041fc..0f84a4d11fd 100644 --- a/server/mvcc/kv_test.go +++ b/server/mvcc/kv_test.go @@ -605,7 +605,7 @@ func TestKVHash(t *testing.T) { kv := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}) kv.Put([]byte("foo0"), []byte("bar0"), lease.NoLease) kv.Put([]byte("foo1"), []byte("bar0"), lease.NoLease) - hashes[i], _, err = kv.Hash() + hashes[i], _, err = kv.hash() if err != nil { t.Fatalf("failed to get hash: %v", err) } diff --git a/server/mvcc/kvstore.go b/server/mvcc/kvstore.go index ae31d968c26..c952d3379b7 100644 --- a/server/mvcc/kvstore.go +++ b/server/mvcc/kvstore.go @@ -18,7 +18,6 @@ import ( "context" "errors" "fmt" - "hash/crc32" "math" "sync" "time" @@ -84,7 +83,8 @@ type store struct { stopc chan struct{} - lg *zap.Logger + lg *zap.Logger + hashes HashStorage } // NewStore returns a new store. It is useful to create a store inside @@ -112,6 +112,7 @@ func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfi lg: lg, } + s.hashes = newHashStorage(lg, s) s.ReadView = &readView{s} s.WriteView = &writeView{s} if s.le != nil { @@ -154,7 +155,7 @@ func (s *store) compactBarrier(ctx context.Context, ch chan struct{}) { close(ch) } -func (s *store) Hash() (hash uint32, revision int64, err error) { +func (s *store) hash() (hash uint32, revision int64, err error) { // TODO: hash and revision could be inconsistent, one possible fix is to add s.revMu.RLock() at the beginning of function, which is costly start := time.Now() @@ -165,7 +166,8 @@ func (s *store) Hash() (hash uint32, revision int64, err error) { return h, s.currentRev, err } -func (s *store) HashByRev(rev int64) (hash uint32, currentRev int64, compactRev int64, err error) { +func (s *store) hashByRev(rev int64) (hash KeyValueHash, currentRev int64, err error) { + var compactRev int64 start := time.Now() s.mu.RLock() @@ -175,12 +177,11 @@ func (s *store) HashByRev(rev int64) (hash uint32, currentRev int64, compactRev if rev > 0 && rev <= compactRev { s.mu.RUnlock() - return 0, 0, compactRev, ErrCompacted + return KeyValueHash{}, 0, ErrCompacted } else if rev > 0 && rev > currentRev { s.mu.RUnlock() - return 0, currentRev, 0, ErrFutureRev + return KeyValueHash{}, currentRev, ErrFutureRev } - if rev == 0 { rev = currentRev } @@ -190,48 +191,25 @@ func (s *store) HashByRev(rev int64) (hash uint32, currentRev int64, compactRev tx.RLock() defer tx.RUnlock() s.mu.RUnlock() - - upper := revision{main: rev + 1} - lower := revision{main: compactRev + 1} - h := crc32.New(crc32.MakeTable(crc32.Castagnoli)) - - h.Write(buckets.Key.Name()) - err = tx.UnsafeForEach(buckets.Key, func(k, v []byte) error { - kr := bytesToRev(k) - if !upper.GreaterThan(kr) { - return nil - } - // skip revisions that are scheduled for deletion - // due to compacting; don't skip if there isn't one. - if lower.GreaterThan(kr) && len(keep) > 0 { - if _, ok := keep[kr]; !ok { - return nil - } - } - h.Write(k) - h.Write(v) - return nil - }) - hash = h.Sum32() - + hash, err = unsafeHashByRev(tx, compactRev, rev, keep) hashRevSec.Observe(time.Since(start).Seconds()) - return hash, currentRev, compactRev, err + return hash, currentRev, err } -func (s *store) updateCompactRev(rev int64) (<-chan struct{}, error) { +func (s *store) updateCompactRev(rev int64) (<-chan struct{}, int64, error) { s.revMu.Lock() if rev <= s.compactMainRev { ch := make(chan struct{}) f := func(ctx context.Context) { s.compactBarrier(ctx, ch) } s.fifoSched.Schedule(f) s.revMu.Unlock() - return ch, ErrCompacted + return ch, 0, ErrCompacted } if rev > s.currentRev { s.revMu.Unlock() - return nil, ErrFutureRev + return nil, 0, ErrFutureRev } - + compactMainRev := s.compactMainRev s.compactMainRev = rev rbytes := newRevBytes() @@ -246,23 +224,23 @@ func (s *store) updateCompactRev(rev int64) (<-chan struct{}, error) { s.revMu.Unlock() - return nil, nil + return nil, compactMainRev, nil } -func (s *store) compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) { +func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64) (<-chan struct{}, error) { ch := make(chan struct{}) var j = func(ctx context.Context) { if ctx.Err() != nil { s.compactBarrier(ctx, ch) return } - start := time.Now() - keep := s.kvindex.Compact(rev) - indexCompactionPauseMs.Observe(float64(time.Since(start) / time.Millisecond)) - if !s.scheduleCompaction(rev, keep) { + hash, err := s.scheduleCompaction(rev, prevCompactRev) + if err != nil { + s.lg.Warn("Failed compaction", zap.Error(err)) s.compactBarrier(context.TODO(), ch) return } + s.hashes.Store(hash) close(ch) } @@ -272,18 +250,18 @@ func (s *store) compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, err } func (s *store) compactLockfree(rev int64) (<-chan struct{}, error) { - ch, err := s.updateCompactRev(rev) + ch, prevCompactRev, err := s.updateCompactRev(rev) if err != nil { return ch, err } - return s.compact(traceutil.TODO(), rev) + return s.compact(traceutil.TODO(), rev, prevCompactRev) } func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) { s.mu.Lock() - ch, err := s.updateCompactRev(rev) + ch, prevCompactRev, err := s.updateCompactRev(rev) trace.Step("check and update compact revision") if err != nil { s.mu.Unlock() @@ -291,7 +269,7 @@ func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, err } s.mu.Unlock() - return s.compact(trace, rev) + return s.compact(trace, rev, prevCompactRev) } func (s *store) Commit() { @@ -553,3 +531,7 @@ func appendMarkTombstone(lg *zap.Logger, b []byte) []byte { func isTombstone(b []byte) bool { return len(b) == markedRevBytesLen && b[markBytePosition] == markTombstone } + +func (s *store) HashStorage() HashStorage { + return s.hashes +} diff --git a/server/mvcc/kvstore_compaction.go b/server/mvcc/kvstore_compaction.go index c63d3f4f4e8..a1028e122fa 100644 --- a/server/mvcc/kvstore_compaction.go +++ b/server/mvcc/kvstore_compaction.go @@ -16,14 +16,19 @@ package mvcc import ( "encoding/binary" + "fmt" "time" "go.etcd.io/etcd/server/v3/mvcc/buckets" "go.uber.org/zap" ) -func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struct{}) bool { +func (s *store) scheduleCompaction(compactMainRev, prevCompactRev int64) (KeyValueHash, error) { totalStart := time.Now() + keep := s.kvindex.Compact(compactMainRev) + indexCompactionPauseMs.Observe(float64(time.Since(totalStart) / time.Millisecond)) + + totalStart = time.Now() defer func() { dbCompactionTotalMs.Observe(float64(time.Since(totalStart) / time.Millisecond)) }() keyCompactions := 0 defer func() { dbCompactionKeysCounter.Add(float64(keyCompactions)) }() @@ -32,6 +37,8 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc end := make([]byte, 8) binary.BigEndian.PutUint64(end, uint64(compactMainRev+1)) + batchNum := s.cfg.CompactionBatchLimit + h := newKVHasher(prevCompactRev, compactMainRev, keep) last := make([]byte, 8+1+8) for { var rev revision @@ -40,13 +47,14 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc tx := s.b.BatchTx() tx.LockOutsideApply() - keys, _ := tx.UnsafeRange(buckets.Key, last, end, int64(s.cfg.CompactionBatchLimit)) - for _, key := range keys { - rev = bytesToRev(key) + keys, values := tx.UnsafeRange(buckets.Key, last, end, int64(batchNum)) + for i := range keys { + rev = bytesToRev(keys[i]) if _, ok := keep[rev]; !ok { - tx.UnsafeDelete(buckets.Key, key) + tx.UnsafeDelete(buckets.Key, keys[i]) keyCompactions++ } + h.WriteKeyValue(keys[i], values[i]) } if len(keys) < s.cfg.CompactionBatchLimit { @@ -54,12 +62,14 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc revToBytes(revision{main: compactMainRev}, rbytes) tx.UnsafePut(buckets.Meta, finishedCompactKeyName, rbytes) tx.Unlock() + hash := h.Hash() s.lg.Info( "finished scheduled compaction", zap.Int64("compact-revision", compactMainRev), zap.Duration("took", time.Since(totalStart)), + zap.Uint32("hash", hash.Hash), ) - return true + return hash, nil } // update last @@ -72,7 +82,7 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc select { case <-time.After(10 * time.Millisecond): case <-s.stopc: - return false + return KeyValueHash{}, fmt.Errorf("interrupted due to stop signal") } } } diff --git a/server/mvcc/kvstore_compaction_test.go b/server/mvcc/kvstore_compaction_test.go index 062050ed163..8ff36c1f846 100644 --- a/server/mvcc/kvstore_compaction_test.go +++ b/server/mvcc/kvstore_compaction_test.go @@ -26,6 +26,7 @@ import ( betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing" "go.etcd.io/etcd/server/v3/mvcc/buckets" "go.uber.org/zap" + "go.uber.org/zap/zaptest" ) func TestScheduleCompaction(t *testing.T) { @@ -68,7 +69,11 @@ func TestScheduleCompaction(t *testing.T) { } for i, tt := range tests { b, tmpPath := betesting.NewDefaultTmpBackend(t) - s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}) + s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + fi := newFakeIndex() + fi.indexCompactRespc <- tt.keep + s.kvindex = fi + tx := s.b.BatchTx() tx.Lock() @@ -79,7 +84,10 @@ func TestScheduleCompaction(t *testing.T) { } tx.Unlock() - s.scheduleCompaction(tt.rev, tt.keep) + _, err := s.scheduleCompaction(tt.rev, 0) + if err != nil { + t.Error(err) + } tx.Lock() for _, rev := range tt.wrevs { diff --git a/server/mvcc/kvstore_test.go b/server/mvcc/kvstore_test.go index e6d437fb15a..81dc27b606f 100644 --- a/server/mvcc/kvstore_test.go +++ b/server/mvcc/kvstore_test.go @@ -332,7 +332,7 @@ func TestStoreCompact(t *testing.T) { fi.indexCompactRespc <- map[revision]struct{}{{1, 0}: {}} key1 := newTestKeyBytes(revision{1, 0}, false) key2 := newTestKeyBytes(revision{2, 0}, false) - b.tx.rangeRespc <- rangeResp{[][]byte{key1, key2}, nil} + b.tx.rangeRespc <- rangeResp{[][]byte{key1, key2}, [][]byte{[]byte("alice"), []byte("bob")}} s.Compact(traceutil.TODO(), 3) s.fifoSched.WaitFinish(1) @@ -552,14 +552,14 @@ func TestHashKVWhenCompacting(t *testing.T) { go func() { defer wg.Done() for { - hash, _, compactRev, err := s.HashByRev(int64(rev)) + hash, _, err := s.HashStorage().HashByRev(int64(rev)) if err != nil { t.Error(err) } select { case <-donec: return - case hashCompactc <- hashKVResult{hash, compactRev}: + case hashCompactc <- hashKVResult{hash.Hash, hash.CompactRevision}: } } }() @@ -614,12 +614,12 @@ func TestHashKVZeroRevision(t *testing.T) { t.Fatal(err) } - hash1, _, _, err := s.HashByRev(int64(rev)) + hash1, _, err := s.HashStorage().HashByRev(int64(rev)) if err != nil { t.Fatal(err) } - var hash2 uint32 - hash2, _, _, err = s.HashByRev(0) + var hash2 KeyValueHash + hash2, _, err = s.HashStorage().HashByRev(0) if err != nil { t.Fatal(err) } @@ -839,18 +839,11 @@ func newFakeStore() *store { b := &fakeBackend{&fakeBatchTx{ Recorder: &testutil.RecorderBuffered{}, rangeRespc: make(chan rangeResp, 5)}} - fi := &fakeIndex{ - Recorder: &testutil.RecorderBuffered{}, - indexGetRespc: make(chan indexGetResp, 1), - indexRangeRespc: make(chan indexRangeResp, 1), - indexRangeEventsRespc: make(chan indexRangeEventsResp, 1), - indexCompactRespc: make(chan map[revision]struct{}, 1), - } s := &store{ cfg: StoreConfig{CompactionBatchLimit: 10000}, b: b, le: &lease.FakeLessor{}, - kvindex: fi, + kvindex: newFakeIndex(), currentRev: 0, compactMainRev: -1, fifoSched: schedule.NewFIFOScheduler(), @@ -858,9 +851,20 @@ func newFakeStore() *store { lg: zap.NewExample(), } s.ReadView, s.WriteView = &readView{s}, &writeView{s} + s.hashes = newHashStorage(zap.NewExample(), s) return s } +func newFakeIndex() *fakeIndex { + return &fakeIndex{ + Recorder: &testutil.RecorderBuffered{}, + indexGetRespc: make(chan indexGetResp, 1), + indexRangeRespc: make(chan indexRangeResp, 1), + indexRangeEventsRespc: make(chan indexRangeEventsResp, 1), + indexCompactRespc: make(chan map[revision]struct{}, 1), + } +} + type rangeResp struct { keys [][]byte vals [][]byte diff --git a/server/storage/mvcc/testutil/hash.go b/server/storage/mvcc/testutil/hash.go new file mode 100644 index 00000000000..7d6d1354b1c --- /dev/null +++ b/server/storage/mvcc/testutil/hash.go @@ -0,0 +1,146 @@ +// Copyright 2022 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package testutil + +import ( + "context" + "errors" + "fmt" + "os" + "testing" + + "github.com/stretchr/testify/assert" + "go.etcd.io/bbolt" + "go.etcd.io/etcd/api/v3/mvccpb" +) + +const ( + // CompactionCycle is high prime used to test hash calculation between compactions. + CompactionCycle = 71 +) + +func TestCompactionHash(ctx context.Context, t *testing.T, h CompactionHashTestCase, compactionBatchLimit int) { + var totalRevisions int64 = 1210 + assert.Less(t, int64(compactionBatchLimit), totalRevisions) + assert.Less(t, int64(CompactionCycle*10), totalRevisions) + var rev int64 + for ; rev < totalRevisions; rev += CompactionCycle { + testCompactionHash(ctx, t, h, rev, rev+CompactionCycle) + } + testCompactionHash(ctx, t, h, rev, rev+totalRevisions) +} + +type CompactionHashTestCase interface { + Put(ctx context.Context, key, value string) error + Delete(ctx context.Context, key string) error + HashByRev(ctx context.Context, rev int64) (KeyValueHash, error) + Defrag(ctx context.Context) error + Compact(ctx context.Context, rev int64) error +} + +type KeyValueHash struct { + Hash uint32 + CompactRevision int64 + Revision int64 +} + +func testCompactionHash(ctx context.Context, t *testing.T, h CompactionHashTestCase, start, stop int64) { + for i := start; i <= stop; i++ { + if i%67 == 0 { + err := h.Delete(ctx, PickKey(i+83)) + assert.NoError(t, err, "error on delete") + } else { + err := h.Put(ctx, PickKey(i), fmt.Sprint(i)) + assert.NoError(t, err, "error on put") + } + } + hash1, err := h.HashByRev(ctx, stop) + assert.NoError(t, err, "error on hash (rev %v)", stop) + + err = h.Compact(ctx, stop) + assert.NoError(t, err, "error on compact (rev %v)", stop) + + err = h.Defrag(ctx) + assert.NoError(t, err, "error on defrag") + + hash2, err := h.HashByRev(ctx, stop) + assert.NoError(t, err, "error on hash (rev %v)", stop) + assert.Equal(t, hash1, hash2, "hashes do not match on rev %v", stop) +} + +func PickKey(i int64) string { + if i%(CompactionCycle*2) == 30 { + return "zenek" + } + if i%CompactionCycle == 30 { + return "xavery" + } + // Use low prime number to ensure repeats without alignment + switch i % 7 { + case 0: + return "alice" + case 1: + return "bob" + case 2: + return "celine" + case 3: + return "dominik" + case 4: + return "eve" + case 5: + return "frederica" + case 6: + return "gorge" + default: + panic("Can't count") + } +} + +func CorruptBBolt(fpath string) error { + db, derr := bbolt.Open(fpath, os.ModePerm, &bbolt.Options{}) + if derr != nil { + return derr + } + defer db.Close() + + return db.Update(func(tx *bbolt.Tx) error { + b := tx.Bucket([]byte("key")) + if b == nil { + return errors.New("got nil bucket for 'key'") + } + keys, vals := [][]byte{}, [][]byte{} + c := b.Cursor() + for k, v := c.First(); k != nil; k, v = c.Next() { + keys = append(keys, k) + var kv mvccpb.KeyValue + if uerr := kv.Unmarshal(v); uerr != nil { + return uerr + } + kv.Key[0]++ + kv.Value[0]++ + v2, v2err := kv.Marshal() + if v2err != nil { + return v2err + } + vals = append(vals, v2) + } + for i := range keys { + if perr := b.Put(keys[i], vals[i]); perr != nil { + return perr + } + } + return nil + }) +} diff --git a/tests/e2e/cluster_test.go b/tests/e2e/cluster_test.go index 9115f9dc1dc..0f8359ed4ff 100644 --- a/tests/e2e/cluster_test.go +++ b/tests/e2e/cluster_test.go @@ -177,7 +177,10 @@ type etcdProcessClusterConfig struct { rollingStart bool logLevel string - MaxConcurrentStreams uint32 // default is math.MaxUint32 + MaxConcurrentStreams uint32 // default is math.MaxUint32 + CorruptCheckTime time.Duration + CompactHashCheckEnabled bool + CompactHashCheckTime time.Duration } // newEtcdProcessCluster launches a new cluster from etcd processes, returning @@ -326,6 +329,16 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs(tb testing.TB) []* args = append(args, "--max-concurrent-streams", fmt.Sprintf("%d", cfg.MaxConcurrentStreams)) } + if cfg.CorruptCheckTime != 0 { + args = append(args, "--experimental-corrupt-check-time", fmt.Sprintf("%s", cfg.CorruptCheckTime)) + } + if cfg.CompactHashCheckEnabled { + args = append(args, "--experimental-compact-hash-check-enabled") + } + if cfg.CompactHashCheckTime != 0 { + args = append(args, "--experimental-compact-hash-check-time", cfg.CompactHashCheckTime.String()) + } + etcdCfgs[i] = &etcdServerProcessConfig{ lg: lg, execPath: cfg.execPath, diff --git a/tests/e2e/corrupt_test.go b/tests/e2e/corrupt_test.go new file mode 100644 index 00000000000..f21eed59050 --- /dev/null +++ b/tests/e2e/corrupt_test.go @@ -0,0 +1,188 @@ +// Copyright 2017 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package e2e + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "go.etcd.io/etcd/api/v3/etcdserverpb" + "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/server/v3/datadir" + "go.etcd.io/etcd/server/v3/storage/mvcc/testutil" +) + +func TestEtcdCorruptHash(t *testing.T) { + // oldenv := os.Getenv("EXPECT_DEBUG") + // defer os.Setenv("EXPECT_DEBUG", oldenv) + // os.Setenv("EXPECT_DEBUG", "1") + + cfg := newConfigNoTLS() + + // trigger snapshot so that restart member can load peers from disk + cfg.snapshotCount = 3 + + testCtl(t, corruptTest, withQuorum(), + withCfg(*cfg), + withInitialCorruptCheck(), + withCorruptFunc(testutil.CorruptBBolt), + ) +} + +func corruptTest(cx ctlCtx) { + cx.t.Log("putting 10 keys...") + for i := 0; i < 10; i++ { + if err := ctlV3Put(cx, fmt.Sprintf("foo%05d", i), fmt.Sprintf("v%05d", i), ""); err != nil { + if cx.dialTimeout > 0 && !isGRPCTimedout(err) { + cx.t.Fatalf("putTest ctlV3Put error (%v)", err) + } + } + } + // enough time for all nodes sync on the same data + cx.t.Log("sleeping 3sec to let nodes sync...") + time.Sleep(3 * time.Second) + + cx.t.Log("connecting clientv3...") + eps := cx.epc.EndpointsV3() + cli1, err := clientv3.New(clientv3.Config{Endpoints: []string{eps[1]}, DialTimeout: 3 * time.Second}) + if err != nil { + cx.t.Fatal(err) + } + defer cli1.Close() + + sresp, err := cli1.Status(context.TODO(), eps[0]) + cx.t.Logf("checked status sresp:%v err:%v", sresp, err) + if err != nil { + cx.t.Fatal(err) + } + id0 := sresp.Header.GetMemberId() + + cx.t.Log("stopping etcd[0]...") + cx.epc.procs[0].Stop() + + // corrupting first member by modifying backend offline. + fp := datadir.ToBackendFileName(cx.epc.procs[0].Config().dataDirPath) + cx.t.Logf("corrupting backend: %v", fp) + if err = cx.corruptFunc(fp); err != nil { + cx.t.Fatal(err) + } + + cx.t.Log("restarting etcd[0]") + ep := cx.epc.procs[0] + proc, err := spawnCmd(append([]string{ep.Config().execPath}, ep.Config().args...), cx.envMap) + if err != nil { + cx.t.Fatal(err) + } + defer proc.Stop() + + cx.t.Log("waiting for etcd[0] failure...") + // restarting corrupted member should fail + waitReadyExpectProc(proc, []string{fmt.Sprintf("etcdmain: %016x found data inconsistency with peers", id0)}) +} + +func TestPeriodicCheckDetectsCorruption(t *testing.T) { + checkTime := time.Second + BeforeTest(t) + epc, err := newEtcdProcessCluster(t, &etcdProcessClusterConfig{ + clusterSize: 3, + keepDataDir: true, + CorruptCheckTime: time.Second, + }) + if err != nil { + t.Fatalf("could not start etcd process cluster (%v)", err) + } + t.Cleanup(func() { + if errC := epc.Close(); errC != nil { + t.Fatalf("error closing etcd processes (%v)", errC) + } + }) + + cc := NewEtcdctl(epc.EndpointsV3()) + + for i := 0; i < 10; i++ { + err := cc.Put(testutil.PickKey(int64(i)), fmt.Sprint(i)) + assert.NoError(t, err, "error on put") + } + + members, err := cc.MemberList() + assert.NoError(t, err, "error on member list") + var memberID uint64 + for _, m := range members.Members { + if m.Name == epc.procs[0].Config().name { + memberID = m.ID + } + } + assert.NotZero(t, memberID, "member not found") + epc.procs[0].Stop() + err = testutil.CorruptBBolt(datadir.ToBackendFileName(epc.procs[0].Config().dataDirPath)) + assert.NoError(t, err) + + err = epc.procs[0].Restart() + assert.NoError(t, err) + time.Sleep(checkTime * 11 / 10) + alarmResponse, err := cc.AlarmList() + assert.NoError(t, err, "error on alarm list") + assert.Equal(t, []*etcdserverpb.AlarmMember{{Alarm: etcdserverpb.AlarmType_CORRUPT, MemberID: memberID}}, alarmResponse.Alarms) +} + +func TestCompactHashCheckDetectCorruption(t *testing.T) { + checkTime := time.Second + BeforeTest(t) + epc, err := newEtcdProcessCluster(t, &etcdProcessClusterConfig{ + clusterSize: 3, + keepDataDir: true, + CompactHashCheckEnabled: true, + CompactHashCheckTime: checkTime, + }) + if err != nil { + t.Fatalf("could not start etcd process cluster (%v)", err) + } + t.Cleanup(func() { + if errC := epc.Close(); errC != nil { + t.Fatalf("error closing etcd processes (%v)", errC) + } + }) + + cc := NewEtcdctl(epc.EndpointsV3()) + + for i := 0; i < 10; i++ { + err := cc.Put(testutil.PickKey(int64(i)), fmt.Sprint(i)) + assert.NoError(t, err, "error on put") + } + members, err := cc.MemberList() + assert.NoError(t, err, "error on member list") + var memberID uint64 + for _, m := range members.Members { + if m.Name == epc.procs[0].Config().name { + memberID = m.ID + } + } + + epc.procs[0].Stop() + err = testutil.CorruptBBolt(datadir.ToBackendFileName(epc.procs[0].Config().dataDirPath)) + assert.NoError(t, err) + + err = epc.procs[0].Restart() + assert.NoError(t, err) + _, err = cc.Compact(5) + assert.NoError(t, err) + time.Sleep(checkTime * 11 / 10) + alarmResponse, err := cc.AlarmList() + assert.NoError(t, err, "error on alarm list") + assert.Equal(t, []*etcdserverpb.AlarmMember{{Alarm: etcdserverpb.AlarmType_CORRUPT, MemberID: memberID}}, alarmResponse.Alarms) +} diff --git a/tests/e2e/etcd_corrupt_test.go b/tests/e2e/etcd_corrupt_test.go deleted file mode 100644 index 87431b6f604..00000000000 --- a/tests/e2e/etcd_corrupt_test.go +++ /dev/null @@ -1,136 +0,0 @@ -// Copyright 2017 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package e2e - -import ( - "context" - "errors" - "fmt" - "os" - "testing" - "time" - - bolt "go.etcd.io/bbolt" - "go.etcd.io/etcd/api/v3/mvccpb" - "go.etcd.io/etcd/client/v3" - "go.etcd.io/etcd/server/v3/datadir" -) - -// TODO: test with embedded etcd in integration package - -func TestEtcdCorruptHash(t *testing.T) { - // oldenv := os.Getenv("EXPECT_DEBUG") - // defer os.Setenv("EXPECT_DEBUG", oldenv) - // os.Setenv("EXPECT_DEBUG", "1") - - cfg := newConfigNoTLS() - - // trigger snapshot so that restart member can load peers from disk - cfg.snapshotCount = 3 - - testCtl(t, corruptTest, withQuorum(), - withCfg(*cfg), - withInitialCorruptCheck(), - withCorruptFunc(corruptHash), - ) -} - -func corruptTest(cx ctlCtx) { - cx.t.Log("putting 10 keys...") - for i := 0; i < 10; i++ { - if err := ctlV3Put(cx, fmt.Sprintf("foo%05d", i), fmt.Sprintf("v%05d", i), ""); err != nil { - if cx.dialTimeout > 0 && !isGRPCTimedout(err) { - cx.t.Fatalf("putTest ctlV3Put error (%v)", err) - } - } - } - // enough time for all nodes sync on the same data - cx.t.Log("sleeping 3sec to let nodes sync...") - time.Sleep(3 * time.Second) - - cx.t.Log("connecting clientv3...") - eps := cx.epc.EndpointsV3() - cli1, err := clientv3.New(clientv3.Config{Endpoints: []string{eps[1]}, DialTimeout: 3 * time.Second}) - if err != nil { - cx.t.Fatal(err) - } - defer cli1.Close() - - sresp, err := cli1.Status(context.TODO(), eps[0]) - cx.t.Logf("checked status sresp:%v err:%v", sresp, err) - if err != nil { - cx.t.Fatal(err) - } - id0 := sresp.Header.GetMemberId() - - cx.t.Log("stopping etcd[0]...") - cx.epc.procs[0].Stop() - - // corrupting first member by modifying backend offline. - fp := datadir.ToBackendFileName(cx.epc.procs[0].Config().dataDirPath) - cx.t.Logf("corrupting backend: %v", fp) - if err = cx.corruptFunc(fp); err != nil { - cx.t.Fatal(err) - } - - cx.t.Log("restarting etcd[0]") - ep := cx.epc.procs[0] - proc, err := spawnCmd(append([]string{ep.Config().execPath}, ep.Config().args...), cx.envMap) - if err != nil { - cx.t.Fatal(err) - } - defer proc.Stop() - - cx.t.Log("waiting for etcd[0] failure...") - // restarting corrupted member should fail - waitReadyExpectProc(proc, []string{fmt.Sprintf("etcdmain: %016x found data inconsistency with peers", id0)}) -} - -func corruptHash(fpath string) error { - db, derr := bolt.Open(fpath, os.ModePerm, &bolt.Options{}) - if derr != nil { - return derr - } - defer db.Close() - - return db.Update(func(tx *bolt.Tx) error { - b := tx.Bucket([]byte("key")) - if b == nil { - return errors.New("got nil bucket for 'key'") - } - keys, vals := [][]byte{}, [][]byte{} - c := b.Cursor() - for k, v := c.First(); k != nil; k, v = c.Next() { - keys = append(keys, k) - var kv mvccpb.KeyValue - if uerr := kv.Unmarshal(v); uerr != nil { - return uerr - } - kv.Key[0]++ - kv.Value[0]++ - v2, v2err := kv.Marshal() - if v2err != nil { - return v2err - } - vals = append(vals, v2) - } - for i := range keys { - if perr := b.Put(keys[i], vals[i]); perr != nil { - return perr - } - } - return nil - }) -} diff --git a/tests/e2e/etcdctl.go b/tests/e2e/etcdctl.go new file mode 100644 index 00000000000..5366d9649b5 --- /dev/null +++ b/tests/e2e/etcdctl.go @@ -0,0 +1,83 @@ +// Copyright 2022 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package e2e + +import ( + "encoding/json" + "fmt" + "strings" + + clientv3 "go.etcd.io/etcd/client/v3" +) + +type EtcdctlV3 struct { + endpoints []string +} + +func NewEtcdctl(endpoints []string) *EtcdctlV3 { + return &EtcdctlV3{ + endpoints: endpoints, + } +} + +func (ctl *EtcdctlV3) Put(key, value string) error { + args := ctl.cmdArgs() + args = append(args, "put", key, value) + return spawnWithExpect(args, "OK") +} + +func (ctl *EtcdctlV3) AlarmList() (*clientv3.AlarmResponse, error) { + var resp clientv3.AlarmResponse + err := ctl.spawnJsonCmd(&resp, "alarm", "list") + return &resp, err +} + +func (ctl *EtcdctlV3) MemberList() (*clientv3.MemberListResponse, error) { + var resp clientv3.MemberListResponse + err := ctl.spawnJsonCmd(&resp, "member", "list") + return &resp, err +} + +func (ctl *EtcdctlV3) Compact(rev int64) (*clientv3.CompactResponse, error) { + args := ctl.cmdArgs("compact", fmt.Sprint(rev)) + return nil, spawnWithExpect(args, fmt.Sprintf("compacted revision %v", rev)) +} + +func (ctl *EtcdctlV3) spawnJsonCmd(output interface{}, args ...string) error { + args = append(args, "-w", "json") + cmd, err := spawnCmd(append(ctl.cmdArgs(), args...), nil) + if err != nil { + return err + } + line, err := cmd.Expect("header") + if err != nil { + return err + } + return json.Unmarshal([]byte(line), output) +} + +func (ctl *EtcdctlV3) cmdArgs(args ...string) []string { + cmdArgs := []string{ctlBinPath + "3"} + for k, v := range ctl.flags() { + cmdArgs = append(cmdArgs, fmt.Sprintf("--%s=%s", k, v)) + } + return append(cmdArgs, args...) +} + +func (ctl *EtcdctlV3) flags() map[string]string { + fmap := make(map[string]string) + fmap["endpoints"] = strings.Join(ctl.endpoints, ",") + return fmap +} diff --git a/tests/go.mod b/tests/go.mod index 658ba17ffc6..60a58b8ef72 100644 --- a/tests/go.mod +++ b/tests/go.mod @@ -27,7 +27,6 @@ require ( github.com/spf13/cobra v1.1.3 github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.7.0 - go.etcd.io/bbolt v1.3.6 go.etcd.io/etcd/api/v3 v3.5.4 go.etcd.io/etcd/client/pkg/v3 v3.5.4 go.etcd.io/etcd/client/v2 v2.305.4 diff --git a/tests/integration/clientv3/maintenance_test.go b/tests/integration/clientv3/maintenance_test.go index 53c3c9c907f..7b90352d606 100644 --- a/tests/integration/clientv3/maintenance_test.go +++ b/tests/integration/clientv3/maintenance_test.go @@ -25,6 +25,7 @@ import ( "testing" "time" + "go.etcd.io/etcd/server/v3/storage/mvcc/testutil" "go.uber.org/zap/zaptest" "google.golang.org/grpc" @@ -69,6 +70,53 @@ func TestMaintenanceHashKV(t *testing.T) { } } +// TODO: Change this to fuzz test +func TestCompactionHash(t *testing.T) { + integration.BeforeTest(t) + + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + cc, err := clus.ClusterClient() + if err != nil { + t.Fatal(err) + } + + testutil.TestCompactionHash(context.Background(), t, hashTestCase{cc, clus.Members[0].GRPCURL()}, 1000) +} + +type hashTestCase struct { + *clientv3.Client + url string +} + +func (tc hashTestCase) Put(ctx context.Context, key, value string) error { + _, err := tc.Client.Put(ctx, key, value) + return err +} + +func (tc hashTestCase) Delete(ctx context.Context, key string) error { + _, err := tc.Client.Delete(ctx, key) + return err +} + +func (tc hashTestCase) HashByRev(ctx context.Context, rev int64) (testutil.KeyValueHash, error) { + resp, err := tc.Client.HashKV(ctx, tc.url, rev) + return testutil.KeyValueHash{Hash: resp.Hash, CompactRevision: resp.CompactRevision, Revision: resp.Header.Revision}, err +} + +func (tc hashTestCase) Defrag(ctx context.Context) error { + _, err := tc.Client.Defragment(ctx, tc.url) + return err +} + +func (tc hashTestCase) Compact(ctx context.Context, rev int64) error { + _, err := tc.Client.Compact(ctx, rev) + // Wait for compaction to be compacted + time.Sleep(50 * time.Millisecond) + return err +} + func TestMaintenanceMoveLeader(t *testing.T) { integration.BeforeTest(t) diff --git a/tests/integration/cluster.go b/tests/integration/cluster.go index fedad797ae9..0d7b47ce462 100644 --- a/tests/integration/cluster.go +++ b/tests/integration/cluster.go @@ -613,6 +613,10 @@ type member struct { func (m *member) GRPCURL() string { return m.grpcURL } +func (m *member) CorruptionChecker() etcdserver.CorruptionChecker { + return m.s.CorruptionChecker() +} + type memberConfig struct { name string uniqNumber int64 diff --git a/tests/integration/corrupt_test.go b/tests/integration/corrupt_test.go new file mode 100644 index 00000000000..b04a77e6f13 --- /dev/null +++ b/tests/integration/corrupt_test.go @@ -0,0 +1,173 @@ +// Copyright 2022 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package integration + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.etcd.io/etcd/api/v3/etcdserverpb" + clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/server/v3/storage/mvcc/testutil" +) + +func TestPeriodicCheck(t *testing.T) { + BeforeTest(t) + + clus := NewClusterV3(t, &ClusterConfig{Size: 3}) + defer clus.Terminate(t) + + cc, err := clus.ClusterClient() + require.NoError(t, err) + + ctx := context.Background() + + var totalRevisions int64 = 1210 + var rev int64 + for ; rev < totalRevisions; rev += testutil.CompactionCycle { + testPeriodicCheck(ctx, t, cc, clus, rev, rev+testutil.CompactionCycle) + } + testPeriodicCheck(ctx, t, cc, clus, rev, rev+totalRevisions) + alarmResponse, err := cc.AlarmList(ctx) + assert.NoError(t, err, "error on alarm list") + assert.Equal(t, []*etcdserverpb.AlarmMember(nil), alarmResponse.Alarms) +} + +func testPeriodicCheck(ctx context.Context, t *testing.T, cc *clientv3.Client, clus *ClusterV3, start, stop int64) { + for i := start; i <= stop; i++ { + if i%67 == 0 { + _, err := cc.Delete(ctx, testutil.PickKey(i+83)) + assert.NoError(t, err, "error on delete") + } else { + _, err := cc.Put(ctx, testutil.PickKey(i), fmt.Sprint(i)) + assert.NoError(t, err, "error on put") + } + } + err := clus.Members[0].CorruptionChecker().PeriodicCheck() + assert.NoError(t, err, "error on periodic check (rev %v)", stop) +} + +func TestPeriodicCheckDetectsCorruption(t *testing.T) { + BeforeTest(t) + + clus := NewClusterV3(t, &ClusterConfig{Size: 3}) + defer clus.Terminate(t) + + cc, err := clus.ClusterClient() + require.NoError(t, err) + + ctx := context.Background() + + for i := 0; i < 10; i++ { + _, err := cc.Put(ctx, testutil.PickKey(int64(i)), fmt.Sprint(i)) + assert.NoError(t, err, "error on put") + } + + err = clus.Members[0].CorruptionChecker().PeriodicCheck() + assert.NoError(t, err, "error on periodic check") + clus.Members[0].Stop(t) + clus.WaitLeader(t) + + err = testutil.CorruptBBolt(clus.Members[0].BackendPath()) + assert.NoError(t, err) + + err = clus.Members[0].Restart(t) + assert.NoError(t, err) + time.Sleep(50 * time.Millisecond) + leader := clus.WaitLeader(t) + err = clus.Members[leader].CorruptionChecker().PeriodicCheck() + assert.NoError(t, err, "error on periodic check") + time.Sleep(50 * time.Millisecond) + alarmResponse, err := cc.AlarmList(ctx) + assert.NoError(t, err, "error on alarm list") + assert.Equal(t, []*etcdserverpb.AlarmMember{{Alarm: etcdserverpb.AlarmType_CORRUPT, MemberID: uint64(clus.Members[0].ID())}}, alarmResponse.Alarms) +} + +func TestCompactHashCheck(t *testing.T) { + BeforeTest(t) + + clus := NewClusterV3(t, &ClusterConfig{Size: 3}) + defer clus.Terminate(t) + + cc, err := clus.ClusterClient() + require.NoError(t, err) + + ctx := context.Background() + + var totalRevisions int64 = 1210 + var rev int64 + for ; rev < totalRevisions; rev += testutil.CompactionCycle { + testCompactionHash(ctx, t, cc, clus, rev, rev+testutil.CompactionCycle) + } + testCompactionHash(ctx, t, cc, clus, rev, rev+totalRevisions) +} + +func testCompactionHash(ctx context.Context, t *testing.T, cc *clientv3.Client, clus *ClusterV3, start, stop int64) { + for i := start; i <= stop; i++ { + if i%67 == 0 { + _, err := cc.Delete(ctx, testutil.PickKey(i+83)) + assert.NoError(t, err, "error on delete") + } else { + _, err := cc.Put(ctx, testutil.PickKey(i), fmt.Sprint(i)) + assert.NoError(t, err, "error on put") + } + } + _, err := cc.Compact(ctx, stop) + assert.NoError(t, err, "error on compact (rev %v)", stop) + // Wait for compaction to be compacted + time.Sleep(50 * time.Millisecond) + + clus.Members[0].CorruptionChecker().CompactHashCheck() +} + +func TestCompactHashCheckDetectCorruption(t *testing.T) { + BeforeTest(t) + + clus := NewClusterV3(t, &ClusterConfig{Size: 3}) + defer clus.Terminate(t) + + cc, err := clus.ClusterClient() + require.NoError(t, err) + + ctx := context.Background() + + for i := 0; i < 10; i++ { + _, err := cc.Put(ctx, testutil.PickKey(int64(i)), fmt.Sprint(i)) + assert.NoError(t, err, "error on put") + } + + clus.Members[0].CorruptionChecker().CompactHashCheck() + clus.Members[0].Stop(t) + clus.WaitLeader(t) + + err = testutil.CorruptBBolt(clus.Members[0].BackendPath()) + assert.NoError(t, err) + + err = clus.Members[0].Restart(t) + assert.NoError(t, err) + _, err = cc.Compact(ctx, 5) + assert.NoError(t, err) + time.Sleep(50 * time.Millisecond) + leader := clus.WaitLeader(t) + clus.Members[leader].CorruptionChecker().CompactHashCheck() + time.Sleep(50 * time.Millisecond) + alarmResponse, err := cc.AlarmList(ctx) + assert.NoError(t, err, "error on alarm list") + assert.Equal(t, []*etcdserverpb.AlarmMember{{Alarm: etcdserverpb.AlarmType_CORRUPT, MemberID: uint64(clus.Members[0].ID())}}, alarmResponse.Alarms) +} diff --git a/tests/integration/hashkv_test.go b/tests/integration/hashkv_test.go new file mode 100644 index 00000000000..b1b4e014f62 --- /dev/null +++ b/tests/integration/hashkv_test.go @@ -0,0 +1,88 @@ +// Copyright 2022 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package integration + +import ( + "context" + "net" + "net/http" + "testing" + "time" + + clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/server/v3/etcdserver" + "go.etcd.io/etcd/server/v3/storage/mvcc/testutil" +) + +const ( + // Use high prime + compactionCycle = 71 +) + +// TODO: Change this to fuzz test +func TestCompactionHash(t *testing.T) { + BeforeTest(t) + + clus := NewClusterV3(t, &ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + cc, err := clus.ClusterClient() + if err != nil { + t.Fatal(err) + } + + client := &http.Client{ + Transport: &http.Transport{ + DialContext: func(_ context.Context, _, _ string) (net.Conn, error) { + return net.Dial("unix", clus.Members[0].PeerURLs[0].Host) + }, + }, + } + + testutil.TestCompactionHash(context.Background(), t, hashTestCase{cc, clus.Members[0].GRPCURL(), client}, 1000) +} + +type hashTestCase struct { + *clientv3.Client + url string + http *http.Client +} + +func (tc hashTestCase) Put(ctx context.Context, key, value string) error { + _, err := tc.Client.Put(ctx, key, value) + return err +} + +func (tc hashTestCase) Delete(ctx context.Context, key string) error { + _, err := tc.Client.Delete(ctx, key) + return err +} + +func (tc hashTestCase) HashByRev(ctx context.Context, rev int64) (testutil.KeyValueHash, error) { + resp, err := etcdserver.HashByRev(ctx, tc.http, "http://unix", rev) + return testutil.KeyValueHash{Hash: resp.Hash, CompactRevision: resp.CompactRevision, Revision: resp.Header.Revision}, err +} + +func (tc hashTestCase) Defrag(ctx context.Context) error { + _, err := tc.Client.Defragment(ctx, tc.url) + return err +} + +func (tc hashTestCase) Compact(ctx context.Context, rev int64) error { + _, err := tc.Client.Compact(ctx, rev) + // Wait for compaction to be compacted + time.Sleep(50 * time.Millisecond) + return err +}