Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer able to receive message which is not matching the regex pattern #22529

Open
2 of 3 tasks
ragaur-tibco opened this issue Apr 18, 2024 · 9 comments
Open
2 of 3 tasks
Labels
type/bug The PR fixed a bug or issue reported a bug

Comments

@ragaur-tibco
Copy link

ragaur-tibco commented Apr 18, 2024

Search before asking

  • I searched in the issues and found nothing similar.

Read release policy

  • I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.

Version

3.2.2

Minimal reproduce step

Steps to reproduce:

  • send messages to multiple topics using producer
  • consumer provide the regex pattern topic name: non-persistent://my-tenant/new-name.*
  • provide the subscriptionTopicsMode = AllTopics

What did you expect to see?

  • Consumer should be able to consume messages from the topic starts with non-persistent://my-tenant/new-name

What did you see instead?

  • Consumer was able to consume messages from the topic starts with persistent as well
package Pulsar;

import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.*;

import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;

public class AllTopicsConsumerExample {
    private static PulsarAdmin adm;
    private static final String SERVICE_URL = "pulsar://localhost:6650";
    private static final String NAMESPACE = "my-tenant/new-name";
    private static final String SUBSCRIPTION_NAME = "your-subscription";

    public static void main(String[] args) throws PulsarClientException {
        PulsarClient pulsarClient = PulsarClient.builder()
                .serviceUrl(SERVICE_URL)
                .build();
        
        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                .topic("non-persistent://my-tenant/new-name/topic-non-1")
                .enableBatching(false).create();
        producer.send("=========from topic non-persistent://my-tenant/new-name/topic-non-1 ");
        System.out.println("new producer");
        Producer<String> producer1 = pulsarClient.newProducer(Schema.STRING)
                .topic("persistent://my-tenant/new-name/topic-pers-1")
                .enableBatching(false).create();
        producer1.send("=======from topic persistent://my-tenant/new-name/topic-pers-1 ");
        
        Pattern allTopicsPattern = Pattern.compile("non-persistent://my-tenant/new-name/.*");

        Consumer<byte[]> allTopicsConsumer = pulsarClient.newConsumer()
                .topicsPattern(allTopicsPattern)
                .subscriptionName(SUBSCRIPTION_NAME).subscriptionTopicsMode(RegexSubscriptionMode.valueOf("AllTopics"))
                .subscribe();

        while (true) {
            Message<byte[]> message = allTopicsConsumer.receive();
            System.out.println("Received message from topic " + message.getTopicName()
                    + ": " + new String(message.getValue()));
            allTopicsConsumer.acknowledge(message);
        }
    }
}

image

Anything else?

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!
@ragaur-tibco ragaur-tibco added the type/bug The PR fixed a bug or issue reported a bug label Apr 18, 2024
@visortelle
Copy link
Member

visortelle commented Apr 19, 2024

According to #19798, there is no need to specify persistent:// or non-persistent:// prefix in the topicsPattern

After discuss, we don't need this PIP, we just need to:

  • Add the warn log when the user-configured pattern contains a domain(‘persistent://public/default/topic.*')
  • Enhancement of the documentation, patternTopics cannot contain domains.

@ragaur-tibco
Copy link
Author

ragaur-tibco commented Apr 19, 2024

ok Thank you @visortelle
But for non-persistent I was not able to do multi topic subscription and as you said "Once in about ~5 runs I see some messages from non-persistent topic"

means for non-persistent it is not working properly for non-persistent topic.

@visortelle
Copy link
Member

@ragaur-tibco I completely agree and don't argue with that.

@visortelle
Copy link
Member

@ragaur-tibco the current behavior is correct. See my comment here: #22527 (comment)

@visortelle
Copy link
Member

visortelle commented Apr 19, 2024

@ragaur-tibco I fixed your code.

When using non-persistent delivery, killing a Pulsar broker or disconnecting a subscriber to a topic means that all in-transit messages are lost on that (non-persistent) topic, meaning that clients may see message loss.

Source: https://pulsar.apache.org/docs/next/cookbooks-non-persistent/#overview

In your code, the subscriber was created after messages were sent.

Code:

package b;

import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.*;

import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;

public class App {
    private static final String SERVICE_URL = "pulsar://localhost:6650";
    private static final String SUBSCRIPTION_NAME = "your-subscription";

    public static void main(String[] args) throws Exception {
        PulsarClient pulsarClient = PulsarClient.builder()
                .serviceUrl(SERVICE_URL)
                .build();

        Producer<String> producerA = pulsarClient.newProducer(Schema.STRING)
                .topic("non-persistent://my-tenant/new-name/topic-non-1")
                .enableBatching(false).create();

        Producer<String> producerB = pulsarClient.newProducer(Schema.STRING)
                .topic("persistent://my-tenant/new-name/topic-pers-1")
                .enableBatching(false).create();

        Pattern allTopicsPattern = Pattern.compile("my-tenant/new-name/.*");

        Consumer<byte[]> allTopicsConsumer = pulsarClient.newConsumer()
                .topicsPattern(allTopicsPattern)
                .subscriptionName(SUBSCRIPTION_NAME)
                .subscriptionTopicsMode(RegexSubscriptionMode.AllTopics)
                .subscribe();

        producerA.send("=========from topic non-persistent://my-tenant/new-name/topic-non-1 ");
        producerB.send("=========from topic persistent://my-tenant/new-name/topic-pers-1 ");

        while (true) {
            Message<byte[]> message = allTopicsConsumer.receive();
            System.out.println("Received message from topic " + message.getTopicName()
                    + ": " + new String(message.getValue()));
            allTopicsConsumer.acknowledge(message);
        }
    }
}

Logs:

  a mvn exec:java                                                                                                                                                   <aws:aws-superadmin> <region:us-east-2>
[INFO] Scanning for projects...
[INFO] 
[INFO] --------------------------------< c:a >---------------------------------
[INFO] Building a 1.0-SNAPSHOT
[INFO] --------------------------------[ jar ]---------------------------------
[INFO] 
[INFO] --- exec-maven-plugin:3.2.0:java (default-cli) @ a ---
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Received message from topic non-persistent://my-tenant/new-name/topic-non-1: =========from topic non-persistent://my-tenant/new-name/topic-non-1 
Received message from topic persistent://my-tenant/new-name/topic-pers-1: =========from topic persistent://my-tenant/new-name/topic-pers-1 

@visortelle
Copy link
Member

@ragaur-tibco please check and let me know if it resolves the issue.

@ragaur-tibco
Copy link
Author

ragaur-tibco commented Apr 21, 2024

Hi @visortelle

I tried creating subscriber before sending the messages

package Pulsar;

import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.*;

import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;

public class AllTopicsConsumerExample {
	private static PulsarAdmin adm;
    private static final String SERVICE_URL = "pulsar://localhost:6650";
    private static final String NAMESPACE = "my-tenant/new-name";
    private static final String SUBSCRIPTION_NAME = "your-subscription-1";

    public static void main(String[] args) throws PulsarClientException {
        PulsarClient pulsarClient = PulsarClient.builder()
                .serviceUrl(SERVICE_URL)
                .build();
        
        // Pattern allTopicsPattern = Pattern.compile("tenant-1/name/topic.*");

        Consumer<byte[]> allTopicsConsumer = pulsarClient.newConsumer()
                .topicsPattern("tenant-1/name/topic.*").subscriptionType(SubscriptionType.Shared)
                .subscriptionName(SUBSCRIPTION_NAME).subscriptionTopicsMode(RegexSubscriptionMode.AllTopics)
                .subscribe();
        
        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                .topic("non-persistent://tenant-1/name/topic-1")
                .enableBatching(false).create();
       
        System.out.println("new producer");
        producer.send("=========from topic non-persistent://tenant-1/name/topic-1 ");
    
        Producer<String> producer1 = pulsarClient.newProducer(Schema.STRING)
                .topic("persistent://tenant-1/name/topic-8")
                .enableBatching(false).create();
        producer1.send("=======from topic  persistent://tenant-1/name/topic-8 ");
        while (true) {
            Message<byte[]> message = allTopicsConsumer.receive();
            System.out.println("Received message from topic " + message.getTopicName()
                    + ": " + new String(message.getValue()));
            allTopicsConsumer.acknowledge(message);
        }
    }
}

response: only getting the response from persistent topic but not from non-persistent
image

@visortelle
Copy link
Member

visortelle commented Apr 21, 2024

Interesting.

My observation is that after we create a pattern consumer, for an existing non-persistent topic it doesn't "immediately" create the underlying subscription and consumers if there are no connected producers at this moment. But it will eventually be created after a short time.

TIP: you can display the list of the underlying consumers by casting your consumer to PatternMultiTopicsConsumerImpl and calling the .getConsumers() method.

List<ConsumerImpl<byte[]>> consumers = 
    ((PatternMultiTopicsConsumerImpl<byte[]>) allTopicsConsumer).getConsumers();

for (ConsumerImpl<byte[]> consumer : consumers) {
    System.out.println("consumer: " + consumer.getTopic());
}

If you modify your code to send a lot of messages asynchronously, you'll start to receive them after a short time.

for (int i = 0; i < 100000; i++) {
    producer.sendAsync("=========from topic non-persistent://tenant-1/name/topic-1 " + i);
}
...
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10732
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10733
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10734
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10735
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10736
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10737
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10738
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10739
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10740
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10741
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10742
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10743
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10744
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10745
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10746

I don't know if it can qualify as a bug. cc @lhotari

@ragaur-tibco do you have a real use-case in mind? I wouldn't rely on non-persistent topics if losing a non-significant amount of messages could affect my application.

@visortelle
Copy link
Member

visortelle commented Apr 21, 2024

Here is the reason. Before adding a topic to the topics list, it checks that the topic isActive(), which checks for !subscriptions.isEmpty() || hasLocalProducers();.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

No branches or pull requests

2 participants