Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

ISSUE-17679: [Bug] Can't consume messages by using a subset of sticky key hash range #4893

Open
1 of 2 tasks
sijie opened this issue Sep 15, 2022 · 0 comments
Open
1 of 2 tasks
Labels

Comments

@sijie
Copy link
Member

sijie commented Sep 15, 2022

Original Issue: apache#17679


Search before asking

  • I searched in the issues and found nothing similar.

Version

2.10.0

Minimal reproduce step

Here are a reproduce code by adding test on KeySharedSubscriptionTest class.

@Test
public void testSpecifiedStickyKeyRanges() throws PulsarClientException, InterruptedException {
    String topic = TopicName.get("persistent", "public", "default",
            "testSpecifiedStickyKeyRanges" + UUID.randomUUID()).toString();
    String subscriptionName = "my-sub";

    String keyToRead = "someKey1";
    String keyToExclude = "someKey2";
    int numMessages = 100;
    byte[] keyBytes = keyToRead.getBytes(StandardCharsets.UTF_8);
    int hash = Murmur3_32Hash.getInstance().makeHash(keyBytes) % KeySharedPolicy.DEFAULT_HASH_RANGE_SIZE;

    // start consumer
    CountDownLatch count = new CountDownLatch(numMessages);
    Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
            .topic(topic)
            .subscriptionName(subscriptionName)
            .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
            .subscriptionType(SubscriptionType.Key_Shared)
            .keySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(Range.of(hash, hash)))
            .messageListener((c, msg) -> {
                c.acknowledgeAsync(msg).whenComplete((m, e) -> {
                    if (e != null) {
                        log.error("error", e);
                    } else {
                        count.countDown();
                    }
                });
            })
            .subscribe();

    pulsar.getExecutor().submit(() -> {
        try {
            try (Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                    .topic(topic)
                    .enableBatching(false)
                    .create();) {
                for (int i = 0; i < numMessages; i++) {
                    String value = "test" + i;
                    producer.newMessage()
                            .key(keyToRead)
                            .value(value)
                            .send();
                    producer.newMessage()
                            .key(keyToExclude)
                            .value(value)
                            .send();
                    Thread.sleep(100);
                }
            }
        } catch (Throwable t) {
            log.error("error", t);
        }
    });

    assertThat(CompletableFuture.runAsync(() -> {
        try {
            count.await();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
    })).succeedsWithin(Duration.ofMinutes(1));

    consumer.close();
}

What did you expect to see?

Consumed the sent messages with given key.

What did you see instead?

No messages can be consumed.

Anything else?

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!
@sijie sijie added the type/bug label Sep 15, 2022
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Projects
None yet
Development

No branches or pull requests

1 participant