Skip to content

Commit

Permalink
spring-projectsGH-1465: Super Stream Support in Template
Browse files Browse the repository at this point in the history
  • Loading branch information
garyrussell committed Sep 26, 2022
1 parent f54f8fb commit 182ec33
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 1 deletion.
Expand Up @@ -17,6 +17,7 @@
package org.springframework.rabbit.stream.producer;

import java.util.concurrent.CompletableFuture;
import java.util.function.Function;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
Expand Down Expand Up @@ -52,6 +53,8 @@ public class RabbitStreamTemplate implements RabbitStreamOperations, BeanNameAwa

private final String streamName;

private Function<com.rabbitmq.stream.Message, String> superStreamRouting;

private MessageConverter messageConverter = new SimpleMessageConverter();

private StreamMessageConverter streamConverter = new DefaultStreamMessageConverter();
Expand Down Expand Up @@ -80,7 +83,13 @@ public RabbitStreamTemplate(Environment environment, String streamName) {
private synchronized Producer createOrGetProducer() {
if (this.producer == null) {
ProducerBuilder builder = this.environment.producerBuilder();
builder.stream(this.streamName);
if (this.superStreamRouting == null) {
builder.stream(this.streamName);
}
else {
builder.superStream(this.streamName)
.routing(this.superStreamRouting);
}
this.producerCustomizer.accept(this.beanName, builder);
this.producer = builder.build();
if (!this.streamConverterSet) {
Expand All @@ -96,6 +105,16 @@ public synchronized void setBeanName(String name) {
this.beanName = name;
}

/**
* Add a routing function, making the stream a super stream.
* @param superStreamRouting the routing function.
* @since 3.0
*/
public void setSuperStreamRouting(Function<com.rabbitmq.stream.Message, String> superStreamRouting) {
this.superStreamRouting = superStreamRouting;
}


/**
* Set a converter for {@link #convertAndSend(Object)} operations.
* @param messageConverter the converter.
Expand Down
Expand Up @@ -22,6 +22,8 @@
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.willAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -114,4 +116,27 @@ void handleConfirm() throws InterruptedException, ExecutionException {
}
}

@Test
void superStream() {
Environment env = mock(Environment.class);
ProducerBuilder pb = mock(ProducerBuilder.class);
given(pb.superStream(any())).willReturn(pb);
given(env.producerBuilder()).willReturn(pb);
Producer producer = mock(Producer.class);
given(pb.build()).willReturn(producer);
try (RabbitStreamTemplate template = new RabbitStreamTemplate(env, "foo")) {
SimpleMessageConverter messageConverter = new SimpleMessageConverter();
template.setMessageConverter(messageConverter);
assertThat(template.messageConverter()).isSameAs(messageConverter);
StreamMessageConverter converter = mock(StreamMessageConverter.class);
given(converter.fromMessage(any())).willReturn(mock(Message.class));
template.setStreamConverter(converter);
template.setSuperStreamRouting(msg -> "bar");
template.convertAndSend("x");
verify(pb).superStream("foo");
verify(pb).routing(any());
verify(pb, never()).stream("foo");
}
}

}
36 changes: 36 additions & 0 deletions src/reference/asciidoc/stream.adoc
Expand Up @@ -180,6 +180,42 @@ SuperStream superStream() {

The `RabbitAdmin` detects this bean and will declare the exchange (`my.super.stream`) and 3 queues (partitions) - `my.super-stream-n` where `n` is `0`, `1`, `2`, bound with routing keys equal to `n`.

If you also wish to publish over AMQP to the exchange, you can provide custom routing keys:

====
[source, java]
----
@Bean
SuperStream superStream() {
return new SuperStream("my.super.stream", 3, (q, i) -> IntStream.range(0, i)
.mapToObj(j -> "rk-" + j)
.collect(Collectors.toList()));
}
----
====

The number of keys must equal the number of partitions.

===== Producing to a SuperStream

You must add a `superStreamRoutingFunction` to the `RabbitStreamTemplate`:

====
[source, java]
----
@Bean
RabbitStreamTemplate streamTemplate(Environment env) {
RabbitStreamTemplate template = new RabbitStreamTemplate(env, "stream.queue1");
template.setSuperStreamRouting(message -> {
// some logic to return a String for the client's hashing algorithm
});
return template;
}
----
====

You can also publish over AMQP, using the `RabbitTemplate`.

===== Consuming Super Streams with Single Active Consumers

Invoke the `superStream` method on the listener container to enable a single active consumer on a super stream.
Expand Down

0 comments on commit 182ec33

Please sign in to comment.