Skip to content

Commit

Permalink
Merge pull request #2328 from Skandalik/mock-fetch-response-key-support
Browse files Browse the repository at this point in the history
feat: support key in MockFetchResponse.
  • Loading branch information
dnwe committed Sep 15, 2022
2 parents 34a28a1 + 29be3d9 commit eb5895f
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 7 deletions.
85 changes: 85 additions & 0 deletions consumer_test.go
@@ -1,6 +1,7 @@
package sarama

import (
"bytes"
"errors"
"log"
"os"
Expand All @@ -13,6 +14,7 @@ import (
)

var testMsg = StringEncoder("Foo")
var testKey = StringEncoder("Bar")

// If a particular offset is provided then messages are consumed starting from
// that offset.
Expand Down Expand Up @@ -78,6 +80,71 @@ func TestConsumerOffsetManual(t *testing.T) {
broker0.Close()
}

// If a message is given a key, it can be correctly collected while consuming.
func TestConsumerMessageWithKey(t *testing.T) {
// Given
broker0 := NewMockBroker(t, 0)

manualOffset := int64(1234)
offsetNewest := int64(2345)
offsetNewestAfterFetchRequest := int64(3456)

mockFetchResponse := NewMockFetchResponse(t, 1)

// skipped because parseRecords(): offset < child.offset
mockFetchResponse.SetMessageWithKey("my_topic", 0, manualOffset-1, testKey, testMsg)

for i := int64(0); i < 10; i++ {
mockFetchResponse.SetMessageWithKey("my_topic", 0, i+manualOffset, testKey, testMsg)
}

mockFetchResponse.SetHighWaterMark("my_topic", 0, offsetNewestAfterFetchRequest)

broker0.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetBroker(broker0.Addr(), broker0.BrokerID()).
SetLeader("my_topic", 0, broker0.BrokerID()),
"OffsetRequest": NewMockOffsetResponse(t).
SetOffset("my_topic", 0, OffsetOldest, 0).
SetOffset("my_topic", 0, OffsetNewest, offsetNewest),
"FetchRequest": mockFetchResponse,
})

// When
master, err := NewConsumer([]string{broker0.Addr()}, NewTestConfig())
if err != nil {
t.Fatal(err)
}

consumer, err := master.ConsumePartition("my_topic", 0, manualOffset)
if err != nil {
t.Fatal(err)
}

// Then
if hwmo := consumer.HighWaterMarkOffset(); hwmo != offsetNewest {
t.Errorf("Expected high water mark offset %d, found %d", offsetNewest, hwmo)
}
for i := int64(0); i < 10; i++ {
select {
case message := <-consumer.Messages():
assertMessageOffset(t, message, i+manualOffset)
assertMessageKey(t, message, testKey)
assertMessageValue(t, message, testMsg)
case err := <-consumer.Errors():
t.Error(err)
}
}

if hwmo := consumer.HighWaterMarkOffset(); hwmo != offsetNewestAfterFetchRequest {
t.Errorf("Expected high water mark offset %d, found %d", offsetNewestAfterFetchRequest, hwmo)
}

safeClose(t, consumer)
safeClose(t, master)
broker0.Close()
}

func TestPauseResumeConsumption(t *testing.T) {
// Given
broker0 := NewMockBroker(t, 0)
Expand Down Expand Up @@ -1795,6 +1862,24 @@ func TestExcludeUncommitted(t *testing.T) {
broker0.Close()
}

func assertMessageKey(t *testing.T, msg *ConsumerMessage, expectedKey Encoder) {
t.Helper()

wantKey, _ := expectedKey.Encode()
if bytes.Compare(msg.Key, wantKey) != 0 {
t.Fatalf("Incorrect key for message. expected=%s, actual=%s", expectedKey, msg.Key)
}
}

func assertMessageValue(t *testing.T, msg *ConsumerMessage, expectedValue Encoder) {
t.Helper()

wantValue, _ := expectedValue.Encode()
if bytes.Compare(msg.Value, wantValue) != 0 {
t.Fatalf("Incorrect value for message. expected=%s, actual=%s", expectedValue, msg.Key)
}
}

func assertMessageOffset(t *testing.T, msg *ConsumerMessage, expectedOffset int64) {
t.Helper()
if msg.Offset != expectedOffset {
Expand Down
31 changes: 24 additions & 7 deletions mockresponses.go
Expand Up @@ -256,9 +256,22 @@ func (mor *MockOffsetResponse) getOffset(topic string, partition int32, time int
return offset
}

// mockMessage is a message that used to be mocked for `FetchResponse`
type mockMessage struct {
key Encoder
msg Encoder
}

func newMockMessage(key, msg Encoder) *mockMessage {
return &mockMessage{
key: key,
msg: msg,
}
}

// MockFetchResponse is a `FetchResponse` builder.
type MockFetchResponse struct {
messages map[string]map[int32]map[int64]Encoder
messages map[string]map[int32]map[int64]*mockMessage
messagesLock *sync.RWMutex
highWaterMarks map[string]map[int32]int64
t TestReporter
Expand All @@ -267,7 +280,7 @@ type MockFetchResponse struct {

func NewMockFetchResponse(t TestReporter, batchSize int) *MockFetchResponse {
return &MockFetchResponse{
messages: make(map[string]map[int32]map[int64]Encoder),
messages: make(map[string]map[int32]map[int64]*mockMessage),
messagesLock: &sync.RWMutex{},
highWaterMarks: make(map[string]map[int32]int64),
t: t,
Expand All @@ -276,19 +289,23 @@ func NewMockFetchResponse(t TestReporter, batchSize int) *MockFetchResponse {
}

func (mfr *MockFetchResponse) SetMessage(topic string, partition int32, offset int64, msg Encoder) *MockFetchResponse {
return mfr.SetMessageWithKey(topic, partition, offset, nil, msg)
}

func (mfr *MockFetchResponse) SetMessageWithKey(topic string, partition int32, offset int64, key, msg Encoder) *MockFetchResponse {
mfr.messagesLock.Lock()
defer mfr.messagesLock.Unlock()
partitions := mfr.messages[topic]
if partitions == nil {
partitions = make(map[int32]map[int64]Encoder)
partitions = make(map[int32]map[int64]*mockMessage)
mfr.messages[topic] = partitions
}
messages := partitions[partition]
if messages == nil {
messages = make(map[int64]Encoder)
messages = make(map[int64]*mockMessage)
partitions[partition] = messages
}
messages[offset] = msg
messages[offset] = newMockMessage(key, msg)
return mfr
}

Expand All @@ -315,7 +332,7 @@ func (mfr *MockFetchResponse) For(reqBody versionedDecoder) encoderWithHeader {
for i := 0; i < mfr.batchSize && offset < maxOffset; {
msg := mfr.getMessage(topic, partition, offset)
if msg != nil {
res.AddMessage(topic, partition, nil, msg, offset)
res.AddMessage(topic, partition, msg.key, msg.msg, offset)
i++
}
offset++
Expand All @@ -331,7 +348,7 @@ func (mfr *MockFetchResponse) For(reqBody versionedDecoder) encoderWithHeader {
return res
}

func (mfr *MockFetchResponse) getMessage(topic string, partition int32, offset int64) Encoder {
func (mfr *MockFetchResponse) getMessage(topic string, partition int32, offset int64) *mockMessage {
mfr.messagesLock.RLock()
defer mfr.messagesLock.RUnlock()
partitions := mfr.messages[topic]
Expand Down

0 comments on commit eb5895f

Please sign in to comment.