Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState #22571

Closed
3 tasks done
shibd opened this issue Apr 24, 2024 · 0 comments
Closed
3 tasks done
Assignees
Labels
type/bug The PR fixed a bug or issue reported a bug

Comments

@shibd
Copy link
Member

shibd commented Apr 24, 2024

Search before asking

  • I searched in the issues and found nothing similar.

Read release policy

  • I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.

Version

  • master
  • 3.2.x
  • 3.1.x
  • 3.0.x.
  • 2.11.x

Minimal reproduce step

In geo-replication case. Let's say there are two clusters: r1, r2, will replicator topic:my-topic from r1 and r2.

The consumer1 subscribes to the topic my-topic of r1 and enables replicateSubscriptionState. After the subscription state sync to r2: my-topic, create a reader read the message from r2:my-topic will stuck on readNext.

Please copy this test to ReplicatorSubscriptionTest to run it.

    @Test
    public void testReplicatedSubscriptionAcrossTwoRegionsGetLastMessage() throws Exception {
        String namespace = BrokerTestUtil.newUniqueName("pulsar/replicatedsubscriptionlastmessage");
        String topicName = "persistent://" + namespace + "/mytopic";
        String subscriptionName = "cluster-subscription";
        // this setting can be used to manually run the test with subscription replication disabled
        // it shows that subscription replication has no impact in behavior for this test case
        boolean replicateSubscriptionState = true;

        admin1.namespaces().createNamespace(namespace);
        admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2"));

        @Cleanup
        PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString())
                .statsInterval(0, TimeUnit.SECONDS)
                .build();

        // create subscription in r1
        createReplicatedSubscription(client1, topicName, subscriptionName, replicateSubscriptionState);

        @Cleanup
        PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString())
                .statsInterval(0, TimeUnit.SECONDS)
                .build();

        // create subscription in r2
        createReplicatedSubscription(client2, topicName, subscriptionName, replicateSubscriptionState);

        Set<String> sentMessages = new LinkedHashSet<>();

        // send messages in r1
        @Cleanup
        Producer<byte[]> producer = client1.newProducer().topic(topicName)
                .enableBatching(false)
                .messageRoutingMode(MessageRoutingMode.SinglePartition)
                .create();
        int numMessages = 6;
        for (int i = 0; i < numMessages; i++) {
            String body = "message" + i;
            producer.send(body.getBytes(StandardCharsets.UTF_8));
            sentMessages.add(body);
        }
        producer.close();


        // consume 3 messages in r1
        Set<String> receivedMessages = new LinkedHashSet<>();
        try (Consumer<byte[]> consumer1 = client1.newConsumer()
                .topic(topicName)
                .subscriptionName(subscriptionName)
                .replicateSubscriptionState(replicateSubscriptionState)
                .subscribe()) {
            readMessages(consumer1, receivedMessages, 3, false);
        }

        // wait for subscription to be replicated
        Thread.sleep(2 * config1.getReplicatedSubscriptionsSnapshotFrequencyMillis());

        // create a reader in r2
        Reader<byte[]> reader = client2.newReader().topic(topicName)
                .subscriptionName("new-sub")
                .startMessageId(MessageId.earliest)
                .create();
        int readNum = 0;
        while (reader.hasMessageAvailable()) {
            Message<byte[]> message = reader.readNext(10, TimeUnit.SECONDS);
            System.out.println("Receive message: " + new String(message.getValue()) + " msgId: " + message.getMessageId());
            assertNotNull(message);
            readNum++;
        }
        assertEquals(readNum, numMessages);
    }

What did you expect to see?

Test can passed.

What did you see instead?

Test will stuck.

Anything else?

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!
@shibd shibd added the type/bug The PR fixed a bug or issue reported a bug label Apr 24, 2024
@shibd shibd self-assigned this Apr 24, 2024
@shibd shibd closed this as completed Apr 30, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

No branches or pull requests

1 participant