Skip to content

Spring Integration 6.0 to 6.1 Migration Guide

Artem Bilan edited this page May 8, 2023 · 3 revisions

Removal of MQTT ConsumerStopAction

Previously deprecated org.springframework.integration.mqtt.core.ConsumerStopAction and its functionality has been removed in favor of existing org.eclipse.paho.client.mqttv3.ConnectionOptions.cleanSession option.

Do Not Block By Default

According to distributed systems design and bad experience of demo developing it is not OK to block forever. Most of the timeouts in the framework are now 30 seconds by default. They were previously -1 or Long.MIN_VALUE which means wait for operation completion indefinitely. Only one remained as 1 seconds is a PollingConsumer where it is better to not block even for those 30 seconds when no messages in the queue, but let the polling task be rescheduled. This fixed the problem with a single thread in a pool for auto-configured TaskScheduler. Now with a 1 second of wait time we are able to switch to other scheduled tasks even with only 1 thread in the pool.

Filter WARN About Discarded Message

The MessageFilter now emits a WARN logging message when a Message is dropped: no discardChannel and throwExceptionOnRejection is false. Essentially, a default behavior of the filter is to not be silent, but rather WARN in the logs. If the previous behavior is acceptable, it is recommended to set a nullChannel as a discardChannel for filter endpoint.

No IntegrationComponentSpec.get() any more

The IntegrationComponentSpec is a FactoryBean by itself and therefore it follows its lifecycle requirements when it is declared as a bean. In addition, the IntegrationComponentSpec might not only be a wrapper around one target object: it may manage and expose other supporting components for target Spring Integration endpoints. For this purpose a ComponentsRegistration contract can be implemented on the IntegrationComponentSpec. Spring Integration Java DSL and its supporting BeanPostProcessor handles an IntegrationComponentSpec and its ComponentsRegistration aspect, but for this task to be performed properly the IntegrationComponentSpec must be provided for the framework as is. Therefore we have deprecated a get() method with removal in the next version.

For example, a bean definition like this:

@Bean(PollerMetadata.DEFAULT_POLLER)
PollerMetadata defaultPoller() {
    return Pollers.fixedDelay(100).get();
}

Must migrate now to this:

@Bean(PollerMetadata.DEFAULT_POLLER)
PollerSpec defaultPoller() {
    return Pollers.fixedDelay(100);
}

Endpoint is async by default for output channel as FluxMessageChannel

If the output channel of the AbstractMessageProducingHandler is configured to a ReactiveStreamsSubscribableChannel, the async mode is turned on by default. If the handler result is not a reactive type or CompletableFuture<?>, then regular reply producing process happens despite the output channel type.

This code:

.<Mono<?>>handle((p, h) -> p, e -> e.async(true))
.channel(MessageChannels.flux())

now simply can be migrated to:

.handle((p, h) -> p)
.channel(MessageChannels.flux())