Skip to content

Commit

Permalink
GH-1382: Republish Recoverer Improvements
Browse files Browse the repository at this point in the history
Resolves #1382

Add expressions; make private method protected.

**cherry-pick to 2.4.x**
  • Loading branch information
garyrussell authored and artembilan committed Oct 28, 2022
1 parent 06ba396 commit ddc32a3
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 17 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2021 the original author or authors.
* Copyright 2014-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -29,6 +29,10 @@
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.RabbitUtils;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;

Expand Down Expand Up @@ -68,9 +72,11 @@ public class RepublishMessageRecoverer implements MessageRecoverer {

protected final AmqpTemplate errorTemplate; // NOSONAR

protected final String errorRoutingKey; // NOSONAR
protected final Expression errorRoutingKeyExpression; // NOSONAR

protected final String errorExchangeName; // NOSONAR
protected final Expression errorExchangeNameExpression; // NOSONAR

protected final EvaluationContext evaluationContext = new StandardEvaluationContext();

private String errorRoutingKeyPrefix = "error.";

Expand All @@ -80,19 +86,48 @@ public class RepublishMessageRecoverer implements MessageRecoverer {

private MessageDeliveryMode deliveryMode = MessageDeliveryMode.PERSISTENT;

/**
* Create an instance with the provided template.
* @param errorTemplate the template.
*/
public RepublishMessageRecoverer(AmqpTemplate errorTemplate) {
this(errorTemplate, null, null);
this(errorTemplate, (String) null, (String) null);
}

/**
* Create an instance with the provided properties.
* @param errorTemplate the template.
* @param errorExchange the exchange.
*/
public RepublishMessageRecoverer(AmqpTemplate errorTemplate, String errorExchange) {
this(errorTemplate, errorExchange, null);
}

/**
* Create an instance with the provided properties. If the exchange or routing key is null,
* the template's default will be used.
* @param errorTemplate the template.
* @param errorExchange the exchange.
* @param errorRoutingKey the routing key.
*/
public RepublishMessageRecoverer(AmqpTemplate errorTemplate, String errorExchange, String errorRoutingKey) {
this(errorTemplate, new LiteralExpression(errorExchange), new LiteralExpression(errorRoutingKey));
}

/**
* Create an instance with the provided properties. If the exchange or routing key
* evaluate to null, the template's default will be used.
* @param errorTemplate the template.
* @param errorExchange the exchange expression, evaluated against the message.
* @param errorRoutingKey the routing key, evaluated against the message.
*/
public RepublishMessageRecoverer(AmqpTemplate errorTemplate, @Nullable Expression errorExchange,
@Nullable Expression errorRoutingKey) {

Assert.notNull(errorTemplate, "'errorTemplate' cannot be null");
this.errorTemplate = errorTemplate;
this.errorExchangeName = errorExchange;
this.errorRoutingKey = errorRoutingKey;
this.errorExchangeNameExpression = errorExchange != null ? errorExchange : new LiteralExpression(null);
this.errorRoutingKeyExpression = errorRoutingKey != null ? errorRoutingKey : new LiteralExpression(null);
if (!(this.errorTemplate instanceof RabbitTemplate)) {
this.maxStackTraceLength = Integer.MAX_VALUE;
}
Expand Down Expand Up @@ -175,17 +210,17 @@ public void recover(Message message, Throwable cause) {
messageProperties.setDeliveryMode(this.deliveryMode);
}

if (null != this.errorExchangeName) {
String routingKey = this.errorRoutingKey != null ? this.errorRoutingKey
: this.prefixedOriginalRoutingKey(message);
doSend(this.errorExchangeName, routingKey, message);
String exchangeName = this.errorExchangeNameExpression.getValue(this.evaluationContext, message, String.class);
String rk = this.errorRoutingKeyExpression.getValue(this.evaluationContext, message, String.class);
String routingKey = rk != null ? rk : this.prefixedOriginalRoutingKey(message);
if (null != exchangeName) {
doSend(exchangeName, routingKey, message);
if (this.logger.isWarnEnabled()) {
this.logger.warn("Republishing failed message to exchange '" + this.errorExchangeName
this.logger.warn("Republishing failed message to exchange '" + exchangeName
+ "' with routing key " + routingKey);
}
}
else {
final String routingKey = this.prefixedOriginalRoutingKey(message);
doSend(null, routingKey, message);
if (this.logger.isWarnEnabled()) {
this.logger.warn("Republishing failed message to the template's default exchange with routing key "
Expand Down Expand Up @@ -271,11 +306,24 @@ else if (stackTraceAsString.length() + exceptionMessage.length() > this.maxStack
return null;
}

private String prefixedOriginalRoutingKey(Message message) {
/**
* The default behavior of this method is to append the received routing key to the
* {@link #setErrorRoutingKeyPrefix(String) routingKeyPrefix}. This is only invoked
* if the routing key is null.
* @param message the message.
* @return the routing key.
*/
protected String prefixedOriginalRoutingKey(Message message) {
return this.errorRoutingKeyPrefix + message.getMessageProperties().getReceivedRoutingKey();
}

private String getStackTraceAsString(Throwable cause) {
/**
* Create a String representation of the stack trace.
* @param cause the throwable.
* @return the String.
* @since 2.4.8
*/
protected String getStackTraceAsString(Throwable cause) {
StringWriter stringWriter = new StringWriter();
PrintWriter printWriter = new PrintWriter(stringWriter, true);
cause.printStackTrace(printWriter);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2019 the original author or authors.
* Copyright 2014-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -33,6 +33,7 @@
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.expression.spel.standard.SpelExpressionParser;

/**
* @author James Carr
Expand All @@ -42,7 +43,7 @@
* @since 1.3
*/
@ExtendWith(MockitoExtension.class)
public class RepublishMessageRecovererTest {
public class RepublishMessageRecovererTests {

private final Message message = new Message("".getBytes(), new MessageProperties());

Expand Down Expand Up @@ -151,4 +152,29 @@ void setDeliveryModeIfNull() {
assertThat(this.message.getMessageProperties().getDeliveryMode()).isEqualTo(MessageDeliveryMode.NON_PERSISTENT);
}

@Test
void dynamicExRk() {
this.recoverer = new RepublishMessageRecoverer(this.amqpTemplate,
new SpelExpressionParser().parseExpression("messageProperties.headers.get('errorExchange')"),
new SpelExpressionParser().parseExpression("messageProperties.headers.get('errorRK')"));
this.message.getMessageProperties().setHeader("errorExchange", "ex");
this.message.getMessageProperties().setHeader("errorRK", "rk");

this.recoverer.recover(this.message, this.cause);

verify(this.amqpTemplate).send("ex", "rk", this.message);
}

@Test
void dynamicRk() {
this.recoverer = new RepublishMessageRecoverer(this.amqpTemplate, null,
new SpelExpressionParser().parseExpression("messageProperties.headers.get('errorRK')"));
this.message.getMessageProperties().setHeader("errorExchange", "ex");
this.message.getMessageProperties().setHeader("errorRK", "rk");

this.recoverer.recover(this.message, this.cause);

verify(this.amqpTemplate).send("rk", this.message);
}

}
5 changes: 4 additions & 1 deletion src/reference/asciidoc/amqp.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -6783,7 +6783,7 @@ The default `MessageRecoverer` consumes the errant message and emits a `WARN` me

Starting with version 1.3, a new `RepublishMessageRecoverer` is provided, to allow publishing of failed messages after retries are exhausted.

When a recoverer consumes the final exception, the message is ack'd and is not sent to the dead letter exchange, if any.
When a recoverer consumes the final exception, the message is ack'd and is not sent to the dead letter exchange by the broker, if configured.

NOTE: When `RepublishMessageRecoverer` is used on the consumer side, the received message has `deliveryMode` in the `receivedDeliveryMode` message property.
In this case the `deliveryMode` is `null`.
Expand Down Expand Up @@ -6834,6 +6834,9 @@ Starting with versions 2.1.13, 2.2.3, the exception message is included in this
* if the stack trace is small, the message will be truncated (plus `...`) to fit in the available bytes (but the message within the stack trace itself is truncated to 97 bytes plus `...`).

Whenever a truncation of any kind occurs, the original exception will be logged to retain the complete information.
The evaluation is performed after the headers are enhanced so information such as the exception type can be used in the expressions.

Starting with version 2.4.8, the error exchange and routing key can be provided as SpEL expressions, with the `Message` being the root object for the evaluation.

Starting with version 2.3.3, a new subclass `RepublishMessageRecovererWithConfirms` is provided; this supports both styles of publisher confirms and will wait for the confirmation before returning (or throw an exception if not confirmed or the message is returned).

Expand Down

0 comments on commit ddc32a3

Please sign in to comment.