Skip to content

Commit

Permalink
Cloud Stream Dead Letter Topic support (#358)
Browse files Browse the repository at this point in the history
* Enable Dead Letter Queue on subscription created by Spring Cloud Stream
* Adds dead letter topic sample and integration test
  • Loading branch information
Travis Tomsu committed Mar 15, 2021
1 parent 678bf1b commit 939df3e
Show file tree
Hide file tree
Showing 18 changed files with 697 additions and 63 deletions.
Expand Up @@ -34,6 +34,8 @@ public class PubSubConsumerProperties extends PubSubCommonProperties {

private String subscriptionName = null;

private DeadLetterPolicy deadLetterPolicy = null;

public AckMode getAckMode() {
return ackMode;
}
Expand All @@ -57,4 +59,34 @@ public String getSubscriptionName() {
public void setSubscriptionName(String subscriptionName) {
this.subscriptionName = subscriptionName;
}

public DeadLetterPolicy getDeadLetterPolicy() {
return deadLetterPolicy;
}

public void setDeadLetterPolicy(DeadLetterPolicy deadLetterPolicy) {
this.deadLetterPolicy = deadLetterPolicy;
}

public static class DeadLetterPolicy {
private String deadLetterTopic;

private Integer maxDeliveryAttempts;

public String getDeadLetterTopic() {
return deadLetterTopic;
}

public void setDeadLetterTopic(String deadLetterTopic) {
this.deadLetterTopic = deadLetterTopic;
}

public Integer getMaxDeliveryAttempts() {
return maxDeliveryAttempts;
}

public void setMaxDeliveryAttempts(Integer maxDeliveryAttempts) {
this.maxDeliveryAttempts = maxDeliveryAttempts;
}
}
}
Expand Up @@ -17,14 +17,14 @@
package com.google.cloud.spring.stream.binder.pubsub.provisioning;

import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;

import com.google.api.gax.rpc.AlreadyExistsException;
import com.google.cloud.spring.pubsub.PubSubAdmin;
import com.google.cloud.spring.stream.binder.pubsub.properties.PubSubConsumerProperties;
import com.google.cloud.spring.stream.binder.pubsub.properties.PubSubProducerProperties;
import com.google.pubsub.v1.DeadLetterPolicy;
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
Expand All @@ -37,6 +37,7 @@
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.cloud.stream.provisioning.ProvisioningException;
import org.springframework.cloud.stream.provisioning.ProvisioningProvider;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

/**
Expand Down Expand Up @@ -71,13 +72,17 @@ public ProducerDestination provisionProducerDestination(String topic,
public ConsumerDestination provisionConsumerDestination(String topicName, String group,
ExtendedConsumerProperties<PubSubConsumerProperties> properties) {

// topicName may be either the short or fully-qualified version.
String topicShortName = TopicName.isParsableFrom(topicName) ? TopicName.parse(topicName).getTopic() : topicName;
Optional<Topic> topic = ensureTopicExists(topicName, properties.getExtension().isAutoCreateResources());

String subscriptionName;
Subscription subscription;

String customName = properties.getExtension().getSubscriptionName();
boolean autoCreate = properties.getExtension().isAutoCreateResources();
PubSubConsumerProperties.DeadLetterPolicy deadLetterPolicy = properties.getExtension().getDeadLetterPolicy();

// topicName may be either the short or fully-qualified version.
String topicShortName = TopicName.isParsableFrom(topicName) ? TopicName.parse(topicName).getTopic() : topicName;
Topic topic = ensureTopicExists(topicName, autoCreate);

if (StringUtils.hasText(customName)) {
if (StringUtils.hasText(group)) {
LOGGER.warn("Either subscriptionName or group can be specified, but not both. " +
Expand All @@ -93,19 +98,19 @@ else if (StringUtils.hasText(group)) {
else {
// Generate anonymous random group since one wasn't provided
subscriptionName = "anonymous." + topicShortName + "." + UUID.randomUUID().toString();
subscription = this.pubSubAdmin.createSubscription(subscriptionName, topicName);
subscription = this.createSubscription(subscriptionName, topicName, deadLetterPolicy, autoCreate);
this.anonymousGroupSubscriptionNames.add(subscriptionName);
}

if (subscription == null) {
if (properties.getExtension().isAutoCreateResources()) {
this.pubSubAdmin.createSubscription(subscriptionName, topicName);
if (autoCreate) {
this.createSubscription(subscriptionName, topicName, deadLetterPolicy, autoCreate);
}
else {
throw new ProvisioningException("Non-existing '" + subscriptionName + "' subscription.");
}
}
else if (topic.isPresent() && !subscription.getTopic().equals(topic.get().getName())) {
else if (!subscription.getTopic().equals(topic.getName())) {
throw new ProvisioningException(
"Existing '" + subscriptionName + "' subscription is for a different topic '"
+ subscription.getTopic() + "'.");
Expand All @@ -124,23 +129,47 @@ public void afterUnbindConsumer(ConsumerDestination destination) {
}
}

private Optional<Topic> ensureTopicExists(String topicName, boolean autoCreate) {
Topic ensureTopicExists(String topicName, boolean autoCreate) {
Topic topic = this.pubSubAdmin.getTopic(topicName);
if (topic == null) {
if (autoCreate) {
try {
topic = this.pubSubAdmin.createTopic(topicName);
}
catch (AlreadyExistsException alreadyExistsException) {
// Ignore concurrent topic creation - we're good as long as topic was created and exists
LOGGER.info("Failed to auto-create topic '" + topicName + "' because it already exists.");
}
if (topic != null) {
return topic;
}

if (autoCreate) {
try {
return this.pubSubAdmin.createTopic(topicName);
}
else {
throw new ProvisioningException("Non-existing '" + topicName + "' topic.");
catch (AlreadyExistsException alreadyExistsException) {
// Sometimes 2+ instances of this application will race to create the topic, so this ensures we retry
// in the non-winning instances. In the rare case it fails, we throw an exception.
return ensureTopicExists(topicName, false);
}
}
throw new ProvisioningException("Non-existing '" + topicName + "' topic.");
}

private Subscription createSubscription(String subscriptionName, String topicName,
PubSubConsumerProperties.DeadLetterPolicy deadLetterPolicy,
boolean autoCreate) {
Subscription.Builder builder = Subscription.newBuilder()
.setName(subscriptionName)
.setTopic(topicName);

if (deadLetterPolicy != null) {
String dlTopicName = deadLetterPolicy.getDeadLetterTopic();
Assert.hasText(dlTopicName, "Dead letter policy cannot have null or empty topic");

Topic dlTopic = ensureTopicExists(dlTopicName, autoCreate);

DeadLetterPolicy.Builder dlpBuilder = DeadLetterPolicy.newBuilder().setDeadLetterTopic(dlTopic.getName());

Integer maxAttempts = deadLetterPolicy.getMaxDeliveryAttempts();
if (maxAttempts != null) {
dlpBuilder.setMaxDeliveryAttempts(maxAttempts);
}
builder.setDeadLetterPolicy(dlpBuilder);
}

return Optional.ofNullable(topic);
return this.pubSubAdmin.createSubscription(builder);
}
}
Expand Up @@ -18,24 +18,25 @@

import com.google.api.gax.rpc.AlreadyExistsException;
import com.google.cloud.spring.pubsub.PubSubAdmin;
import com.google.cloud.spring.pubsub.support.PubSubSubscriptionUtils;
import com.google.cloud.spring.pubsub.support.PubSubTopicUtils;
import com.google.cloud.spring.stream.binder.pubsub.properties.PubSubConsumerProperties;
import com.google.pubsub.v1.DeadLetterPolicy;
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.Topic;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.provisioning.ProvisioningException;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.matches;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -64,23 +65,16 @@ public class PubSubChannelProvisionerTests {
// class under test
PubSubChannelProvisioner pubSubChannelProvisioner;

/**
* Used to check exception messages and types.
*/
@Rule
public ExpectedException expectedEx = ExpectedException.none();

@Before
public void setup() {
when(this.pubSubAdminMock.getSubscription(any())).thenReturn(null);
doAnswer(invocation ->
Subscription.newBuilder()
.setName("projects/test-project/subscriptions/" + invocation.getArgument(0))
.setTopic(invocation.getArgument(1, String.class).startsWith("projects/") ?
invocation.getArgument(1) :
"projects/test-project/topics/" + invocation.getArgument(1))
.build()
).when(this.pubSubAdminMock).createSubscription(any(), any());
doAnswer(invocation -> {
Subscription.Builder arg = invocation.getArgument(0, Subscription.Builder.class);
return Subscription.newBuilder()
.setName(PubSubSubscriptionUtils.toProjectSubscriptionName(arg.getName(), "test-project").toString())
.setTopic(PubSubTopicUtils.toTopicName(arg.getTopic(), "test-project").toString())
.build();
}).when(this.pubSubAdminMock).createSubscription(any());
doAnswer(invocation ->
Topic.newBuilder().setName("projects/test-project/topics/" + invocation.getArgument(0)).build()
).when(this.pubSubAdminMock).getTopic(any());
Expand All @@ -97,7 +91,10 @@ public void testProvisionConsumerDestination_specifiedGroup() {

assertThat(result.getName()).isEqualTo("topic_A.group_A");

verify(this.pubSubAdminMock).createSubscription("topic_A.group_A", "topic_A");
ArgumentCaptor<Subscription.Builder> argCaptor = ArgumentCaptor.forClass(Subscription.Builder.class);
verify(this.pubSubAdminMock).createSubscription(argCaptor.capture());
assertThat(argCaptor.getValue().getName()).isEqualTo("topic_A.group_A");
assertThat(argCaptor.getValue().getTopic()).isEqualTo("topic_A");
}

@Test
Expand All @@ -111,7 +108,10 @@ public void testProvisionConsumerDestination_specifiedGroupTopicInDifferentProje

assertThat(result.getName()).isEqualTo("topic_A.group_A");

verify(this.pubSubAdminMock).createSubscription("topic_A.group_A", "projects/differentProject/topics/topic_A");
ArgumentCaptor<Subscription.Builder> argCaptor = ArgumentCaptor.forClass(Subscription.Builder.class);
verify(this.pubSubAdminMock).createSubscription(argCaptor.capture());
assertThat(argCaptor.getValue().getName()).isEqualTo("topic_A.group_A");
assertThat(argCaptor.getValue().getTopic()).isEqualTo("projects/differentProject/topics/topic_A");
}

@Test
Expand All @@ -127,37 +127,35 @@ public void testProvisionConsumerDestination_customSubscription() {

@Test
public void testProvisionConsumerDestination_noTopicException() {
this.expectedEx.expect(ProvisioningException.class);
this.expectedEx.expectMessage("Non-existing 'topic_A' topic.");

when(this.pubSubConsumerProperties.isAutoCreateResources()).thenReturn(false);
when(this.pubSubAdminMock.getTopic("topic_A")).thenReturn(null);

PubSubConsumerDestination result = (PubSubConsumerDestination) this.pubSubChannelProvisioner
.provisionConsumerDestination("topic_A", "group_A", this.properties);
assertThatExceptionOfType(ProvisioningException.class)
.isThrownBy(() -> this.pubSubChannelProvisioner
.provisionConsumerDestination("topic_A", "group_A", this.properties))
.withMessage("Non-existing 'topic_A' topic.");
}

@Test
public void testProvisionConsumerDestination_noSubscriptionException() {
this.expectedEx.expect(ProvisioningException.class);
this.expectedEx.expectMessage("Non-existing 'topic_A.group_A' subscription.");

when(this.pubSubConsumerProperties.isAutoCreateResources()).thenReturn(false);

PubSubConsumerDestination result = (PubSubConsumerDestination) this.pubSubChannelProvisioner
.provisionConsumerDestination("topic_A", "group_A", this.properties);
assertThatExceptionOfType(ProvisioningException.class)
.isThrownBy(() -> this.pubSubChannelProvisioner
.provisionConsumerDestination("topic_A", "group_A", this.properties))
.withMessage("Non-existing 'topic_A.group_A' subscription.");
}

@Test
public void testProvisionConsumerDestination_wrongTopicException() {
this.expectedEx.expect(ProvisioningException.class);
this.expectedEx.expectMessage("Existing 'topic_A.group_A' subscription is for a different topic 'topic_B'.");

when(this.pubSubConsumerProperties.isAutoCreateResources()).thenReturn(false);
when(this.pubSubAdminMock.getSubscription("topic_A.group_A")).thenReturn(Subscription.newBuilder().setTopic("topic_B").build());
when(this.pubSubAdminMock.getSubscription("topic_A.group_A")).thenReturn(
Subscription.newBuilder().setTopic("topic_B").build());

PubSubConsumerDestination result = (PubSubConsumerDestination) this.pubSubChannelProvisioner
.provisionConsumerDestination("topic_A", "group_A", this.properties);
assertThatExceptionOfType(ProvisioningException.class)
.isThrownBy(() -> this.pubSubChannelProvisioner
.provisionConsumerDestination("topic_A", "group_A", this.properties))
.withMessage("Existing 'topic_A.group_A' subscription is for a different topic 'topic_B'.");
}

@Test
Expand All @@ -172,7 +170,35 @@ public void testProvisionConsumerDestination_anonymousGroup() {

assertThat(result.getName()).matches(subscriptionNameRegex);

verify(this.pubSubAdminMock).createSubscription(matches(subscriptionNameRegex), eq("topic_A"));
ArgumentCaptor<Subscription.Builder> argCaptor = ArgumentCaptor.forClass(Subscription.Builder.class);
verify(this.pubSubAdminMock).createSubscription(argCaptor.capture());
assertThat(argCaptor.getValue().getName()).matches(subscriptionNameRegex);
assertThat(argCaptor.getValue().getTopic()).isEqualTo("topic_A");
}

@Test
public void testProvisionConsumerDestination_deadLetterQueue() {
PubSubConsumerProperties.DeadLetterPolicy dlp = new PubSubConsumerProperties.DeadLetterPolicy();
dlp.setDeadLetterTopic("deadLetterTopic");
dlp.setMaxDeliveryAttempts(12);
when(this.pubSubConsumerProperties.getDeadLetterPolicy()).thenReturn(dlp);

when(this.pubSubAdminMock.getTopic("deadLetterTopic")).thenReturn(null);
when(this.pubSubAdminMock.createTopic("deadLetterTopic"))
.thenReturn(Topic.newBuilder().setName("projects/test-project/topics/deadLetterTopic").build());

this.pubSubChannelProvisioner.provisionConsumerDestination("topic_A", "group_A", this.properties);

ArgumentCaptor<Subscription.Builder> argCaptor = ArgumentCaptor.forClass(Subscription.Builder.class);
verify(this.pubSubAdminMock).createSubscription(argCaptor.capture());
Subscription.Builder sb = argCaptor.getValue();
assertThat(sb.getName()).isEqualTo("topic_A.group_A");
assertThat(sb.getTopic()).isEqualTo("topic_A");
assertThat(sb.getDeadLetterPolicy()).isNotNull();
DeadLetterPolicy policy = sb.getDeadLetterPolicy();
assertThat(policy.getDeadLetterTopic()).isEqualTo("projects/test-project/topics/deadLetterTopic");
assertThat(policy.getMaxDeliveryAttempts()).isEqualTo(12);

}

@Test
Expand Down Expand Up @@ -209,10 +235,22 @@ public void testAfterUnbindConsumer_nonAnonymous() {
@Test
public void testProvisionConsumerDestination_concurrentTopicCreation() {
when(this.pubSubAdminMock.createTopic(any())).thenThrow(AlreadyExistsException.class);
when(this.pubSubAdminMock.getTopic("already_existing_topic")).thenReturn(null);
when(this.pubSubAdminMock.getTopic("already_existing_topic"))
.thenReturn(null)
.thenReturn(Topic.newBuilder().setName("already_existing_topic").build());

// Ensure no exceptions occur if topic already exists on create call
assertThat(this.pubSubChannelProvisioner
.provisionConsumerDestination("already_existing_topic", "group1", this.properties)).isNotNull();
}

@Test
public void testProvisionConsumerDestination_recursiveExistCalls() {
when(this.pubSubAdminMock.getTopic("new_topic")).thenReturn(null);
when(this.pubSubAdminMock.createTopic(any())).thenThrow(AlreadyExistsException.class);

// Ensure no infinite loop on recursive call
assertThatExceptionOfType(ProvisioningException.class)
.isThrownBy(() -> this.pubSubChannelProvisioner.ensureTopicExists("new_topic", true));
}
}
Expand Up @@ -256,7 +256,8 @@ public Subscription createSubscription(String subscriptionName, String topicName
* Create a new subscription on Google Cloud Pub/Sub.
*
* @param builder a Subscription.Builder straight from the client API library that exposes all available knobs and
* levers
* levers. The name and topic fields will be expanded to fully qualified names (i.e.
* "projects/my-project/topic/my-topic") if they are not already.
* @return the created subscription
*/
public Subscription createSubscription(Subscription.Builder builder) {
Expand Down
Expand Up @@ -118,7 +118,6 @@ protected void doStop() {
super.doStop();
}

@SuppressWarnings("deprecation")
private void consumeMessage(ConvertedBasicAcknowledgeablePubsubMessage<?> message) {
Map<String, Object> messageHeaders =
this.headerMapper.toHeaders(message.getPubsubMessage().getAttributesMap());
Expand Down

0 comments on commit 939df3e

Please sign in to comment.