Skip to content

Commit

Permalink
Draft reproduce issue etcd-io#17529
Browse files Browse the repository at this point in the history
Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
  • Loading branch information
serathius committed Apr 1, 2024
1 parent 0fa800a commit a0b9888
Show file tree
Hide file tree
Showing 17 changed files with 196 additions and 60 deletions.
1 change: 1 addition & 0 deletions server/etcdserver/api/v3rpc/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,7 @@ func (sws *serverWatchStream) sendLoop() {
sws.mu.RUnlock()

var serr error
// gofail: var beforeSendWatchResponse struct{}
if !fragmented && !ok {
serr = sws.gRPCStream.Send(wr)
} else {
Expand Down
24 changes: 23 additions & 1 deletion tests/framework/e2e/etcd_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,8 +369,30 @@ func (f *BinaryFailpoints) SetupHTTP(ctx context.Context, failpoint, payload str
return nil
}

func (f *BinaryFailpoints) Disable(ctx context.Context, failpoint string) error {
host := fmt.Sprintf("127.0.0.1:%d", f.member.Config().GoFailPort)
failpointUrl := url.URL{
Scheme: "http",
Host: host,
Path: failpoint,
}
r, err := http.NewRequestWithContext(ctx, "DELETE", failpointUrl.String(), nil)
if err != nil {
return err
}
resp, err := httpClient.Do(r)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusNoContent {
return fmt.Errorf("bad status code: %d", resp.StatusCode)
}
return nil
}

var httpClient = http.Client{
Timeout: 10 * time.Millisecond,
//Timeout: 10 * time.Millisecond,
}

func (f *BinaryFailpoints) Available(failpoint string) bool {
Expand Down
22 changes: 12 additions & 10 deletions tests/robustness/failpoint/failpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,18 @@ const (

var (
allFailpoints = []Failpoint{
KillFailpoint, BeforeCommitPanic, AfterCommitPanic, RaftBeforeSavePanic, RaftAfterSavePanic,
DefragBeforeCopyPanic, DefragBeforeRenamePanic, BackendBeforePreCommitHookPanic, BackendAfterPreCommitHookPanic,
BackendBeforeStartDBTxnPanic, BackendAfterStartDBTxnPanic, BackendBeforeWritebackBufPanic,
BackendAfterWritebackBufPanic, CompactBeforeCommitScheduledCompactPanic, CompactAfterCommitScheduledCompactPanic,
CompactBeforeSetFinishedCompactPanic, CompactAfterSetFinishedCompactPanic, CompactBeforeCommitBatchPanic,
CompactAfterCommitBatchPanic, RaftBeforeLeaderSendPanic, BlackholePeerNetwork, DelayPeerNetwork,
RaftBeforeFollowerSendPanic, RaftBeforeApplySnapPanic, RaftAfterApplySnapPanic, RaftAfterWALReleasePanic,
RaftBeforeSaveSnapPanic, RaftAfterSaveSnapPanic, BlackholeUntilSnapshot,
beforeApplyOneConfChangeSleep,
MemberReplace,
//KillFailpoint, BeforeCommitPanic, AfterCommitPanic, RaftBeforeSavePanic, RaftAfterSavePanic,
//DefragBeforeCopyPanic, DefragBeforeRenamePanic, BackendBeforePreCommitHookPanic, BackendAfterPreCommitHookPanic,
//BackendBeforeStartDBTxnPanic, BackendAfterStartDBTxnPanic, BackendBeforeWritebackBufPanic,
//BackendAfterWritebackBufPanic,
////CompactBeforeCommitScheduledCompactPanic, CompactAfterCommitScheduledCompactPanic,
////CompactBeforeSetFinishedCompactPanic, CompactAfterSetFinishedCompactPanic, CompactBeforeCommitBatchPanic,
//CompactAfterCommitBatchPanic, RaftBeforeLeaderSendPanic, BlackholePeerNetwork, DelayPeerNetwork,
//RaftBeforeFollowerSendPanic, RaftBeforeApplySnapPanic, RaftAfterApplySnapPanic, RaftAfterWALReleasePanic,
//RaftBeforeSaveSnapPanic, RaftAfterSaveSnapPanic, BlackholeUntilSnapshot,
//beforeApplyOneConfChangeSleep,
//MemberReplace,
beforeSendWatchResponse,
}
)

Expand Down
44 changes: 39 additions & 5 deletions tests/robustness/failpoint/gofail.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ var (
RaftAfterWALReleasePanic Failpoint = goPanicFailpoint{"raftAfterWALRelease", triggerBlackhole{waitTillSnapshot: true}, Follower}
RaftBeforeSaveSnapPanic Failpoint = goPanicFailpoint{"raftBeforeSaveSnap", triggerBlackhole{waitTillSnapshot: true}, Follower}
RaftAfterSaveSnapPanic Failpoint = goPanicFailpoint{"raftAfterSaveSnap", triggerBlackhole{waitTillSnapshot: true}, Follower}
beforeApplyOneConfChangeSleep Failpoint = killAndGofailSleep{"beforeApplyOneConfChange", time.Second}
beforeApplyOneConfChangeSleep Failpoint = restartAndSleepFailpoint{"beforeApplyOneConfChange", time.Second}
beforeSendWatchResponse Failpoint = sleepFailpoint{"beforeSendWatchResponse", time.Second}
)

type goPanicFailpoint struct {
Expand Down Expand Up @@ -147,12 +148,12 @@ func (f goPanicFailpoint) Name() string {
return fmt.Sprintf("%s=panic()", f.failpoint)
}

type killAndGofailSleep struct {
type restartAndSleepFailpoint struct {
failpoint string
time time.Duration
}

func (f killAndGofailSleep) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) error {
func (f restartAndSleepFailpoint) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) error {
member := clus.Procs[rand.Int()%len(clus.Procs)]
for member.IsRunning() {
err := member.Kill()
Expand All @@ -178,11 +179,44 @@ func (f killAndGofailSleep) Inject(ctx context.Context, t *testing.T, lg *zap.Lo
return nil
}

func (f killAndGofailSleep) Name() string {
func (f restartAndSleepFailpoint) Name() string {
return fmt.Sprintf("kill&%s=sleep(%s)", f.failpoint, f.time)
}

func (f restartAndSleepFailpoint) Available(config e2e.EtcdProcessClusterConfig, member e2e.EtcdProcess) bool {
memberFailpoints := member.Failpoints()
if memberFailpoints == nil {
return false
}
return memberFailpoints.Available(f.failpoint)
}

type sleepFailpoint struct {
failpoint string
time time.Duration
}

func (f sleepFailpoint) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) error {
member := clus.Procs[rand.Int()%len(clus.Procs)]
lg.Info("Setting up goFailpoint", zap.String("failpoint", f.Name()))
err := member.Failpoints().SetupHTTP(ctx, f.failpoint, fmt.Sprintf(`sleep(%q)`, time.Second))
if err != nil {
return err
}
time.Sleep(time.Second * 2)
err = member.Failpoints().Disable(ctx, f.failpoint)
if err != nil {
return err
}
time.Sleep(time.Second / 10 * 11)
return nil
}

func (f sleepFailpoint) Name() string {
return fmt.Sprintf("%s=sleep(%s)", f.failpoint, f.time)
}

func (f killAndGofailSleep) Available(config e2e.EtcdProcessClusterConfig, member e2e.EtcdProcess) bool {
func (f sleepFailpoint) Available(config e2e.EtcdProcessClusterConfig, member e2e.EtcdProcess) bool {
memberFailpoints := member.Failpoints()
if memberFailpoints == nil {
return false
Expand Down
19 changes: 10 additions & 9 deletions tests/robustness/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, s testSce
}
forcestopCluster(r.Cluster)

watchProgressNotifyEnabled := r.Cluster.Cfg.ServerConfig.ExperimentalWatchProgressNotifyInterval != 0
validateGotAtLeastOneProgressNotify(t, r.Client, s.watch.requestProgress || watchProgressNotifyEnabled)
//watchProgressNotifyEnabled := r.Cluster.Cfg.ServerConfig.ExperimentalWatchProgressNotifyInterval != 0
//validateGotAtLeastOneProgressNotify(t, r.Client, s.watch.requestProgress || watchProgressNotifyEnabled)
validateConfig := validate.Config{ExpectRevisionUnique: s.traffic.ExpectUniqueRevision()}
r.Visualize = validate.ValidateAndReturnVisualize(t, lg, validateConfig, r.Client, persistedRequests, 5*time.Minute)

Expand All @@ -106,15 +106,16 @@ func (s testScenario) run(ctx context.Context, t *testing.T, lg *zap.Logger, clu
time.Sleep(time.Second)
return nil
})
maxRevisionChan := make(chan int64, 1)
//maxRevisionChan := make(chan int64, 1)
//g.Go(func() error {
// watchReport = collectClusterWatchEvents(ctx, t, clus, maxRevisionChan, s.watch, baseTime, ids)
// return nil
//})
g.Go(func() error {
defer close(maxRevisionChan)
//defer close(maxRevisionChan)
operationReport = traffic.SimulateTraffic(ctx, t, lg, clus, s.profile, s.traffic, finishTraffic, baseTime, ids)
maxRevisionChan <- operationsMaxRevision(operationReport)
return nil
})
g.Go(func() error {
watchReport = collectClusterWatchEvents(ctx, t, clus, maxRevisionChan, s.watch, baseTime, ids)
//maxRevisionChan <- operationsMaxRevision(operationReport)
time.Sleep(time.Second)
return nil
})
g.Wait()
Expand Down
4 changes: 2 additions & 2 deletions tests/robustness/makefile.mk
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,15 @@ GOFAIL_VERSION = $(shell cd tools/mod && go list -m -f {{.Version}} go.etcd.io/g

.PHONY: gofail-enable
gofail-enable: install-gofail
gofail enable server/etcdserver/ server/storage/backend/ server/storage/mvcc/ server/storage/wal/
gofail enable server/etcdserver/ server/storage/backend/ server/storage/mvcc/ server/storage/wal/ server/etcdserver/api/v3rpc/
cd ./server && go get go.etcd.io/gofail@${GOFAIL_VERSION}
cd ./etcdutl && go get go.etcd.io/gofail@${GOFAIL_VERSION}
cd ./etcdctl && go get go.etcd.io/gofail@${GOFAIL_VERSION}
cd ./tests && go get go.etcd.io/gofail@${GOFAIL_VERSION}

.PHONY: gofail-disable
gofail-disable: install-gofail
gofail disable server/etcdserver/ server/storage/backend/ server/storage/mvcc/ server/storage/wal/
gofail disable server/etcdserver/ server/storage/backend/ server/storage/mvcc/ server/storage/wal/ server/etcdserver/api/v3rpc/
cd ./server && go mod tidy
cd ./etcdutl && go mod tidy
cd ./etcdctl && go mod tidy
Expand Down
4 changes: 3 additions & 1 deletion tests/robustness/model/describe.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func describeEtcdResponse(request EtcdRequest, response MaybeEtcdResponse) strin
return fmt.Sprintf("%s, rev: %d", describeRangeResponse(request.Range.RangeOptions, *response.Range), response.Revision)
case Txn:
return fmt.Sprintf("%s, rev: %d", describeTxnResponse(request.Txn, response.Txn), response.Revision)
case LeaseGrant, LeaseRevoke, Defragment:
case LeaseGrant, LeaseRevoke, Defragment, Compact:
if response.Revision == 0 {
return "ok"
}
Expand Down Expand Up @@ -63,6 +63,8 @@ func describeEtcdRequest(request EtcdRequest) string {
return fmt.Sprintf("leaseRevoke(%d)", request.LeaseRevoke.LeaseID)
case Defragment:
return fmt.Sprintf("defragment()")
case Compact:
return fmt.Sprintf("compact(%d)", request.Compact.Revision)
default:
return fmt.Sprintf("<! unknown request type: %q !>", request.Type)
}
Expand Down
22 changes: 18 additions & 4 deletions tests/robustness/model/deterministic.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,11 @@ var DeterministicModel = porcupine.Model{
}

type EtcdState struct {
Revision int64
KeyValues map[string]ValueRevision
KeyLeases map[string]int64
Leases map[int64]EtcdLease
Revision int64
CompactRevision int64
KeyValues map[string]ValueRevision
KeyLeases map[string]int64
Leases map[int64]EtcdLease
}

func (s EtcdState) apply(request EtcdRequest, response EtcdResponse) (bool, EtcdState) {
Expand Down Expand Up @@ -178,6 +179,9 @@ func (s EtcdState) Step(request EtcdRequest) (EtcdState, MaybeEtcdResponse) {
return s, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Revision: s.Revision, LeaseRevoke: &LeaseRevokeResponse{}}}
case Defragment:
return s, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Defragment: &DefragmentResponse{}, Revision: s.Revision}}
case Compact:
s.CompactRevision = request.Compact.Revision
return s, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Compact: &CompactResponse{}, Revision: s.Revision}}
default:
panic(fmt.Sprintf("Unknown request type: %v", request.Type))
}
Expand Down Expand Up @@ -234,6 +238,7 @@ type RequestType string
const (
Range RequestType = "range"
Txn RequestType = "txn"
Compact RequestType = "compact"
LeaseGrant RequestType = "leaseGrant"
LeaseRevoke RequestType = "leaseRevoke"
Defragment RequestType = "defragment"
Expand All @@ -246,6 +251,7 @@ type EtcdRequest struct {
Range *RangeRequest
Txn *TxnRequest
Defragment *DefragmentRequest
Compact *CompactRequest
}

type RangeRequest struct {
Expand All @@ -269,6 +275,13 @@ type DeleteOptions struct {
Key string
}

type CompactResponse struct {
}

type CompactRequest struct {
Revision int64
}

type TxnRequest struct {
Conditions []EtcdCondition
OperationsOnSuccess []EtcdOperation
Expand Down Expand Up @@ -322,6 +335,7 @@ type EtcdResponse struct {
LeaseGrant *LeaseGrantReponse
LeaseRevoke *LeaseRevokeResponse
Defragment *DefragmentResponse
Compact *CompactResponse
Revision int64
}

Expand Down
27 changes: 27 additions & 0 deletions tests/robustness/model/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,25 @@ func (h *AppendableHistory) AppendDelete(key string, start, end time.Duration, r
})
}

func (h *AppendableHistory) AppendCompact(rev int64, start, end time.Duration, resp *clientv3.CompactResponse, err error) {
request := compactRequest(rev)
if err != nil {
h.appendFailed(request, start.Nanoseconds(), err)
return
}
var revision int64
if resp != nil && resp.Header != nil {
revision = resp.Header.Revision
}
h.appendSuccessful(porcupine.Operation{
ClientId: h.streamId,
Input: request,
Call: start.Nanoseconds(),
Output: compactResponse(revision),
Return: end.Nanoseconds(),
})
}

func (h *AppendableHistory) AppendTxn(cmp []clientv3.Cmp, clientOnSuccessOps, clientOnFailure []clientv3.Op, start, end time.Duration, resp *clientv3.TxnResponse, err error) {
conds := []EtcdCondition{}
for _, cmp := range cmp {
Expand Down Expand Up @@ -405,6 +424,10 @@ func putResponse(revision int64) MaybeEtcdResponse {
return MaybeEtcdResponse{EtcdResponse: EtcdResponse{Txn: &TxnResponse{Results: []EtcdOperationResult{{}}}, Revision: revision}}
}

func compactRequest(rev int64) EtcdRequest {
return EtcdRequest{Type: Compact, Compact: &CompactRequest{Revision: rev}}
}

func deleteRequest(key string) EtcdRequest {
return EtcdRequest{Type: Txn, Txn: &TxnRequest{OperationsOnSuccess: []EtcdOperation{{Type: DeleteOperation, Delete: DeleteOptions{Key: key}}}}}
}
Expand All @@ -413,6 +436,10 @@ func deleteResponse(deleted int64, revision int64) MaybeEtcdResponse {
return MaybeEtcdResponse{EtcdResponse: EtcdResponse{Txn: &TxnResponse{Results: []EtcdOperationResult{{Deleted: deleted}}}, Revision: revision}}
}

func compactResponse(revision int64) MaybeEtcdResponse {
return MaybeEtcdResponse{EtcdResponse: EtcdResponse{Compact: &CompactResponse{}, Revision: revision}}
}

func compareRevisionAndPutRequest(key string, expectedRevision int64, value string) EtcdRequest {
return txnRequestSingleOperation(compareRevision(key, expectedRevision), putOperation(key, value), nil)
}
Expand Down
8 changes: 7 additions & 1 deletion tests/robustness/report/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package report
import (
"errors"
"fmt"
"go.etcd.io/etcd/pkg/v3/pbutil"
"io"
"os"
"path/filepath"
Expand Down Expand Up @@ -134,7 +135,12 @@ func ReadWAL(lg *zap.Logger, dataDir string) (state raftpb.HardState, ents []raf
func parseEntryNormal(ent raftpb.Entry) (*model.EtcdRequest, error) {
var raftReq pb.InternalRaftRequest
if err := raftReq.Unmarshal(ent.Data); err != nil {
return nil, err
var r pb.Request
isV2Entry := pbutil.MaybeUnmarshal(&r, ent.Data)
if !isV2Entry {
return nil, err
}
return nil, nil
}
switch {
case raftReq.Put != nil:
Expand Down
24 changes: 12 additions & 12 deletions tests/robustness/scenarios.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,18 @@ type TrafficProfile struct {
}

var trafficProfiles = []TrafficProfile{
{
Traffic: traffic.EtcdPut,
Profile: traffic.HighTrafficProfile,
},
{
Traffic: traffic.EtcdPutDeleteLease,
Profile: traffic.LowTraffic,
},
{
Traffic: traffic.Kubernetes,
Profile: traffic.HighTrafficProfile,
},
//{
// Traffic: traffic.EtcdPut,
// Profile: traffic.HighTrafficProfile,
//},
//{
// Traffic: traffic.EtcdPutDeleteLease,
// Profile: traffic.LowTraffic,
//},
//{
// Traffic: traffic.Kubernetes,
// Profile: traffic.HighTrafficProfile,
//},
{
Traffic: traffic.Kubernetes,
Profile: traffic.LowTraffic,
Expand Down

0 comments on commit a0b9888

Please sign in to comment.