Skip to content

Commit

Permalink
Draft reproduce issue etcd-io#17529
Browse files Browse the repository at this point in the history
Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
  • Loading branch information
serathius committed Apr 7, 2024
1 parent a7f0ed2 commit 1a02e04
Show file tree
Hide file tree
Showing 15 changed files with 152 additions and 80 deletions.
10 changes: 5 additions & 5 deletions server/storage/mvcc/watchable_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,11 +372,11 @@ func (s *watchableStore) syncWatchers() int {
victims := make(watcherBatch)
wb := newWatcherBatch(wg, evs)
for w := range wg.watchers {
if w.minRev < compactionRev {
// Skip the watcher that failed to send compacted watch response due to w.ch is full.
// Next retry of syncWatchers would try to resend the compacted watch response to w.ch
continue
}
//if w.minRev < compactionRev {
// // Skip the watcher that failed to send compacted watch response due to w.ch is full.
// // Next retry of syncWatchers would try to resend the compacted watch response to w.ch
// continue
//}
w.minRev = curRev + 1

eb, ok := wb[w]
Expand Down
2 changes: 1 addition & 1 deletion tests/framework/e2e/etcd_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ func (f *BinaryFailpoints) DeactivateHTTP(ctx context.Context, failpoint string)
}
httpClient := http.Client{
// TODO: Decrease after deactivate is not blocked by sleep https://github.com/etcd-io/gofail/issues/64
Timeout: 2 * time.Second,
Timeout: 3 * time.Second,
}
if f.clientTimeout != 0 {
httpClient.Timeout = f.clientTimeout
Expand Down
29 changes: 15 additions & 14 deletions tests/robustness/failpoint/failpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,21 @@ const (

var (
allFailpoints = []Failpoint{
KillFailpoint, BeforeCommitPanic, AfterCommitPanic, RaftBeforeSavePanic, RaftAfterSavePanic,
DefragBeforeCopyPanic, DefragBeforeRenamePanic, BackendBeforePreCommitHookPanic, BackendAfterPreCommitHookPanic,
BackendBeforeStartDBTxnPanic, BackendAfterStartDBTxnPanic, BackendBeforeWritebackBufPanic,
BackendAfterWritebackBufPanic, CompactBeforeCommitScheduledCompactPanic, CompactAfterCommitScheduledCompactPanic,
CompactBeforeSetFinishedCompactPanic, CompactAfterSetFinishedCompactPanic, CompactBeforeCommitBatchPanic,
CompactAfterCommitBatchPanic, RaftBeforeLeaderSendPanic, BlackholePeerNetwork, DelayPeerNetwork,
RaftBeforeFollowerSendPanic, RaftBeforeApplySnapPanic, RaftAfterApplySnapPanic, RaftAfterWALReleasePanic,
RaftBeforeSaveSnapPanic, RaftAfterSaveSnapPanic, BlackholeUntilSnapshot,
BeforeApplyOneConfChangeSleep,
MemberReplace,
DropPeerNetwork,
RaftBeforeSaveSleep,
RaftAfterSaveSleep,
ApplyBeforeOpenSnapshot,
//KillFailpoint, BeforeCommitPanic, AfterCommitPanic, RaftBeforeSavePanic, RaftAfterSavePanic,
//DefragBeforeCopyPanic, DefragBeforeRenamePanic, BackendBeforePreCommitHookPanic, BackendAfterPreCommitHookPanic,
//BackendBeforeStartDBTxnPanic, BackendAfterStartDBTxnPanic, BackendBeforeWritebackBufPanic,
//BackendAfterWritebackBufPanic, CompactBeforeCommitScheduledCompactPanic, CompactAfterCommitScheduledCompactPanic,
//CompactBeforeSetFinishedCompactPanic, CompactAfterSetFinishedCompactPanic, CompactBeforeCommitBatchPanic,
//CompactAfterCommitBatchPanic, RaftBeforeLeaderSendPanic, BlackholePeerNetwork, DelayPeerNetwork,
//RaftBeforeFollowerSendPanic, RaftBeforeApplySnapPanic, RaftAfterApplySnapPanic, RaftAfterWALReleasePanic,
//RaftBeforeSaveSnapPanic, RaftAfterSaveSnapPanic, BlackholeUntilSnapshot,
//BeforeApplyOneConfChangeSleep,
//MemberReplace,
//DropPeerNetwork,
//RaftBeforeSaveSleep,
//RaftAfterSaveSleep,
//ApplyBeforeOpenSnapshot,
beforeSendWatchResponse,
}
)

Expand Down
4 changes: 3 additions & 1 deletion tests/robustness/failpoint/gofail.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ var (
BeforeApplyOneConfChangeSleep Failpoint = killAndGofailSleep{"beforeApplyOneConfChange", time.Second}
RaftBeforeSaveSleep Failpoint = gofailSleepAndDeactivate{"raftBeforeSave", time.Second}
RaftAfterSaveSleep Failpoint = gofailSleepAndDeactivate{"raftAfterSave", time.Second}
beforeSendWatchResponse Failpoint = gofailSleepAndDeactivate{"beforeSendWatchResponse", time.Second}
)

type goPanicFailpoint struct {
Expand Down Expand Up @@ -209,13 +210,14 @@ func (f gofailSleepAndDeactivate) Inject(ctx context.Context, t *testing.T, lg *
lg.Info("goFailpoint setup failed", zap.String("failpoint", f.Name()), zap.Error(err))
return fmt.Errorf("goFailpoint %s setup failed, err:%w", f.Name(), err)
}
time.Sleep(f.time)
time.Sleep(2 * time.Second)
lg.Info("Deactivating gofailpoint", zap.String("failpoint", f.Name()))
err = member.Failpoints().DeactivateHTTP(ctx, f.failpoint)
if err != nil {
lg.Info("goFailpoint deactivate failed", zap.String("failpoint", f.Name()), zap.Error(err))
return fmt.Errorf("goFailpoint %s deactivate failed, err: %w", f.Name(), err)
}
time.Sleep(2 * time.Second)
return nil
}

Expand Down
21 changes: 10 additions & 11 deletions tests/robustness/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, s testSce
}
forcestopCluster(r.Cluster)

watchProgressNotifyEnabled := r.Cluster.Cfg.ServerConfig.ExperimentalWatchProgressNotifyInterval != 0
validateGotAtLeastOneProgressNotify(t, r.Client, s.watch.requestProgress || watchProgressNotifyEnabled)
//watchProgressNotifyEnabled := r.Cluster.Cfg.ServerConfig.ExperimentalWatchProgressNotifyInterval != 0
//validateGotAtLeastOneProgressNotify(t, r.Client, s.watch.requestProgress || watchProgressNotifyEnabled)
validateConfig := validate.Config{ExpectRevisionUnique: s.traffic.ExpectUniqueRevision()}
r.Visualize = validate.ValidateAndReturnVisualize(t, lg, validateConfig, r.Client, persistedRequests, 5*time.Minute)

Expand Down Expand Up @@ -125,17 +125,16 @@ func (s testScenario) run(ctx context.Context, t *testing.T, lg *zap.Logger, clu
lg.Info("Finished injecting failures")
return nil
})
maxRevisionChan := make(chan int64, 1)
//maxRevisionChan := make(chan int64, 1)
//g.Go(func() error {
// watchReport = collectClusterWatchEvents(ctx, t, clus, maxRevisionChan, s.watch, baseTime, ids)
// return nil
//})
g.Go(func() error {
defer close(maxRevisionChan)
//defer close(maxRevisionChan)
operationReport = traffic.SimulateTraffic(ctx, t, lg, clus, s.profile, s.traffic, finishTraffic, baseTime, ids)
maxRevision := operationsMaxRevision(operationReport)
maxRevisionChan <- maxRevision
lg.Info("Finished simulating traffic", zap.Int64("max-revision", maxRevision))
return nil
})
g.Go(func() error {
watchReport = collectClusterWatchEvents(ctx, t, clus, maxRevisionChan, s.watch, baseTime, ids)
//maxRevisionChan <- operationsMaxRevision(operationReport)
time.Sleep(time.Second)
return nil
})
g.Wait()
Expand Down
4 changes: 3 additions & 1 deletion tests/robustness/model/describe.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func describeEtcdResponse(request EtcdRequest, response MaybeEtcdResponse) strin
return fmt.Sprintf("%s, rev: %d", describeRangeResponse(request.Range.RangeOptions, *response.Range), response.Revision)
case Txn:
return fmt.Sprintf("%s, rev: %d", describeTxnResponse(request.Txn, response.Txn), response.Revision)
case LeaseGrant, LeaseRevoke, Defragment:
case LeaseGrant, LeaseRevoke, Defragment, Compact:
if response.Revision == 0 {
return "ok"
}
Expand Down Expand Up @@ -63,6 +63,8 @@ func describeEtcdRequest(request EtcdRequest) string {
return fmt.Sprintf("leaseRevoke(%d)", request.LeaseRevoke.LeaseID)
case Defragment:
return fmt.Sprintf("defragment()")
case Compact:
return fmt.Sprintf("compact(%d)", request.Compact.Revision)
default:
return fmt.Sprintf("<! unknown request type: %q !>", request.Type)
}
Expand Down
22 changes: 18 additions & 4 deletions tests/robustness/model/deterministic.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,11 @@ var DeterministicModel = porcupine.Model{
}

type EtcdState struct {
Revision int64
KeyValues map[string]ValueRevision
KeyLeases map[string]int64
Leases map[int64]EtcdLease
Revision int64
CompactRevision int64
KeyValues map[string]ValueRevision
KeyLeases map[string]int64
Leases map[int64]EtcdLease
}

func (s EtcdState) apply(request EtcdRequest, response EtcdResponse) (bool, EtcdState) {
Expand Down Expand Up @@ -178,6 +179,9 @@ func (s EtcdState) Step(request EtcdRequest) (EtcdState, MaybeEtcdResponse) {
return s, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Revision: s.Revision, LeaseRevoke: &LeaseRevokeResponse{}}}
case Defragment:
return s, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Defragment: &DefragmentResponse{}, Revision: s.Revision}}
case Compact:
s.CompactRevision = request.Compact.Revision
return s, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Compact: &CompactResponse{}, Revision: s.Revision}}
default:
panic(fmt.Sprintf("Unknown request type: %v", request.Type))
}
Expand Down Expand Up @@ -234,6 +238,7 @@ type RequestType string
const (
Range RequestType = "range"
Txn RequestType = "txn"
Compact RequestType = "compact"
LeaseGrant RequestType = "leaseGrant"
LeaseRevoke RequestType = "leaseRevoke"
Defragment RequestType = "defragment"
Expand All @@ -246,6 +251,7 @@ type EtcdRequest struct {
Range *RangeRequest
Txn *TxnRequest
Defragment *DefragmentRequest
Compact *CompactRequest
}

type RangeRequest struct {
Expand All @@ -269,6 +275,13 @@ type DeleteOptions struct {
Key string
}

type CompactResponse struct {
}

type CompactRequest struct {
Revision int64
}

type TxnRequest struct {
Conditions []EtcdCondition
OperationsOnSuccess []EtcdOperation
Expand Down Expand Up @@ -322,6 +335,7 @@ type EtcdResponse struct {
LeaseGrant *LeaseGrantReponse
LeaseRevoke *LeaseRevokeResponse
Defragment *DefragmentResponse
Compact *CompactResponse
Revision int64
}

Expand Down
27 changes: 27 additions & 0 deletions tests/robustness/model/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,25 @@ func (h *AppendableHistory) AppendDelete(key string, start, end time.Duration, r
})
}

func (h *AppendableHistory) AppendCompact(rev int64, start, end time.Duration, resp *clientv3.CompactResponse, err error) {
request := compactRequest(rev)
if err != nil {
h.appendFailed(request, start.Nanoseconds(), err)
return
}
var revision int64
if resp != nil && resp.Header != nil {
revision = resp.Header.Revision
}
h.appendSuccessful(porcupine.Operation{
ClientId: h.streamID,
Input: request,
Call: start.Nanoseconds(),
Output: compactResponse(revision),
Return: end.Nanoseconds(),
})
}

func (h *AppendableHistory) AppendTxn(cmp []clientv3.Cmp, clientOnSuccessOps, clientOnFailure []clientv3.Op, start, end time.Duration, resp *clientv3.TxnResponse, err error) {
conds := []EtcdCondition{}
for _, cmp := range cmp {
Expand Down Expand Up @@ -405,6 +424,10 @@ func putResponse(revision int64) MaybeEtcdResponse {
return MaybeEtcdResponse{EtcdResponse: EtcdResponse{Txn: &TxnResponse{Results: []EtcdOperationResult{{}}}, Revision: revision}}
}

func compactRequest(rev int64) EtcdRequest {
return EtcdRequest{Type: Compact, Compact: &CompactRequest{Revision: rev}}
}

func deleteRequest(key string) EtcdRequest {
return EtcdRequest{Type: Txn, Txn: &TxnRequest{OperationsOnSuccess: []EtcdOperation{{Type: DeleteOperation, Delete: DeleteOptions{Key: key}}}}}
}
Expand All @@ -413,6 +436,10 @@ func deleteResponse(deleted int64, revision int64) MaybeEtcdResponse {
return MaybeEtcdResponse{EtcdResponse: EtcdResponse{Txn: &TxnResponse{Results: []EtcdOperationResult{{Deleted: deleted}}}, Revision: revision}}
}

func compactResponse(revision int64) MaybeEtcdResponse {
return MaybeEtcdResponse{EtcdResponse: EtcdResponse{Compact: &CompactResponse{}, Revision: revision}}
}

func compareRevisionAndPutRequest(key string, expectedRevision int64, value string) EtcdRequest {
return txnRequestSingleOperation(compareRevision(key, expectedRevision), putOperation(key, value), nil)
}
Expand Down
58 changes: 29 additions & 29 deletions tests/robustness/scenarios.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,18 @@ type TrafficProfile struct {
}

var trafficProfiles = []TrafficProfile{
{
Traffic: traffic.EtcdPut,
Profile: traffic.HighTrafficProfile,
},
{
Traffic: traffic.EtcdPutDeleteLease,
Profile: traffic.LowTraffic,
},
{
Traffic: traffic.Kubernetes,
Profile: traffic.HighTrafficProfile,
},
//{
// Traffic: traffic.EtcdPut,
// Profile: traffic.HighTrafficProfile,
//},
//{
// Traffic: traffic.EtcdPutDeleteLease,
// Profile: traffic.LowTraffic,
//},
//{
// Traffic: traffic.Kubernetes,
// Profile: traffic.HighTrafficProfile,
//},
{
Traffic: traffic.Kubernetes,
Profile: traffic.LowTraffic,
Expand All @@ -64,7 +64,7 @@ func exploratoryScenarios(t *testing.T) []testScenario {
if err != nil {
t.Fatalf("Failed checking etcd version binary, binary: %q, err: %v", e2e.BinPath.Etcd, err)
}
enableLazyFS := e2e.BinPath.LazyFSAvailable()
//enableLazyFS := e2e.BinPath.LazyFSAvailable()
randomizableOptions := []e2e.EPClusterOption{
options.WithClusterOptionGroups(
options.ClusterOptions{options.WithTickMs(29), options.WithElectionMs(271)},
Expand All @@ -80,22 +80,22 @@ func exploratoryScenarios(t *testing.T) []testScenario {
e2e.WithWatchProcessNotifyInterval(100 * time.Millisecond),
}
scenarios := []testScenario{}
for _, tp := range trafficProfiles {
name := filepath.Join(tp.Traffic.Name(), tp.Profile.Name, "ClusterOfSize1")
clusterOfSize1Options := baseOptions
clusterOfSize1Options = append(clusterOfSize1Options, e2e.WithClusterSize(1))
// Add LazyFS only for traffic with lower QPS as it uses a lot of CPU lowering minimal QPS.
if enableLazyFS && tp.Profile.MinimalQPS <= 100 {
clusterOfSize1Options = append(clusterOfSize1Options, e2e.WithLazyFSEnabled(true))
name = filepath.Join(name, "LazyFS")
}
scenarios = append(scenarios, testScenario{
name: name,
traffic: tp.Traffic,
profile: tp.Profile,
cluster: *e2e.NewConfig(clusterOfSize1Options...),
})
}
//for _, tp := range trafficProfiles {
// name := filepath.Join(tp.Traffic.Name(), tp.Profile.Name, "ClusterOfSize1")
// clusterOfSize1Options := baseOptions
// clusterOfSize1Options = append(clusterOfSize1Options, e2e.WithClusterSize(1))
// // Add LazyFS only for traffic with lower QPS as it uses a lot of CPU lowering minimal QPS.
// if enableLazyFS && tp.Profile.MinimalQPS <= 100 {
// clusterOfSize1Options = append(clusterOfSize1Options, e2e.WithLazyFSEnabled(true))
// name = filepath.Join(name, "LazyFS")
// }
// scenarios = append(scenarios, testScenario{
// name: name,
// traffic: tp.Traffic,
// profile: tp.Profile,
// cluster: *e2e.NewConfig(clusterOfSize1Options...),
// })
//}

for _, tp := range trafficProfiles {
name := filepath.Join(tp.Traffic.Name(), tp.Profile.Name, "ClusterOfSize3")
Expand Down
14 changes: 14 additions & 0 deletions tests/robustness/traffic/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package traffic
import (
"context"
"fmt"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -135,6 +136,19 @@ func (c *RecordingClient) Delete(ctx context.Context, key string) (*clientv3.Del
return resp, err
}

func (c *RecordingClient) Compact(ctx context.Context, rev int64) (*clientv3.CompactResponse, error) {
c.kvMux.Lock()
defer c.kvMux.Unlock()
callTime := time.Since(c.baseTime)
fmt.Printf("Compact %d\n", rev)
resp, err := c.client.Compact(ctx, rev)
returnTime := time.Since(c.baseTime)
if err == nil || !strings.Contains(err.Error(), "mvcc: required revision has been compacted") {
c.kvOperations.AppendCompact(rev, callTime, returnTime, resp, err)
}
return resp, err
}

func (c *RecordingClient) Txn(ctx context.Context, conditions []clientv3.Cmp, onSuccess []clientv3.Op, onFailure []clientv3.Op) (*clientv3.TxnResponse, error) {
txn := c.client.Txn(ctx).If(
conditions...,
Expand Down

0 comments on commit 1a02e04

Please sign in to comment.