From feb4ff19e097a9d8f13b093e8fb25dc12c31227b Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Tue, 13 Jul 2021 16:56:01 +0200 Subject: [PATCH] Fixed retention of keys in compaction (#11287) ### Motivation This change fixes few issues in the compaction mechanism, the * When a reader is created, reading from "earliest" message, it should read the compacted data and then continue from the next message. * When the compaction consumer starts, it shouldn't seek to the beginning. This causes 2 issues: * Rescanning of the topic each time the compaction runs * Keys that are being dropped from the topic are also getting dropped from the compacted view, while in fact they should be there until explicitly deleted (with an empty message for a key). The main source of the problem is that when creating a cursor on "earliest" message, the cursor gets automatically adjusted on the earliest message available to read. This confuses the check for the read-compacted because it may think the reader/consumer is already ahead of the compaction horizon. ### Modifications Introduced a "isFirstRead" flag to make sure we double check the start message id and use `MessageId.earliest` instead of the earliest available message to read on the topic. After the first read, the positioning will be fine. --- ...bstractDispatcherSingleActiveConsumer.java | 6 + .../pulsar/broker/service/Consumer.java | 12 +- .../nonpersistent/NonPersistentTopic.java | 2 +- ...sistentDispatcherSingleActiveConsumer.java | 4 +- ...reamingDispatcherSingleActiveConsumer.java | 5 +- .../service/persistent/PersistentTopic.java | 2 +- .../apache/pulsar/client/api/RawReader.java | 2 +- .../pulsar/compaction/CompactedTopic.java | 8 +- .../pulsar/compaction/CompactedTopicImpl.java | 29 ++- ...sistentDispatcherFailoverConsumerTest.java | 19 +- .../broker/service/PersistentTopicTest.java | 43 ++-- .../compaction/CompactionRetentionTest.java | 229 ++++++++++++++++++ .../apache/pulsar/io/PulsarSinkE2ETest.java | 28 ++- 13 files changed, 328 insertions(+), 61 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java index e73daaa4210d4..690a5984b4828 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java @@ -55,6 +55,8 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas AtomicIntegerFieldUpdater.newUpdater(AbstractDispatcherSingleActiveConsumer.class, "isClosed"); private volatile int isClosed = FALSE; + protected boolean isFirstRead = true; + public AbstractDispatcherSingleActiveConsumer(SubType subscriptionType, int partitionIndex, String topicName, Subscription subscription, ServiceConfiguration serviceConfig) { @@ -159,6 +161,10 @@ public synchronized void addConsumer(Consumer consumer) throws BrokerServiceExce isKeyHashRangeFiltered = false; } + if (consumers.isEmpty()) { + isFirstRead = true; + } + consumers.add(consumer); if (!pickAndScheduleActiveConsumer()) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index bef85174b36c1..1e8b2e561042c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -41,6 +41,7 @@ import org.apache.commons.lang3.tuple.MutablePair; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.common.api.proto.CommandAck; import org.apache.pulsar.common.api.proto.CommandAck.AckType; @@ -123,12 +124,13 @@ public class Consumer { private boolean preciseDispatcherFlowControl; private PositionImpl readPositionWhenJoining; private final String clientAddress; // IP address only, no port number included + private final MessageId startMessageId; public Consumer(Subscription subscription, SubType subType, String topicName, long consumerId, int priorityLevel, String consumerName, int maxUnackedMessages, TransportCnx cnx, String appId, Map metadata, boolean readCompacted, InitialPosition subscriptionInitialPosition, - KeySharedMeta keySharedMeta) { + KeySharedMeta keySharedMeta, MessageId startMessageId) { this.subscription = subscription; this.subType = subType; @@ -148,6 +150,10 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo this.bytesOutCounter = new LongAdder(); this.msgOutCounter = new LongAdder(); this.appId = appId; + + // Ensure we start from compacted view + this.startMessageId = (readCompacted && startMessageId == null) ? MessageId.earliest : startMessageId; + this.preciseDispatcherFlowControl = cnx.isPreciseDispatcherFlowControl(); PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.set(this, 0); MESSAGE_PERMITS_UPDATER.set(this, 0); @@ -835,5 +841,9 @@ public String getClientAddress() { return clientAddress; } + public MessageId getStartMessageId() { + return startMessageId; + } + private static final Logger log = LoggerFactory.getLogger(Consumer.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 9207ab84d9418..2751d1e2a0196 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -271,7 +271,7 @@ public CompletableFuture subscribe(final TransportCnx cnx, String subs NonPersistentSubscription subscription = subscriptions.computeIfAbsent(subscriptionName, name -> new NonPersistentSubscription(this, subscriptionName)); Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName, 0, - cnx, cnx.getAuthRole(), metadata, readCompacted, initialPosition, keySharedMeta); + cnx, cnx.getAuthRole(), metadata, readCompacted, initialPosition, keySharedMeta, MessageId.latest); addConsumerToSubscription(subscription, consumer).thenRun(() -> { if (!cnx.isActive()) { try { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index 2bd94fff51c50..4bae5b02a40f0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -153,6 +153,7 @@ public synchronized void internalReadEntriesComplete(final List entries, } havePendingRead = false; + isFirstRead = false; if (readBatchSize < serviceConfig.getDispatcherMaxReadBatchSize()) { int newReadBatchSize = Math.min(readBatchSize * 2, serviceConfig.getDispatcherMaxReadBatchSize()); @@ -338,7 +339,8 @@ protected void readMoreEntries(Consumer consumer) { } havePendingRead = true; if (consumer.readCompacted()) { - topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, this, consumer); + topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, isFirstRead, + this, consumer); } else { cursor.asyncReadEntriesOrWait(messagesToRead, bytesToRead, this, consumer, topic.getMaxReadPosition()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java index e213b04915413..2e922324f1873 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java @@ -121,6 +121,8 @@ public synchronized void internalReadEntryComplete(Entry entry, PendingReadEntry havePendingRead = false; } + isFirstRead = false; + if (readBatchSize < serviceConfig.getDispatcherMaxReadBatchSize()) { int newReadBatchSize = Math.min(readBatchSize * 2, serviceConfig.getDispatcherMaxReadBatchSize()); if (log.isDebugEnabled()) { @@ -197,7 +199,8 @@ protected void readMoreEntries(Consumer consumer) { havePendingRead = true; if (consumer.readCompacted()) { - topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, this, consumer); + topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, isFirstRead, + this, consumer); } else { streamingEntryReader.asyncReadEntries(messagesToRead, bytesToRead, consumer); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 306eeb8848b6c..0f0d9670a9d3f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -731,7 +731,7 @@ public CompletableFuture subscribe(final TransportCnx cnx, String subs CompletableFuture future = subscriptionFuture.thenCompose(subscription -> { Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName, maxUnackedMessages, cnx, cnx.getAuthRole(), metadata, - readCompacted, initialPosition, keySharedMeta); + readCompacted, initialPosition, keySharedMeta, startMessageId); return addConsumerToSubscription(subscription, consumer).thenCompose(v -> { checkBackloggedCursors(); if (!cnx.isActive()) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java index 415c3dce6d88c..f74157a938914 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java @@ -34,7 +34,7 @@ public interface RawReader { static CompletableFuture create(PulsarClient client, String topic, String subscription) { CompletableFuture> future = new CompletableFuture<>(); RawReader r = new RawReaderImpl((PulsarClientImpl) client, topic, subscription, future); - return future.thenCompose((consumer) -> r.seekAsync(MessageId.earliest)).thenApply((ignore) -> r); + return future.thenCompose(x -> x.seekAsync(MessageId.earliest)).thenApply(__ -> r); } /** diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java index 88b8e5826d9b9..4922852bda465 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java @@ -22,9 +22,13 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.Position; +import org.apache.pulsar.broker.service.Consumer; public interface CompactedTopic { CompletableFuture newCompactedLedger(Position p, long compactedLedgerId); - void asyncReadEntriesOrWait(ManagedCursor cursor, int numberOfEntriesToRead, - ReadEntriesCallback callback, Object ctx); + void asyncReadEntriesOrWait(ManagedCursor cursor, + int numberOfEntriesToRead, + boolean isFirstRead, + ReadEntriesCallback callback, + Consumer consumer); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java index c4646edf8dca0..b061a424b131c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java @@ -42,6 +42,8 @@ import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.EntryImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.pulsar.broker.service.Consumer; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.RawMessage; import org.apache.pulsar.client.impl.RawMessageImpl; import org.apache.pulsar.common.api.proto.MessageIdData; @@ -81,13 +83,20 @@ public CompletableFuture newCompactedLedger(Position p, long compactedLedgerI } @Override - public void asyncReadEntriesOrWait(ManagedCursor cursor, int numberOfEntriesToRead, - ReadEntriesCallback callback, Object ctx) { + public void asyncReadEntriesOrWait(ManagedCursor cursor, + int numberOfEntriesToRead, + boolean isFirstRead, + ReadEntriesCallback callback, Consumer consumer) { synchronized (this) { - PositionImpl cursorPosition = (PositionImpl) cursor.getReadPosition(); + PositionImpl cursorPosition; + if (isFirstRead && MessageId.earliest.equals(consumer.getStartMessageId())){ + cursorPosition = PositionImpl.earliest; + } else { + cursorPosition = (PositionImpl) cursor.getReadPosition(); + } if (compactionHorizon == null || compactionHorizon.compareTo(cursorPosition) < 0) { - cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, callback, ctx, PositionImpl.latest); + cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, callback, consumer, PositionImpl.latest); } else { compactedTopicContext.thenCompose( (context) -> findStartPoint(cursorPosition, context.ledger.getLastAddConfirmed(), context.cache) @@ -96,11 +105,11 @@ public void asyncReadEntriesOrWait(ManagedCursor cursor, int numberOfEntriesToRe // the cursor just needs to be set to the compaction horizon if (startPoint == COMPACT_LEDGER_EMPTY) { cursor.seek(compactionHorizon.getNext()); - callback.readEntriesComplete(Collections.emptyList(), ctx); + callback.readEntriesComplete(Collections.emptyList(), consumer); return CompletableFuture.completedFuture(null); } if (startPoint == NEWER_THAN_COMPACTED && compactionHorizon.compareTo(cursorPosition) < 0) { - cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, callback, ctx, + cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, callback, consumer, PositionImpl.latest); return CompletableFuture.completedFuture(null); } else { @@ -108,23 +117,23 @@ public void asyncReadEntriesOrWait(ManagedCursor cursor, int numberOfEntriesToRe startPoint + numberOfEntriesToRead); if (startPoint == NEWER_THAN_COMPACTED) { cursor.seek(compactionHorizon.getNext()); - callback.readEntriesComplete(Collections.emptyList(), ctx); + callback.readEntriesComplete(Collections.emptyList(), consumer); return CompletableFuture.completedFuture(null); } return readEntries(context.ledger, startPoint, endPoint) .thenAccept((entries) -> { Entry lastEntry = entries.get(entries.size() - 1); cursor.seek(lastEntry.getPosition().getNext()); - callback.readEntriesComplete(entries, ctx); + callback.readEntriesComplete(entries, consumer); }); } })) .exceptionally((exception) -> { if (exception.getCause() instanceof NoSuchElementException) { cursor.seek(compactionHorizon.getNext()); - callback.readEntriesComplete(Collections.emptyList(), ctx); + callback.readEntriesComplete(Collections.emptyList(), consumer); } else { - callback.readEntriesFailed(new ManagedLedgerException(exception), ctx); + callback.readEntriesFailed(new ManagedLedgerException(exception), consumer); } return null; }); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java index 6cccfee8e0659..692ea221710a7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java @@ -73,6 +73,7 @@ import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.transaction.TransactionTestBase; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.common.api.proto.BaseCommand; import org.apache.pulsar.common.api.proto.CommandActiveConsumerChange; import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition; @@ -299,7 +300,7 @@ public void testConsumerGroupChangesWithOldNewConsumers() throws Exception { // 2. Add old consumer Consumer consumer1 = new Consumer(sub, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0, - "Cons1"/* consumer name */, 50000, serverCnxWithOldVersion, "myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, null); + "Cons1"/* consumer name */, 50000, serverCnxWithOldVersion, "myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, null, MessageId.latest); pdfc.addConsumer(consumer1); List consumers = pdfc.getConsumers(); assertSame(consumers.get(0).consumerName(), consumer1.consumerName()); @@ -310,7 +311,7 @@ public void testConsumerGroupChangesWithOldNewConsumers() throws Exception { // 3. Add new consumer Consumer consumer2 = new Consumer(sub, SubType.Exclusive, topic.getName(), 2 /* consumer id */, 0, - "Cons2"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, null); + "Cons2"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, null, MessageId.latest); pdfc.addConsumer(consumer2); consumers = pdfc.getConsumers(); assertSame(consumers.get(0).consumerName(), consumer1.consumerName()); @@ -339,7 +340,7 @@ public void testAddRemoveConsumer() throws Exception { // 2. Add consumer Consumer consumer1 = spy(new Consumer(sub, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0, "Cons1"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), - false /* read compacted */, InitialPosition.Latest, null)); + false /* read compacted */, InitialPosition.Latest, null, MessageId.latest)); pdfc.addConsumer(consumer1); List consumers = pdfc.getConsumers(); assertSame(consumers.get(0).consumerName(), consumer1.consumerName()); @@ -363,7 +364,7 @@ public void testAddRemoveConsumer() throws Exception { // 5. Add another consumer which does not change active consumer Consumer consumer2 = spy(new Consumer(sub, SubType.Exclusive, topic.getName(), 2 /* consumer id */, 0, "Cons2"/* consumer name */, - 50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null)); + 50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest)); pdfc.addConsumer(consumer2); consumers = pdfc.getConsumers(); assertSame(pdfc.getActiveConsumer().consumerName(), consumer1.consumerName()); @@ -377,7 +378,7 @@ public void testAddRemoveConsumer() throws Exception { // 6. Add a consumer which changes active consumer Consumer consumer0 = spy(new Consumer(sub, SubType.Exclusive, topic.getName(), 0 /* consumer id */, 0, "Cons0"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), - false /* read compacted */, InitialPosition.Latest, null)); + false /* read compacted */, InitialPosition.Latest, null, MessageId.latest)); pdfc.addConsumer(consumer0); consumers = pdfc.getConsumers(); assertSame(pdfc.getActiveConsumer().consumerName(), consumer0.consumerName()); @@ -460,7 +461,7 @@ public void testAddRemoveConsumerNonPartitionedTopic() throws Exception { // 2. Add a consumer Consumer consumer1 = spy(new Consumer(sub, SubType.Failover, topic.getName(), 1 /* consumer id */, 1, "Cons1"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), - false /* read compacted */, InitialPosition.Latest, null)); + false /* read compacted */, InitialPosition.Latest, null, MessageId.latest)); pdfc.addConsumer(consumer1); List consumers = pdfc.getConsumers(); assertEquals(1, consumers.size()); @@ -469,7 +470,7 @@ public void testAddRemoveConsumerNonPartitionedTopic() throws Exception { // 3. Add a consumer with same priority level and consumer name is smaller in lexicographic order. Consumer consumer2 = spy(new Consumer(sub, SubType.Failover, topic.getName(), 2 /* consumer id */, 1, "Cons2"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), - false /* read compacted */, InitialPosition.Latest, null)); + false /* read compacted */, InitialPosition.Latest, null, MessageId.latest)); pdfc.addConsumer(consumer2); // 4. Verify active consumer doesn't change @@ -482,7 +483,7 @@ public void testAddRemoveConsumerNonPartitionedTopic() throws Exception { // 5. Add another consumer which has higher priority level Consumer consumer3 = spy(new Consumer(sub, SubType.Failover, topic.getName(), 3 /* consumer id */, 0, "Cons3"/* consumer name */, - 50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null)); + 50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest)); pdfc.addConsumer(consumer3); consumers = pdfc.getConsumers(); assertEquals(3, consumers.size()); @@ -672,7 +673,7 @@ private Consumer getNextConsumer(PersistentDispatcherMultipleConsumers dispatche private Consumer createConsumer(int priority, int permit, boolean blocked, int id) throws Exception { Consumer consumer = new Consumer(null, SubType.Shared, "test-topic", id, priority, ""+id, 5000, - serverCnx, "appId", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null); + serverCnx, "appId", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); try { consumer.flowPermits(permit); } catch (Exception e) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index 0f3560ea98d2e..97c2ae4d33831 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -101,6 +101,7 @@ import org.apache.pulsar.broker.service.persistent.PersistentReplicator; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.PulsarClientImpl; @@ -724,7 +725,7 @@ public void testChangeSubscriptionType() throws Exception { Consumer consumer = new Consumer(sub, SubType.Exclusive, topic.getName(), 1, 0, "Cons1", 50000, serverCnx, "myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, - new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT)); + new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT), MessageId.latest); sub.addConsumer(consumer); consumer.close(); @@ -735,7 +736,7 @@ public void testChangeSubscriptionType() throws Exception { consumer = new Consumer(sub, subType, topic.getName(), 1, 0, "Cons1", 50000, serverCnx, "myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, - new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT)); + new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT), MessageId.latest); sub.addConsumer(consumer); assertTrue(sub.getDispatcher().isConsumerConnected()); @@ -758,7 +759,7 @@ public void testAddRemoveConsumer() throws Exception { // 1. simple add consumer Consumer consumer = new Consumer(sub, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0, "Cons1"/* consumer name */, - 50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null); + 50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); sub.addConsumer(consumer); assertTrue(sub.getDispatcher().isConsumerConnected()); @@ -791,7 +792,7 @@ public void testAddRemoveConsumerDurableCursor() throws Exception { PersistentSubscription sub = new PersistentSubscription(topic, "non-durable-sub", cursorMock, false); Consumer consumer = new Consumer(sub, SubType.Exclusive, topic.getName(), 1, 0, "Cons1", 50000, serverCnx, - "myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, null); + "myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, null, MessageId.latest); sub.addConsumer(consumer); assertFalse(sub.getDispatcher().isClosed()); @@ -827,14 +828,14 @@ private void testMaxConsumersShared() throws Exception { // 1. add consumer1 Consumer consumer = new Consumer(sub, SubType.Shared, topic.getName(), 1 /* consumer id */, 0, "Cons1"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), - false /* read compacted */, InitialPosition.Latest, null); + false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); addConsumerToSubscription.invoke(topic, sub, consumer); assertEquals(sub.getConsumers().size(), 1); // 2. add consumer2 Consumer consumer2 = new Consumer(sub, SubType.Shared, topic.getName(), 2 /* consumer id */, 0, "Cons2"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), - false /* read compacted */, InitialPosition.Latest, null); + false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); addConsumerToSubscription.invoke(topic, sub, consumer2); assertEquals(sub.getConsumers().size(), 2); @@ -842,7 +843,7 @@ private void testMaxConsumersShared() throws Exception { try { Consumer consumer3 = new Consumer(sub, SubType.Shared, topic.getName(), 3 /* consumer id */, 0, "Cons3"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), - false /* read compacted */, InitialPosition.Latest, null); + false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); ((CompletableFuture) addConsumerToSubscription.invoke(topic, sub, consumer3)).get(); fail("should have failed"); } catch (ExecutionException e) { @@ -855,7 +856,7 @@ private void testMaxConsumersShared() throws Exception { // 4. add consumer4 to sub2 Consumer consumer4 = new Consumer(sub2, SubType.Shared, topic.getName(), 4 /* consumer id */, 0, "Cons4"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), - false /* read compacted */, InitialPosition.Latest, null); + false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); addConsumerToSubscription.invoke(topic, sub2, consumer4); assertEquals(sub2.getConsumers().size(), 1); @@ -866,7 +867,7 @@ private void testMaxConsumersShared() throws Exception { try { Consumer consumer5 = new Consumer(sub2, SubType.Shared, topic.getName(), 5 /* consumer id */, 0, "Cons5"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), - false /* read compacted */, InitialPosition.Latest, null); + false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); ((CompletableFuture) addConsumerToSubscription.invoke(topic, sub2, consumer5)).get(); fail("should have failed"); } catch (ExecutionException e) { @@ -922,14 +923,14 @@ private void testMaxConsumersFailover() throws Exception { // 1. add consumer1 Consumer consumer = new Consumer(sub, SubType.Failover, topic.getName(), 1 /* consumer id */, 0, "Cons1"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), - false /* read compacted */, InitialPosition.Latest, null); + false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); addConsumerToSubscription.invoke(topic, sub, consumer); assertEquals(sub.getConsumers().size(), 1); // 2. add consumer2 Consumer consumer2 = new Consumer(sub, SubType.Failover, topic.getName(), 2 /* consumer id */, 0, "Cons2"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), - false /* read compacted */, InitialPosition.Latest, null); + false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); addConsumerToSubscription.invoke(topic, sub, consumer2); assertEquals(sub.getConsumers().size(), 2); @@ -937,7 +938,7 @@ private void testMaxConsumersFailover() throws Exception { try { Consumer consumer3 = new Consumer(sub, SubType.Failover, topic.getName(), 3 /* consumer id */, 0, "Cons3"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), - false /* read compacted */, InitialPosition.Latest, null); + false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); ((CompletableFuture) addConsumerToSubscription.invoke(topic, sub, consumer3)).get(); fail("should have failed"); } catch (ExecutionException e) { @@ -950,7 +951,7 @@ private void testMaxConsumersFailover() throws Exception { // 4. add consumer4 to sub2 Consumer consumer4 = new Consumer(sub2, SubType.Failover, topic.getName(), 4 /* consumer id */, 0, "Cons4"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), - false /* read compacted */, InitialPosition.Latest, null); + false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); addConsumerToSubscription.invoke(topic, sub2, consumer4); assertEquals(sub2.getConsumers().size(), 1); @@ -961,7 +962,7 @@ private void testMaxConsumersFailover() throws Exception { try { Consumer consumer5 = new Consumer(sub2, SubType.Failover, topic.getName(), 5 /* consumer id */, 0, "Cons5"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), - false /* read compacted */, InitialPosition.Latest, null); + false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); ((CompletableFuture) addConsumerToSubscription.invoke(topic, sub2, consumer5)).get(); fail("should have failed"); } catch (ExecutionException e) { @@ -1009,7 +1010,7 @@ private Consumer getMockedConsumerWithSpecificAddress(Topic topic, Subscription doReturn(new PulsarCommandSenderImpl(null, cnx)).when(cnx).getCommandSender(); return new Consumer(sub, SubType.Shared, topic.getName(), consumerId, 0, consumerNameBase + consumerId, 50000, - cnx, role, Collections.emptyMap(), false, InitialPosition.Latest, null); + cnx, role, Collections.emptyMap(), false, InitialPosition.Latest, null, MessageId.latest); } @Test @@ -1117,7 +1118,7 @@ public void testUbsubscribeRaceConditions() throws Exception { PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock, false); Consumer consumer1 = new Consumer(sub, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0, "Cons1"/* consumer name */, - 50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null); + 50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); sub.addConsumer(consumer1); doAnswer(new Answer() { @@ -1141,7 +1142,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { Thread.sleep(10); /* delay to ensure that the ubsubscribe gets executed first */ sub.addConsumer(new Consumer(sub, SubType.Exclusive, topic.getName(), 2 /* consumer id */, 0, "Cons2"/* consumer name */, 50000, serverCnx, - "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null)).get(); + "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest)).get(); fail(); } catch (Exception e) { assertTrue(e.getCause() instanceof BrokerServiceException.SubscriptionFencedException); @@ -1941,21 +1942,21 @@ public void testBacklogCursor() throws Exception { ManagedCursor cursor1 = ledger.openCursor("c1"); PersistentSubscription sub1 = new PersistentSubscription(topic, "sub-1", cursor1, false); Consumer consumer1 = new Consumer(sub1, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0, "Cons1"/* consumer name */, - 50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null); + 50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); topic.getSubscriptions().put(Codec.decode(cursor1.getName()), sub1); sub1.addConsumer(consumer1); // Open cursor2, add it into activeCursor-container and add it into subscription consumer list ManagedCursor cursor2 = ledger.openCursor("c2"); PersistentSubscription sub2 = new PersistentSubscription(topic, "sub-2", cursor2, false); Consumer consumer2 = new Consumer(sub2, SubType.Exclusive, topic.getName(), 2 /* consumer id */, 0, "Cons2"/* consumer name */, - 50000, serverCnx, "myrole-2", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null); + 50000, serverCnx, "myrole-2", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); topic.getSubscriptions().put(Codec.decode(cursor2.getName()), sub2); sub2.addConsumer(consumer2); // Open cursor3, add it into activeCursor-container and do not add it into subscription consumer list ManagedCursor cursor3 = ledger.openCursor("c3"); PersistentSubscription sub3 = new PersistentSubscription(topic, "sub-3", cursor3, false); Consumer consumer3 = new Consumer(sub2, SubType.Exclusive, topic.getName(), 3 /* consumer id */, 0, "Cons2"/* consumer name */, - 50000, serverCnx, "myrole-3", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null); + 50000, serverCnx, "myrole-3", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); topic.getSubscriptions().put(Codec.decode(cursor3.getName()), sub3); // Case1: cursors are active as haven't started deactivateBacklogCursor scan @@ -2065,7 +2066,7 @@ public void testCheckInactiveSubscriptions() throws Exception { addConsumerToSubscription.setAccessible(true); Consumer consumer = new Consumer(nonDeletableSubscription1, SubType.Shared, topic.getName(), 1, 0, "consumer1", - 50000, serverCnx, "app1", Collections.emptyMap(), false, InitialPosition.Latest, null); + 50000, serverCnx, "app1", Collections.emptyMap(), false, InitialPosition.Latest, null, MessageId.latest); addConsumerToSubscription.invoke(topic, nonDeletableSubscription1, consumer); when(pulsar.getConfigurationCache().policiesCache() diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java new file mode 100644 index 0000000000000..a73d1f5a5d171 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java @@ -0,0 +1,229 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.compaction; + +import static org.mockito.Mockito.anyLong; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.swagger.models.auth.In; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Random; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.client.BookKeeper; +import org.apache.bookkeeper.client.api.OpenBuilder; +import org.apache.bookkeeper.mledger.ManagedLedgerInfo; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.client.api.CompressionType; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.CryptoKeyReader; +import org.apache.pulsar.client.api.EncryptionKeyInfo; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageRoutingMode; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerBuilder; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Reader; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.impl.BatchMessageIdImpl; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; +import org.apache.pulsar.common.policies.data.RetentionPolicies; +import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker") +public class CompactionRetentionTest extends MockedPulsarServiceBaseTest { + private ScheduledExecutorService compactionScheduler; + private BookKeeper bk; + + @BeforeMethod + @Override + public void setup() throws Exception { + conf.setManagedLedgerMinLedgerRolloverTimeMinutes(0); + conf.setManagedLedgerMaxEntriesPerLedger(2); + super.internalSetup(); + + admin.clusters().createCluster("use", ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build()); + admin.tenants().createTenant("my-tenant", + new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("use"))); + admin.namespaces().createNamespace("my-tenant/use/my-ns"); + + compactionScheduler = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setNameFormat("compaction-%d").setDaemon(true).build()); + bk = pulsar.getBookKeeperClientFactory().create(this.conf, null, null, Optional.empty(), null); + } + + @AfterMethod(alwaysRun = true) + @Override + public void cleanup() throws Exception { + super.internalCleanup(); + + if (compactionScheduler != null) { + compactionScheduler.shutdownNow(); + } + } + + /** + * Compaction should retain expired keys in the compacted view + */ + @Test + public void testCompaction() throws Exception { + String topic = "persistent://my-tenant/use/my-ns/my-topic-" + System.nanoTime(); + + Set keys = Sets.newHashSet("a", "b", "c"); + Set keysToExpire = Sets.newHashSet("x1", "x2"); + Set allKeys = new HashSet<>(); + allKeys.addAll(keys); + allKeys.addAll(keysToExpire); + + ObjectMapper mapper = new ObjectMapper(); + mapper.configure(SerializationFeature.INDENT_OUTPUT, true); + + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.INT32) + .topic(topic) + .create(); + + Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); + compactor.compact(topic).join(); + + log.info(" ---- X 1: {}", mapper.writeValueAsString( + admin.topics().getInternalStats(topic, false))); + + int round = 1; + + for (String key : allKeys) { + producer.newMessage() + .key(key) + .value(round) + .send(); + } + + log.info(" ---- X 2: {}", mapper.writeValueAsString( + admin.topics().getInternalStats(topic, false))); + + validateMessages(pulsarClient, true, topic, round, allKeys); + + compactor.compact(topic).join(); + + log.info(" ---- X 3: {}", mapper.writeValueAsString( + admin.topics().getInternalStats(topic, false))); + + validateMessages(pulsarClient, true, topic, round, allKeys); + + round = 2; + + for (String key : allKeys) { + producer.newMessage() + .key(key) + .value(round) + .send(); + } + + compactor.compact(topic).join(); + + validateMessages(pulsarClient, true, topic, round, allKeys); + + // Now explicitly remove the expiring keys + for (String key : keysToExpire) { + producer.newMessage() + .key(key) + .send(); + } + + compactor.compact(topic).join(); + + log.info(" ---- X 4: {}", mapper.writeValueAsString( + admin.topics().getInternalStats(topic, false))); + + validateMessages(pulsarClient, true, topic, round, keys); + + // In the raw topic there should be no messages + validateMessages(pulsarClient, false, topic, round, Collections.emptySet()); + } + + private void validateMessages(PulsarClient client, boolean readCompacted, String topic, int round, Set expectedKeys) + throws Exception { + @Cleanup + Reader reader = client.newReader(Schema.INT32) + .topic(topic) + .startMessageId(MessageId.earliest) + .readCompacted(readCompacted) + .create(); + + Map receivedValues = new HashMap<>(); + + while (true) { + Message msg = reader.readNext(1, TimeUnit.SECONDS); + if (msg == null) { + break; + } + + Integer value = msg.size() > 0 ? msg.getValue() : null; + log.info("Received: {} -- value: {}", msg.getKey(), value); + if (value != null) { + receivedValues.put(msg.getKey(), value); + } + } + + Map expectedReceivedValues = new HashMap<>(); + expectedKeys.forEach(k -> expectedReceivedValues.put(k, round)); + + log.info("Received values: {}", receivedValues); + log.info("Expected values: {}", expectedReceivedValues); + assertEquals(receivedValues, expectedReceivedValues); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java index b921266a60c10..4c9604805603d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java @@ -50,6 +50,7 @@ import org.apache.pulsar.functions.LocalRunner; import org.apache.pulsar.functions.utils.FunctionCommon; import org.apache.pulsar.functions.worker.PulsarFunctionTestUtils; +import org.awaitility.Awaitility; import org.testng.annotations.Test; import com.google.common.collect.Lists; @@ -74,12 +75,19 @@ public void testReadCompactedSink() throws Exception { final int messageNum = 20; final int maxKeys = 10; // 1 Setup producer + @Cleanup Producer producer = pulsarClient.newProducer(Schema.STRING) .topic(sourceTopic) .enableBatching(false) .messageRoutingMode(MessageRoutingMode.SinglePartition) .create(); - pulsarClient.newConsumer().topic(sourceTopic).subscriptionName(subscriptionName).readCompacted(true).subscribe().close(); + pulsarClient.newConsumer() + .topic(sourceTopic) + .subscriptionName(subscriptionName) + .readCompacted(true) + .subscribe() + .close(); + // 2 Send messages and record the expected values after compaction Map expected = new HashMap<>(); for (int j = 0; j < messageNum; j++) { @@ -107,18 +115,12 @@ public void testReadCompactedSink() throws Exception { admin.sink().createSinkWithUrl(sinkConfig, jarFilePathUrl); // 5 Sink should only read compacted value,so we will only receive compacted messages - retryStrategically((test) -> { - try { - String prometheusMetrics = PulsarFunctionTestUtils.getPrometheusMetrics(pulsar.getListenPortHTTP().get()); - Map metrics = PulsarFunctionTestUtils.parseMetrics(prometheusMetrics); - PulsarFunctionTestUtils.Metric m = metrics.get("pulsar_sink_received_total"); - return m.value == (double) maxKeys; - } catch (Exception e) { - return false; - } - }, 50, 1000); - - producer.close(); + Awaitility.await().ignoreExceptions().untilAsserted(() -> { + String prometheusMetrics = PulsarFunctionTestUtils.getPrometheusMetrics(pulsar.getListenPortHTTP().get()); + Map metrics = PulsarFunctionTestUtils.parseMetrics(prometheusMetrics); + PulsarFunctionTestUtils.Metric m = metrics.get("pulsar_sink_received_total"); + assertEquals(m.value, maxKeys); + }); } @Test(timeOut = 30000)