From 01bfe374a9fbb3bd1d787e1543b4cf03c59653ae Mon Sep 17 00:00:00 2001 From: Kranti Deep Date: Fri, 16 Oct 2020 00:53:10 +0530 Subject: [PATCH] Adds MockJoinGroupResponse, MockLeaveGroupResponse, MockSyncGroupResponse, MockHeartbeatResponse for mocking consumer groups Squashed commit of the following: commit d80a202edf65301c47af457b44043e3fedd54efd Merge: 6dbd2ce 65f0fec Author: Kranti Deep Date: Sun Oct 11 22:37:51 2020 +0530 Merge branch 'master' into cd/1577/consumer-group-mocks commit 6dbd2ce4ce7d3222cb5126d32b40ff07260c30c5 Merge: 94dce3e c1c2a08 Author: Kranti Deep Date: Wed Aug 12 20:46:55 2020 +0530 Merge branch 'master' into cd/1577/consumer-group-mocks commit 94dce3e27b076b2c2be44c8dd1ee8c10219293af Author: Kranti Deep Date: Sun Jul 12 23:05:21 2020 +0530 change return type of MockJoinGroupResponse.For, MockHeartbeatResponse.For & MockLeaveGroupResponse.For to encoderWithHeader commit ac5ca79bb834918ddff9c8baf841eda6624fa4d8 Merge: e81f001 5933302 Author: Kranti Deep Date: Sun Jul 12 22:52:16 2020 +0530 Merge branch 'master' into cd/1577/consumer-group-mocks commit e81f001564409e2ccfc3653aafe81c28ee12e2ac Author: Carl Dunham Date: Tue Feb 25 16:40:16 2020 -0500 add basic mocks for consumer group responses --- mockresponses.go | 149 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 149 insertions(+) diff --git a/mockresponses.go b/mockresponses.go index be8af85456..13bad31302 100644 --- a/mockresponses.go +++ b/mockresponses.go @@ -1078,6 +1078,155 @@ func (m *MockDeleteGroupsResponse) For(reqBody versionedDecoder) encoderWithHead return resp } +type MockJoinGroupResponse struct { + t TestReporter + + ThrottleTime int32 + Err KError + GenerationId int32 + GroupProtocol string + LeaderId string + MemberId string + Members map[string][]byte +} + +func NewMockJoinGroupResponse(t TestReporter) *MockJoinGroupResponse { + return &MockJoinGroupResponse{ + t: t, + + Members: make(map[string][]byte), + } +} + +func (m *MockJoinGroupResponse) For(reqBody versionedDecoder) encoderWithHeader { + req := reqBody.(*JoinGroupRequest) + resp := &JoinGroupResponse{ + Version: req.Version, + ThrottleTime: m.ThrottleTime, + Err: m.Err, + GenerationId: m.GenerationId, + GroupProtocol: m.GroupProtocol, + LeaderId: m.LeaderId, + MemberId: m.MemberId, + Members: m.Members, + } + return resp +} + +func (m *MockJoinGroupResponse) SetThrottleTime(t int32) *MockJoinGroupResponse { + m.ThrottleTime = t + return m +} + +func (m *MockJoinGroupResponse) SetError(kerr KError) *MockJoinGroupResponse { + m.Err = kerr + return m +} + +func (m *MockJoinGroupResponse) SetGenerationId(id int32) *MockJoinGroupResponse { + m.GenerationId = id + return m +} + +func (m *MockJoinGroupResponse) SetGroupProtocol(proto string) *MockJoinGroupResponse { + m.GroupProtocol = proto + return m +} + +func (m *MockJoinGroupResponse) SetLeaderId(id string) *MockJoinGroupResponse { + m.LeaderId = id + return m +} + +func (m *MockJoinGroupResponse) SetMemberId(id string) *MockJoinGroupResponse { + m.MemberId = id + return m +} + +func (m *MockJoinGroupResponse) SetMember(id string, meta *ConsumerGroupMemberMetadata) *MockJoinGroupResponse { + bin, err := encode(meta, nil) + if err != nil { + panic(fmt.Sprintf("error encoding member metadata: %v", err)) + } + m.Members[id] = bin + return m +} + +type MockLeaveGroupResponse struct { + t TestReporter + + Err KError +} + +func NewMockLeaveGroupResponse(t TestReporter) *MockLeaveGroupResponse { + return &MockLeaveGroupResponse{t: t} +} + +func (m *MockLeaveGroupResponse) For(reqBody versionedDecoder) encoderWithHeader { + resp := &LeaveGroupResponse{ + Err: m.Err, + } + return resp +} + +func (m *MockLeaveGroupResponse) SetError(kerr KError) *MockLeaveGroupResponse { + m.Err = kerr + return m +} + +type MockSyncGroupResponse struct { + t TestReporter + + Err KError + MemberAssignment []byte +} + +func NewMockSyncGroupResponse(t TestReporter) *MockSyncGroupResponse { + return &MockSyncGroupResponse{t: t} +} + +func (m *MockSyncGroupResponse) For(reqBody versionedDecoder) encoderWithHeader { + resp := &SyncGroupResponse{ + Err: m.Err, + MemberAssignment: m.MemberAssignment, + } + return resp +} + +func (m *MockSyncGroupResponse) SetError(kerr KError) *MockSyncGroupResponse { + m.Err = kerr + return m +} + +func (m *MockSyncGroupResponse) SetMemberAssignment(assignment *ConsumerGroupMemberAssignment) *MockSyncGroupResponse { + bin, err := encode(assignment, nil) + if err != nil { + panic(fmt.Sprintf("error encoding member assignment: %v", err)) + } + m.MemberAssignment = bin + return m +} + +type MockHeartbeatResponse struct { + t TestReporter + + Err KError +} + +func NewMockHeartbeatResponse(t TestReporter) *MockHeartbeatResponse { + return &MockHeartbeatResponse{t: t} +} + +func (m *MockHeartbeatResponse) For(reqBody versionedDecoder) encoderWithHeader { + resp := &HeartbeatResponse{} + return resp +} + +func (m *MockHeartbeatResponse) SetError(kerr KError) *MockHeartbeatResponse { + m.Err = kerr + return m +} + type MockDescribeLogDirsResponse struct { t TestReporter logDirs []DescribeLogDirsResponseDirMetadata