Skip to content

Commit

Permalink
GH-1465: Part 1: Provision Super Streams over AMQP
Browse files Browse the repository at this point in the history
  • Loading branch information
garyrussell committed Jun 16, 2022
1 parent 6dae0fc commit d143d4b
Show file tree
Hide file tree
Showing 5 changed files with 170 additions and 4 deletions.
@@ -0,0 +1,64 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.rabbit.stream.config;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.IntStream;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Binding.DestinationType;
import org.springframework.amqp.core.Declarable;
import org.springframework.amqp.core.Declarables;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;

/**
* Create Super Stream Topology {@link Declarable}s.
*
* @author Gary Russell
* @since 3.0
*
*/
public class SuperStream extends Declarables {

/**
* Create a Super Stream with the provided parameters.
* @param name the stream name.
* @param partitions the number of partitions.
*/
public SuperStream(String name, int partitions) {
super(declarables(name, partitions));
}

private static Collection<Declarable> declarables(String name, int partitions) {
List<Declarable> declarables = new ArrayList<>();
String[] rks = IntStream.range(0, partitions).mapToObj(String::valueOf).toArray(String[]::new);
declarables.add(new DirectExchange(name, true, false, Map.of("x-super-stream", true)));
for (int i = 0; i < partitions; i++) {
String rk = rks[i];
Queue q = new Queue(name + "-" + rk, true, false, false, Map.of("x-queue-type", "stream"));
declarables.add(q);
declarables.add(new Binding(q.getName(), DestinationType.QUEUE, name, rk,
Map.of("x-stream-partition-order", i)));
}
return declarables;
}

}
@@ -0,0 +1,77 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.rabbit.stream.config;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.List;

import org.junit.jupiter.api.Test;

import org.springframework.amqp.core.Declarables;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.rabbit.stream.support.AbstractIntegrationTests;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;

/**
* @author Gary Russell
* @since 3.0
*
*/
@SpringJUnitConfig
public class SuperStreamProvisioningTests extends AbstractIntegrationTests {

@Test
void provision(@Autowired Declarables declarables, @Autowired CachingConnectionFactory cf,
@Autowired RabbitAdmin admin) {

assertThat(declarables.getDeclarables()).hasSize(7);
cf.createConnection();
List<Queue> queues = declarables.getDeclarablesByType(Queue.class);
assertThat(queues).extracting(que -> que.getName()).contains("test-0", "test-1", "test-2");
queues.forEach(que -> admin.deleteQueue(que.getName()));
declarables.getDeclarablesByType(DirectExchange.class).forEach(ex -> admin.deleteExchange(ex.getName()));
}

@Configuration
public static class Config {

@Bean
CachingConnectionFactory cf() {
return new CachingConnectionFactory("localhost", amqpPort());
}

@Bean
RabbitAdmin admin(ConnectionFactory cf) {
return new RabbitAdmin(cf);
}

@Bean
SuperStream superStream() {
return new SuperStream("test", 3);
}

}

}
Expand Up @@ -44,6 +44,7 @@
import org.springframework.rabbit.stream.config.StreamRabbitListenerContainerFactory;
import org.springframework.rabbit.stream.producer.RabbitStreamTemplate;
import org.springframework.rabbit.stream.retry.StreamRetryOperationsInterceptorFactoryBean;
import org.springframework.rabbit.stream.support.AbstractIntegrationTests;
import org.springframework.rabbit.stream.support.StreamMessageProperties;
import org.springframework.retry.interceptor.RetryOperationsInterceptor;
import org.springframework.test.annotation.DirtiesContext;
Expand Down
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package org.springframework.rabbit.stream.listener;
package org.springframework.rabbit.stream.support;

import java.time.Duration;

Expand Down Expand Up @@ -48,15 +48,15 @@ public abstract class AbstractIntegrationTests {
}
}

static int amqpPort() {
public static int amqpPort() {
return RABBITMQ != null ? RABBITMQ.getMappedPort(5672) : 5672;
}

static int managementPort() {
public static int managementPort() {
return RABBITMQ != null ? RABBITMQ.getMappedPort(15672) : 15672;
}

static int streamPort() {
public static int streamPort() {
return RABBITMQ != null ? RABBITMQ.getMappedPort(5552) : 5552;
}

Expand Down
24 changes: 24 additions & 0 deletions src/reference/asciidoc/stream.adoc
Expand Up @@ -157,3 +157,27 @@ public StreamRetryOperationsInterceptorFactoryBean sfb(RetryTemplate retryTempla
====

IMPORTANT: Stateful retry is not supported with this container.

==== Super Streams

A Super Stream is an abstract concept for a partitioned stream, implemented by binding a number of stream queues to an exchange having an argument `x-super-stream: true`.

===== Provisioning

For convenience, a super stream can be provisioned by defining a single bean of type `SuperStream`.

====
[source, java]
----
@Bean
SuperStream superStream() {
return new SuperStream("my.super.stream", 3);
}
----
====

The `RabbitAdmin` detects this bean and will declare the exchange (`my.super.stream`) and 3 queues (partitions) - `my.super-stream-n` where `n` is `0`, `1`, `2`, bound with routing keys equal to `n`.

===== Consuming Super Streams with Single Active Consumers

TBD.

0 comments on commit d143d4b

Please sign in to comment.