diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java index e73daaa4210d4..690a5984b4828 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java @@ -55,6 +55,8 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas AtomicIntegerFieldUpdater.newUpdater(AbstractDispatcherSingleActiveConsumer.class, "isClosed"); private volatile int isClosed = FALSE; + protected boolean isFirstRead = true; + public AbstractDispatcherSingleActiveConsumer(SubType subscriptionType, int partitionIndex, String topicName, Subscription subscription, ServiceConfiguration serviceConfig) { @@ -159,6 +161,10 @@ public synchronized void addConsumer(Consumer consumer) throws BrokerServiceExce isKeyHashRangeFiltered = false; } + if (consumers.isEmpty()) { + isFirstRead = true; + } + consumers.add(consumer); if (!pickAndScheduleActiveConsumer()) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index bef85174b36c1..1e8b2e561042c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -41,6 +41,7 @@ import org.apache.commons.lang3.tuple.MutablePair; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.common.api.proto.CommandAck; import org.apache.pulsar.common.api.proto.CommandAck.AckType; @@ -123,12 +124,13 @@ public class Consumer { private boolean preciseDispatcherFlowControl; private PositionImpl readPositionWhenJoining; private final String clientAddress; // IP address only, no port number included + private final MessageId startMessageId; public Consumer(Subscription subscription, SubType subType, String topicName, long consumerId, int priorityLevel, String consumerName, int maxUnackedMessages, TransportCnx cnx, String appId, Map metadata, boolean readCompacted, InitialPosition subscriptionInitialPosition, - KeySharedMeta keySharedMeta) { + KeySharedMeta keySharedMeta, MessageId startMessageId) { this.subscription = subscription; this.subType = subType; @@ -148,6 +150,10 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo this.bytesOutCounter = new LongAdder(); this.msgOutCounter = new LongAdder(); this.appId = appId; + + // Ensure we start from compacted view + this.startMessageId = (readCompacted && startMessageId == null) ? MessageId.earliest : startMessageId; + this.preciseDispatcherFlowControl = cnx.isPreciseDispatcherFlowControl(); PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.set(this, 0); MESSAGE_PERMITS_UPDATER.set(this, 0); @@ -835,5 +841,9 @@ public String getClientAddress() { return clientAddress; } + public MessageId getStartMessageId() { + return startMessageId; + } + private static final Logger log = LoggerFactory.getLogger(Consumer.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 9207ab84d9418..2751d1e2a0196 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -271,7 +271,7 @@ public CompletableFuture subscribe(final TransportCnx cnx, String subs NonPersistentSubscription subscription = subscriptions.computeIfAbsent(subscriptionName, name -> new NonPersistentSubscription(this, subscriptionName)); Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName, 0, - cnx, cnx.getAuthRole(), metadata, readCompacted, initialPosition, keySharedMeta); + cnx, cnx.getAuthRole(), metadata, readCompacted, initialPosition, keySharedMeta, MessageId.latest); addConsumerToSubscription(subscription, consumer).thenRun(() -> { if (!cnx.isActive()) { try { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index 2bd94fff51c50..4bae5b02a40f0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -153,6 +153,7 @@ public synchronized void internalReadEntriesComplete(final List entries, } havePendingRead = false; + isFirstRead = false; if (readBatchSize < serviceConfig.getDispatcherMaxReadBatchSize()) { int newReadBatchSize = Math.min(readBatchSize * 2, serviceConfig.getDispatcherMaxReadBatchSize()); @@ -338,7 +339,8 @@ protected void readMoreEntries(Consumer consumer) { } havePendingRead = true; if (consumer.readCompacted()) { - topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, this, consumer); + topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, isFirstRead, + this, consumer); } else { cursor.asyncReadEntriesOrWait(messagesToRead, bytesToRead, this, consumer, topic.getMaxReadPosition()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java index e213b04915413..2e922324f1873 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java @@ -121,6 +121,8 @@ public synchronized void internalReadEntryComplete(Entry entry, PendingReadEntry havePendingRead = false; } + isFirstRead = false; + if (readBatchSize < serviceConfig.getDispatcherMaxReadBatchSize()) { int newReadBatchSize = Math.min(readBatchSize * 2, serviceConfig.getDispatcherMaxReadBatchSize()); if (log.isDebugEnabled()) { @@ -197,7 +199,8 @@ protected void readMoreEntries(Consumer consumer) { havePendingRead = true; if (consumer.readCompacted()) { - topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, this, consumer); + topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, isFirstRead, + this, consumer); } else { streamingEntryReader.asyncReadEntries(messagesToRead, bytesToRead, consumer); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 306eeb8848b6c..0f0d9670a9d3f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -731,7 +731,7 @@ public CompletableFuture subscribe(final TransportCnx cnx, String subs CompletableFuture future = subscriptionFuture.thenCompose(subscription -> { Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName, maxUnackedMessages, cnx, cnx.getAuthRole(), metadata, - readCompacted, initialPosition, keySharedMeta); + readCompacted, initialPosition, keySharedMeta, startMessageId); return addConsumerToSubscription(subscription, consumer).thenCompose(v -> { checkBackloggedCursors(); if (!cnx.isActive()) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java index 415c3dce6d88c..f74157a938914 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java @@ -34,7 +34,7 @@ public interface RawReader { static CompletableFuture create(PulsarClient client, String topic, String subscription) { CompletableFuture> future = new CompletableFuture<>(); RawReader r = new RawReaderImpl((PulsarClientImpl) client, topic, subscription, future); - return future.thenCompose((consumer) -> r.seekAsync(MessageId.earliest)).thenApply((ignore) -> r); + return future.thenCompose(x -> x.seekAsync(MessageId.earliest)).thenApply(__ -> r); } /** diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java index 88b8e5826d9b9..4922852bda465 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java @@ -22,9 +22,13 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.Position; +import org.apache.pulsar.broker.service.Consumer; public interface CompactedTopic { CompletableFuture newCompactedLedger(Position p, long compactedLedgerId); - void asyncReadEntriesOrWait(ManagedCursor cursor, int numberOfEntriesToRead, - ReadEntriesCallback callback, Object ctx); + void asyncReadEntriesOrWait(ManagedCursor cursor, + int numberOfEntriesToRead, + boolean isFirstRead, + ReadEntriesCallback callback, + Consumer consumer); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java index c4646edf8dca0..b061a424b131c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java @@ -42,6 +42,8 @@ import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.EntryImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.pulsar.broker.service.Consumer; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.RawMessage; import org.apache.pulsar.client.impl.RawMessageImpl; import org.apache.pulsar.common.api.proto.MessageIdData; @@ -81,13 +83,20 @@ public CompletableFuture newCompactedLedger(Position p, long compactedLedgerI } @Override - public void asyncReadEntriesOrWait(ManagedCursor cursor, int numberOfEntriesToRead, - ReadEntriesCallback callback, Object ctx) { + public void asyncReadEntriesOrWait(ManagedCursor cursor, + int numberOfEntriesToRead, + boolean isFirstRead, + ReadEntriesCallback callback, Consumer consumer) { synchronized (this) { - PositionImpl cursorPosition = (PositionImpl) cursor.getReadPosition(); + PositionImpl cursorPosition; + if (isFirstRead && MessageId.earliest.equals(consumer.getStartMessageId())){ + cursorPosition = PositionImpl.earliest; + } else { + cursorPosition = (PositionImpl) cursor.getReadPosition(); + } if (compactionHorizon == null || compactionHorizon.compareTo(cursorPosition) < 0) { - cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, callback, ctx, PositionImpl.latest); + cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, callback, consumer, PositionImpl.latest); } else { compactedTopicContext.thenCompose( (context) -> findStartPoint(cursorPosition, context.ledger.getLastAddConfirmed(), context.cache) @@ -96,11 +105,11 @@ public void asyncReadEntriesOrWait(ManagedCursor cursor, int numberOfEntriesToRe // the cursor just needs to be set to the compaction horizon if (startPoint == COMPACT_LEDGER_EMPTY) { cursor.seek(compactionHorizon.getNext()); - callback.readEntriesComplete(Collections.emptyList(), ctx); + callback.readEntriesComplete(Collections.emptyList(), consumer); return CompletableFuture.completedFuture(null); } if (startPoint == NEWER_THAN_COMPACTED && compactionHorizon.compareTo(cursorPosition) < 0) { - cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, callback, ctx, + cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, callback, consumer, PositionImpl.latest); return CompletableFuture.completedFuture(null); } else { @@ -108,23 +117,23 @@ public void asyncReadEntriesOrWait(ManagedCursor cursor, int numberOfEntriesToRe startPoint + numberOfEntriesToRead); if (startPoint == NEWER_THAN_COMPACTED) { cursor.seek(compactionHorizon.getNext()); - callback.readEntriesComplete(Collections.emptyList(), ctx); + callback.readEntriesComplete(Collections.emptyList(), consumer); return CompletableFuture.completedFuture(null); } return readEntries(context.ledger, startPoint, endPoint) .thenAccept((entries) -> { Entry lastEntry = entries.get(entries.size() - 1); cursor.seek(lastEntry.getPosition().getNext()); - callback.readEntriesComplete(entries, ctx); + callback.readEntriesComplete(entries, consumer); }); } })) .exceptionally((exception) -> { if (exception.getCause() instanceof NoSuchElementException) { cursor.seek(compactionHorizon.getNext()); - callback.readEntriesComplete(Collections.emptyList(), ctx); + callback.readEntriesComplete(Collections.emptyList(), consumer); } else { - callback.readEntriesFailed(new ManagedLedgerException(exception), ctx); + callback.readEntriesFailed(new ManagedLedgerException(exception), consumer); } return null; }); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java index 6cccfee8e0659..692ea221710a7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java @@ -73,6 +73,7 @@ import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.transaction.TransactionTestBase; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.common.api.proto.BaseCommand; import org.apache.pulsar.common.api.proto.CommandActiveConsumerChange; import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition; @@ -299,7 +300,7 @@ public void testConsumerGroupChangesWithOldNewConsumers() throws Exception { // 2. Add old consumer Consumer consumer1 = new Consumer(sub, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0, - "Cons1"/* consumer name */, 50000, serverCnxWithOldVersion, "myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, null); + "Cons1"/* consumer name */, 50000, serverCnxWithOldVersion, "myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, null, MessageId.latest); pdfc.addConsumer(consumer1); List consumers = pdfc.getConsumers(); assertSame(consumers.get(0).consumerName(), consumer1.consumerName()); @@ -310,7 +311,7 @@ public void testConsumerGroupChangesWithOldNewConsumers() throws Exception { // 3. Add new consumer Consumer consumer2 = new Consumer(sub, SubType.Exclusive, topic.getName(), 2 /* consumer id */, 0, - "Cons2"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, null); + "Cons2"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, null, MessageId.latest); pdfc.addConsumer(consumer2); consumers = pdfc.getConsumers(); assertSame(consumers.get(0).consumerName(), consumer1.consumerName()); @@ -339,7 +340,7 @@ public void testAddRemoveConsumer() throws Exception { // 2. Add consumer Consumer consumer1 = spy(new Consumer(sub, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0, "Cons1"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), - false /* read compacted */, InitialPosition.Latest, null)); + false /* read compacted */, InitialPosition.Latest, null, MessageId.latest)); pdfc.addConsumer(consumer1); List consumers = pdfc.getConsumers(); assertSame(consumers.get(0).consumerName(), consumer1.consumerName()); @@ -363,7 +364,7 @@ public void testAddRemoveConsumer() throws Exception { // 5. Add another consumer which does not change active consumer Consumer consumer2 = spy(new Consumer(sub, SubType.Exclusive, topic.getName(), 2 /* consumer id */, 0, "Cons2"/* consumer name */, - 50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null)); + 50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest)); pdfc.addConsumer(consumer2); consumers = pdfc.getConsumers(); assertSame(pdfc.getActiveConsumer().consumerName(), consumer1.consumerName()); @@ -377,7 +378,7 @@ public void testAddRemoveConsumer() throws Exception { // 6. Add a consumer which changes active consumer Consumer consumer0 = spy(new Consumer(sub, SubType.Exclusive, topic.getName(), 0 /* consumer id */, 0, "Cons0"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), - false /* read compacted */, InitialPosition.Latest, null)); + false /* read compacted */, InitialPosition.Latest, null, MessageId.latest)); pdfc.addConsumer(consumer0); consumers = pdfc.getConsumers(); assertSame(pdfc.getActiveConsumer().consumerName(), consumer0.consumerName()); @@ -460,7 +461,7 @@ public void testAddRemoveConsumerNonPartitionedTopic() throws Exception { // 2. Add a consumer Consumer consumer1 = spy(new Consumer(sub, SubType.Failover, topic.getName(), 1 /* consumer id */, 1, "Cons1"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), - false /* read compacted */, InitialPosition.Latest, null)); + false /* read compacted */, InitialPosition.Latest, null, MessageId.latest)); pdfc.addConsumer(consumer1); List consumers = pdfc.getConsumers(); assertEquals(1, consumers.size()); @@ -469,7 +470,7 @@ public void testAddRemoveConsumerNonPartitionedTopic() throws Exception { // 3. Add a consumer with same priority level and consumer name is smaller in lexicographic order. Consumer consumer2 = spy(new Consumer(sub, SubType.Failover, topic.getName(), 2 /* consumer id */, 1, "Cons2"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), - false /* read compacted */, InitialPosition.Latest, null)); + false /* read compacted */, InitialPosition.Latest, null, MessageId.latest)); pdfc.addConsumer(consumer2); // 4. Verify active consumer doesn't change @@ -482,7 +483,7 @@ public void testAddRemoveConsumerNonPartitionedTopic() throws Exception { // 5. Add another consumer which has higher priority level Consumer consumer3 = spy(new Consumer(sub, SubType.Failover, topic.getName(), 3 /* consumer id */, 0, "Cons3"/* consumer name */, - 50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null)); + 50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest)); pdfc.addConsumer(consumer3); consumers = pdfc.getConsumers(); assertEquals(3, consumers.size()); @@ -672,7 +673,7 @@ private Consumer getNextConsumer(PersistentDispatcherMultipleConsumers dispatche private Consumer createConsumer(int priority, int permit, boolean blocked, int id) throws Exception { Consumer consumer = new Consumer(null, SubType.Shared, "test-topic", id, priority, ""+id, 5000, - serverCnx, "appId", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null); + serverCnx, "appId", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); try { consumer.flowPermits(permit); } catch (Exception e) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index 0f3560ea98d2e..97c2ae4d33831 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -101,6 +101,7 @@ import org.apache.pulsar.broker.service.persistent.PersistentReplicator; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.PulsarClientImpl; @@ -724,7 +725,7 @@ public void testChangeSubscriptionType() throws Exception { Consumer consumer = new Consumer(sub, SubType.Exclusive, topic.getName(), 1, 0, "Cons1", 50000, serverCnx, "myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, - new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT)); + new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT), MessageId.latest); sub.addConsumer(consumer); consumer.close(); @@ -735,7 +736,7 @@ public void testChangeSubscriptionType() throws Exception { consumer = new Consumer(sub, subType, topic.getName(), 1, 0, "Cons1", 50000, serverCnx, "myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, - new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT)); + new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT), MessageId.latest); sub.addConsumer(consumer); assertTrue(sub.getDispatcher().isConsumerConnected()); @@ -758,7 +759,7 @@ public void testAddRemoveConsumer() throws Exception { // 1. simple add consumer Consumer consumer = new Consumer(sub, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0, "Cons1"/* consumer name */, - 50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null); + 50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); sub.addConsumer(consumer); assertTrue(sub.getDispatcher().isConsumerConnected()); @@ -791,7 +792,7 @@ public void testAddRemoveConsumerDurableCursor() throws Exception { PersistentSubscription sub = new PersistentSubscription(topic, "non-durable-sub", cursorMock, false); Consumer consumer = new Consumer(sub, SubType.Exclusive, topic.getName(), 1, 0, "Cons1", 50000, serverCnx, - "myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, null); + "myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, null, MessageId.latest); sub.addConsumer(consumer); assertFalse(sub.getDispatcher().isClosed()); @@ -827,14 +828,14 @@ private void testMaxConsumersShared() throws Exception { // 1. add consumer1 Consumer consumer = new Consumer(sub, SubType.Shared, topic.getName(), 1 /* consumer id */, 0, "Cons1"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), - false /* read compacted */, InitialPosition.Latest, null); + false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); addConsumerToSubscription.invoke(topic, sub, consumer); assertEquals(sub.getConsumers().size(), 1); // 2. add consumer2 Consumer consumer2 = new Consumer(sub, SubType.Shared, topic.getName(), 2 /* consumer id */, 0, "Cons2"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), - false /* read compacted */, InitialPosition.Latest, null); + false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); addConsumerToSubscription.invoke(topic, sub, consumer2); assertEquals(sub.getConsumers().size(), 2); @@ -842,7 +843,7 @@ private void testMaxConsumersShared() throws Exception { try { Consumer consumer3 = new Consumer(sub, SubType.Shared, topic.getName(), 3 /* consumer id */, 0, "Cons3"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), - false /* read compacted */, InitialPosition.Latest, null); + false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); ((CompletableFuture) addConsumerToSubscription.invoke(topic, sub, consumer3)).get(); fail("should have failed"); } catch (ExecutionException e) { @@ -855,7 +856,7 @@ private void testMaxConsumersShared() throws Exception { // 4. add consumer4 to sub2 Consumer consumer4 = new Consumer(sub2, SubType.Shared, topic.getName(), 4 /* consumer id */, 0, "Cons4"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), - false /* read compacted */, InitialPosition.Latest, null); + false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); addConsumerToSubscription.invoke(topic, sub2, consumer4); assertEquals(sub2.getConsumers().size(), 1); @@ -866,7 +867,7 @@ private void testMaxConsumersShared() throws Exception { try { Consumer consumer5 = new Consumer(sub2, SubType.Shared, topic.getName(), 5 /* consumer id */, 0, "Cons5"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), - false /* read compacted */, InitialPosition.Latest, null); + false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); ((CompletableFuture) addConsumerToSubscription.invoke(topic, sub2, consumer5)).get(); fail("should have failed"); } catch (ExecutionException e) { @@ -922,14 +923,14 @@ private void testMaxConsumersFailover() throws Exception { // 1. add consumer1 Consumer consumer = new Consumer(sub, SubType.Failover, topic.getName(), 1 /* consumer id */, 0, "Cons1"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), - false /* read compacted */, InitialPosition.Latest, null); + false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); addConsumerToSubscription.invoke(topic, sub, consumer); assertEquals(sub.getConsumers().size(), 1); // 2. add consumer2 Consumer consumer2 = new Consumer(sub, SubType.Failover, topic.getName(), 2 /* consumer id */, 0, "Cons2"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), - false /* read compacted */, InitialPosition.Latest, null); + false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); addConsumerToSubscription.invoke(topic, sub, consumer2); assertEquals(sub.getConsumers().size(), 2); @@ -937,7 +938,7 @@ private void testMaxConsumersFailover() throws Exception { try { Consumer consumer3 = new Consumer(sub, SubType.Failover, topic.getName(), 3 /* consumer id */, 0, "Cons3"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), - false /* read compacted */, InitialPosition.Latest, null); + false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); ((CompletableFuture) addConsumerToSubscription.invoke(topic, sub, consumer3)).get(); fail("should have failed"); } catch (ExecutionException e) { @@ -950,7 +951,7 @@ private void testMaxConsumersFailover() throws Exception { // 4. add consumer4 to sub2 Consumer consumer4 = new Consumer(sub2, SubType.Failover, topic.getName(), 4 /* consumer id */, 0, "Cons4"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), - false /* read compacted */, InitialPosition.Latest, null); + false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); addConsumerToSubscription.invoke(topic, sub2, consumer4); assertEquals(sub2.getConsumers().size(), 1); @@ -961,7 +962,7 @@ private void testMaxConsumersFailover() throws Exception { try { Consumer consumer5 = new Consumer(sub2, SubType.Failover, topic.getName(), 5 /* consumer id */, 0, "Cons5"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), - false /* read compacted */, InitialPosition.Latest, null); + false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); ((CompletableFuture) addConsumerToSubscription.invoke(topic, sub2, consumer5)).get(); fail("should have failed"); } catch (ExecutionException e) { @@ -1009,7 +1010,7 @@ private Consumer getMockedConsumerWithSpecificAddress(Topic topic, Subscription doReturn(new PulsarCommandSenderImpl(null, cnx)).when(cnx).getCommandSender(); return new Consumer(sub, SubType.Shared, topic.getName(), consumerId, 0, consumerNameBase + consumerId, 50000, - cnx, role, Collections.emptyMap(), false, InitialPosition.Latest, null); + cnx, role, Collections.emptyMap(), false, InitialPosition.Latest, null, MessageId.latest); } @Test @@ -1117,7 +1118,7 @@ public void testUbsubscribeRaceConditions() throws Exception { PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock, false); Consumer consumer1 = new Consumer(sub, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0, "Cons1"/* consumer name */, - 50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null); + 50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); sub.addConsumer(consumer1); doAnswer(new Answer() { @@ -1141,7 +1142,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { Thread.sleep(10); /* delay to ensure that the ubsubscribe gets executed first */ sub.addConsumer(new Consumer(sub, SubType.Exclusive, topic.getName(), 2 /* consumer id */, 0, "Cons2"/* consumer name */, 50000, serverCnx, - "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null)).get(); + "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest)).get(); fail(); } catch (Exception e) { assertTrue(e.getCause() instanceof BrokerServiceException.SubscriptionFencedException); @@ -1941,21 +1942,21 @@ public void testBacklogCursor() throws Exception { ManagedCursor cursor1 = ledger.openCursor("c1"); PersistentSubscription sub1 = new PersistentSubscription(topic, "sub-1", cursor1, false); Consumer consumer1 = new Consumer(sub1, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0, "Cons1"/* consumer name */, - 50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null); + 50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); topic.getSubscriptions().put(Codec.decode(cursor1.getName()), sub1); sub1.addConsumer(consumer1); // Open cursor2, add it into activeCursor-container and add it into subscription consumer list ManagedCursor cursor2 = ledger.openCursor("c2"); PersistentSubscription sub2 = new PersistentSubscription(topic, "sub-2", cursor2, false); Consumer consumer2 = new Consumer(sub2, SubType.Exclusive, topic.getName(), 2 /* consumer id */, 0, "Cons2"/* consumer name */, - 50000, serverCnx, "myrole-2", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null); + 50000, serverCnx, "myrole-2", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); topic.getSubscriptions().put(Codec.decode(cursor2.getName()), sub2); sub2.addConsumer(consumer2); // Open cursor3, add it into activeCursor-container and do not add it into subscription consumer list ManagedCursor cursor3 = ledger.openCursor("c3"); PersistentSubscription sub3 = new PersistentSubscription(topic, "sub-3", cursor3, false); Consumer consumer3 = new Consumer(sub2, SubType.Exclusive, topic.getName(), 3 /* consumer id */, 0, "Cons2"/* consumer name */, - 50000, serverCnx, "myrole-3", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null); + 50000, serverCnx, "myrole-3", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); topic.getSubscriptions().put(Codec.decode(cursor3.getName()), sub3); // Case1: cursors are active as haven't started deactivateBacklogCursor scan @@ -2065,7 +2066,7 @@ public void testCheckInactiveSubscriptions() throws Exception { addConsumerToSubscription.setAccessible(true); Consumer consumer = new Consumer(nonDeletableSubscription1, SubType.Shared, topic.getName(), 1, 0, "consumer1", - 50000, serverCnx, "app1", Collections.emptyMap(), false, InitialPosition.Latest, null); + 50000, serverCnx, "app1", Collections.emptyMap(), false, InitialPosition.Latest, null, MessageId.latest); addConsumerToSubscription.invoke(topic, nonDeletableSubscription1, consumer); when(pulsar.getConfigurationCache().policiesCache() diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java new file mode 100644 index 0000000000000..a73d1f5a5d171 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java @@ -0,0 +1,229 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.compaction; + +import static org.mockito.Mockito.anyLong; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.swagger.models.auth.In; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Random; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.client.BookKeeper; +import org.apache.bookkeeper.client.api.OpenBuilder; +import org.apache.bookkeeper.mledger.ManagedLedgerInfo; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.client.api.CompressionType; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.CryptoKeyReader; +import org.apache.pulsar.client.api.EncryptionKeyInfo; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageRoutingMode; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerBuilder; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Reader; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.impl.BatchMessageIdImpl; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; +import org.apache.pulsar.common.policies.data.RetentionPolicies; +import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker") +public class CompactionRetentionTest extends MockedPulsarServiceBaseTest { + private ScheduledExecutorService compactionScheduler; + private BookKeeper bk; + + @BeforeMethod + @Override + public void setup() throws Exception { + conf.setManagedLedgerMinLedgerRolloverTimeMinutes(0); + conf.setManagedLedgerMaxEntriesPerLedger(2); + super.internalSetup(); + + admin.clusters().createCluster("use", ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build()); + admin.tenants().createTenant("my-tenant", + new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("use"))); + admin.namespaces().createNamespace("my-tenant/use/my-ns"); + + compactionScheduler = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setNameFormat("compaction-%d").setDaemon(true).build()); + bk = pulsar.getBookKeeperClientFactory().create(this.conf, null, null, Optional.empty(), null); + } + + @AfterMethod(alwaysRun = true) + @Override + public void cleanup() throws Exception { + super.internalCleanup(); + + if (compactionScheduler != null) { + compactionScheduler.shutdownNow(); + } + } + + /** + * Compaction should retain expired keys in the compacted view + */ + @Test + public void testCompaction() throws Exception { + String topic = "persistent://my-tenant/use/my-ns/my-topic-" + System.nanoTime(); + + Set keys = Sets.newHashSet("a", "b", "c"); + Set keysToExpire = Sets.newHashSet("x1", "x2"); + Set allKeys = new HashSet<>(); + allKeys.addAll(keys); + allKeys.addAll(keysToExpire); + + ObjectMapper mapper = new ObjectMapper(); + mapper.configure(SerializationFeature.INDENT_OUTPUT, true); + + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.INT32) + .topic(topic) + .create(); + + Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); + compactor.compact(topic).join(); + + log.info(" ---- X 1: {}", mapper.writeValueAsString( + admin.topics().getInternalStats(topic, false))); + + int round = 1; + + for (String key : allKeys) { + producer.newMessage() + .key(key) + .value(round) + .send(); + } + + log.info(" ---- X 2: {}", mapper.writeValueAsString( + admin.topics().getInternalStats(topic, false))); + + validateMessages(pulsarClient, true, topic, round, allKeys); + + compactor.compact(topic).join(); + + log.info(" ---- X 3: {}", mapper.writeValueAsString( + admin.topics().getInternalStats(topic, false))); + + validateMessages(pulsarClient, true, topic, round, allKeys); + + round = 2; + + for (String key : allKeys) { + producer.newMessage() + .key(key) + .value(round) + .send(); + } + + compactor.compact(topic).join(); + + validateMessages(pulsarClient, true, topic, round, allKeys); + + // Now explicitly remove the expiring keys + for (String key : keysToExpire) { + producer.newMessage() + .key(key) + .send(); + } + + compactor.compact(topic).join(); + + log.info(" ---- X 4: {}", mapper.writeValueAsString( + admin.topics().getInternalStats(topic, false))); + + validateMessages(pulsarClient, true, topic, round, keys); + + // In the raw topic there should be no messages + validateMessages(pulsarClient, false, topic, round, Collections.emptySet()); + } + + private void validateMessages(PulsarClient client, boolean readCompacted, String topic, int round, Set expectedKeys) + throws Exception { + @Cleanup + Reader reader = client.newReader(Schema.INT32) + .topic(topic) + .startMessageId(MessageId.earliest) + .readCompacted(readCompacted) + .create(); + + Map receivedValues = new HashMap<>(); + + while (true) { + Message msg = reader.readNext(1, TimeUnit.SECONDS); + if (msg == null) { + break; + } + + Integer value = msg.size() > 0 ? msg.getValue() : null; + log.info("Received: {} -- value: {}", msg.getKey(), value); + if (value != null) { + receivedValues.put(msg.getKey(), value); + } + } + + Map expectedReceivedValues = new HashMap<>(); + expectedKeys.forEach(k -> expectedReceivedValues.put(k, round)); + + log.info("Received values: {}", receivedValues); + log.info("Expected values: {}", expectedReceivedValues); + assertEquals(receivedValues, expectedReceivedValues); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java index b921266a60c10..4c9604805603d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java @@ -50,6 +50,7 @@ import org.apache.pulsar.functions.LocalRunner; import org.apache.pulsar.functions.utils.FunctionCommon; import org.apache.pulsar.functions.worker.PulsarFunctionTestUtils; +import org.awaitility.Awaitility; import org.testng.annotations.Test; import com.google.common.collect.Lists; @@ -74,12 +75,19 @@ public void testReadCompactedSink() throws Exception { final int messageNum = 20; final int maxKeys = 10; // 1 Setup producer + @Cleanup Producer producer = pulsarClient.newProducer(Schema.STRING) .topic(sourceTopic) .enableBatching(false) .messageRoutingMode(MessageRoutingMode.SinglePartition) .create(); - pulsarClient.newConsumer().topic(sourceTopic).subscriptionName(subscriptionName).readCompacted(true).subscribe().close(); + pulsarClient.newConsumer() + .topic(sourceTopic) + .subscriptionName(subscriptionName) + .readCompacted(true) + .subscribe() + .close(); + // 2 Send messages and record the expected values after compaction Map expected = new HashMap<>(); for (int j = 0; j < messageNum; j++) { @@ -107,18 +115,12 @@ public void testReadCompactedSink() throws Exception { admin.sink().createSinkWithUrl(sinkConfig, jarFilePathUrl); // 5 Sink should only read compacted value,so we will only receive compacted messages - retryStrategically((test) -> { - try { - String prometheusMetrics = PulsarFunctionTestUtils.getPrometheusMetrics(pulsar.getListenPortHTTP().get()); - Map metrics = PulsarFunctionTestUtils.parseMetrics(prometheusMetrics); - PulsarFunctionTestUtils.Metric m = metrics.get("pulsar_sink_received_total"); - return m.value == (double) maxKeys; - } catch (Exception e) { - return false; - } - }, 50, 1000); - - producer.close(); + Awaitility.await().ignoreExceptions().untilAsserted(() -> { + String prometheusMetrics = PulsarFunctionTestUtils.getPrometheusMetrics(pulsar.getListenPortHTTP().get()); + Map metrics = PulsarFunctionTestUtils.parseMetrics(prometheusMetrics); + PulsarFunctionTestUtils.Metric m = metrics.get("pulsar_sink_received_total"); + assertEquals(m.value, maxKeys); + }); } @Test(timeOut = 30000)