Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

async_nats: double_ack() should return an error if the ack_wait period is over #1042

Open
ghost opened this issue Jul 18, 2023 · 4 comments
Open
Assignees
Labels
enhancement Enhancement to existing functionality

Comments

@ghost
Copy link

ghost commented Jul 18, 2023

Use case

Currently, we are using NATS to run some long running jobs. To avoid running too much jobs, we have a rate limit for some worker that pause message processing to avoid overwhelm itself.
When the service is ready to accept messages, we are sending the ack.

But in the current implementation, ack doesn't throw an error when the ack_wait period is exceeded, and this message will be redelivered by NATS even if it's already processed.

This can be reproduced using the jetstream_pull example with the following patch by using the WorkQueue rentention policy:

diff --git a/async-nats/examples/jetstream_pull.rs b/async-nats/examples/jetstream_pull.rs
index 8368c9a..358449b 100644
--- a/async-nats/examples/jetstream_pull.rs
+++ b/async-nats/examples/jetstream_pull.rs
@@ -23,12 +23,14 @@ async fn main() -> Result<(), async_nats::Error> {
         .create_stream(jetstream::stream::Config {
             name: stream_name,
             subjects: vec!["events.>".to_string()],
+            retention: async_nats::jetstream::stream::RetentionPolicy::WorkQueue,
             ..Default::default()
         })
         .await?
         // Then, on that `Stream` use method to create Consumer and bind to it too.
         .create_consumer(jetstream::consumer::pull::Config {
             durable_name: Some("consumer".to_string()),
+            ack_wait: std::time::Duration::from_secs(1),
             ..Default::default()
         })
         .await?;
@@ -52,14 +54,18 @@ async fn main() -> Result<(), async_nats::Error> {
     // Iterate over messages.
     while let Some(message) = messages.next().await {
         let message = message?;
+        // long running job here
+        tokio::time::sleep(std::time::Duration::from_millis(150)).await;
+
+        // acknowledge the message
+        message.ack().await?;
+
+        // message can now be processed
         println!(
             "got message on subject {} with payload {:?}",
             message.subject,
             from_utf8(&message.payload)?
         );
-
-        // acknowledge the message
-        message.ack().await?;
     }
 
     Ok(())

When running the code, you will see that all messages are processed but the last one is still in queue because it can't be acked in time.

Proposed change

ack() should return an error of kind WaitPeriodOver or anything else to handle this specific case.

Who benefits from the change(s)?

Any user who needs to handle a lot of concurrent long running job

Alternative Approaches

No response

@ghost ghost added enhancement Enhancement to existing functionality needs triage labels Jul 18, 2023
@Jarema
Copy link
Member

Jarema commented Jul 18, 2023

Thanks for filling in the issue!

Did you consider using double_ack?
https://docs.rs/async-nats/latest/async_nats/jetstream/message/struct.Message.html#method.double_ack

The problem with client-side timeout check is that it's race'y in nature:
From client perspective, there is still time to send the ack, but until it arrives at the server, the ack wait threshold is reached and redelivery already is happening.

@ghost
Copy link
Author

ghost commented Jul 19, 2023

Well, even using the double_ack, I don't get any error in return. For example, by reusing the example jetstream_pull and with this diff:

diff --git a/async-nats/examples/jetstream_pull.rs b/async-nats/examples/jetstream_pull.rs
index 8368c9a..216260e 100644
--- a/async-nats/examples/jetstream_pull.rs
+++ b/async-nats/examples/jetstream_pull.rs
@@ -23,12 +23,14 @@ async fn main() -> Result<(), async_nats::Error> {
         .create_stream(jetstream::stream::Config {
             name: stream_name,
             subjects: vec!["events.>".to_string()],
+            retention: async_nats::jetstream::stream::RetentionPolicy::WorkQueue,
             ..Default::default()
         })
         .await?
         // Then, on that `Stream` use method to create Consumer and bind to it too.
         .create_consumer(jetstream::consumer::pull::Config {
             durable_name: Some("consumer".to_string()),
+            ack_wait: std::time::Duration::from_secs(1),
             ..Default::default()
         })
         .await?;
@@ -47,19 +49,19 @@ async fn main() -> Result<(), async_nats::Error> {
 
     // Attach to the messages iterator for the Consumer.
     // The iterator does its best to optimize retrieval of messages from the server.
-    let mut messages = consumer.messages().await?.take(10);
+    let mut messages = consumer.messages().await?;
 
     // Iterate over messages.
     while let Some(message) = messages.next().await {
+        tokio::time::sleep(std::time::Duration::from_millis(150)).await;
         let message = message?;
+        // acknowledge the message
+        message.double_ack().await.unwrap();
         println!(
             "got message on subject {} with payload {:?}",
             message.subject,
             from_utf8(&message.payload)?
         );
-
-        // acknowledge the message
-        message.ack().await?;
     }
 
     Ok(())

I am getting this output when removing the take(10) on the messages stream:

got message on subject events.0 with payload "data"
got message on subject events.1 with payload "data"
got message on subject events.2 with payload "data"
got message on subject events.3 with payload "data"
got message on subject events.4 with payload "data"
got message on subject events.5 with payload "data"
got message on subject events.6 with payload "data"
got message on subject events.7 with payload "data"
got message on subject events.8 with payload "data"
got message on subject events.9 with payload "data"
got message on subject events.6 with payload "data"
got message on subject events.7 with payload "data"
got message on subject events.8 with payload "data"
got message on subject events.9 with payload "data"

I would expect the double_ack to return an error from the server telling it that the message hasn't been acked in time.

@Jarema
Copy link
Member

Jarema commented Jul 19, 2023

I'll take a look into double ack not returning the expected response.

@Jarema Jarema self-assigned this Jul 19, 2023
@ghost
Copy link
Author

ghost commented Jul 19, 2023

Thanks!

@ghost ghost changed the title async_nats: ack() should return an error if the ack_wait period is over async_nats: double_ack() should return an error if the ack_wait period is over Jul 24, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Enhancement to existing functionality
Projects
None yet
Development

No branches or pull requests

2 participants