Skip to content

Commit

Permalink
test: fix to address new definition of recently joined consumers
Browse files Browse the repository at this point in the history
  • Loading branch information
equanz committed Apr 25, 2023
1 parent 8c33d71 commit 97f7621
Show file tree
Hide file tree
Showing 4 changed files with 222 additions and 36 deletions.
Expand Up @@ -3286,43 +3286,48 @@ public void testGetTtlDurationDefaultInSeconds() throws Exception {
}

@Test
public void testGetReadPositionWhenJoining() throws Exception {
final String topic = "persistent://prop-xyz/ns1/testGetReadPositionWhenJoining-" + UUID.randomUUID().toString();
public void testGetLastSentPositionsWhenJoining() throws Exception {
final String topic = "persistent://prop-xyz/ns1/testGetLastSentPositionsWhenJoining-" + UUID.randomUUID().toString();
final String subName = "my-sub";
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.enableBatching(false)
.create();
@Cleanup
Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
.topic(topic)
.consumerName("c1")
.subscriptionType(SubscriptionType.Key_Shared)
.subscriptionName(subName)
.subscribe();

final int messages = 10;
MessageIdImpl messageId = null;
for (int i = 0; i < messages; i++) {
messageId = (MessageIdImpl) producer.send(("Hello Pulsar - " + i).getBytes());
}

List<Consumer<byte[]>> consumers = new ArrayList<>();
for (int i = 0; i < 2; i++) {
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionType(SubscriptionType.Key_Shared)
.subscriptionName(subName)
.subscribe();
consumers.add(consumer);
for (int i = 0; i < messages; i++) {
assertNotNull(consumer1.receive(100, TimeUnit.MILLISECONDS));
}

@Cleanup
Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
.topic(topic)
.consumerName("c2")
.subscriptionType(SubscriptionType.Key_Shared)
.subscriptionName(subName)
.subscribe();

TopicStats stats = admin.topics().getStats(topic);
Assert.assertEquals(stats.getSubscriptions().size(), 1);
SubscriptionStats subStats = stats.getSubscriptions().get(subName);
Assert.assertNotNull(subStats);
Assert.assertEquals(subStats.getConsumers().size(), 2);
ConsumerStats consumerStats = subStats.getConsumers().get(0);
Assert.assertEquals(consumerStats.getReadPositionWhenJoining(),
PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId() + 1).toString());

for (Consumer<byte[]> consumer : consumers) {
consumer.close();
}
ConsumerStats consumer2Stats = subStats.getConsumers().stream().filter(s -> s.getConsumerName().equals(consumer2.getConsumerName())).findFirst().get();
Assert.assertEquals(consumer2Stats.getLastSentPositionsWhenJoining(),
Set.of(PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId())).toString());
}

@Test
Expand Down
Expand Up @@ -154,7 +154,9 @@ void shutdown() throws Exception {
executor.shutdownNow();

for (int i = 0; i < BROKER_COUNT; i++) {
pulsarAdmins[i].close();
if (pulsarAdmins[i] != null) {
pulsarAdmins[i].close();
}
if (pulsarServices[i] != null) {
pulsarServices[i].close();
}
Expand Down
Expand Up @@ -116,7 +116,7 @@ public void testConsumersAfterMarkDelete() throws PulsarClientException, PulsarA
TopicStats stats = admin.topics().getStats(topicName);
Assert.assertEquals(stats.getSubscriptions().size(), 1);
Assert.assertEquals(stats.getSubscriptions().entrySet().iterator().next().getValue()
.getConsumersAfterMarkDeletePosition().size(), 1);
.getRecentlyJoinedConsumers().size(), 1);

consumer1.close();
consumer2.close();
Expand Down

0 comments on commit 97f7621

Please sign in to comment.