Skip to content

Commit

Permalink
Ignore system topics (workaround for apache/pulsar#12727)
Browse files Browse the repository at this point in the history
  • Loading branch information
eolivelli committed Nov 10, 2021
1 parent 25e60f3 commit 4c4b6dc
Showing 1 changed file with 8 additions and 2 deletions.
Expand Up @@ -55,6 +55,9 @@
@Slf4j
public class PulsarConnectionFactory
implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory, AutoCloseable {

private static final String PENDING_ACK_STORE_SUFFIX = "__transaction_pending_ack";

private static final Set<String> clientIdentifiers = new ConcurrentSkipListSet<>();

private final Map<String, Producer<byte[]>> producers = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -880,8 +883,6 @@ public Consumer<byte[]> createConsumer(
.newConsumer()
// these properties can be overridden by the configuration
.negativeAckRedeliveryDelay(1, TimeUnit.SECONDS)
// always set this for Transactions
.isAckReceiptEnabled(sessionMode == Session.SESSION_TRANSACTED)
.loadConf(getConsumerConfiguration())
// these properties cannot be overwritten by the configuration
.subscriptionInitialPosition(initialPosition)
Expand Down Expand Up @@ -961,6 +962,11 @@ public boolean deleteSubscription(PulsarDestination destination, String name)
// required for TCK, scan for all subscriptions
List<String> allTopics = pulsarAdmin.topics().getList(systemNamespace);
for (String topic : allTopics) {
if (topic.endsWith(PENDING_ACK_STORE_SUFFIX)) {
// skip Transaction related system topics
log.info("Ignoring system topic {}", topic);
continue;
}
log.info("Scanning topic {}", topic);
List<String> subscriptions;
try {
Expand Down

0 comments on commit 4c4b6dc

Please sign in to comment.