Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] Can't consume messages by using a subset of sticky key hash range #17679

Closed
1 of 2 tasks
syhily opened this issue Sep 15, 2022 · 3 comments
Closed
1 of 2 tasks

[Bug] Can't consume messages by using a subset of sticky key hash range #17679

syhily opened this issue Sep 15, 2022 · 3 comments
Assignees
Labels
type/bug The PR fixed a bug or issue reported a bug

Comments

@syhily
Copy link

syhily commented Sep 15, 2022

Search before asking

  • I searched in the issues and found nothing similar.

Version

2.10.0

Minimal reproduce step

Here are a reproduce code by adding test on KeySharedSubscriptionTest class.

@Test
public void testSpecifiedStickyKeyRanges() throws PulsarClientException, InterruptedException {
    String topic = TopicName.get("persistent", "public", "default",
            "testSpecifiedStickyKeyRanges" + UUID.randomUUID()).toString();
    String subscriptionName = "my-sub";

    String keyToRead = "someKey1";
    String keyToExclude = "someKey2";
    int numMessages = 100;
    byte[] keyBytes = keyToRead.getBytes(StandardCharsets.UTF_8);
    int hash = Murmur3_32Hash.getInstance().makeHash(keyBytes) % KeySharedPolicy.DEFAULT_HASH_RANGE_SIZE;

    // start consumer
    CountDownLatch count = new CountDownLatch(numMessages);
    Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
            .topic(topic)
            .subscriptionName(subscriptionName)
            .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
            .subscriptionType(SubscriptionType.Key_Shared)
            .keySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(Range.of(hash, hash)))
            .messageListener((c, msg) -> {
                c.acknowledgeAsync(msg).whenComplete((m, e) -> {
                    if (e != null) {
                        log.error("error", e);
                    } else {
                        count.countDown();
                    }
                });
            })
            .subscribe();

    pulsar.getExecutor().submit(() -> {
        try {
            try (Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                    .topic(topic)
                    .enableBatching(false)
                    .create();) {
                for (int i = 0; i < numMessages; i++) {
                    String value = "test" + i;
                    producer.newMessage()
                            .key(keyToRead)
                            .value(value)
                            .send();
                    producer.newMessage()
                            .key(keyToExclude)
                            .value(value)
                            .send();
                    Thread.sleep(100);
                }
            }
        } catch (Throwable t) {
            log.error("error", t);
        }
    });

    assertThat(CompletableFuture.runAsync(() -> {
        try {
            count.await();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
    })).succeedsWithin(Duration.ofMinutes(1));

    consumer.close();
}

What did you expect to see?

Consumed the sent messages with given key.

What did you see instead?

No messages can be consumed.

Anything else?

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!
@syhily syhily added the type/bug The PR fixed a bug or issue reported a bug label Sep 15, 2022
@syhily
Copy link
Author

syhily commented Sep 15, 2022

@eolivelli I got the reproduce test code passed after commenting the code you added in #14014. Can you help me check this?

        for (Entry entry : entries) {
            int stickyKeyHash = getStickyKeyHash(entry);
            Consumer c = selector.select(stickyKeyHash);
            if (c != null) {
                groupedEntries.computeIfAbsent(c, k -> new ArrayList<>()).add(entry);
                // consumerStickyKeyHashesMap.computeIfAbsent(c, k -> new HashSet<>()).add(stickyKeyHash);
            } else {
                addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), stickyKeyHash);
                entry.release();
            }
        }

@Technoboy-
Copy link
Contributor

For sticky mode, the consumer will only receive messages of hashes in the specified ranges.
In the above case, the consumer only config one hash range. But the produced messages hash are in other ranges, so the consumer is blocked.

@syhily
Copy link
Author

syhily commented Sep 16, 2022

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

No branches or pull requests

2 participants