Skip to content

Commit

Permalink
Record compact operation from failpoint injection
Browse files Browse the repository at this point in the history
Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
  • Loading branch information
serathius committed Apr 26, 2024
1 parent cf32907 commit 7ae772b
Show file tree
Hide file tree
Showing 10 changed files with 107 additions and 82 deletions.
26 changes: 14 additions & 12 deletions tests/robustness/failpoint/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/server/v3/etcdserver"
"go.etcd.io/etcd/tests/v3/framework/e2e"
"go.etcd.io/etcd/tests/v3/robustness/identity"
"go.etcd.io/etcd/tests/v3/robustness/report"
)

var (
Expand All @@ -36,7 +38,7 @@ var (

type memberReplace struct{}

func (f memberReplace) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) error {
func (f memberReplace) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, baseTime time.Time, ids identity.Provider) ([]report.ClientReport, error) {
memberID := uint64(rand.Int() % len(clus.Procs))
member := clus.Procs[memberID]
var endpoints []string
Expand All @@ -50,12 +52,12 @@ func (f memberReplace) Inject(ctx context.Context, t *testing.T, lg *zap.Logger,
DialKeepAliveTimeout: 100 * time.Millisecond,
})
if err != nil {
return err
return nil, err
}
defer cc.Close()
memberID, found, err := getID(ctx, cc, member.Config().Name)
if err != nil {
return err
return nil, err
}
if !found {
t.Fatal("Member not found")
Expand All @@ -65,11 +67,11 @@ func (f memberReplace) Inject(ctx context.Context, t *testing.T, lg *zap.Logger,
lg.Info("Removing member", zap.String("member", member.Config().Name))
_, err = cc.MemberRemove(ctx, memberID)
if err != nil {
return err
return nil, err
}
_, found, err = getID(ctx, cc, member.Config().Name)
if err != nil {
return err
return nil, err
}
if found {
t.Fatal("Expected member to be removed")
Expand All @@ -83,21 +85,21 @@ func (f memberReplace) Inject(ctx context.Context, t *testing.T, lg *zap.Logger,
err = member.Wait(ctx)
if err != nil && !strings.Contains(err.Error(), "unexpected exit code") {
lg.Info("Failed to kill the process", zap.Error(err))
return fmt.Errorf("failed to kill the process within %s, err: %w", triggerTimeout, err)
return nil, fmt.Errorf("failed to kill the process within %s, err: %w", triggerTimeout, err)
}
}
lg.Info("Removing member data", zap.String("member", member.Config().Name))
err = os.RemoveAll(member.Config().DataDirPath)
if err != nil {
return err
return nil, err
}

lg.Info("Adding member back", zap.String("member", member.Config().Name))
removedMemberPeerURL := member.Config().PeerURL.String()
for {
select {
case <-ctx.Done():
return ctx.Err()
return nil, ctx.Err()
default:
}
reqCtx, cancel := context.WithTimeout(ctx, time.Second)
Expand All @@ -109,17 +111,17 @@ func (f memberReplace) Inject(ctx context.Context, t *testing.T, lg *zap.Logger,
}
err = patchArgs(member.Config().Args, "initial-cluster-state", "existing")
if err != nil {
return err
return nil, err
}
lg.Info("Starting member", zap.String("member", member.Config().Name))
err = member.Start(ctx)
if err != nil {
return err
return nil, err
}
for {
select {
case <-ctx.Done():
return ctx.Err()
return nil, ctx.Err()
default:
}
_, found, err := getID(ctx, cc, member.Config().Name)
Expand All @@ -130,7 +132,7 @@ func (f memberReplace) Inject(ctx context.Context, t *testing.T, lg *zap.Logger,
break
}
}
return nil
return nil, nil
}

func (f memberReplace) Name() string {
Expand Down
32 changes: 21 additions & 11 deletions tests/robustness/failpoint/failpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (

clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/tests/v3/framework/e2e"
"go.etcd.io/etcd/tests/v3/robustness/identity"
"go.etcd.io/etcd/tests/v3/robustness/report"
)

const (
Expand All @@ -38,9 +40,9 @@ var (
DefragBeforeCopyPanic, DefragBeforeRenamePanic, BackendBeforePreCommitHookPanic, BackendAfterPreCommitHookPanic,
BackendBeforeStartDBTxnPanic, BackendAfterStartDBTxnPanic, BackendBeforeWritebackBufPanic,
BackendAfterWritebackBufPanic,
//CompactBeforeCommitScheduledCompactPanic, CompactAfterCommitScheduledCompactPanic,
//CompactBeforeSetFinishedCompactPanic, CompactAfterSetFinishedCompactPanic, CompactBeforeCommitBatchPanic,
//CompactAfterCommitBatchPanic,
CompactBeforeCommitScheduledCompactPanic, CompactAfterCommitScheduledCompactPanic,
CompactBeforeSetFinishedCompactPanic, CompactAfterSetFinishedCompactPanic, CompactBeforeCommitBatchPanic,
CompactAfterCommitBatchPanic,
RaftBeforeLeaderSendPanic, BlackholePeerNetwork, DelayPeerNetwork,
RaftBeforeFollowerSendPanic, RaftBeforeApplySnapPanic, RaftAfterApplySnapPanic, RaftAfterWALReleasePanic,
RaftBeforeSaveSnapPanic, RaftAfterSaveSnapPanic, BlackholeUntilSnapshot,
Expand Down Expand Up @@ -77,7 +79,7 @@ func Validate(clus *e2e.EtcdProcessCluster, failpoint Failpoint) error {
return nil
}

func Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, failpoint Failpoint, baseTime time.Time) (*InjectionReport, error) {
func Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, failpoint Failpoint, baseTime time.Time, ids identity.Provider) (*FailpointReport, error) {
ctx, cancel := context.WithTimeout(ctx, triggerTimeout)
defer cancel()
var err error
Expand All @@ -87,7 +89,7 @@ func Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdPro
}
lg.Info("Triggering failpoint", zap.String("failpoint", failpoint.Name()))
start := time.Since(baseTime)
err = failpoint.Inject(ctx, t, lg, clus)
clientReport, err := failpoint.Inject(ctx, t, lg, clus, baseTime, ids)
if err != nil {
lg.Error("Failed to trigger failpoint", zap.String("failpoint", failpoint.Name()), zap.Error(err))
return nil, fmt.Errorf("failed triggering failpoint, err: %v", err)
Expand All @@ -98,14 +100,22 @@ func Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdPro
lg.Info("Finished triggering failpoint", zap.String("failpoint", failpoint.Name()))
end := time.Since(baseTime)

return &InjectionReport{
Start: start,
End: end,
Name: failpoint.Name(),
return &FailpointReport{
Injection: Injection{
Start: start,
End: end,
Name: failpoint.Name(),
},
Client: clientReport,
}, nil
}

type InjectionReport struct {
type FailpointReport struct {
Injection
Client []report.ClientReport
}

type Injection struct {
Start, End time.Duration
Name string
}
Expand Down Expand Up @@ -139,7 +149,7 @@ func verifyClusterHealth(ctx context.Context, _ *testing.T, clus *e2e.EtcdProces
}

type Failpoint interface {
Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) error
Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, baseTime time.Time, ids identity.Provider) ([]report.ClientReport, error)
Name() string
AvailabilityChecker
}
Expand Down
39 changes: 24 additions & 15 deletions tests/robustness/failpoint/gofail.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"go.uber.org/zap"

"go.etcd.io/etcd/tests/v3/framework/e2e"
"go.etcd.io/etcd/tests/v3/robustness/identity"
"go.etcd.io/etcd/tests/v3/robustness/report"
)

var (
Expand Down Expand Up @@ -73,13 +75,13 @@ const (
Follower failpointTarget = "Follower"
)

func (f goPanicFailpoint) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) error {
func (f goPanicFailpoint) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, baseTime time.Time, ids identity.Provider) (reports []report.ClientReport, err error) {
member := f.pickMember(t, clus)

for member.IsRunning() {
select {
case <-ctx.Done():
return ctx.Err()
return reports, ctx.Err()
default:
}
lg.Info("Setting up gofailpoint", zap.String("failpoint", f.Name()))
Expand All @@ -94,16 +96,23 @@ func (f goPanicFailpoint) Inject(ctx context.Context, t *testing.T, lg *zap.Logg
}
if f.trigger != nil {
lg.Info("Triggering gofailpoint", zap.String("failpoint", f.Name()))
err = f.trigger.Trigger(ctx, t, member, clus)
r, err := f.trigger.Trigger(ctx, t, member, clus, baseTime, ids)
if err != nil {
lg.Info("gofailpoint trigger failed", zap.String("failpoint", f.Name()), zap.Error(err))
}
if r != nil {
reports = append(reports, r...)
}
}
if !member.IsRunning() {
// TODO: Check member logs that etcd not running is caused panic caused by proper gofailpoint.
break
}
lg.Info("Waiting for member to exit", zap.String("member", member.Config().Name))
err = member.Wait(ctx)
if err != nil && !strings.Contains(err.Error(), "unexpected exit code") {
lg.Info("Member didn't exit as expected", zap.String("member", member.Config().Name), zap.Error(err))
return fmt.Errorf("member didn't exit as expected: %v", err)
return reports, fmt.Errorf("member didn't exit as expected: %v", err)
}
lg.Info("Member exited as expected", zap.String("member", member.Config().Name))
}
Expand All @@ -112,11 +121,11 @@ func (f goPanicFailpoint) Inject(ctx context.Context, t *testing.T, lg *zap.Logg
lg.Info("Removing data that was not fsynced")
err := lazyfs.ClearCache(ctx)
if err != nil {
return err
return reports, err
}
}

return member.Start(ctx)
return reports, member.Start(ctx)
}

func (f goPanicFailpoint) pickMember(t *testing.T, clus *e2e.EtcdProcessCluster) e2e.EtcdProcess {
Expand Down Expand Up @@ -155,7 +164,7 @@ type killAndGofailSleep struct {
time time.Duration
}

func (f killAndGofailSleep) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) error {
func (f killAndGofailSleep) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, baseTime time.Time, ids identity.Provider) ([]report.ClientReport, error) {
member := clus.Procs[rand.Int()%len(clus.Procs)]
for member.IsRunning() {
err := member.Kill()
Expand All @@ -165,20 +174,20 @@ func (f killAndGofailSleep) Inject(ctx context.Context, t *testing.T, lg *zap.Lo
err = member.Wait(ctx)
if err != nil && !strings.Contains(err.Error(), "unexpected exit code") {
lg.Info("Failed to kill the process", zap.Error(err))
return fmt.Errorf("failed to kill the process within %s, err: %w", triggerTimeout, err)
return nil, fmt.Errorf("failed to kill the process within %s, err: %w", triggerTimeout, err)
}
}
lg.Info("Setting up goFailpoint", zap.String("failpoint", f.Name()))
err := member.Failpoints().SetupEnv(f.failpoint, fmt.Sprintf(`sleep(%q)`, f.time))
if err != nil {
return err
return nil, err
}
err = member.Start(ctx)
if err != nil {
return err
return nil, err
}
// TODO: Check gofail status (https://github.com/etcd-io/gofail/pull/47) and wait for sleep to beis executed at least once.
return nil
return nil, nil
}

func (f killAndGofailSleep) Name() string {
Expand All @@ -201,22 +210,22 @@ type gofailSleepAndDeactivate struct {
time time.Duration
}

func (f gofailSleepAndDeactivate) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) error {
func (f gofailSleepAndDeactivate) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, baseTime time.Time, ids identity.Provider) ([]report.ClientReport, error) {
member := clus.Procs[rand.Int()%len(clus.Procs)]
lg.Info("Setting up gofailpoint", zap.String("failpoint", f.Name()))
err := member.Failpoints().SetupHTTP(ctx, f.failpoint, fmt.Sprintf(`sleep(%q)`, f.time))
if err != nil {
lg.Info("goFailpoint setup failed", zap.String("failpoint", f.Name()), zap.Error(err))
return fmt.Errorf("goFailpoint %s setup failed, err:%w", f.Name(), err)
return nil, fmt.Errorf("goFailpoint %s setup failed, err:%w", f.Name(), err)
}
time.Sleep(f.time)
lg.Info("Deactivating gofailpoint", zap.String("failpoint", f.Name()))
err = member.Failpoints().DeactivateHTTP(ctx, f.failpoint)
if err != nil {
lg.Info("goFailpoint deactivate failed", zap.String("failpoint", f.Name()), zap.Error(err))
return fmt.Errorf("goFailpoint %s deactivate failed, err: %w", f.Name(), err)
return nil, fmt.Errorf("goFailpoint %s deactivate failed, err: %w", f.Name(), err)
}
return nil
return nil, nil
}

func (f gofailSleepAndDeactivate) Name() string {
Expand Down
13 changes: 8 additions & 5 deletions tests/robustness/failpoint/kill.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@ import (
"math/rand"
"strings"
"testing"
"time"

"go.uber.org/zap"

"go.etcd.io/etcd/tests/v3/framework/e2e"
"go.etcd.io/etcd/tests/v3/robustness/identity"
"go.etcd.io/etcd/tests/v3/robustness/report"
)

var (
Expand All @@ -32,7 +35,7 @@ var (

type killFailpoint struct{}

func (f killFailpoint) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) error {
func (f killFailpoint) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, baseTime time.Time, ids identity.Provider) ([]report.ClientReport, error) {
member := clus.Procs[rand.Int()%len(clus.Procs)]

for member.IsRunning() {
Expand All @@ -43,21 +46,21 @@ func (f killFailpoint) Inject(ctx context.Context, t *testing.T, lg *zap.Logger,
err = member.Wait(ctx)
if err != nil && !strings.Contains(err.Error(), "unexpected exit code") {
lg.Info("Failed to kill the process", zap.Error(err))
return fmt.Errorf("failed to kill the process within %s, err: %w", triggerTimeout, err)
return nil, fmt.Errorf("failed to kill the process within %s, err: %w", triggerTimeout, err)
}
}
if lazyfs := member.LazyFS(); lazyfs != nil {
lg.Info("Removing data that was not fsynced")
err := lazyfs.ClearCache(ctx)
if err != nil {
return err
return nil, err
}
}
err := member.Start(ctx)
if err != nil {
return err
return nil, err
}
return nil
return nil, nil
}

func (f killFailpoint) Name() string {
Expand Down

0 comments on commit 7ae772b

Please sign in to comment.