Skip to content

Commit

Permalink
spring-projectsGH-1465: Part 1: Provision Super Streams over AMQP
Browse files Browse the repository at this point in the history
  • Loading branch information
garyrussell committed Jun 16, 2022
1 parent e25600b commit 8705749
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 0 deletions.
@@ -0,0 +1,63 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.rabbit.stream.config;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.IntStream;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Binding.DestinationType;
import org.springframework.amqp.core.Declarable;
import org.springframework.amqp.core.Declarables;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;

/**
* Create Super Stream Topology {@link Declarable}s.
*
* @author Gary Russell
* @since 3.0
*
*/
public class SuperStream extends Declarables {

/**
* Create a Super Stream with the provided parameters.
* @param name the stream name.
* @param partitions the number of partitions.
*/
public SuperStream(String name, int partitions) {
super(declarables(name, partitions));
}

private static Declarable[] declarables(String name, int partitions) {
List<Declarable> declarables = new ArrayList<>();
String[] rks = IntStream.range(0, partitions).mapToObj(String::valueOf).toArray(String[]::new);
declarables.add(new DirectExchange(name, true, false, Map.of("x-super-stream", true)));
for (int i = 0; i < partitions; i++) {
String rk = rks[i];
Queue q = new Queue(name + "-" + rk, true, false, false, Map.of("x-queue-type", "stream"));
declarables.add(q);
declarables.add(new Binding(q.getName(), DestinationType.QUEUE, name, rk,
Map.of("x-stream-partition-order", i)));
}
return declarables.toArray(new Declarable[0]);
}

}
@@ -0,0 +1,77 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.rabbit.stream.config;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.List;

import org.junit.jupiter.api.Test;

import org.springframework.amqp.core.Declarables;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.rabbit.stream.listener.AbstractIntegrationTests;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;

/**
* @author Gary Russell
* @since 3.0
*
*/
@SpringJUnitConfig
public class SuperStreamProvisioningTests extends AbstractIntegrationTests {

@Test
void provision(@Autowired Declarables declarables, @Autowired CachingConnectionFactory cf,
@Autowired RabbitAdmin admin) {

assertThat(declarables.getDeclarables()).hasSize(7);
cf.createConnection();
List<Queue> queues = declarables.getDeclarablesByType(Queue.class);
assertThat(queues).extracting(que -> que.getName()).contains("test-0", "test-1", "test-2");
queues.forEach(que -> admin.deleteQueue(que.getName()));
declarables.getDeclarablesByType(DirectExchange.class).forEach(ex -> admin.deleteExchange(ex.getName()));
}

@Configuration
public static class Config {

@Bean
CachingConnectionFactory cf() {
return new CachingConnectionFactory("localhost");
}

@Bean
RabbitAdmin admin(ConnectionFactory cf) {
return new RabbitAdmin(cf);
}

@Bean
SuperStream superStream() {
return new SuperStream("test", 3);
}

}

}

0 comments on commit 8705749

Please sign in to comment.