Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add pause consumer #1234

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft

feat: add pause consumer #1234

wants to merge 1 commit into from

Conversation

yordis
Copy link

@yordis yordis commented Mar 14, 2024

Closes #1218

@Jarema
Copy link
Member

Jarema commented Mar 18, 2024

Thanks for the PR!

However, this feature has to be implemented in async-nats.

Let me know if I can help you with that in any way, or reach me on NATS slack, if you want to continue the work on this of course :).

@yordis yordis force-pushed the fixes-1218 branch 6 times, most recently from cda3184 to 931536f Compare March 19, 2024 06:27
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
@yordis
Copy link
Author

yordis commented Mar 19, 2024

Am I going in the right direction now?

Copy link
Member

@Jarema Jarema left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your contribution!

This looks nice. Some comments added.

@@ -850,6 +850,64 @@ impl Stream {
}
}

/// Pause a [Consumer] until the given time.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add a note about what it means, briefly.


async fn request_pause_consumer(&self, name: &str, pause_until: Option<OffsetDateTime>) -> Result<PauseResponse, ConsumerError> {
let subject = format!("CONSUMER.PAUSE.{}.{}", self.info.config.name, name);
let payload = &PauseConsumerRequest{ pause_until };
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would use literal json! as the struct is used only once.

@@ -3475,6 +3475,7 @@ mod jetstream {
max_ack_pending: 150,
}),
first_sequence: Some(505),
pause_until: None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to add test for pause/unpause.
I'm happy to help with that.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Implement Consumers Pause
2 participants