Skip to content

Commit

Permalink
spring-projectsGH-1425: Configure ReplyPostProcessor via Factory
Browse files Browse the repository at this point in the history
  • Loading branch information
garyrussell committed Oct 26, 2022
1 parent 36e98a7 commit d29d625
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 2 deletions.
Expand Up @@ -17,6 +17,7 @@
package org.springframework.amqp.rabbit.config;

import java.util.Arrays;
import java.util.function.Function;

import org.aopalliance.aop.Advice;

Expand All @@ -26,6 +27,7 @@
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpoint;
import org.springframework.amqp.rabbit.listener.adapter.AbstractAdaptableMessageListener;
import org.springframework.amqp.rabbit.listener.adapter.ReplyPostProcessor;
import org.springframework.amqp.utils.JavaUtils;
import org.springframework.lang.Nullable;
import org.springframework.retry.RecoveryCallback;
Expand Down Expand Up @@ -54,6 +56,8 @@ public abstract class BaseRabbitListenerContainerFactory<C extends MessageListen

private Advice[] adviceChain;

private Function<String, ReplyPostProcessor> replyPostProcessorProvider;

@Override
public abstract C createListenerContainer(RabbitListenerEndpoint endpoint);

Expand Down Expand Up @@ -108,6 +112,17 @@ public void setReplyRecoveryCallback(RecoveryCallback<?> recoveryCallback) {
this.recoveryCallback = recoveryCallback;
}

/**
* Set a function to provide a reply post processor; it will be used if there is no
* replyPostProcessor on the rabbit listener annotation. The input parameter is the
* listener id.
* @param replyPostProcessorProvider the post processor.
* @since 3.0
*/
public void setReplyPostProcessorProvider(Function<String, ReplyPostProcessor> replyPostProcessorProvider) {
this.replyPostProcessorProvider = replyPostProcessorProvider;
}

protected void applyCommonOverrides(@Nullable RabbitListenerEndpoint endpoint, C instance) {
if (endpoint != null) { // endpoint settings overriding default factory settings
JavaUtils.INSTANCE
Expand All @@ -130,6 +145,11 @@ protected void applyCommonOverrides(@Nullable RabbitListenerEndpoint endpoint, C
.acceptIfNotNull(endpoint.getReplyPostProcessor(), messageListener::setReplyPostProcessor)
.acceptIfNotNull(endpoint.getReplyContentType(), messageListener::setReplyContentType);
messageListener.setConverterWinsContentType(endpoint.isConverterWinsContentType());
if (endpoint.getReplyPostProcessor() == null && this.replyPostProcessorProvider != null) {
JavaUtils.INSTANCE
.acceptIfNotNull(this.replyPostProcessorProvider.apply(endpoint.getId()),
messageListener::setReplyPostProcessor);
}
}
}
}
Expand Down
Expand Up @@ -237,6 +237,9 @@ public class EnableRabbitIntegrationTests extends NeedsManagementTests {
@Autowired
private MultiListenerValidatedJsonBean multiValidated;

@Autowired
private ReplyPostProcessor rpp;

@BeforeAll
public static void setUp() {
System.setProperty(RabbitListenerAnnotationBeanPostProcessor.RABBIT_EMPTY_STRING_ARGUMENTS_PROPERTY,
Expand Down Expand Up @@ -310,6 +313,8 @@ public void autoStart() {
this.registry.start();
assertThat(listenerContainer.isRunning()).isTrue();
listenerContainer.stop();
assertThat(listenerContainer.getMessageListener()).extracting("replyPostProcessor")
.isSameAs(this.rpp);
}

@Test
Expand Down Expand Up @@ -1690,14 +1695,22 @@ public SimpleMessageListenerContainer factoryCreatedContainerNoListener(
}

@Bean
public SimpleRabbitListenerContainerFactory rabbitAutoStartFalseListenerContainerFactory() {
public SimpleRabbitListenerContainerFactory rabbitAutoStartFalseListenerContainerFactory(
ReplyPostProcessor rpp) {

SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(rabbitConnectionFactory());
factory.setReceiveTimeout(10L);
factory.setAutoStartup(false);
factory.setReplyPostProcessorProvider(id -> rpp);
return factory;
}

@Bean
ReplyPostProcessor rpp() {
return (in, out) -> out;
}

@Bean
public SimpleRabbitListenerContainerFactory jsonListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
Expand Down
14 changes: 14 additions & 0 deletions src/reference/asciidoc/amqp.adoc
Expand Up @@ -3144,6 +3144,20 @@ public ReplyPostProcessor echoCustomHeader() {
----
====

Starting with version 3.0, you can configure the post processor on the container factory instead of on the annotation.

====
[source, java]
----
factory.setReplyPostProcessorProvider(id -> (req, resp) -> {
resp.getMessageProperties().setHeader("myHeader", req.getMessageProperties().getHeader("myHeader"));
return resp;
});
----
====

The annotation will supersede the factory setting.

The `@SendTo` value is assumed as a reply `exchange` and `routingKey` pair that follows the `exchange/routingKey` pattern,
where one of those parts can be omitted.
The valid values are as follows:
Expand Down
5 changes: 4 additions & 1 deletion src/reference/asciidoc/whats-new.adoc
Expand Up @@ -37,7 +37,10 @@ When setting the container factory `consumerBatchEnabled` to `true`, the `batchL
See <<receiving-batch>> for more infoprmation.

`MessageConverter` s can now return `Optional.empty()` for a null value; this is currently implemented by the `Jackson2JsonMessageConverter`.
See <<Jackson2JsonMessageConverter-from-message>> for more information.
See <<Jackson2JsonMessageConverter-from-message>> for more information

You can now configure a `ReplyPostProcessor` via the container factory rather than via a property on `@RabbitListener`.
See <<async-annotation-driven-reply>> for more information.

==== Connection Factory Changes

Expand Down

0 comments on commit d29d625

Please sign in to comment.