Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support key in MockFetchResponse. #2328

Merged
merged 2 commits into from Sep 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
85 changes: 85 additions & 0 deletions consumer_test.go
@@ -1,6 +1,7 @@
package sarama

import (
"bytes"
"errors"
"log"
"os"
Expand All @@ -13,6 +14,7 @@ import (
)

var testMsg = StringEncoder("Foo")
var testKey = StringEncoder("Bar")

// If a particular offset is provided then messages are consumed starting from
// that offset.
Expand Down Expand Up @@ -78,6 +80,71 @@ func TestConsumerOffsetManual(t *testing.T) {
broker0.Close()
}

// If a message is given a key, it can be correctly collected while consuming.
func TestConsumerMessageWithKey(t *testing.T) {
// Given
broker0 := NewMockBroker(t, 0)

manualOffset := int64(1234)
offsetNewest := int64(2345)
offsetNewestAfterFetchRequest := int64(3456)

mockFetchResponse := NewMockFetchResponse(t, 1)

// skipped because parseRecords(): offset < child.offset
mockFetchResponse.SetMessageWithKey("my_topic", 0, manualOffset-1, testKey, testMsg)

for i := int64(0); i < 10; i++ {
mockFetchResponse.SetMessageWithKey("my_topic", 0, i+manualOffset, testKey, testMsg)
}

mockFetchResponse.SetHighWaterMark("my_topic", 0, offsetNewestAfterFetchRequest)

broker0.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetBroker(broker0.Addr(), broker0.BrokerID()).
SetLeader("my_topic", 0, broker0.BrokerID()),
"OffsetRequest": NewMockOffsetResponse(t).
SetOffset("my_topic", 0, OffsetOldest, 0).
SetOffset("my_topic", 0, OffsetNewest, offsetNewest),
"FetchRequest": mockFetchResponse,
})

// When
master, err := NewConsumer([]string{broker0.Addr()}, NewTestConfig())
if err != nil {
t.Fatal(err)
}

consumer, err := master.ConsumePartition("my_topic", 0, manualOffset)
if err != nil {
t.Fatal(err)
}

// Then
if hwmo := consumer.HighWaterMarkOffset(); hwmo != offsetNewest {
t.Errorf("Expected high water mark offset %d, found %d", offsetNewest, hwmo)
}
for i := int64(0); i < 10; i++ {
select {
case message := <-consumer.Messages():
assertMessageOffset(t, message, i+manualOffset)
assertMessageKey(t, message, testKey)
assertMessageValue(t, message, testMsg)
case err := <-consumer.Errors():
t.Error(err)
}
}

if hwmo := consumer.HighWaterMarkOffset(); hwmo != offsetNewestAfterFetchRequest {
t.Errorf("Expected high water mark offset %d, found %d", offsetNewestAfterFetchRequest, hwmo)
}

safeClose(t, consumer)
safeClose(t, master)
broker0.Close()
}

func TestPauseResumeConsumption(t *testing.T) {
// Given
broker0 := NewMockBroker(t, 0)
Expand Down Expand Up @@ -1795,6 +1862,24 @@ func TestExcludeUncommitted(t *testing.T) {
broker0.Close()
}

func assertMessageKey(t *testing.T, msg *ConsumerMessage, expectedKey Encoder) {
t.Helper()

wantKey, _ := expectedKey.Encode()
if bytes.Compare(msg.Key, wantKey) != 0 {
t.Fatalf("Incorrect key for message. expected=%s, actual=%s", expectedKey, msg.Key)
}
}

func assertMessageValue(t *testing.T, msg *ConsumerMessage, expectedValue Encoder) {
t.Helper()

wantValue, _ := expectedValue.Encode()
if bytes.Compare(msg.Value, wantValue) != 0 {
t.Fatalf("Incorrect value for message. expected=%s, actual=%s", expectedValue, msg.Key)
}
}

func assertMessageOffset(t *testing.T, msg *ConsumerMessage, expectedOffset int64) {
t.Helper()
if msg.Offset != expectedOffset {
Expand Down
31 changes: 24 additions & 7 deletions mockresponses.go
Expand Up @@ -256,9 +256,22 @@ func (mor *MockOffsetResponse) getOffset(topic string, partition int32, time int
return offset
}

// mockMessage is a message that used to be mocked for `FetchResponse`
type mockMessage struct {
key Encoder
msg Encoder
}

func newMockMessage(key, msg Encoder) *mockMessage {
return &mockMessage{
key: key,
msg: msg,
}
}

// MockFetchResponse is a `FetchResponse` builder.
type MockFetchResponse struct {
messages map[string]map[int32]map[int64]Encoder
messages map[string]map[int32]map[int64]*mockMessage
messagesLock *sync.RWMutex
highWaterMarks map[string]map[int32]int64
t TestReporter
Expand All @@ -267,7 +280,7 @@ type MockFetchResponse struct {

func NewMockFetchResponse(t TestReporter, batchSize int) *MockFetchResponse {
return &MockFetchResponse{
messages: make(map[string]map[int32]map[int64]Encoder),
messages: make(map[string]map[int32]map[int64]*mockMessage),
messagesLock: &sync.RWMutex{},
highWaterMarks: make(map[string]map[int32]int64),
t: t,
Expand All @@ -276,19 +289,23 @@ func NewMockFetchResponse(t TestReporter, batchSize int) *MockFetchResponse {
}

func (mfr *MockFetchResponse) SetMessage(topic string, partition int32, offset int64, msg Encoder) *MockFetchResponse {
return mfr.SetMessageWithKey(topic, partition, offset, nil, msg)
}

func (mfr *MockFetchResponse) SetMessageWithKey(topic string, partition int32, offset int64, key, msg Encoder) *MockFetchResponse {
mfr.messagesLock.Lock()
defer mfr.messagesLock.Unlock()
partitions := mfr.messages[topic]
if partitions == nil {
partitions = make(map[int32]map[int64]Encoder)
partitions = make(map[int32]map[int64]*mockMessage)
mfr.messages[topic] = partitions
}
messages := partitions[partition]
if messages == nil {
messages = make(map[int64]Encoder)
messages = make(map[int64]*mockMessage)
partitions[partition] = messages
}
messages[offset] = msg
messages[offset] = newMockMessage(key, msg)
return mfr
}

Expand All @@ -315,7 +332,7 @@ func (mfr *MockFetchResponse) For(reqBody versionedDecoder) encoderWithHeader {
for i := 0; i < mfr.batchSize && offset < maxOffset; {
msg := mfr.getMessage(topic, partition, offset)
if msg != nil {
res.AddMessage(topic, partition, nil, msg, offset)
res.AddMessage(topic, partition, msg.key, msg.msg, offset)
i++
}
offset++
Expand All @@ -331,7 +348,7 @@ func (mfr *MockFetchResponse) For(reqBody versionedDecoder) encoderWithHeader {
return res
}

func (mfr *MockFetchResponse) getMessage(topic string, partition int32, offset int64) Encoder {
func (mfr *MockFetchResponse) getMessage(topic string, partition int32, offset int64) *mockMessage {
mfr.messagesLock.RLock()
defer mfr.messagesLock.RUnlock()
partitions := mfr.messages[topic]
Expand Down