Skip to content

Commit

Permalink
Merge pull request #2057 from pachmu/fix-empty-batch-consume
Browse files Browse the repository at this point in the history
fix: stuck on the batch with zero records length
  • Loading branch information
bai committed Nov 8, 2021
2 parents 3a200f6 + a8a9228 commit 99c86c9
Show file tree
Hide file tree
Showing 6 changed files with 158 additions and 9 deletions.
4 changes: 4 additions & 0 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,10 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu
child.fetchSize = child.conf.Consumer.Fetch.Max
}
}
} else if block.LastRecordsBatchOffset != nil && *block.LastRecordsBatchOffset < block.HighWaterMarkOffset {
// check last record offset to avoid stuck if high watermark was not reached
Logger.Printf("consumer/broker/%d received batch with zero records but high watermark was not reached, topic %s, partition %d, offset %d\n", child.broker.broker.ID(), child.topic, child.partition, *block.LastRecordsBatchOffset)
child.offset = *block.LastRecordsBatchOffset + 1
}

return nil, nil
Expand Down
33 changes: 33 additions & 0 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1734,6 +1734,39 @@ func Test_partitionConsumer_parseResponse(t *testing.T) {
}
}

func Test_partitionConsumer_parseResponseEmptyBatch(t *testing.T) {
lrbOffset := int64(5)
block := &FetchResponseBlock{
HighWaterMarkOffset: 10,
LastStableOffset: 10,
LastRecordsBatchOffset: &lrbOffset,
LogStartOffset: 0,
}
response := &FetchResponse{
Blocks: map[string]map[int32]*FetchResponseBlock{"my_topic": {0: block}},
Version: 2,
}
child := &partitionConsumer{
broker: &brokerConsumer{
broker: &Broker{},
},
conf: NewConfig(),
topic: "my_topic",
partition: 0,
}
got, err := child.parseResponse(response)
if err != nil {
t.Errorf("partitionConsumer.parseResponse() error = %v", err)
return
}
if got != nil {
t.Errorf("partitionConsumer.parseResponse() should be nil, got %v", got)
}
if child.offset != 6 {
t.Errorf("child.offset should be LastRecordsBatchOffset + 1: %d, got %d", lrbOffset+1, child.offset)
}
}

func testConsumerInterceptor(
t *testing.T,
interceptors []ConsumerInterceptor,
Expand Down
24 changes: 15 additions & 9 deletions fetch_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,16 @@ func (t *AbortedTransaction) encode(pe packetEncoder) (err error) {
}

type FetchResponseBlock struct {
Err KError
HighWaterMarkOffset int64
LastStableOffset int64
LogStartOffset int64
AbortedTransactions []*AbortedTransaction
PreferredReadReplica int32
Records *Records // deprecated: use FetchResponseBlock.RecordsSet
RecordsSet []*Records
Partial bool
Err KError
HighWaterMarkOffset int64
LastStableOffset int64
LastRecordsBatchOffset *int64
LogStartOffset int64
AbortedTransactions []*AbortedTransaction
PreferredReadReplica int32
Records *Records // deprecated: use FetchResponseBlock.RecordsSet
RecordsSet []*Records
Partial bool
}

func (b *FetchResponseBlock) decode(pd packetDecoder, version int16) (err error) {
Expand Down Expand Up @@ -118,6 +119,11 @@ func (b *FetchResponseBlock) decode(pd packetDecoder, version int16) (err error)
return err
}

b.LastRecordsBatchOffset, err = records.recordsOffset()
if err != nil {
return err
}

partial, err := records.isPartial()
if err != nil {
return err
Expand Down
76 changes: 76 additions & 0 deletions fetch_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,37 @@ var (
0x00,
}

emptyRecordsFetchResponsev11 = []byte{
0x00, 0x00, 0x00, 0x00, // ThrottleTime
0x00, 0x00, // Error
0x00, 0x00, 0x00, 0x00, // Fetch session
0x00, 0x00, 0x00, 0x01, // Num topic
0x00, 0x05, 't', 'o', 'p', 'i', 'c', // Topic
0x00, 0x00, 0x00, 0x01, // Num partition
0x00, 0x00, 0x00, 0x05, // Partition
0x00, 0x00, // Error
0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10, // High Watermark Offset
0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10, // Last Stable Offset
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // Log start offset
0x00, 0x00, 0x00, 0x00, // Number of Aborted Transactions
0xff, 0xff, 0xff, 0xff, // Replica id
0x00, 0x00, 0x00, 0x3D, // Batch size
// recordBatch
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // Offset
0x00, 0x00, 0x00, 0x31, // Message size
0x00, 0x00, 0x00, 0x00, // Leader epoch
0x02, // Magic byte
0x14, 0xE0, 0x7A, 0x62, // CRC
0x00, 0x00, // Flags
0x00, 0x00, 0x00, 0x00, // Last offset delta
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0A, // First timestamp
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0B, // Last timestamp
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, // Producer id
0x00, 0x00, // Producer epoch
0x00, 0x00, 0x00, 0x3d, // Base sequence
0x00, 0x00, 0x00, 0x00, // Records size
}

oneMessageFetchResponseV4 = []byte{
0x00, 0x00, 0x00, 0x00, // ThrottleTime
0x00, 0x00, 0x00, 0x01, // Number of Topics
Expand Down Expand Up @@ -386,6 +417,51 @@ func TestPartailFetchResponse(t *testing.T) {
}
}

func TestEmptyRecordsFetchResponse(t *testing.T) {
response := FetchResponse{}
testVersionDecodable(t, "empty record", &response, emptyRecordsFetchResponsev11, 11)

if len(response.Blocks) != 1 {
t.Fatal("Decoding produced incorrect number of topic blocks.")
}

if len(response.Blocks["topic"]) != 1 {
t.Fatal("Decoding produced incorrect number of partition blocks for topic.")
}

block := response.GetBlock("topic", 5)
if block == nil {
t.Fatal("GetBlock didn't return block.")
}
if block.Err != ErrNoError {
t.Error("Decoding didn't produce correct error code.")
}
if block.HighWaterMarkOffset != 0x10101010 {
t.Error("Decoding didn't produce correct high water mark offset.")
}
if block.PreferredReadReplica != -1 {
t.Error("Decoding didn't produce correct preferred read replica.")
}
partial, err := block.isPartial()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if partial {
t.Error("Decoding a partial trailing record")
}

n, err := block.numRecords()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if n != 0 {
t.Fatal("Decoding produced incorrect number of records.")
}
if *block.LastRecordsBatchOffset != 0 {
t.Fatal("Last records batch offset is incorrect.")
}
}

func TestOneMessageFetchResponseV4(t *testing.T) {
response := FetchResponse{}
testVersionDecodable(t, "one message v4", &response, oneMessageFetchResponseV4, 4)
Expand Down
15 changes: 15 additions & 0 deletions records.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,21 @@ func (r *Records) isOverflow() (bool, error) {
return false, fmt.Errorf("unknown records type: %v", r.recordsType)
}

func (r *Records) recordsOffset() (*int64, error) {
switch r.recordsType {
case unknownRecords:
return nil, nil
case legacyRecords:
return nil, nil
case defaultRecords:
if r.RecordBatch == nil {
return nil, nil
}
return &r.RecordBatch.FirstOffset, nil
}
return nil, fmt.Errorf("unknown records type: %v", r.recordsType)
}

func magicValue(pd packetDecoder) (int8, error) {
return pd.peekInt8(magicOffset)
}
Expand Down
15 changes: 15 additions & 0 deletions records_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,20 @@ func TestLegacyRecords(t *testing.T) {
if c {
t.Errorf("MessageSet can't be a control batch")
}
f, err := r.recordsOffset()
if err != nil {
t.Fatal(err)
}
if f != nil {
t.Errorf("RecordBatch record offset is invalid")
}
}

func TestDefaultRecords(t *testing.T) {
batch := &RecordBatch{
IsTransactional: true,
Version: 2,
FirstOffset: 1,
Records: []*Record{
{
Value: []byte{1},
Expand Down Expand Up @@ -141,4 +149,11 @@ func TestDefaultRecords(t *testing.T) {
if c {
t.Errorf("RecordBatch shouldn't be a control batch")
}
f, err := r.recordsOffset()
if err != nil {
t.Fatal(err)
}
if f == nil || *f != 1 {
t.Errorf("RecordBatch record offset is invalid")
}
}

0 comments on commit 99c86c9

Please sign in to comment.