Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include x-death header in Stream messages #11173

Closed
ansd opened this issue May 6, 2024 · 1 comment
Closed

Include x-death header in Stream messages #11173

ansd opened this issue May 6, 2024 · 1 comment
Assignees
Milestone

Comments

@ansd
Copy link
Member

ansd commented May 6, 2024

Is your feature request related to a problem? Please describe.

If a message gets dead lettered into a Stream, the x-death header is not set when an AMQP 0.9.1 client consumes the message.

Describe the solution you'd like

Include the death events history also in messages consumed via AMQP 0.9.1 from a Stream.

Describe alternatives you've considered

No response

Additional context

No response

@ansd ansd self-assigned this May 6, 2024
ansd added a commit that referenced this issue May 6, 2024
This commit fixes #11159, #11160, #11173.
ansd added a commit that referenced this issue May 6, 2024
This commit fixes #11159, #11160, #11173.
ansd added a commit that referenced this issue May 6, 2024
This commit fixes #11159, #11160, #11173.
ansd added a commit that referenced this issue May 7, 2024
This commit fixes #11159, #11160, #11173.
ansd added a commit that referenced this issue May 7, 2024
This commit fixes #11159, #11160, #11173.
ansd added a commit that referenced this issue May 7, 2024
This commit fixes #11159, #11160, #11173.
ansd added a commit that referenced this issue May 8, 2024
  # What?

This commit fixes #11159, #11160, #11173.

  # How?

  ## Background

RabbitMQ allows to dead letter messages for four different reasons, out
of which three reasons cause messages to be dead lettered automatically
internally in the broker: (maxlen, expired, delivery_limit) and 1 reason
is caused by an explicit client action (rejected).

RabbitMQ also allows dead letter topologies. When a message is dead
lettered, it is re-published to an exchange, and therefore zero to
multiple target queues. These target queues can in turn dead letter
messages. Hence it is possible to create a cycle of queues where
messages get dead lettered endlessly, which is what we want to avoid.

  ## Alternative approach

One approach to avoid such endless cycles is to use a similar concept of
the TTL field of the IPv4 datagram, or the hop limit field of an IPv6
datagram. These fields ensure that IP packets aren't cicrulating forever
in the Internet. Each router decrements this counter. If this counter
reaches 0, the sender will be notified and the message gets dropped.

We could use the same approach in RabbitMQ: Whenever a queue dead
letters a message, a dead_letter_hop_limit field could be decremented.
If this field reaches 0, the message will be dropped.
Such a hop limit field could have a sensible default value, for example
32. The sender of the message could override this value. Likewise, the
client rejecting a message could set a new value via the Modified
outcome.

Such an approach has multiple advantages:
1. No dead letter cycle detection per se needs to be performed within
   the broker which is a slight simplification to what we have today.
2. Simpler dead letter topologies. One very common use case is that
   clients re-try sending the message after some time by consuming from
   a dead-letter queue and rejecting the message such that the message
   gets republished to the original queue. Instead of requiring explicit
   client actions, which increases complexity, a x-message-ttl argument
   could be set on the dead-letter queue to automatically retry after
   some time. This is a big simplification because it eliminates the
   need of various frameworks that retry, such as
   https://docs.spring.io/spring-cloud-stream/reference/rabbit/rabbit_overview/rabbitmq-retry.html
3. No dead letter history information needs to be compressed because
   there is a clear limit on how often a message gets dead lettered.
   Therefore, the full history including timestamps of every dead letter
   event will be available to clients.

Disadvantages:
1. Breaks a lot of clients, even for 4.0.

  ## 3.12 approach

Instead of decrementing a counter, the approach up to 3.12 has been to
drop the message if the message cycled automatically. A message cycled
automatically if no client expliclity rejected the message, i.e. the
mesage got dead lettered due to maxlen, expired, or delivery_limit, but
not due to rejected.

In this approach, the broker must be able to detect such cycles
reliably.
Reliably detecting dead letter cycles broke in 3.13 due to #11159 and #11160.

To reliably detect cycles, the broker must be able to obtain the exact
order of dead letter events for a given message. In 3.13.0 - 3.13.2, the
order cannot exactly be determined because wall clock time is used to
record the death time.

This commit uses the same approach as done in 3.12: a list ordered by
death recency is used with the most recent death at the head of the
list.

To not grow this list endlessly (for example when a client rejects the
same message hundreds of times), this list should be compacted.
This commit, like 3.12, compacts by tuple `{Queue, Reason}`:
If this message got already dead lettered from this Queue for this
Reason, then only a counter is incremented and the element is moved to
the front of the list.

  ## Streams & AMQP 1.0 clients

Dead lettering from a stream doesn't make sense because:
1. a client cannot reject a message from a stream since the stream must
   contain the total order of events to be consumed my multiple clients.
2. TTL is implemented by Stream retention where only old Stream segments
   are automatically deleted (or archived in the future).
3. same applies to maxlen

Although messages cannot be dead lettered **from** a stream, messages can be dead lettered
**into** a stream. This commit provides clients consuming from a stream the death history: #11173

Additionally, this commit provides AMQP 1.0 clients the death history via
message annotation `x-opt-deaths` which contains the same information as
AMQP 0.9.1 header `x-death`.

Both, storing the death history in a stream and providing death history
to an AMQP 1.0 client, use the same encoding: a message annoation
`x-opt-deaths` that contains an array of maps ordered by death recency.
The information encoded is the same as in the AMQP 0.9.1 x-death header.

Instead of providing an array of maps, a better approach could be to use
an array of a custom AMQP death type, such as:
```xml
<amqp name="rabbitmq">
    <section name="custom-types">
        <type name="death" class="composite" source="list">
            <descriptor name="rabbitmq:death:list" code="0x00000000:0x000000255"/>
            <field name="queue" type="string" mandatory="true" label="the name of the queue the message was dead lettered from"/>
            <field name="reason" type="symbol" mandatory="true" label="the reason why this message was dead lettered"/>
            <field name="count" type="ulong" default="1" label="how many times this message was dead lettered from this queue for this reason"/>
            <field name="time" mandatory="true" type="timestamp" label="the first time when this message was dead lettered from this queue for this reason"/>
            <field name="exchange" type="string" default="" label="the exchange this message was published to before it was dead lettered for the first time from this queue for this reason"/>
            <field name="routing-keys" type="string" default="" multiple="true" label="the routing keys this message was published with before it was dead lettered for the first time from this queue for this reason"/>
            <field name="ttl" type="milliseconds" label="the time to live of this message before it was dead lettered for the first time from this queue for reason ‘expired’"/>
        </type>
    </section>
</amqp>
```

However, encoding and decoding custom AMQP types that are nested within
arrays which in turn are nested within the message annotation map can be
difficult for clients and the broker. Also, each client will need to
know the custom AMQP type. For now, therefore we use an array of maps.

  ## Feature flag
The new way to record death information is done via mc annotation
`deaths_v2`.
Because old nodes do not know this new annotation, recording death
information via mc annotation `deaths_v2` is hidden behind a new feature
flag `message_containers_deaths_v2`.

If this feature flag is disabled, a message will continue to use the
3.13.0 - 3.13.2 way to record death information in mc annotation
`deaths`, or even the older way within `x-death` header directly if
feature flag message_containers is also disabled.

Only if feature flag `message_containers_deaths_v2` is enabled and this
message hasn't been dead lettered before, will the new mc annotation
`deaths_v2` be used.
ansd added a commit that referenced this issue May 8, 2024
  # What?

This commit fixes #11159, #11160, #11173.

  # How?

  ## Background

RabbitMQ allows to dead letter messages for four different reasons, out
of which three reasons cause messages to be dead lettered automatically
internally in the broker: (maxlen, expired, delivery_limit) and 1 reason
is caused by an explicit client action (rejected).

RabbitMQ also allows dead letter topologies. When a message is dead
lettered, it is re-published to an exchange, and therefore zero to
multiple target queues. These target queues can in turn dead letter
messages. Hence it is possible to create a cycle of queues where
messages get dead lettered endlessly, which is what we want to avoid.

  ## Alternative approach

One approach to avoid such endless cycles is to use a similar concept of
the TTL field of the IPv4 datagram, or the hop limit field of an IPv6
datagram. These fields ensure that IP packets aren't cicrulating forever
in the Internet. Each router decrements this counter. If this counter
reaches 0, the sender will be notified and the message gets dropped.

We could use the same approach in RabbitMQ: Whenever a queue dead
letters a message, a dead_letter_hop_limit field could be decremented.
If this field reaches 0, the message will be dropped.
Such a hop limit field could have a sensible default value, for example
32. The sender of the message could override this value. Likewise, the
client rejecting a message could set a new value via the Modified
outcome.

Such an approach has multiple advantages:
1. No dead letter cycle detection per se needs to be performed within
   the broker which is a slight simplification to what we have today.
2. Simpler dead letter topologies. One very common use case is that
   clients re-try sending the message after some time by consuming from
   a dead-letter queue and rejecting the message such that the message
   gets republished to the original queue. Instead of requiring explicit
   client actions, which increases complexity, a x-message-ttl argument
   could be set on the dead-letter queue to automatically retry after
   some time. This is a big simplification because it eliminates the
   need of various frameworks that retry, such as
   https://docs.spring.io/spring-cloud-stream/reference/rabbit/rabbit_overview/rabbitmq-retry.html
3. No dead letter history information needs to be compressed because
   there is a clear limit on how often a message gets dead lettered.
   Therefore, the full history including timestamps of every dead letter
   event will be available to clients.

Disadvantages:
1. Breaks a lot of clients, even for 4.0.

  ## 3.12 approach

Instead of decrementing a counter, the approach up to 3.12 has been to
drop the message if the message cycled automatically. A message cycled
automatically if no client expliclity rejected the message, i.e. the
mesage got dead lettered due to maxlen, expired, or delivery_limit, but
not due to rejected.

In this approach, the broker must be able to detect such cycles
reliably.
Reliably detecting dead letter cycles broke in 3.13 due to #11159 and #11160.

To reliably detect cycles, the broker must be able to obtain the exact
order of dead letter events for a given message. In 3.13.0 - 3.13.2, the
order cannot exactly be determined because wall clock time is used to
record the death time.

This commit uses the same approach as done in 3.12: a list ordered by
death recency is used with the most recent death at the head of the
list.

To not grow this list endlessly (for example when a client rejects the
same message hundreds of times), this list should be compacted.
This commit, like 3.12, compacts by tuple `{Queue, Reason}`:
If this message got already dead lettered from this Queue for this
Reason, then only a counter is incremented and the element is moved to
the front of the list.

  ## Streams & AMQP 1.0 clients

Dead lettering from a stream doesn't make sense because:
1. a client cannot reject a message from a stream since the stream must
   maintain the total order of events to be consumed by multiple clients.
2. TTL is implemented by Stream retention where only old Stream segments
   are automatically deleted (or archived in the future).
3. same applies to maxlen

Although messages cannot be dead lettered **from** a stream, messages can be dead lettered
**into** a stream. This commit provides clients consuming from a stream the death history: #11173

Additionally, this commit provides AMQP 1.0 clients the death history via
message annotation `x-opt-deaths` which contains the same information as
AMQP 0.9.1 header `x-death`.

Both, storing the death history in a stream and providing death history
to an AMQP 1.0 client, use the same encoding: a message annoation
`x-opt-deaths` that contains an array of maps ordered by death recency.
The information encoded is the same as in the AMQP 0.9.1 x-death header.

Instead of providing an array of maps, a better approach could be to use
an array of a custom AMQP death type, such as:
```xml
<amqp name="rabbitmq">
    <section name="custom-types">
        <type name="death" class="composite" source="list">
            <descriptor name="rabbitmq:death:list" code="0x00000000:0x000000255"/>
            <field name="queue" type="string" mandatory="true" label="the name of the queue the message was dead lettered from"/>
            <field name="reason" type="symbol" mandatory="true" label="the reason why this message was dead lettered"/>
            <field name="count" type="ulong" default="1" label="how many times this message was dead lettered from this queue for this reason"/>
            <field name="time" mandatory="true" type="timestamp" label="the first time when this message was dead lettered from this queue for this reason"/>
            <field name="exchange" type="string" default="" label="the exchange this message was published to before it was dead lettered for the first time from this queue for this reason"/>
            <field name="routing-keys" type="string" default="" multiple="true" label="the routing keys this message was published with before it was dead lettered for the first time from this queue for this reason"/>
            <field name="ttl" type="milliseconds" label="the time to live of this message before it was dead lettered for the first time from this queue for reason ‘expired’"/>
        </type>
    </section>
</amqp>
```

However, encoding and decoding custom AMQP types that are nested within
arrays which in turn are nested within the message annotation map can be
difficult for clients and the broker. Also, each client will need to
know the custom AMQP type. For now, therefore we use an array of maps.

  ## Feature flag
The new way to record death information is done via mc annotation
`deaths_v2`.
Because old nodes do not know this new annotation, recording death
information via mc annotation `deaths_v2` is hidden behind a new feature
flag `message_containers_deaths_v2`.

If this feature flag is disabled, a message will continue to use the
3.13.0 - 3.13.2 way to record death information in mc annotation
`deaths`, or even the older way within `x-death` header directly if
feature flag message_containers is also disabled.

Only if feature flag `message_containers_deaths_v2` is enabled and this
message hasn't been dead lettered before, will the new mc annotation
`deaths_v2` be used.
ansd added a commit that referenced this issue May 13, 2024
  # What?

This commit fixes #11159, #11160, #11173.

  # How?

  ## Background

RabbitMQ allows to dead letter messages for four different reasons, out
of which three reasons cause messages to be dead lettered automatically
internally in the broker: (maxlen, expired, delivery_limit) and 1 reason
is caused by an explicit client action (rejected).

RabbitMQ also allows dead letter topologies. When a message is dead
lettered, it is re-published to an exchange, and therefore zero to
multiple target queues. These target queues can in turn dead letter
messages. Hence it is possible to create a cycle of queues where
messages get dead lettered endlessly, which is what we want to avoid.

  ## Alternative approach

One approach to avoid such endless cycles is to use a similar concept of
the TTL field of the IPv4 datagram, or the hop limit field of an IPv6
datagram. These fields ensure that IP packets aren't cicrulating forever
in the Internet. Each router decrements this counter. If this counter
reaches 0, the sender will be notified and the message gets dropped.

We could use the same approach in RabbitMQ: Whenever a queue dead
letters a message, a dead_letter_hop_limit field could be decremented.
If this field reaches 0, the message will be dropped.
Such a hop limit field could have a sensible default value, for example
32. The sender of the message could override this value. Likewise, the
client rejecting a message could set a new value via the Modified
outcome.

Such an approach has multiple advantages:
1. No dead letter cycle detection per se needs to be performed within
   the broker which is a slight simplification to what we have today.
2. Simpler dead letter topologies. One very common use case is that
   clients re-try sending the message after some time by consuming from
   a dead-letter queue and rejecting the message such that the message
   gets republished to the original queue. Instead of requiring explicit
   client actions, which increases complexity, a x-message-ttl argument
   could be set on the dead-letter queue to automatically retry after
   some time. This is a big simplification because it eliminates the
   need of various frameworks that retry, such as
   https://docs.spring.io/spring-cloud-stream/reference/rabbit/rabbit_overview/rabbitmq-retry.html
3. No dead letter history information needs to be compressed because
   there is a clear limit on how often a message gets dead lettered.
   Therefore, the full history including timestamps of every dead letter
   event will be available to clients.

Disadvantages:
1. Breaks a lot of clients, even for 4.0.

  ## 3.12 approach

Instead of decrementing a counter, the approach up to 3.12 has been to
drop the message if the message cycled automatically. A message cycled
automatically if no client expliclity rejected the message, i.e. the
mesage got dead lettered due to maxlen, expired, or delivery_limit, but
not due to rejected.

In this approach, the broker must be able to detect such cycles
reliably.
Reliably detecting dead letter cycles broke in 3.13 due to #11159 and #11160.

To reliably detect cycles, the broker must be able to obtain the exact
order of dead letter events for a given message. In 3.13.0 - 3.13.2, the
order cannot exactly be determined because wall clock time is used to
record the death time.

This commit uses the same approach as done in 3.12: a list ordered by
death recency is used with the most recent death at the head of the
list.

To not grow this list endlessly (for example when a client rejects the
same message hundreds of times), this list should be compacted.
This commit, like 3.12, compacts by tuple `{Queue, Reason}`:
If this message got already dead lettered from this Queue for this
Reason, then only a counter is incremented and the element is moved to
the front of the list.

  ## Streams & AMQP 1.0 clients

Dead lettering from a stream doesn't make sense because:
1. a client cannot reject a message from a stream since the stream must
   maintain the total order of events to be consumed by multiple clients.
2. TTL is implemented by Stream retention where only old Stream segments
   are automatically deleted (or archived in the future).
3. same applies to maxlen

Although messages cannot be dead lettered **from** a stream, messages can be dead lettered
**into** a stream. This commit provides clients consuming from a stream the death history: #11173

Additionally, this commit provides AMQP 1.0 clients the death history via
message annotation `x-opt-deaths` which contains the same information as
AMQP 0.9.1 header `x-death`.

Both, storing the death history in a stream and providing death history
to an AMQP 1.0 client, use the same encoding: a message annoation
`x-opt-deaths` that contains an array of maps ordered by death recency.
The information encoded is the same as in the AMQP 0.9.1 x-death header.

Instead of providing an array of maps, a better approach could be to use
an array of a custom AMQP death type, such as:
```xml
<amqp name="rabbitmq">
    <section name="custom-types">
        <type name="death" class="composite" source="list">
            <descriptor name="rabbitmq:death:list" code="0x00000000:0x000000255"/>
            <field name="queue" type="string" mandatory="true" label="the name of the queue the message was dead lettered from"/>
            <field name="reason" type="symbol" mandatory="true" label="the reason why this message was dead lettered"/>
            <field name="count" type="ulong" default="1" label="how many times this message was dead lettered from this queue for this reason"/>
            <field name="time" mandatory="true" type="timestamp" label="the first time when this message was dead lettered from this queue for this reason"/>
            <field name="exchange" type="string" default="" label="the exchange this message was published to before it was dead lettered for the first time from this queue for this reason"/>
            <field name="routing-keys" type="string" default="" multiple="true" label="the routing keys this message was published with before it was dead lettered for the first time from this queue for this reason"/>
            <field name="ttl" type="milliseconds" label="the time to live of this message before it was dead lettered for the first time from this queue for reason ‘expired’"/>
        </type>
    </section>
</amqp>
```

However, encoding and decoding custom AMQP types that are nested within
arrays which in turn are nested within the message annotation map can be
difficult for clients and the broker. Also, each client will need to
know the custom AMQP type. For now, therefore we use an array of maps.

  ## Feature flag
The new way to record death information is done via mc annotation
`deaths_v2`.
Because old nodes do not know this new annotation, recording death
information via mc annotation `deaths_v2` is hidden behind a new feature
flag `message_containers_deaths_v2`.

If this feature flag is disabled, a message will continue to use the
3.13.0 - 3.13.2 way to record death information in mc annotation
`deaths`, or even the older way within `x-death` header directly if
feature flag message_containers is also disabled.

Only if feature flag `message_containers_deaths_v2` is enabled and this
message hasn't been dead lettered before, will the new mc annotation
`deaths_v2` be used.
ansd added a commit that referenced this issue May 13, 2024
  # What?

This commit fixes #11159, #11160, #11173.

  # How?

  ## Background

RabbitMQ allows to dead letter messages for four different reasons, out
of which three reasons cause messages to be dead lettered automatically
internally in the broker: (maxlen, expired, delivery_limit) and 1 reason
is caused by an explicit client action (rejected).

RabbitMQ also allows dead letter topologies. When a message is dead
lettered, it is re-published to an exchange, and therefore zero to
multiple target queues. These target queues can in turn dead letter
messages. Hence it is possible to create a cycle of queues where
messages get dead lettered endlessly, which is what we want to avoid.

  ## Alternative approach

One approach to avoid such endless cycles is to use a similar concept of
the TTL field of the IPv4 datagram, or the hop limit field of an IPv6
datagram. These fields ensure that IP packets aren't cicrulating forever
in the Internet. Each router decrements this counter. If this counter
reaches 0, the sender will be notified and the message gets dropped.

We could use the same approach in RabbitMQ: Whenever a queue dead
letters a message, a dead_letter_hop_limit field could be decremented.
If this field reaches 0, the message will be dropped.
Such a hop limit field could have a sensible default value, for example
32. The sender of the message could override this value. Likewise, the
client rejecting a message could set a new value via the Modified
outcome.

Such an approach has multiple advantages:
1. No dead letter cycle detection per se needs to be performed within
   the broker which is a slight simplification to what we have today.
2. Simpler dead letter topologies. One very common use case is that
   clients re-try sending the message after some time by consuming from
   a dead-letter queue and rejecting the message such that the message
   gets republished to the original queue. Instead of requiring explicit
   client actions, which increases complexity, a x-message-ttl argument
   could be set on the dead-letter queue to automatically retry after
   some time. This is a big simplification because it eliminates the
   need of various frameworks that retry, such as
   https://docs.spring.io/spring-cloud-stream/reference/rabbit/rabbit_overview/rabbitmq-retry.html
3. No dead letter history information needs to be compressed because
   there is a clear limit on how often a message gets dead lettered.
   Therefore, the full history including timestamps of every dead letter
   event will be available to clients.

Disadvantages:
1. Breaks a lot of clients, even for 4.0.

  ## 3.12 approach

Instead of decrementing a counter, the approach up to 3.12 has been to
drop the message if the message cycled automatically. A message cycled
automatically if no client expliclity rejected the message, i.e. the
mesage got dead lettered due to maxlen, expired, or delivery_limit, but
not due to rejected.

In this approach, the broker must be able to detect such cycles
reliably.
Reliably detecting dead letter cycles broke in 3.13 due to #11159 and #11160.

To reliably detect cycles, the broker must be able to obtain the exact
order of dead letter events for a given message. In 3.13.0 - 3.13.2, the
order cannot exactly be determined because wall clock time is used to
record the death time.

This commit uses the same approach as done in 3.12: a list ordered by
death recency is used with the most recent death at the head of the
list.

To not grow this list endlessly (for example when a client rejects the
same message hundreds of times), this list should be compacted.
This commit, like 3.12, compacts by tuple `{Queue, Reason}`:
If this message got already dead lettered from this Queue for this
Reason, then only a counter is incremented and the element is moved to
the front of the list.

  ## Streams & AMQP 1.0 clients

Dead lettering from a stream doesn't make sense because:
1. a client cannot reject a message from a stream since the stream must
   maintain the total order of events to be consumed by multiple clients.
2. TTL is implemented by Stream retention where only old Stream segments
   are automatically deleted (or archived in the future).
3. same applies to maxlen

Although messages cannot be dead lettered **from** a stream, messages can be dead lettered
**into** a stream. This commit provides clients consuming from a stream the death history: #11173

Additionally, this commit provides AMQP 1.0 clients the death history via
message annotation `x-opt-deaths` which contains the same information as
AMQP 0.9.1 header `x-death`.

Both, storing the death history in a stream and providing death history
to an AMQP 1.0 client, use the same encoding: a message annoation
`x-opt-deaths` that contains an array of maps ordered by death recency.
The information encoded is the same as in the AMQP 0.9.1 x-death header.

Instead of providing an array of maps, a better approach could be to use
an array of a custom AMQP death type, such as:
```xml
<amqp name="rabbitmq">
    <section name="custom-types">
        <type name="death" class="composite" source="list">
            <descriptor name="rabbitmq:death:list" code="0x00000000:0x000000255"/>
            <field name="queue" type="string" mandatory="true" label="the name of the queue the message was dead lettered from"/>
            <field name="reason" type="symbol" mandatory="true" label="the reason why this message was dead lettered"/>
            <field name="count" type="ulong" default="1" label="how many times this message was dead lettered from this queue for this reason"/>
            <field name="time" mandatory="true" type="timestamp" label="the first time when this message was dead lettered from this queue for this reason"/>
            <field name="exchange" type="string" default="" label="the exchange this message was published to before it was dead lettered for the first time from this queue for this reason"/>
            <field name="routing-keys" type="string" default="" multiple="true" label="the routing keys this message was published with before it was dead lettered for the first time from this queue for this reason"/>
            <field name="ttl" type="milliseconds" label="the time to live of this message before it was dead lettered for the first time from this queue for reason ‘expired’"/>
        </type>
    </section>
</amqp>
```

However, encoding and decoding custom AMQP types that are nested within
arrays which in turn are nested within the message annotation map can be
difficult for clients and the broker. Also, each client will need to
know the custom AMQP type. For now, therefore we use an array of maps.

  ## Feature flag
The new way to record death information is done via mc annotation
`deaths_v2`.
Because old nodes do not know this new annotation, recording death
information via mc annotation `deaths_v2` is hidden behind a new feature
flag `message_containers_deaths_v2`.

If this feature flag is disabled, a message will continue to use the
3.13.0 - 3.13.2 way to record death information in mc annotation
`deaths`, or even the older way within `x-death` header directly if
feature flag message_containers is also disabled.

Only if feature flag `message_containers_deaths_v2` is enabled and this
message hasn't been dead lettered before, will the new mc annotation
`deaths_v2` be used.
mergify bot pushed a commit that referenced this issue May 13, 2024
  # What?

This commit fixes #11159, #11160, #11173.

  # How?

  ## Background

RabbitMQ allows to dead letter messages for four different reasons, out
of which three reasons cause messages to be dead lettered automatically
internally in the broker: (maxlen, expired, delivery_limit) and 1 reason
is caused by an explicit client action (rejected).

RabbitMQ also allows dead letter topologies. When a message is dead
lettered, it is re-published to an exchange, and therefore zero to
multiple target queues. These target queues can in turn dead letter
messages. Hence it is possible to create a cycle of queues where
messages get dead lettered endlessly, which is what we want to avoid.

  ## Alternative approach

One approach to avoid such endless cycles is to use a similar concept of
the TTL field of the IPv4 datagram, or the hop limit field of an IPv6
datagram. These fields ensure that IP packets aren't cicrulating forever
in the Internet. Each router decrements this counter. If this counter
reaches 0, the sender will be notified and the message gets dropped.

We could use the same approach in RabbitMQ: Whenever a queue dead
letters a message, a dead_letter_hop_limit field could be decremented.
If this field reaches 0, the message will be dropped.
Such a hop limit field could have a sensible default value, for example
32. The sender of the message could override this value. Likewise, the
client rejecting a message could set a new value via the Modified
outcome.

Such an approach has multiple advantages:
1. No dead letter cycle detection per se needs to be performed within
   the broker which is a slight simplification to what we have today.
2. Simpler dead letter topologies. One very common use case is that
   clients re-try sending the message after some time by consuming from
   a dead-letter queue and rejecting the message such that the message
   gets republished to the original queue. Instead of requiring explicit
   client actions, which increases complexity, a x-message-ttl argument
   could be set on the dead-letter queue to automatically retry after
   some time. This is a big simplification because it eliminates the
   need of various frameworks that retry, such as
   https://docs.spring.io/spring-cloud-stream/reference/rabbit/rabbit_overview/rabbitmq-retry.html
3. No dead letter history information needs to be compressed because
   there is a clear limit on how often a message gets dead lettered.
   Therefore, the full history including timestamps of every dead letter
   event will be available to clients.

Disadvantages:
1. Breaks a lot of clients, even for 4.0.

  ## 3.12 approach

Instead of decrementing a counter, the approach up to 3.12 has been to
drop the message if the message cycled automatically. A message cycled
automatically if no client expliclity rejected the message, i.e. the
mesage got dead lettered due to maxlen, expired, or delivery_limit, but
not due to rejected.

In this approach, the broker must be able to detect such cycles
reliably.
Reliably detecting dead letter cycles broke in 3.13 due to #11159 and #11160.

To reliably detect cycles, the broker must be able to obtain the exact
order of dead letter events for a given message. In 3.13.0 - 3.13.2, the
order cannot exactly be determined because wall clock time is used to
record the death time.

This commit uses the same approach as done in 3.12: a list ordered by
death recency is used with the most recent death at the head of the
list.

To not grow this list endlessly (for example when a client rejects the
same message hundreds of times), this list should be compacted.
This commit, like 3.12, compacts by tuple `{Queue, Reason}`:
If this message got already dead lettered from this Queue for this
Reason, then only a counter is incremented and the element is moved to
the front of the list.

  ## Streams & AMQP 1.0 clients

Dead lettering from a stream doesn't make sense because:
1. a client cannot reject a message from a stream since the stream must
   maintain the total order of events to be consumed by multiple clients.
2. TTL is implemented by Stream retention where only old Stream segments
   are automatically deleted (or archived in the future).
3. same applies to maxlen

Although messages cannot be dead lettered **from** a stream, messages can be dead lettered
**into** a stream. This commit provides clients consuming from a stream the death history: #11173

Additionally, this commit provides AMQP 1.0 clients the death history via
message annotation `x-opt-deaths` which contains the same information as
AMQP 0.9.1 header `x-death`.

Both, storing the death history in a stream and providing death history
to an AMQP 1.0 client, use the same encoding: a message annoation
`x-opt-deaths` that contains an array of maps ordered by death recency.
The information encoded is the same as in the AMQP 0.9.1 x-death header.

Instead of providing an array of maps, a better approach could be to use
an array of a custom AMQP death type, such as:
```xml
<amqp name="rabbitmq">
    <section name="custom-types">
        <type name="death" class="composite" source="list">
            <descriptor name="rabbitmq:death:list" code="0x00000000:0x000000255"/>
            <field name="queue" type="string" mandatory="true" label="the name of the queue the message was dead lettered from"/>
            <field name="reason" type="symbol" mandatory="true" label="the reason why this message was dead lettered"/>
            <field name="count" type="ulong" default="1" label="how many times this message was dead lettered from this queue for this reason"/>
            <field name="time" mandatory="true" type="timestamp" label="the first time when this message was dead lettered from this queue for this reason"/>
            <field name="exchange" type="string" default="" label="the exchange this message was published to before it was dead lettered for the first time from this queue for this reason"/>
            <field name="routing-keys" type="string" default="" multiple="true" label="the routing keys this message was published with before it was dead lettered for the first time from this queue for this reason"/>
            <field name="ttl" type="milliseconds" label="the time to live of this message before it was dead lettered for the first time from this queue for reason ‘expired’"/>
        </type>
    </section>
</amqp>
```

However, encoding and decoding custom AMQP types that are nested within
arrays which in turn are nested within the message annotation map can be
difficult for clients and the broker. Also, each client will need to
know the custom AMQP type. For now, therefore we use an array of maps.

  ## Feature flag
The new way to record death information is done via mc annotation
`deaths_v2`.
Because old nodes do not know this new annotation, recording death
information via mc annotation `deaths_v2` is hidden behind a new feature
flag `message_containers_deaths_v2`.

If this feature flag is disabled, a message will continue to use the
3.13.0 - 3.13.2 way to record death information in mc annotation
`deaths`, or even the older way within `x-death` header directly if
feature flag message_containers is also disabled.

Only if feature flag `message_containers_deaths_v2` is enabled and this
message hasn't been dead lettered before, will the new mc annotation
`deaths_v2` be used.

(cherry picked from commit 6b300a2)

# Conflicts:
#	deps/rabbit/app.bzl
#	deps/rabbit/src/mc_amqp.erl
#	deps/rabbit/src/rabbit_core_ff.erl
#	deps/rabbit/test/amqp_client_SUITE.erl
@ansd ansd added this to the 3.13.3 milestone May 14, 2024
@ansd
Copy link
Member Author

ansd commented May 14, 2024

Implemented in #11174

@ansd ansd closed this as completed May 14, 2024
ansd added a commit that referenced this issue May 14, 2024
* Fix dead lettering

  # What?

This commit fixes #11159, #11160, #11173.

  # How?

  ## Background

RabbitMQ allows to dead letter messages for four different reasons, out
of which three reasons cause messages to be dead lettered automatically
internally in the broker: (maxlen, expired, delivery_limit) and 1 reason
is caused by an explicit client action (rejected).

RabbitMQ also allows dead letter topologies. When a message is dead
lettered, it is re-published to an exchange, and therefore zero to
multiple target queues. These target queues can in turn dead letter
messages. Hence it is possible to create a cycle of queues where
messages get dead lettered endlessly, which is what we want to avoid.

  ## Alternative approach

One approach to avoid such endless cycles is to use a similar concept of
the TTL field of the IPv4 datagram, or the hop limit field of an IPv6
datagram. These fields ensure that IP packets aren't cicrulating forever
in the Internet. Each router decrements this counter. If this counter
reaches 0, the sender will be notified and the message gets dropped.

We could use the same approach in RabbitMQ: Whenever a queue dead
letters a message, a dead_letter_hop_limit field could be decremented.
If this field reaches 0, the message will be dropped.
Such a hop limit field could have a sensible default value, for example
32. The sender of the message could override this value. Likewise, the
client rejecting a message could set a new value via the Modified
outcome.

Such an approach has multiple advantages:
1. No dead letter cycle detection per se needs to be performed within
   the broker which is a slight simplification to what we have today.
2. Simpler dead letter topologies. One very common use case is that
   clients re-try sending the message after some time by consuming from
   a dead-letter queue and rejecting the message such that the message
   gets republished to the original queue. Instead of requiring explicit
   client actions, which increases complexity, a x-message-ttl argument
   could be set on the dead-letter queue to automatically retry after
   some time. This is a big simplification because it eliminates the
   need of various frameworks that retry, such as
   https://docs.spring.io/spring-cloud-stream/reference/rabbit/rabbit_overview/rabbitmq-retry.html
3. No dead letter history information needs to be compressed because
   there is a clear limit on how often a message gets dead lettered.
   Therefore, the full history including timestamps of every dead letter
   event will be available to clients.

Disadvantages:
1. Breaks a lot of clients, even for 4.0.

  ## 3.12 approach

Instead of decrementing a counter, the approach up to 3.12 has been to
drop the message if the message cycled automatically. A message cycled
automatically if no client expliclity rejected the message, i.e. the
mesage got dead lettered due to maxlen, expired, or delivery_limit, but
not due to rejected.

In this approach, the broker must be able to detect such cycles
reliably.
Reliably detecting dead letter cycles broke in 3.13 due to #11159 and #11160.

To reliably detect cycles, the broker must be able to obtain the exact
order of dead letter events for a given message. In 3.13.0 - 3.13.2, the
order cannot exactly be determined because wall clock time is used to
record the death time.

This commit uses the same approach as done in 3.12: a list ordered by
death recency is used with the most recent death at the head of the
list.

To not grow this list endlessly (for example when a client rejects the
same message hundreds of times), this list should be compacted.
This commit, like 3.12, compacts by tuple `{Queue, Reason}`:
If this message got already dead lettered from this Queue for this
Reason, then only a counter is incremented and the element is moved to
the front of the list.

  ## Streams & AMQP 1.0 clients

Dead lettering from a stream doesn't make sense because:
1. a client cannot reject a message from a stream since the stream must
   maintain the total order of events to be consumed by multiple clients.
2. TTL is implemented by Stream retention where only old Stream segments
   are automatically deleted (or archived in the future).
3. same applies to maxlen

Although messages cannot be dead lettered **from** a stream, messages can be dead lettered
**into** a stream. This commit provides clients consuming from a stream the death history: #11173

Additionally, this commit provides AMQP 1.0 clients the death history via
message annotation `x-opt-deaths` which contains the same information as
AMQP 0.9.1 header `x-death`.

Both, storing the death history in a stream and providing death history
to an AMQP 1.0 client, use the same encoding: a message annoation
`x-opt-deaths` that contains an array of maps ordered by death recency.
The information encoded is the same as in the AMQP 0.9.1 x-death header.

Instead of providing an array of maps, a better approach could be to use
an array of a custom AMQP death type, such as:
```xml
<amqp name="rabbitmq">
    <section name="custom-types">
        <type name="death" class="composite" source="list">
            <descriptor name="rabbitmq:death:list" code="0x00000000:0x000000255"/>
            <field name="queue" type="string" mandatory="true" label="the name of the queue the message was dead lettered from"/>
            <field name="reason" type="symbol" mandatory="true" label="the reason why this message was dead lettered"/>
            <field name="count" type="ulong" default="1" label="how many times this message was dead lettered from this queue for this reason"/>
            <field name="time" mandatory="true" type="timestamp" label="the first time when this message was dead lettered from this queue for this reason"/>
            <field name="exchange" type="string" default="" label="the exchange this message was published to before it was dead lettered for the first time from this queue for this reason"/>
            <field name="routing-keys" type="string" default="" multiple="true" label="the routing keys this message was published with before it was dead lettered for the first time from this queue for this reason"/>
            <field name="ttl" type="milliseconds" label="the time to live of this message before it was dead lettered for the first time from this queue for reason ‘expired’"/>
        </type>
    </section>
</amqp>
```

However, encoding and decoding custom AMQP types that are nested within
arrays which in turn are nested within the message annotation map can be
difficult for clients and the broker. Also, each client will need to
know the custom AMQP type. For now, therefore we use an array of maps.

  ## Feature flag
The new way to record death information is done via mc annotation
`deaths_v2`.
Because old nodes do not know this new annotation, recording death
information via mc annotation `deaths_v2` is hidden behind a new feature
flag `message_containers_deaths_v2`.

If this feature flag is disabled, a message will continue to use the
3.13.0 - 3.13.2 way to record death information in mc annotation
`deaths`, or even the older way within `x-death` header directly if
feature flag message_containers is also disabled.

Only if feature flag `message_containers_deaths_v2` is enabled and this
message hasn't been dead lettered before, will the new mc annotation
`deaths_v2` be used.

(cherry picked from commit 6b300a2)

# Conflicts:
#	deps/rabbit/app.bzl
#	deps/rabbit/src/mc_amqp.erl
#	deps/rabbit/src/rabbit_core_ff.erl
#	deps/rabbit/test/amqp_client_SUITE.erl

* Fix conflicts and failing tests

Extend message_containers_deaths_v2_SUITE to send 3 messages whose death
histories will be stored in 3 different ways:
1. with feature flag message_containers disabled
2. with feature flag message_containers enabled, but message_containers_deaths_v2 disabled
3. with feature flag message_containers_deaths_v2 enabled

---------

Co-authored-by: David Ansari <david.ansari@gmx.de>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant