Skip to content

Commit

Permalink
Merge pull request #14282 from serathius/fix-checks-v3.5
Browse files Browse the repository at this point in the history
Fix corruption checks v3.5
  • Loading branch information
serathius committed Sep 7, 2022
2 parents 204c031 + 2ddb9e0 commit ba52d5a
Show file tree
Hide file tree
Showing 26 changed files with 1,875 additions and 337 deletions.
6 changes: 4 additions & 2 deletions server/config/config.go
Expand Up @@ -137,8 +137,10 @@ type ServerConfig struct {

// InitialCorruptCheck is true to check data corruption on boot
// before serving any peer/client traffic.
InitialCorruptCheck bool
CorruptCheckTime time.Duration
InitialCorruptCheck bool
CorruptCheckTime time.Duration
CompactHashCheckEnabled bool
CompactHashCheckTime time.Duration

// PreVote is true to enable Raft Pre-Vote.
PreVote bool
Expand Down
13 changes: 11 additions & 2 deletions server/embed/config.go
Expand Up @@ -314,8 +314,10 @@ type Config struct {
// AuthTokenTTL specifies the TTL in seconds of the simple token
AuthTokenTTL uint `json:"auth-token-ttl"`

ExperimentalInitialCorruptCheck bool `json:"experimental-initial-corrupt-check"`
ExperimentalCorruptCheckTime time.Duration `json:"experimental-corrupt-check-time"`
ExperimentalInitialCorruptCheck bool `json:"experimental-initial-corrupt-check"`
ExperimentalCorruptCheckTime time.Duration `json:"experimental-corrupt-check-time"`
ExperimentalCompactHashCheckEnabled bool `json:"experimental-compact-hash-check-enabled"`
ExperimentalCompactHashCheckTime time.Duration `json:"experimental-compact-hash-check-time"`
// ExperimentalEnableV2V3 configures URLs that expose deprecated V2 API working on V3 store.
// Deprecated in v3.5.
// TODO: Delete in v3.6 (https://github.com/etcd-io/etcd/issues/12913)
Expand Down Expand Up @@ -501,6 +503,9 @@ func NewConfig() *Config {
ExperimentalMemoryMlock: false,
ExperimentalTxnModeWriteWithSharedBuffer: true,

ExperimentalCompactHashCheckEnabled: false,
ExperimentalCompactHashCheckTime: time.Minute,

V2Deprecation: config.V2_DEPR_DEFAULT,
}
cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
Expand Down Expand Up @@ -698,6 +703,10 @@ func (cfg *Config) Validate() error {
return fmt.Errorf("setting experimental-enable-lease-checkpoint-persist requires experimental-enable-lease-checkpoint")
}

if cfg.ExperimentalCompactHashCheckTime <= 0 {
return fmt.Errorf("--experimental-compact-hash-check-time must be >0 (set to %v)", cfg.ExperimentalCompactHashCheckTime)
}

return nil
}

Expand Down
8 changes: 6 additions & 2 deletions server/embed/etcd.go
Expand Up @@ -203,6 +203,8 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
HostWhitelist: cfg.HostWhitelist,
InitialCorruptCheck: cfg.ExperimentalInitialCorruptCheck,
CorruptCheckTime: cfg.ExperimentalCorruptCheckTime,
CompactHashCheckEnabled: cfg.ExperimentalCompactHashCheckEnabled,
CompactHashCheckTime: cfg.ExperimentalCompactHashCheckTime,
PreVote: cfg.PreVote,
Logger: cfg.logger,
ForceNewCluster: cfg.ForceNewCluster,
Expand Down Expand Up @@ -247,8 +249,8 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {

// newly started member ("memberInitialized==false")
// does not need corruption check
if memberInitialized {
if err = e.Server.CheckInitialHashKV(); err != nil {
if memberInitialized && srvcfg.InitialCorruptCheck {
if err = e.Server.CorruptionChecker().InitialCheck(); err != nil {
// set "EtcdServer" to nil, so that it does not block on "EtcdServer.Close()"
// (nothing to close since rafthttp transports have not been started)

Expand Down Expand Up @@ -339,6 +341,8 @@ func print(lg *zap.Logger, ec Config, sc config.ServerConfig, memberInitialized
zap.Bool("pre-vote", sc.PreVote),
zap.Bool("initial-corrupt-check", sc.InitialCorruptCheck),
zap.String("corrupt-check-time-interval", sc.CorruptCheckTime.String()),
zap.Bool("compact-check-time-enabled", sc.CompactHashCheckEnabled),
zap.Duration("compact-check-time-interval", sc.CompactHashCheckTime),
zap.String("auto-compaction-mode", sc.AutoCompactionMode),
zap.Duration("auto-compaction-retention", sc.AutoCompactionRetention),
zap.String("auto-compaction-interval", sc.AutoCompactionRetention.String()),
Expand Down
2 changes: 2 additions & 0 deletions server/etcdmain/config.go
Expand Up @@ -281,6 +281,8 @@ func newConfig() *config {
// experimental
fs.BoolVar(&cfg.ec.ExperimentalInitialCorruptCheck, "experimental-initial-corrupt-check", cfg.ec.ExperimentalInitialCorruptCheck, "Enable to check data corruption before serving any client/peer traffic.")
fs.DurationVar(&cfg.ec.ExperimentalCorruptCheckTime, "experimental-corrupt-check-time", cfg.ec.ExperimentalCorruptCheckTime, "Duration of time between cluster corruption check passes.")
fs.BoolVar(&cfg.ec.ExperimentalCompactHashCheckEnabled, "experimental-compact-hash-check-enabled", cfg.ec.ExperimentalCompactHashCheckEnabled, "Enable leader to periodically check followers compaction hashes.")
fs.DurationVar(&cfg.ec.ExperimentalCompactHashCheckTime, "experimental-compact-hash-check-time", cfg.ec.ExperimentalCompactHashCheckTime, "Duration of time between leader checks followers compaction hashes.")

fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpoint, "experimental-enable-lease-checkpoint", false, "Enable leader to send regular checkpoints to other members to prevent reset of remaining TTL on leader change.")
// TODO: delete in v3.7
Expand Down
27 changes: 14 additions & 13 deletions server/etcdserver/api/v3rpc/maintenance.go
Expand Up @@ -66,19 +66,20 @@ type ClusterStatusGetter interface {
}

type maintenanceServer struct {
lg *zap.Logger
rg etcdserver.RaftStatusGetter
kg KVGetter
bg BackendGetter
a Alarmer
lt LeaderTransferrer
hdr header
cs ClusterStatusGetter
d Downgrader
lg *zap.Logger
rg etcdserver.RaftStatusGetter
hasher mvcc.HashStorage
kg KVGetter
bg BackendGetter
a Alarmer
lt LeaderTransferrer
hdr header
cs ClusterStatusGetter
d Downgrader
}

func NewMaintenanceServer(s *etcdserver.EtcdServer) pb.MaintenanceServer {
srv := &maintenanceServer{lg: s.Cfg.Logger, rg: s, kg: s, bg: s, a: s, lt: s, hdr: newHeader(s), cs: s, d: s}
srv := &maintenanceServer{lg: s.Cfg.Logger, rg: s, hasher: s.KV().HashStorage(), kg: s, bg: s, a: s, lt: s, hdr: newHeader(s), cs: s, d: s}
if srv.lg == nil {
srv.lg = zap.NewNop()
}
Expand Down Expand Up @@ -180,7 +181,7 @@ func (ms *maintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance
}

func (ms *maintenanceServer) Hash(ctx context.Context, r *pb.HashRequest) (*pb.HashResponse, error) {
h, rev, err := ms.kg.KV().Hash()
h, rev, err := ms.hasher.Hash()
if err != nil {
return nil, togRPCError(err)
}
Expand All @@ -190,12 +191,12 @@ func (ms *maintenanceServer) Hash(ctx context.Context, r *pb.HashRequest) (*pb.H
}

func (ms *maintenanceServer) HashKV(ctx context.Context, r *pb.HashKVRequest) (*pb.HashKVResponse, error) {
h, rev, compactRev, err := ms.kg.KV().HashByRev(r.Revision)
h, rev, err := ms.hasher.HashByRev(r.Revision)
if err != nil {
return nil, togRPCError(err)
}

resp := &pb.HashKVResponse{Header: &pb.ResponseHeader{Revision: rev}, Hash: h, CompactRevision: compactRev}
resp := &pb.HashKVResponse{Header: &pb.ResponseHeader{Revision: rev}, Hash: h.Hash, CompactRevision: h.CompactRevision}
ms.hdr.fill(resp.Header)
return resp, nil
}
Expand Down

0 comments on commit ba52d5a

Please sign in to comment.