Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add methods to pause/resume consumer's consumption #2005

Merged
merged 3 commits into from
Jan 22, 2022

Conversation

raulnegreiros
Copy link
Contributor

I'm submitting this PR in order to solve this issue.

It aims to allow consumption control, providing some methods to pause and resume consumer consumption.

Motivation:
When your data destination is offline it becomes pointless to continue to consume new messages from the broker once it certainly will result in an error. The Java library already provides something thing similar to it.

Note that the consumption state is not preserved between the rebalance process, so it's the user responsibility to manage it using the callbacks.

@ghost ghost added the cla-needed label Sep 5, 2021
@raulnegreiros
Copy link
Contributor Author

I'm with the same error related to the CLA that @faillefer mentioned with another PR

@raulnegreiros
Copy link
Contributor Author

Is there some extra step to take for the CLA bot to become verified? I already have signed the terms, but it still doesn't recognize it
image

@ghost ghost removed the cla-needed label Sep 11, 2021
@bai bai requested a review from dnwe September 13, 2021 03:26
@bai
Copy link
Contributor

bai commented Sep 13, 2021

Looks like CLA check is now green.

@dnwe
Copy link
Collaborator

dnwe commented Nov 6, 2021

@raulnegreiros 👋🏻 hey, I'm sorry this took so long to review, I hadn't spotted that it was still pending.

Overall the changes look reasonable and it's good for us to sync up with the Java client with this capability

However, I'm not sure the TestPauseResumeConsumption test is exercising the capability as you planned. If I remove the suspend and resume, the test still passes:

diff --git a/consumer_test.go b/consumer_test.go
index d0a4ac6..aac3c3f 100644
--- a/consumer_test.go
+++ b/consumer_test.go
@@ -91,7 +91,7 @@ func TestPauseResumeConsumption(t *testing.T) {
 		t.Fatal(err)
 	}
 
-	consumer.Pause()
+	// consumer.Pause()
 
 	// Then: no message is available
 	if len(consumer.Messages()) != 0 {
@@ -99,7 +99,7 @@ func TestPauseResumeConsumption(t *testing.T) {
 	}
 
 	// When
-	consumer.Resume()
+	// consumer.Resume()
 
 	// Then: messages starting from offset 1234 are consumed.
 	for i := 0; i < 10; i++ {

@dnwe
Copy link
Collaborator

dnwe commented Dec 2, 2021

Ping @raulnegreiros

@IBM IBM deleted a comment from tong3jie Dec 2, 2021
@dnwe dnwe changed the title Adds the methods to pause/resume consumer's consumption feat: add methods to pause/resume consumer's consumption Dec 6, 2021
@dnwe dnwe added the feat label Dec 6, 2021
@d-baranowski
Copy link

Hello 👋 is this likely to get merged?

@dnwe
Copy link
Collaborator

dnwe commented Dec 22, 2021

@d-baranowski still waiting for someone to fixup the unittests. As mentioned above, they’re not currently exercising the functionality that was added

@d-baranowski
Copy link

I'll take a look after work

@raulnegreiros
Copy link
Contributor Author

sorry the huge delay.
Thanks for the review @dnwe . I had to rewrite the test, but I'm not sure because I needed to add a time dependency. Do you think it's a valid way?

Click to expand the test code ``` golang func TestPauseResumeConsumption(t *testing.T) {
broker0 := NewMockBroker(t, 0)

const newestOffsetBroker = 1233
const maxOffsetBroker = newestOffsetBroker + 10
offsetBroker := newestOffsetBroker
offsetClient := offsetBroker

mockFetchResponse := NewMockFetchResponse(t, 1)
mockFetchResponse.SetMessage("my_topic", 0, int64(newestOffsetBroker), testMsg)
offsetBroker++

brokerResponses := map[string]MockResponse{
	"MetadataRequest": NewMockMetadataResponse(t).
		SetBroker(broker0.Addr(), broker0.BrokerID()).
		SetLeader("my_topic", 0, broker0.BrokerID()),
	"OffsetRequest": NewMockOffsetResponse(t).
		SetOffset("my_topic", 0, OffsetOldest, 0).
		SetOffset("my_topic", 0, OffsetNewest, int64(newestOffsetBroker)),
	"FetchRequest": mockFetchResponse,
}

broker0.SetHandlerByMap(brokerResponses)

master, err := NewConsumer([]string{broker0.Addr()}, NewTestConfig())
if err != nil {
	t.Fatal(err)
}

consumer, err := master.ConsumePartition("my_topic", 0, OffsetNewest)
if err != nil {
	t.Fatal(err)
}

// pause the consumption
consumer.Pause()

// set more msgs on broker
for ; offsetBroker < maxOffsetBroker; offsetBroker++ {
	mockFetchResponse = mockFetchResponse.SetMessage("my_topic", 0, int64(offsetBroker), testMsg)
}
brokerResponses["FetchRequest"] = mockFetchResponse
broker0.SetHandlerByMap(brokerResponses)

keepConsuming := true
for keepConsuming {
	select {
	case message := <-consumer.Messages():
		// only the first msg is expected to be consumed
		offsetClient++
		assertMessageOffset(t, message, int64(newestOffsetBroker))
	case err := <-consumer.Errors():
		t.Fatal(err)
	case <-time.After(time.Second):
		// is expected to timedout once the consumption is pauses
		keepConsuming = false
	}
}

// lets resume the consumption in order to consume the new msgs
consumer.Resume()

for offsetClient < maxOffsetBroker {
	select {
	case message := <-consumer.Messages():
		assertMessageOffset(t, message, int64(offsetClient))
		offsetClient += 1
	case err := <-consumer.Errors():
		t.Fatal("Error: ", err)
	case <-time.After(time.Second * 10):
		t.Fatal("consumer timed out . Offset: ", offsetClient)
	}
}

safeClose(t, consumer)
safeClose(t, master)
broker0.Close()

}

<\details>

It aims to allow consumption control, providing some methods to pause
and resume consumer consumption.

When your data destination is offline it becomes pointless to continue
to consume new messages from the broker once it certainly will result in
an error. The Java library already provides something thing similar to
it.

Note that the consumption state is not preserved between the rebalance
process, so it's the user responsibility to manage it using the
callbacks.
@raulnegreiros
Copy link
Contributor Author

raulnegreiros commented Jan 12, 2022

@dnwe are you sure you wanted to push force?
During my last modifications I've noticed that the main branch has been changed from master to main and I did the conflict resolution, I think a simple merge from main into feature-branch will be enough.
I did it locally and there is a diff on consumer.go there an extra

pc.messages <- msg

in the forced branch. I think it Is breaking the tests. Do you want I fix it?

@dnwe
Copy link
Collaborator

dnwe commented Jan 12, 2022

@raulnegreiros thanks! I was trying to spot where I'd messed up the merge conflict resolution

Copy link
Collaborator

@dnwe dnwe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @raulnegreiros — I'm ready to merge this for the next release, but please could you rebase one last time to include #2078 — I think you might need to adjust the highwatermark offset in the mock consumer in a similar way and update your tests

@raulnegreiros
Copy link
Contributor Author

done @dnwe!
Could you double-check, please

@dnwe dnwe merged commit 31d757b into IBM:main Jan 22, 2022
@dnwe
Copy link
Collaborator

dnwe commented Jan 22, 2022

@raulnegreiros thank you for a great contribution! Merged

dnwe pushed a commit to raulnegreiros/sarama that referenced this pull request Apr 13, 2022
When using the new "pause consumer" support (IBM#2005), Sarama would
incorrectly submit empty FetchRequests if all of the assigned partitions
were paused. This is because we only used Pause to skip adding the
topicPartition blocks to the FetchRequest and still went ahead and sent
the Fetch even if it was essentially empty:
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants