Skip to content

Commit

Permalink
Record operation from failpoint injection
Browse files Browse the repository at this point in the history
Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
  • Loading branch information
serathius committed Apr 27, 2024
1 parent 0bd7008 commit d884401
Show file tree
Hide file tree
Showing 13 changed files with 195 additions and 120 deletions.
69 changes: 64 additions & 5 deletions tests/robustness/traffic/client.go → tests/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package traffic
package client

import (
"context"
Expand All @@ -33,7 +33,7 @@ import (
// clientv3.Client) that records all the requests and responses made. Doesn't
// allow for concurrent requests to confirm to model.AppendableHistory requirements.
type RecordingClient struct {
id int
ID int
client clientv3.Client
// using baseTime time-measuring operation to get monotonic clock reading
// see https://github.com/golang/go/blob/master/src/time/time.go#L17
Expand All @@ -51,7 +51,7 @@ type TimedWatchEvent struct {
Time time.Duration
}

func NewClient(endpoints []string, ids identity.Provider, baseTime time.Time) (*RecordingClient, error) {
func NewRecordingClient(endpoints []string, ids identity.Provider, baseTime time.Time) (*RecordingClient, error) {
cc, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
Logger: zap.NewNop(),
Expand All @@ -62,7 +62,7 @@ func NewClient(endpoints []string, ids identity.Provider, baseTime time.Time) (*
return nil, err
}
return &RecordingClient{
id: ids.NewClientID(),
ID: ids.NewClientID(),
client: *cc,
kvOperations: model.NewAppendableHistory(ids),
baseTime: baseTime,
Expand All @@ -75,7 +75,7 @@ func (c *RecordingClient) Close() error {

func (c *RecordingClient) Report() report.ClientReport {
return report.ClientReport{
ClientID: c.id,
ClientID: c.ID,
KeyValue: c.kvOperations.History.Operations(),
Watch: c.watchOperations,
}
Expand Down Expand Up @@ -190,6 +190,65 @@ func (c *RecordingClient) Defragment(ctx context.Context) (*clientv3.DefragmentR
return resp, err
}

func (c *RecordingClient) Compact(ctx context.Context, rev int64) (*clientv3.CompactResponse, error) {
c.kvMux.Lock()
defer c.kvMux.Unlock()
resp, err := c.client.Compact(ctx, rev)
return resp, err
}
func (c *RecordingClient) MemberList(ctx context.Context, opts ...clientv3.OpOption) (*clientv3.MemberListResponse, error) {
c.kvMux.Lock()
defer c.kvMux.Unlock()
resp, err := c.client.MemberList(ctx, opts...)
return resp, err
}

func (c *RecordingClient) MemberAdd(ctx context.Context, peerAddrs []string) (*clientv3.MemberAddResponse, error) {
c.kvMux.Lock()
defer c.kvMux.Unlock()
resp, err := c.client.MemberAdd(ctx, peerAddrs)
return resp, err
}

func (c *RecordingClient) MemberAddAsLearner(ctx context.Context, peerAddrs []string) (*clientv3.MemberAddResponse, error) {
c.kvMux.Lock()
defer c.kvMux.Unlock()
resp, err := c.client.MemberAddAsLearner(ctx, peerAddrs)
return resp, err
}

func (c *RecordingClient) MemberRemove(ctx context.Context, id uint64) (*clientv3.MemberRemoveResponse, error) {
c.kvMux.Lock()
defer c.kvMux.Unlock()
resp, err := c.client.MemberRemove(ctx, id)
return resp, err
}

func (c *RecordingClient) MemberUpdate(ctx context.Context, id uint64, peerAddrs []string) (*clientv3.MemberUpdateResponse, error) {
c.kvMux.Lock()
defer c.kvMux.Unlock()
resp, err := c.client.MemberUpdate(ctx, id, peerAddrs)
return resp, err
}

func (c *RecordingClient) MemberPromote(ctx context.Context, id uint64) (*clientv3.MemberPromoteResponse, error) {
c.kvMux.Lock()
defer c.kvMux.Unlock()
resp, err := c.client.MemberPromote(ctx, id)
return resp, err
}

func (c *RecordingClient) Status(ctx context.Context, endpoint string) (*clientv3.StatusResponse, error) {
c.kvMux.Lock()
defer c.kvMux.Unlock()
resp, err := c.client.Status(ctx, endpoint)
return resp, err
}

func (c *RecordingClient) Endpoints() []string {
return c.client.Endpoints()
}

func (c *RecordingClient) Watch(ctx context.Context, key string, rev int64, withPrefix bool, withProgressNotify bool, withPrevKV bool) clientv3.WatchChan {
request := model.WatchRequest{
Key: key,
Expand Down
36 changes: 17 additions & 19 deletions tests/robustness/failpoint/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ import (

clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/server/v3/etcdserver"
"go.etcd.io/etcd/tests/v3/client"
"go.etcd.io/etcd/tests/v3/framework/e2e"
"go.etcd.io/etcd/tests/v3/robustness/identity"
"go.etcd.io/etcd/tests/v3/robustness/report"
)

var (
Expand All @@ -36,26 +39,21 @@ var (

type memberReplace struct{}

func (f memberReplace) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) error {
func (f memberReplace) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, baseTime time.Time, ids identity.Provider) ([]report.ClientReport, error) {
memberID := uint64(rand.Int() % len(clus.Procs))
member := clus.Procs[memberID]
var endpoints []string
for i := 1; i < len(clus.Procs); i++ {
endpoints = append(endpoints, clus.Procs[(int(memberID)+i)%len(clus.Procs)].EndpointsGRPC()...)
}
cc, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
Logger: zap.NewNop(),
DialKeepAliveTime: 50 * time.Second,
DialKeepAliveTimeout: 100 * time.Millisecond,
})
cc, err := client.NewRecordingClient(endpoints, ids, baseTime)
if err != nil {
return err
return nil, err
}
defer cc.Close()
memberID, found, err := getID(ctx, cc, member.Config().Name)
if err != nil {
return err
return nil, err
}
if !found {
t.Fatal("Member not found")
Expand All @@ -65,11 +63,11 @@ func (f memberReplace) Inject(ctx context.Context, t *testing.T, lg *zap.Logger,
lg.Info("Removing member", zap.String("member", member.Config().Name))
_, err = cc.MemberRemove(ctx, memberID)
if err != nil {
return err
return nil, err
}
_, found, err = getID(ctx, cc, member.Config().Name)
if err != nil {
return err
return nil, err
}
if found {
t.Fatal("Expected member to be removed")
Expand All @@ -83,21 +81,21 @@ func (f memberReplace) Inject(ctx context.Context, t *testing.T, lg *zap.Logger,
err = member.Wait(ctx)
if err != nil && !strings.Contains(err.Error(), "unexpected exit code") {
lg.Info("Failed to kill the process", zap.Error(err))
return fmt.Errorf("failed to kill the process within %s, err: %w", triggerTimeout, err)
return nil, fmt.Errorf("failed to kill the process within %s, err: %w", triggerTimeout, err)
}
}
lg.Info("Removing member data", zap.String("member", member.Config().Name))
err = os.RemoveAll(member.Config().DataDirPath)
if err != nil {
return err
return nil, err
}

lg.Info("Adding member back", zap.String("member", member.Config().Name))
removedMemberPeerURL := member.Config().PeerURL.String()
for {
select {
case <-ctx.Done():
return ctx.Err()
return nil, ctx.Err()
default:
}
reqCtx, cancel := context.WithTimeout(ctx, time.Second)
Expand All @@ -109,17 +107,17 @@ func (f memberReplace) Inject(ctx context.Context, t *testing.T, lg *zap.Logger,
}
err = patchArgs(member.Config().Args, "initial-cluster-state", "existing")
if err != nil {
return err
return nil, err
}
lg.Info("Starting member", zap.String("member", member.Config().Name))
err = member.Start(ctx)
if err != nil {
return err
return nil, err
}
for {
select {
case <-ctx.Done():
return ctx.Err()
return nil, ctx.Err()
default:
}
_, found, err := getID(ctx, cc, member.Config().Name)
Expand All @@ -130,7 +128,7 @@ func (f memberReplace) Inject(ctx context.Context, t *testing.T, lg *zap.Logger,
break
}
}
return nil
return nil, nil
}

func (f memberReplace) Name() string {
Expand All @@ -141,7 +139,7 @@ func (f memberReplace) Available(config e2e.EtcdProcessClusterConfig, _ e2e.Etcd
return config.ClusterSize > 1
}

func getID(ctx context.Context, cc *clientv3.Client, name string) (id uint64, found bool, err error) {
func getID(ctx context.Context, cc clientv3.Cluster, name string) (id uint64, found bool, err error) {
resp, err := cc.MemberList(ctx)
if err != nil {
return 0, false, err
Expand Down
26 changes: 18 additions & 8 deletions tests/robustness/failpoint/failpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (

clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/tests/v3/framework/e2e"
"go.etcd.io/etcd/tests/v3/robustness/identity"
"go.etcd.io/etcd/tests/v3/robustness/report"
)

const (
Expand Down Expand Up @@ -75,7 +77,7 @@ func Validate(clus *e2e.EtcdProcessCluster, failpoint Failpoint) error {
return nil
}

func Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, failpoint Failpoint, baseTime time.Time) (*InjectionReport, error) {
func Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, failpoint Failpoint, baseTime time.Time, ids identity.Provider) (*FailpointReport, error) {
ctx, cancel := context.WithTimeout(ctx, triggerTimeout)
defer cancel()
var err error
Expand All @@ -85,7 +87,7 @@ func Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdPro
}
lg.Info("Triggering failpoint", zap.String("failpoint", failpoint.Name()))
start := time.Since(baseTime)
err = failpoint.Inject(ctx, t, lg, clus)
clientReport, err := failpoint.Inject(ctx, t, lg, clus, baseTime, ids)
if err != nil {
lg.Error("Failed to trigger failpoint", zap.String("failpoint", failpoint.Name()), zap.Error(err))
return nil, fmt.Errorf("failed triggering failpoint, err: %v", err)
Expand All @@ -96,14 +98,22 @@ func Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdPro
lg.Info("Finished triggering failpoint", zap.String("failpoint", failpoint.Name()))
end := time.Since(baseTime)

return &InjectionReport{
Start: start,
End: end,
Name: failpoint.Name(),
return &FailpointReport{
Injection: Injection{
Start: start,
End: end,
Name: failpoint.Name(),
},
Client: clientReport,
}, nil
}

type InjectionReport struct {
type FailpointReport struct {
Injection
Client []report.ClientReport
}

type Injection struct {
Start, End time.Duration
Name string
}
Expand Down Expand Up @@ -137,7 +147,7 @@ func verifyClusterHealth(ctx context.Context, _ *testing.T, clus *e2e.EtcdProces
}

type Failpoint interface {
Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) error
Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, baseTime time.Time, ids identity.Provider) ([]report.ClientReport, error)
Name() string
AvailabilityChecker
}
Expand Down

0 comments on commit d884401

Please sign in to comment.