-
Notifications
You must be signed in to change notification settings - Fork 3.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Quorum queues v4 #10637
Draft
kjnilsson
wants to merge
12
commits into
main
Choose a base branch
from
qq-v4
base: main
Could not load branches
Branch not found: {{ refName }}
Could not load tags
Nothing to show
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Quorum queues v4 #10637
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
kjnilsson
force-pushed
the
qq-v4
branch
3 times, most recently
from
March 1, 2024 13:09
b6a9bf0
to
17c6e08
Compare
kjnilsson
force-pushed
the
qq-v4
branch
7 times, most recently
from
March 8, 2024 10:56
a479434
to
b6d9b85
Compare
kjnilsson
force-pushed
the
qq-v4
branch
4 times, most recently
from
May 1, 2024 08:12
4bd8c1f
to
203b671
Compare
kjnilsson
force-pushed
the
qq-v4
branch
4 times, most recently
from
May 13, 2024 08:48
6be53b0
to
1bc6b22
Compare
kjnilsson
force-pushed
the
qq-v4
branch
2 times, most recently
from
May 15, 2024 09:38
628be90
to
d016d0a
Compare
Consumer timeouts postponed. |
Create the new version but not including any changes yet. fix QQ: force delete followers after leader has terminated. Also try a longer sleep for mqtt_shared_SUITE so that the delete operation stands a chance to time out and move on to the forced deletion stage. In some mixed machine version scenarios some followers will never apply the poison pill command so we may as well force delete them just in case. QQ: skip test in amqp_client that cannot pass with mixed machine versions QQ: remove dead code Code relating to prior machine versions and state conversions. formatting / readability rabbit_fifo_prop_SUITE fixes
Also update rabbit_fifo_* suites to test more relevant code versions where applicable. add ff mock QQ: always use the updated credit mode format QQv4: use more compact consumer reference in settle, credit, return This introudces a new type: consumer_key() which is either the consumer_id or the raft index the checkout was processed at. If the consumer is using one of the updated credit spec formats rabbit_fifo will use the raft index as the primary key for the consumer such that the rabbit fifo client can then use the more space efficient integer index instead of the full consumer id in subsequent commands. There is compatibility code to still accept the consumer id in settle, return, discard and credit commands but this is slighlyt slower and of course less space efficient. The old form will be used in cases where the fifo client may have already remove the local consumer state (as happens after a cancel). Lots of test refactorings of the rabbit_fifo_SUITE to begin to use the new forms.
rabbit_fifo_prop_SUITE refactoring and other fixes. fixss bzl bzl fixes
Single active consumers will be activated if they have a higher priority than the currently active consumer. if the currently active consumer has pending messages, no further messages will be assigned to the consumer and the activation of the new consumer will happen once all pending messages are settled. This is to ensure processing order. Consumers with the same priority will internally be ordered to favour those with credit then those that attached first. QQ: add SAC consumer priority integration tests Dialyzer fix QQ: add check for ff in tests
This option immediately removes and returns all messages for a consumer instead of the softer 'cancel' option which keeps the consumer around until all pending messages have been either settled or returned. This involves a change to the rabbit_queue_type:cancel/5 API to rabbit_queue_type:cancel/3.
This will form the basis for queue initiated consumer timeouts.
Instead of the old ra_machine:handle_aux/6 callback.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Quorum queues v4
#8261
Tasks:
Consumer timeouts implemented in the queue (rather than in the channel as currently done).enqueue
,settle
andcredit
Consumer timeouts design
rabbit_fifo
will record the checkout time for each message that is assigned to a consumer. This timestampwill be used to detect messages that have been kept longer than the consumer timeout configured.
We do not want to use the RA
timeout
effect as to do so we'd need to either do expensive and frequentcalculations over the full set of checked out messages or keep lots of timers (one per message).
Instead we'd schedule an aux event every minute which will do a scan over the checked set and if any
consumer has messages with expired timeouts and if so commit a new command
eval_consumer_timeouts
to do this work and return messages. This means that will be evaluated some time after the expiry but no more than ~60s
Consumers that let any of their message locks expire should not be assigned any further messages until they send
some kind of command (settlement, lock renewal etc) to show that they are live and responding. They should be treated as "suspected" until it is known that they can reply.
This mean we can probably get rid of the (undesirable but necessary with mnesia) behaviour where when the queue received a
DOWN
notification with the reasonnoconnection
it would immediately return all messages. With mnesia this was reasonably correct. If there were cluster disconnected (even shortly) typically therabbit
application would restart itself in mysterious ways with the ultimate result that channels were terminated. Withkhepri
this will no longer be the case and the cluster should be able to function normally even if there are short term cluster disconnections.So going forward when a QQ receives a
noconnection
for a consumer process it will only mark it as disconnected (so that new messages are not assigned until it comes back) and let the consumer timeout handle the message return in due course. This means it should be able to handle the case of short term disconnections / reconnections in the cluster without messages being returned unnecessarily.If the consumer is already in
cancelled
state (cancelled but with pending messages) then all pending messages will be returned and the consumer will be removed. This is the safest option there are potentially faulty clients in the wild that will never ack pending messages after a cancellation.This also means that locks should be relatively short (max 5 mins but ideally lower).
Single Active Consumer consumers that let their messages time out will have all pending messages returned, as well
as being replaced. This is to ensure ordering invariants with SAC.
Protocol impl:
AMQP can provide a management extension command to renew locks for a messages.
AMQP legacy can configure an auto renew function (that is done by the channel process / queue type) where it will
auto renew the lock n number of times on behalf of the client. This is because the legacy protocol (and other protocols such as MQTT / STOMP) don't have any options for implementing lock renewal.
For AMQP legacy we can default to renew locks to the total of the current
consumer_timeout
configuration.Q: Can we do lock renewal without going through Raft log?
When a messages reaches timeout the queue will notify the consumer process with a new Ra event
{message_timeout, consumer_tag(), [MsgIds]}
- how this is handled may depend on the protocolimplementation. AMQP can emit the released or modified outcome. Other protocols don't have the same
mechanism so for AMQP legacy it is probably best to terminate the channel or initiate a broker side consumer cancellation.