Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exactly-once-processing not guaranteed by fire-and-forget NatsJetStreamMessage.inProgress() #1042

Open
davidmcote opened this issue Nov 9, 2023 · 3 comments
Labels
enhancement Enhancement to existing functionality

Comments

@davidmcote
Copy link
Contributor

davidmcote commented Nov 9, 2023

Observed behavior

The signature of NatsJetStreamMessage.inProgress() returns void. Its internal implementation merely publishes an Ack message to the reply subject of the NatsJetStreamMessage without waiting for confirmation from JetStream that the message's invisibility window has been extended.

A work-queue consumer expecting on exactly-once-processing delivery semantics and dutifully calling inProgress() while it works has no way of knowing whether its progress acks are being lost (either due to network partitions or the JetStream domain being otherwise temporarily unavailable.)

Desired behavior

NatsJetStreamMessage.inProgress() should await confirmation from JetStream that the message invisibility window has been extended and enable the application to detect failure/uncertain delivery.

Ideally, NatsJetStreamMessage would expose a synchronous form of inProgress() that throws, and an asynchronous version that completes exceptionally if the AckProgress message isn't confirmed. (As well as similar methods for the other AckType variants if they're not already available like ackSync().)

Server and client version

nats-server v2.10.4

Host environment

MacBook Pro, 2.6GHz 6-Core Intel Core i7, 16GB RAM

Steps to reproduce

1/ Launch "Hub" nats-server.

% cat nats-server-leaflisten.conf

server_name: "server1"
listen: 4222

leafnodes {
    port: 7422
}

cluster {
    name: "my-cluster"
}

% nats-server -c nats-server-leaflisten.conf

2/ Launch "Spoke" leafnode w/ JetStream

% cat nats-server-leafedge.conf
listen: "127.0.0.1:9222"

jetstream {
    domain: "spoke"
}

leafnodes {
    remotes = [
        {
            url: "nats-leaf://127.0.0.1:7422"
        }
    ]
}

 % nats-server -c nats-server-leafedge.conf

3/ Provision a work queue on the JetStream spoke.

% nats --domain spoke stream delete MyWorkQueue
? Really delete Stream MyWorkQueue Yes
davcote@3c22fbc65918 ~ % nats --domain spoke stream add
? Stream Name MyWorkQueue
? Subjects myqueue
? Storage file
? Replication 1
? Retention Policy Work Queue
? Discard Policy New
? Stream Messages Limit -1
? Per Subject Messages Limit -1
? Total Stream Size -1
? Message TTL -1
? Max Message Size -1
? Duplicate tracking time window 2m0s
? Allow message Roll-ups No
? Allow message deletion Yes
? Allow purging subjects or the entire stream Yes
Stream MyWorkQueue was created

Information for Stream MyWorkQueue created 2023-11-09 09:43:24

              Subjects: myqueue
              Replicas: 1
               Storage: File

Options:

             Retention: WorkQueue
       Acknowledgments: true
        Discard Policy: New
      Duplicate Window: 2m0s
            Direct Get: true
     Allows Msg Delete: true
          Allows Purge: true
        Allows Rollups: false

Limits:

      Maximum Messages: unlimited
   Maximum Per Subject: unlimited
         Maximum Bytes: unlimited
           Maximum Age: unlimited
  Maximum Message Size: unlimited
     Maximum Consumers: unlimited

State:

              Messages: 0
                 Bytes: 0 B
        First Sequence: 0
         Last Sequence: 0
      Active Consumers: 0

4/ Launch a competing consumer which sends inProgress() acks every 5 seconds.

    public static void main(final String args[]) throws Exception {

        try (Connection nc = Nats.connect()) {
            final JetStream js = nc.jetStream(JetStreamOptions
                    .builder()
                    .domain("spoke")
                    .build());

            final JetStreamSubscription sub = js.subscribe(">", PushSubscribeOptions
                    .builder()
                    .stream("MyWorkQueue")
                    .durable("consumer")
                    .configuration(
                            ConsumerConfiguration
                                    .builder()
                                    .ackWait(Duration.ofSeconds(15))
                                    .build())
                    .build());


            while (true) {
                final Message message = sub.nextMessage(Duration.ofSeconds(1));
                if (message == null) {
                    continue;
                }

                new Thread(() -> {
                    final String tid = Thread.currentThread().getName();
                    try {
                        System.out.printf("(%s @ %s) Received: %s%n", Instant.now(), tid, message);
                        System.out.printf("(%s @ %s) Metadata = %s%n", Instant.now(), tid, message.metaData());

                        // Process message for ~2 minutes, sending "in-progress" every 5 seconds
                        for (int ii = 0; ii < 24; ii++) {
                            message.inProgress();
                            System.out.printf("(%s @ %s) In-progress: %s%n", Instant.now(), tid, message);

                            try {
                                TimeUnit.SECONDS.sleep(5);
                            } catch (final InterruptedException e) {
                                System.out.printf("(%s @ %s) Interrupted: %s%n", Instant.now(), tid, message);
                                Thread.currentThread().interrupt();
                                return;
                            }
                        }

                        try {
                            message.ackSync(Duration.ofSeconds(5));
                            System.out.printf("(%s @ %s) Acked: %s%n", Instant.now(), tid, message);
                        } catch (TimeoutException e) {
                            System.out.printf("(%s @ %s) Acked Failed: %s%n", Instant.now(), tid, e);
                        }

                    } catch (final InterruptedException interruptedException) {
                        System.out.printf("(%s @ %s) Interrupted: %s%n", Instant.now(), tid, message);
                        Thread.currentThread().interrupt();
                    }
                }).start();
            }
        }

5/ Publish a message into the work queue.

% nats req myqueue 'example-message'
09:46:43 Sending request on "myqueue"
09:46:43 Received with rtt 1.122397ms
{"stream":"MyWorkQueue", "domain":"spoke", "seq":1}

6/ Temporarily stop the Spoke Jetstream server
After the consumer has started processing the message, Ctrl-C the JetStream Spoke nats-server, wait ~30 seconds for ackWait(Duration.ofSeconds(15)) to expire, and start the process back up.

7/ Observe that the message is redelivered to the competing consumer process while the first delivery is still being processed.

(2023-11-09T14:47:08.501698Z @ Thread-1) Received: NatsMessage |myqueue|$JS.ACK.MyWorkQueue.consumer.1.1.1.1699541203202066000.0|example-message|
(2023-11-09T14:47:08.502116Z @ Thread-1) Metadata = NatsJetStreamMetaData{prefix='$JS', domain='null', stream='MyWorkQueue', consumer='consumer', delivered=1, streamSeq=1, consumerSeq=1, timestamp=2023-11-09T09:46:43.202066-05:00[America/New_York], pending=0}
(2023-11-09T14:47:08.502970Z @ Thread-1) In-progress: NatsMessage |myqueue|$JS.ACK.MyWorkQueue.consumer.1.1.1.1699541203202066000.0|example-message|
(2023-11-09T14:47:13.506437Z @ Thread-1) In-progress: NatsMessage |myqueue|$JS.ACK.MyWorkQueue.consumer.1.1.1.1699541203202066000.0|example-message|


------------------- JETSTREAM SPOKE MANUALLY STOPPED --------------------

(2023-11-09T14:47:18.511138Z @ Thread-1) In-progress: NatsMessage |myqueue|$JS.ACK.MyWorkQueue.consumer.1.1.1.1699541203202066000.0|example-message|
(2023-11-09T14:47:23.512549Z @ Thread-1) In-progress: NatsMessage |myqueue|$JS.ACK.MyWorkQueue.consumer.1.1.1.1699541203202066000.0|example-message|
(2023-11-09T14:47:28.514365Z @ Thread-1) In-progress: NatsMessage |myqueue|$JS.ACK.MyWorkQueue.consumer.1.1.1.1699541203202066000.0|example-message|
(2023-11-09T14:47:33.517215Z @ Thread-1) In-progress: NatsMessage |myqueue|$JS.ACK.MyWorkQueue.consumer.1.1.1.1699541203202066000.0|example-message|
(2023-11-09T14:47:35.774556Z @ Thread-2) Received: NatsMessage |myqueue|$JS.ACK.MyWorkQueue.consumer.2.1.2.1699541203202066000.0|example-message|

------------------- JETSTREAM SPOKE STARTED AGAIN --------------------

(2023-11-09T14:47:35.774811Z @ Thread-2) Metadata = NatsJetStreamMetaData{prefix='$JS', domain='null', stream='MyWorkQueue', consumer='consumer', delivered=2, streamSeq=1, consumerSeq=2, timestamp=2023-11-09T09:46:43.202066-05:00[America/New_York], pending=0}
(2023-11-09T14:47:35.775104Z @ Thread-2) In-progress: NatsMessage |myqueue|$JS.ACK.MyWorkQueue.consumer.2.1.2.1699541203202066000.0|example-message|
(2023-11-09T14:47:38.521194Z @ Thread-1) In-progress: NatsMessage |myqueue|$JS.ACK.MyWorkQueue.consumer.1.1.1.1699541203202066000.0|example-message|

----------- CONSUMER IS WORKING ON THE SAME MESSAGE IN Thread-1 AND THREAD-2 --------------

(2023-11-09T14:47:40.779778Z @ Thread-2) In-progress: NatsMessage |myqueue|$JS.ACK.MyWorkQueue.consumer.2.1.2.1699541203202066000.0|example-message|
(2023-11-09T14:47:43.522014Z @ Thread-1) In-progress: NatsMessage |myqueue|$JS.ACK.MyWorkQueue.consumer.1.1.1.1699541203202066000.0|example-message|
(2023-11-09T14:47:45.782792Z @ Thread-2) In-progress: NatsMessage |myqueue|$JS.ACK.MyWorkQueue.consumer.2.1.2.1699541203202066000.0|example-message|
(2023-11-09T14:47:48.526721Z @ Thread-1) In-progress: NatsMessage |myqueue|$JS.ACK.MyWorkQueue.consumer.1.1.1.1699541203202066000.0|example-message|
(2023-11-09T14:47:50.786126Z @ Thread-2) In-progress: NatsMessage |myqueue|$JS.ACK.MyWorkQueue.consumer.2.1.2.1699541203202066000.0|example-message|
(2023-11-09T14:47:53.529706Z @ Thread-1) In-progress: NatsMessage |myqueue|$JS.ACK.MyWorkQueue.consumer.1.1.1.1699541203202066000.0|example-message|
(2023-11-09T14:47:55.789811Z @ Thread-2) In-progress: NatsMessage |myqueue|$JS.ACK.MyWorkQueue.consumer.2.1.2.1699541203202066000.0|example-message|
(2023-11-09T14:47:58.530642Z @ Thread-1) In-progress: NatsMessage |myqueue|$JS.ACK.MyWorkQueue.consumer.1.1.1.1699541203202066000.0|example-message|
(2023-11-09T14:48:00.795163Z @ Thread-2) In-progress: NatsMessage |myqueue|$JS.ACK.MyWorkQueue.consumer.2.1.2.1699541203202066000.0|example-message|
(2023-11-09T14:48:03.534515Z @ Thread-1) In-progress: NatsMessage |myqueue|$JS.ACK.MyWorkQueue.consumer.1.1.1.1699541203202066000.0|example-message|
(2023-11-09T14:48:05.797233Z @ Thread-2) In-progress: NatsMessage |myqueue|$JS.ACK.MyWorkQueue.consumer.2.1.2.1699541203202066000.0|example-message|
(2023-11-09T14:48:08.536057Z @ Thread-1) In-progress: NatsMessage |myqueue|$JS.ACK.MyWorkQueue.consumer.1.1.1.1699541203202066000.0|example-message|
(2023-11-09T14:48:10.800737Z @ Thread-2) In-progress: NatsMessage |myqueue|$JS.ACK.MyWorkQueue.consumer.2.1.2.1699541203202066000.0|example-message|
(2023-11-09T14:48:13.536691Z @ Thread-1) In-progress: NatsMessage |myqueue|$JS.ACK.MyWorkQueue.consumer.1.1.1.1699541203202066000.0|example-message|
(2023-11-09T14:48:15.803382Z @ Thread-2) In-progress: NatsMessage |myqueue|$JS.ACK.MyWorkQueue.consumer.2.1.2.1699541203202066000.0|example-message|
(2023-11-09T14:48:18.539848Z @ Thread-1) In-progress: NatsMessage |myqueue|$JS.ACK.MyWorkQueue.consumer.1.1.1.1699541203202066000.0|example-message|
(2023-11-09T14:48:20.807602Z @ Thread-2) In-progress: NatsMessage |myqueue|$JS.ACK.MyWorkQueue.consumer.2.1.2.1699541203202066000.0|example-message|
(2023-11-09T14:48:23.543542Z @ Thread-1) In-progress: NatsMessage |myqueue|$JS.ACK.MyWorkQueue.consumer.1.1.1.1699541203202066000.0|example-message|
(2023-11-09T14:48:25.812030Z @ Thread-2) In-progress: NatsMessage |myqueue|$JS.ACK.MyWorkQueue.consumer.2.1.2.1699541203202066000.0|example-message|
(2023-11-09T14:48:28.546705Z @ Thread-1) In-progress: NatsMessage |myqueue|$JS.ACK.MyWorkQueue.consumer.1.1.1.1699541203202066000.0|example-message|
(2023-11-09T14:48:30.815201Z @ Thread-2) In-progress: NatsMessage |myqueue|$JS.ACK.MyWorkQueue.consumer.2.1.2.1699541203202066000.0|example-message|
(2023-11-09T14:48:33.551059Z @ Thread-1) In-progress: NatsMessage |myqueue|$JS.ACK.MyWorkQueue.consumer.1.1.1.1699541203202066000.0|example-message|
(2023-11-09T14:48:35.818051Z @ Thread-2) In-progress: NatsMessage |myqueue|$JS.ACK.MyWorkQueue.consumer.2.1.2.1699541203202066000.0|example-message|
(2023-11-09T14:48:38.554498Z @ Thread-1) In-progress: NatsMessage |myqueue|$JS.ACK.MyWorkQueue.consumer.1.1.1.1699541203202066000.0|example-message|
(2023-11-09T14:48:40.822232Z @ Thread-2) In-progress: NatsMessage |myqueue|$JS.ACK.MyWorkQueue.consumer.2.1.2.1699541203202066000.0|example-message|
(2023-11-09T14:48:43.558655Z @ Thread-1) In-progress: NatsMessage |myqueue|$JS.ACK.MyWorkQueue.consumer.1.1.1.1699541203202066000.0|example-message|
(2023-11-09T14:48:45.825267Z @ Thread-2) In-progress: NatsMessage |myqueue|$JS.ACK.MyWorkQueue.consumer.2.1.2.1699541203202066000.0|example-message|
(2023-11-09T14:48:48.559761Z @ Thread-1) In-progress: NatsMessage |myqueue|$JS.ACK.MyWorkQueue.consumer.1.1.1.1699541203202066000.0|example-message|
(2023-11-09T14:48:50.827395Z @ Thread-2) In-progress: NatsMessage |myqueue|$JS.ACK.MyWorkQueue.consumer.2.1.2.1699541203202066000.0|example-message|
(2023-11-09T14:48:53.563469Z @ Thread-1) In-progress: NatsMessage |myqueue|$JS.ACK.MyWorkQueue.consumer.1.1.1.1699541203202066000.0|example-message|
(2023-11-09T14:48:55.830847Z @ Thread-2) In-progress: NatsMessage |myqueue|$JS.ACK.MyWorkQueue.consumer.2.1.2.1699541203202066000.0|example-message|
(2023-11-09T14:48:58.567623Z @ Thread-1) In-progress: NatsMessage |myqueue|$JS.ACK.MyWorkQueue.consumer.1.1.1.1699541203202066000.0|example-message|
(2023-11-09T14:49:00.835085Z @ Thread-2) In-progress: NatsMessage |myqueue|$JS.ACK.MyWorkQueue.consumer.2.1.2.1699541203202066000.0|example-message|
(2023-11-09T14:49:03.568588Z @ Thread-1) In-progress: NatsMessage |myqueue|$JS.ACK.MyWorkQueue.consumer.1.1.1.1699541203202066000.0|example-message|
(2023-11-09T14:49:05.840071Z @ Thread-2) In-progress: NatsMessage |myqueue|$JS.ACK.MyWorkQueue.consumer.2.1.2.1699541203202066000.0|example-message|
(2023-11-09T14:49:08.574088Z @ Thread-1) Acked: NatsMessage |myqueue|$JS.ACK.MyWorkQueue.consumer.1.1.1.1699541203202066000.0|example-message|
(2023-11-09T14:49:10.845300Z @ Thread-2) In-progress: NatsMessage |myqueue|$JS.ACK.MyWorkQueue.consumer.2.1.2.1699541203202066000.0|example-message|
(2023-11-09T14:49:15.846319Z @ Thread-2) In-progress: NatsMessage |myqueue|$JS.ACK.MyWorkQueue.consumer.2.1.2.1699541203202066000.0|example-message|
(2023-11-09T14:49:20.850141Z @ Thread-2) In-progress: NatsMessage |myqueue|$JS.ACK.MyWorkQueue.consumer.2.1.2.1699541203202066000.0|example-message|
(2023-11-09T14:49:25.854232Z @ Thread-2) In-progress: NatsMessage |myqueue|$JS.ACK.MyWorkQueue.consumer.2.1.2.1699541203202066000.0|example-message|
(2023-11-09T14:49:30.859414Z @ Thread-2) In-progress: NatsMessage |myqueue|$JS.ACK.MyWorkQueue.consumer.2.1.2.1699541203202066000.0|example-message|
(2023-11-09T14:49:35.865893Z @ Thread-2) Acked: NatsMessage |myqueue|$JS.ACK.MyWorkQueue.consumer.2.1.2.1699541203202066000.0|example-message|

8/ An application can detect failure if they bypass the default impl of inProgress()
NATS JetStream responds to AckProgress messages if they include a reply topic.

// Re-implement message.inProgress(), but as a request-reply, not fire-and-forget
if (null == nc.request(
    message.getReplyTo(),
    AckType.AckProgress.bodyBytes(-1),
    Duration.ofSeconds(5))) {
    System.out.printf("(%s @ %s) IN_PROGRESS FAILED: %s%n", Instant.now(), tid, message);
}
(2023-11-09T15:02:34.186299Z @ Thread-1) Received: NatsMessage |myqueue|$JS.ACK.MyWorkQueue.consumer.1.4.7.1699542154173599000.0|example-message|
(2023-11-09T15:02:34.186829Z @ Thread-1) Metadata = NatsJetStreamMetaData{prefix='$JS', domain='null', stream='MyWorkQueue', consumer='consumer', delivered=1, streamSeq=4, consumerSeq=7, timestamp=2023-11-09T10:02:34.173599-05:00[America/New_York], pending=0}
(2023-11-09T15:02:34.188863Z @ Thread-1) In-progress: NatsMessage |myqueue|$JS.ACK.MyWorkQueue.consumer.1.4.7.1699542154173599000.0|example-message|
(2023-11-09T15:02:39.193649Z @ Thread-1) In-progress: NatsMessage |myqueue|$JS.ACK.MyWorkQueue.consumer.1.4.7.1699542154173599000.0|example-message|
(2023-11-09T15:02:44.202469Z @ Thread-1) IN_PROGRESS FAILED: NatsMessage |myqueue|$JS.ACK.MyWorkQueue.consumer.1.4.7.1699542154173599000.0|example-message|
@davidmcote davidmcote added the defect Suspected defect such as a bug or regression label Nov 9, 2023
@scottf scottf added enhancement Enhancement to existing functionality and removed defect Suspected defect such as a bug or regression labels Nov 9, 2023
@scottf
Copy link
Contributor

scottf commented Nov 9, 2023

  1. Changing this to an enhancement. JetStream is at least once, not exactly once.
  2. There are ways to provide closer to exactly once in your application, for instance tracking the sequence number of a processed message. Maybe use Key Value to store something so it can be seen from multiple instances.
  3. I've asked the client team about adding inProgressSync to the api. It would do a request instead of a publish and wait for the response.
  4. This still is not foolproof as you still could exceed your ack wait, so that must be properly tuned.

@davidmcote
Copy link
Contributor Author

davidmcote commented Nov 9, 2023

Thanks Scott!

1/
I am not after exactly-once delivery, which is provably impossible to make reliable, but exactly-once processing. Ie: at-least-once-delivery plus some extra consistency guarantees for acks. (Just wanted to elaborate this so I don't come off as tin-foil hat crazy here :))

Agree, JetStream offers at-least-once delivery semantics for most stream configurations. It does appear that the docs intend to enable exactly-once-processing in the manner I am describing.

Granted, those both focus on the AckSync() capability. Unfortunately exactly-once processing behavior becomes unachievable/unreliable for messages whose processing time exceeds AckWait without introducing this proposed change.


2/
In my application in particular, I've already been using an async version of my double-acking inProgress() impl in combination with a timekeeper-since-last-successful AckProgress to avoid breaching something like 20% * AckWait. This works without the overhead of an extra KV interaction, but is unfortunate surface area for application code to maintain and doesn't benefit from the lastAck state machine.


3/
If this proposal for inProgressSync() is something the client team agrees with, I would be happy to hazard a code patch up to nats.java later this month.

I'd also be happy to provide double-acking methods for the full suite of AckTypes along with asynchronous versions if they'd be on board with that. This ambition would likely touch the very externally-visible Message interface so I'd await guidance on what signatures, if any, you'd hope to add here.


4/
About exceeding AckWait: I've been meaning to write up a proposal for the nats-server repo to be able to reject "out-of-window" Ack messages.

Notably, the reply subject of jetstream messages has a structure which encodes the message's metadata, including its monotonically increasing delivery count that seems like it could be used to decide whether an ACK is still valid or for a since-expired delivery attempt without any modification to the wire protocol.

That is certainly tangent to this proposal, but I'd love if JetStream could distinguish between message receipts like SQS does with its notion of receipt handles.

@scottf
Copy link
Contributor

scottf commented Nov 9, 2023

  1. It makes sense to have a parallel of the double ack (AckSync) for the in progress ack. I've already asked the team for feedback.
  2. It was pretty trivial, I already wrote it to just make sure that the server actually gives a response.
  3. I'm pretty sure ack that are outside of the ack wait window are just discarded, but this is a server side issue. And yes, all an ack is, is publishing (requesting for the double) the text of the ack (+ACK, -NAK, +WPI, +TERM) to that exact reply to.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Enhancement to existing functionality
Projects
None yet
Development

No branches or pull requests

2 participants