Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds missing mock responses for mocking consumer group #1750

Merged
merged 2 commits into from Oct 16, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
148 changes: 148 additions & 0 deletions mockresponses.go
Expand Up @@ -1078,6 +1078,154 @@ func (m *MockDeleteGroupsResponse) For(reqBody versionedDecoder) encoderWithHead
return resp
}

type MockJoinGroupResponse struct {
t TestReporter

ThrottleTime int32
Err KError
GenerationId int32
GroupProtocol string
LeaderId string
MemberId string
Members map[string][]byte
}

func NewMockJoinGroupResponse(t TestReporter) *MockJoinGroupResponse {
return &MockJoinGroupResponse{
t: t,
Members: make(map[string][]byte),
}
}

func (m *MockJoinGroupResponse) For(reqBody versionedDecoder) encoderWithHeader {
req := reqBody.(*JoinGroupRequest)
resp := &JoinGroupResponse{
Version: req.Version,
ThrottleTime: m.ThrottleTime,
Err: m.Err,
GenerationId: m.GenerationId,
GroupProtocol: m.GroupProtocol,
LeaderId: m.LeaderId,
MemberId: m.MemberId,
Members: m.Members,
}
return resp
}

func (m *MockJoinGroupResponse) SetThrottleTime(t int32) *MockJoinGroupResponse {
m.ThrottleTime = t
return m
}

func (m *MockJoinGroupResponse) SetError(kerr KError) *MockJoinGroupResponse {
m.Err = kerr
return m
}

func (m *MockJoinGroupResponse) SetGenerationId(id int32) *MockJoinGroupResponse {
m.GenerationId = id
return m
}

func (m *MockJoinGroupResponse) SetGroupProtocol(proto string) *MockJoinGroupResponse {
m.GroupProtocol = proto
return m
}

func (m *MockJoinGroupResponse) SetLeaderId(id string) *MockJoinGroupResponse {
m.LeaderId = id
return m
}

func (m *MockJoinGroupResponse) SetMemberId(id string) *MockJoinGroupResponse {
m.MemberId = id
return m
}

func (m *MockJoinGroupResponse) SetMember(id string, meta *ConsumerGroupMemberMetadata) *MockJoinGroupResponse {
bin, err := encode(meta, nil)
if err != nil {
panic(fmt.Sprintf("error encoding member metadata: %v", err))
}
m.Members[id] = bin
return m
}

type MockLeaveGroupResponse struct {
t TestReporter

Err KError
}

func NewMockLeaveGroupResponse(t TestReporter) *MockLeaveGroupResponse {
return &MockLeaveGroupResponse{t: t}
}

func (m *MockLeaveGroupResponse) For(reqBody versionedDecoder) encoderWithHeader {
resp := &LeaveGroupResponse{
Err: m.Err,
}
return resp
}

func (m *MockLeaveGroupResponse) SetError(kerr KError) *MockLeaveGroupResponse {
m.Err = kerr
return m
}

type MockSyncGroupResponse struct {
t TestReporter

Err KError
MemberAssignment []byte
}

func NewMockSyncGroupResponse(t TestReporter) *MockSyncGroupResponse {
return &MockSyncGroupResponse{t: t}
}

func (m *MockSyncGroupResponse) For(reqBody versionedDecoder) encoderWithHeader {
resp := &SyncGroupResponse{

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @krantideep95, I'm confused whether I can set the value of attribute Version in SyncGroupResponse, like what other MockResponse did, eg

func (m *MockApiVersionsResponse) For(reqBody versionedDecoder) encoderWithHeader {
	req := reqBody.(*ApiVersionsRequest)
	res := &ApiVersionsResponse{
		Version: req.Version,
		ApiKeys: m.apiKeys,
	}
	return res
}

Without the setting, the Version in the SyncGroupResponse is always 0.
In one of my test cases, kafka consumer version is V2.8.1, Version in the SyncGroupRequest is 3 while in the SyncGroupResponse is 0, that will lead to an ErrInsufficientData error.

Err: m.Err,
MemberAssignment: m.MemberAssignment,
}
return resp
}

func (m *MockSyncGroupResponse) SetError(kerr KError) *MockSyncGroupResponse {
m.Err = kerr
return m
}

func (m *MockSyncGroupResponse) SetMemberAssignment(assignment *ConsumerGroupMemberAssignment) *MockSyncGroupResponse {
bin, err := encode(assignment, nil)
if err != nil {
panic(fmt.Sprintf("error encoding member assignment: %v", err))
}
m.MemberAssignment = bin
return m
}

type MockHeartbeatResponse struct {
t TestReporter

Err KError
}

func NewMockHeartbeatResponse(t TestReporter) *MockHeartbeatResponse {
return &MockHeartbeatResponse{t: t}
}

func (m *MockHeartbeatResponse) For(reqBody versionedDecoder) encoderWithHeader {
resp := &HeartbeatResponse{}
return resp
}

func (m *MockHeartbeatResponse) SetError(kerr KError) *MockHeartbeatResponse {
m.Err = kerr
return m
}

type MockDescribeLogDirsResponse struct {
t TestReporter
logDirs []DescribeLogDirsResponseDirMetadata
Expand Down