Skip to content

Commit

Permalink
GH-1509: Add Concurrency for Super Streams
Browse files Browse the repository at this point in the history
Resolves #1509
  • Loading branch information
garyrussell authored and artembilan committed Oct 4, 2022
1 parent 52e49cb commit 506abd5
Show file tree
Hide file tree
Showing 5 changed files with 198 additions and 21 deletions.
Expand Up @@ -16,10 +16,11 @@

package org.springframework.rabbit.stream.listener;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;

import org.aopalliance.aop.Advice;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.amqp.core.Message;
Expand All @@ -29,6 +30,7 @@
import org.springframework.aop.framework.ProxyFactory;
import org.springframework.aop.support.DefaultPointcutAdvisor;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.core.log.LogAccessor;
import org.springframework.lang.Nullable;
import org.springframework.rabbit.stream.support.StreamMessageProperties;
import org.springframework.rabbit.stream.support.converter.DefaultStreamMessageConverter;
Expand All @@ -49,15 +51,21 @@
*/
public class StreamListenerContainer implements MessageListenerContainer, BeanNameAware {

protected Log logger = LogFactory.getLog(getClass()); // NOSONAR
protected LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass())); // NOSONAR

private final ConsumerBuilder builder;

private final Collection<Consumer> consumers = new ArrayList<>();

private StreamMessageConverter streamConverter;

private ConsumerCustomizer consumerCustomizer = (id, con) -> { };

private Consumer consumer;
private boolean simpleStream;

private boolean superStream;

private int concurrency = 1;

private String listenerId;

Expand Down Expand Up @@ -96,22 +104,41 @@ public StreamListenerContainer(Environment environment, @Nullable Codec codec) {
*/
@Override
public void setQueueNames(String... queueNames) {
Assert.isTrue(!this.superStream, "setQueueNames() and superStream() are mutually exclusive");
Assert.isTrue(queueNames != null && queueNames.length == 1, "Only one stream is supported");
this.builder.stream(queueNames[0]);
this.simpleStream = true;
}

/**
* Enable Single Active Consumer on a Super Stream, with one consumer.
* Mutually exclusive with {@link #setQueueNames(String...)}.
* @param streamName the stream.
* @param name the consumer name.
* @since 3.0
*/
public void superStream(String streamName, String name) {
superStream(streamName, name, 1);
}

/**
* Enable Single Active Consumer on a Super Stream.
* Enable Single Active Consumer on a Super Stream with the provided number of consumers.
* There must be at least that number of partitions in the Super Stream.
* Mutually exclusive with {@link #setQueueNames(String...)}.
* @param superStream the stream.
* @param streamName the stream.
* @param name the consumer name.
* @param consumers the number of consumers.
* @since 3.0
*/
public void superStream(String superStream, String name) {
Assert.notNull(superStream, "'superStream' cannot be null");
this.builder.superStream(superStream)
public void superStream(String streamName, String name, int consumers) {
Assert.isTrue(consumers > 0, () -> "'concurrency' must be greater than zero, not " + consumers);
this.concurrency = consumers;
Assert.isTrue(!this.simpleStream, "setQueueNames() and superStream() are mutually exclusive");
Assert.notNull(streamName, "'superStream' cannot be null");
this.builder.superStream(streamName)
.singleActiveConsumer()
.name(name);
this.superStream = true;
}

/**
Expand Down Expand Up @@ -201,23 +228,35 @@ public Object getMessageListener() {

@Override
public synchronized boolean isRunning() {
return this.consumer != null;
return this.consumers.size() > 0;
}

@Override
public synchronized void start() {
if (this.consumer == null) {
if (this.consumers.size() == 0) {
this.consumerCustomizer.accept(getListenerId(), this.builder);
this.consumer = this.builder.build();
if (this.simpleStream) {
this.consumers.add(this.builder.build());
}
else {
for (int i = 0; i < this.concurrency; i++) {
this.consumers.add(this.builder.build());
}
}
}
}

@Override
public synchronized void stop() {
if (this.consumer != null) {
this.consumer.close();
this.consumer = null;
}
this.consumers.forEach(consumer -> {
try {
consumer.close();
}
catch (RuntimeException ex) {
this.logger.error(ex, "Failed to close consumer");
}
});
this.consumers.clear();
}

@Override
Expand All @@ -233,8 +272,8 @@ public void setupMessageListener(MessageListener messageListener) {
try {
((ChannelAwareMessageListener) this.messageListener).onMessage(message2, null);
}
catch (Exception e) { // NOSONAR
this.logger.error("Listner threw an exception", e);
catch (Exception ex) { // NOSONAR
this.logger.error(ex, "Listner threw an exception");
}
}
else {
Expand Down
@@ -0,0 +1,133 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.rabbit.stream.listener;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.junit.jupiter.api.Test;

import org.springframework.amqp.core.Declarables;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.rabbit.stream.config.SuperStream;
import org.springframework.rabbit.stream.support.AbstractIntegrationTests;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;

import com.rabbitmq.stream.Address;
import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.OffsetSpecification;

/**
* @author Gary Russell
* @since 3.0
*
*/
@SpringJUnitConfig
public class SuperStreamConcurrentSACTests extends AbstractIntegrationTests {

@Test
void concurrent(@Autowired StreamListenerContainer container, @Autowired RabbitTemplate template,
@Autowired Config config, @Autowired RabbitAdmin admin,
@Autowired Declarables superStream) throws InterruptedException {

template.getConnectionFactory().createConnection();
container.start();
assertThat(config.consumerLatch.await(10, TimeUnit.SECONDS)).isTrue();
template.convertAndSend("ss.sac.concurrency.test", "0", "foo");
template.convertAndSend("ss.sac.concurrency.test", "1", "bar");
template.convertAndSend("ss.sac.concurrency.test", "2", "baz");
assertThat(config.messageLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(config.threads).hasSize(3);
container.stop();
clean(admin, superStream);
}

private void clean(RabbitAdmin admin, Declarables declarables) {
declarables.getDeclarablesByType(Queue.class).forEach(queue -> admin.deleteQueue(queue.getName()));
declarables.getDeclarablesByType(DirectExchange.class).forEach(ex -> admin.deleteExchange(ex.getName()));
}

@Configuration
public static class Config {

final Set<String> threads = new HashSet<>();

final CountDownLatch consumerLatch = new CountDownLatch(3);

final CountDownLatch messageLatch = new CountDownLatch(3);

@Bean
CachingConnectionFactory cf() {
return new CachingConnectionFactory("localhost", amqpPort());
}

@Bean
RabbitAdmin admin(ConnectionFactory cf) {
return new RabbitAdmin(cf);
}

@Bean
RabbitTemplate template(ConnectionFactory cf) {
return new RabbitTemplate(cf);
}

@Bean
SuperStream superStream() {
return new SuperStream("ss.sac.concurrency.test", 3);
}

@Bean
static Environment environment() {
return Environment.builder()
.addressResolver(add -> new Address("localhost", streamPort()))
.maxConsumersByConnection(1)
.build();
}

@Bean
StreamListenerContainer concurrentContainer(Environment env) {
StreamListenerContainer container = new StreamListenerContainer(env);
container.superStream("ss.sac.concurrency.test", "concurrent", 3);
container.setupMessageListener(msg -> {
this.threads.add(Thread.currentThread().getName());
this.messageLatch.countDown();
});
container.setConsumerCustomizer((id, builder) -> {
builder.consumerUpdateListener(context -> {
this.consumerLatch.countDown();
return OffsetSpecification.last();
});
});
container.setAutoStartup(false);
return container;
}

}

}
Expand Up @@ -33,7 +33,7 @@ public abstract class AbstractIntegrationTests {
static {
if (System.getProperty("spring.rabbit.use.local.server") == null
&& System.getenv("SPRING_RABBIT_USE_LOCAL_SERVER") == null) {
String image = "rabbitmq:3.11";
String image = "rabbitmq:3.11-management";
String cache = System.getenv().get("IMAGE_CACHE");
if (cache != null) {
image = cache + image;
Expand Down
5 changes: 3 additions & 2 deletions src/reference/asciidoc/amqp.adoc
Expand Up @@ -5741,7 +5741,7 @@ Enable this feature by calling `ConnectionFactoryUtils.enableAfterCompletionFail
==== Message Listener Container Configuration

There are quite a few options for configuring a `SimpleMessageListenerContainer` (SMLC) and a `DirectMessageListenerContainer` (DMLC) related to transactions and quality of service, and some of them interact with each other.
Properties that apply to the SMLC, DMLC, or `StreamListenerContainer` (StLC) (see <<stream-support>> are indicated by the check mark in the appropriate column.
Properties that apply to the SMLC, DMLC, or `StreamListenerContainer` (StLC) (see <<stream-support>>) are indicated by the check mark in the appropriate column.
See <<choose-container>> for information to help you decide which container is appropriate for your application.

The following table shows the container property names and their equivalent attribute names (in parentheses) when using the namespace to configure a `<rabbit:listener-container/>`.
Expand Down Expand Up @@ -5894,10 +5894,11 @@ a|

|The number of concurrent consumers to initially start for each listener.
See <<listener-concurrency>>.
For the `StLC`, concurrency is controlled via an overloaded `superStream` method; see <<super-stream-consumer>>.

a|image::images/tickmark.png[]
a|
a|
a|image::images/tickmark.png[]

|[[connectionFactory]]<<connectionFactory,`connectionFactory`>> +
(connection-factory)
Expand Down
6 changes: 5 additions & 1 deletion src/reference/asciidoc/stream.adoc
Expand Up @@ -216,6 +216,7 @@ RabbitStreamTemplate streamTemplate(Environment env) {

You can also publish over AMQP, using the `RabbitTemplate`.

[[super-stream-consumer]]
===== Consuming Super Streams with Single Active Consumers

Invoke the `superStream` method on the listener container to enable a single active consumer on a super stream.
Expand All @@ -227,7 +228,7 @@ Invoke the `superStream` method on the listener container to enable a single act
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
StreamListenerContainer container(Environment env, String name) {
StreamListenerContainer container = new StreamListenerContainer(env);
container.superStream("ss.sac", "myConsumer");
container.superStream("ss.sac", "myConsumer", 3); // concurrency = 3
container.setupMessageListener(msg -> {
...
});
Expand All @@ -236,3 +237,6 @@ StreamListenerContainer container(Environment env, String name) {
}
----
====

IMPORTANT: At this time, when the concurrency is greater than 1, the actual concurrency is further controlled by the `Environment`; to achieve full concurrency, set the environment's `maxConsumersByConnection` to 1.
See https://rabbitmq.github.io/rabbitmq-stream-java-client/snapshot/htmlsingle/#configuring-the-environment[Configuring the Environment].

0 comments on commit 506abd5

Please sign in to comment.