From 4c4b6dc8aa554fa93529bdbf4b180475689465e2 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Wed, 10 Nov 2021 14:48:40 +0100 Subject: [PATCH] Ignore system topics (workaround for https://github.com/apache/pulsar/issues/12727) --- .../oss/pulsar/jms/PulsarConnectionFactory.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionFactory.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionFactory.java index 3249ebe4..5f71e562 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionFactory.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionFactory.java @@ -55,6 +55,9 @@ @Slf4j public class PulsarConnectionFactory implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory, AutoCloseable { + + private static final String PENDING_ACK_STORE_SUFFIX = "__transaction_pending_ack"; + private static final Set clientIdentifiers = new ConcurrentSkipListSet<>(); private final Map> producers = new ConcurrentHashMap<>(); @@ -880,8 +883,6 @@ public Consumer createConsumer( .newConsumer() // these properties can be overridden by the configuration .negativeAckRedeliveryDelay(1, TimeUnit.SECONDS) - // always set this for Transactions - .isAckReceiptEnabled(sessionMode == Session.SESSION_TRANSACTED) .loadConf(getConsumerConfiguration()) // these properties cannot be overwritten by the configuration .subscriptionInitialPosition(initialPosition) @@ -961,6 +962,11 @@ public boolean deleteSubscription(PulsarDestination destination, String name) // required for TCK, scan for all subscriptions List allTopics = pulsarAdmin.topics().getList(systemNamespace); for (String topic : allTopics) { + if (topic.endsWith(PENDING_ACK_STORE_SUFFIX)) { + // skip Transaction related system topics + log.info("Ignoring system topic {}", topic); + continue; + } log.info("Scanning topic {}", topic); List subscriptions; try {