From 1182337e88a6b8dcaafff75d571a918898b7e4c2 Mon Sep 17 00:00:00 2001 From: Aloys Date: Sun, 4 Jul 2021 14:47:12 +0800 Subject: [PATCH] fix publish_time not set error when broker entry metadata enable without AppendBrokerTimestampMetadataInterceptor (#11014) Fixes #11013 ### Motivation fix publish_time not set error when broker entry metadata enable without AppendBrokerTimestampMetadataInterceptor ### Modifications 1. add a new method named `getEntryTimestamp` which will return the `brokerEntryTimestamp` if `BrokerEntryMetadata` is enabled or otherwise return the `publishTime`. 2. using this `entryTimestamp` for expiry checking. --- .../PersistentMessageExpiryMonitor.java | 8 +--- .../persistent/PersistentMessageFinder.java | 8 +--- .../service/persistent/PersistentTopic.java | 19 +++----- .../pulsar/client/impl/MessageImpl.java | 44 +++++++------------ .../pulsar/client/impl/MessageImplTest.java | 17 ++++--- .../pulsar/sql/presto/PulsarSplitManager.java | 12 +---- 6 files changed, 35 insertions(+), 73 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java index cfed64d6cda774..c5b340330ad1d5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java @@ -73,17 +73,13 @@ public boolean expireMessages(int messageTTLInSeconds) { messageTTLInSeconds); cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, entry -> { - MessageImpl msg = null; try { - msg = MessageImpl.deserializeBrokerEntryMetaDataFirst(entry.getDataBuffer()); - return msg.isExpired(messageTTLInSeconds); + long entryTimestamp = MessageImpl.getEntryTimestamp(entry.getDataBuffer()); + return MessageImpl.isEntryExpired(messageTTLInSeconds, entryTimestamp); } catch (Exception e) { log.error("[{}][{}] Error deserializing message for expiry check", topicName, subName, e); } finally { entry.release(); - if (msg != null) { - msg.recycle(); - } } return false; }, this, null); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java index 346de2b346c32a..80a71ca64c4614 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java @@ -61,17 +61,13 @@ public void findMessages(final long timestamp, AsyncCallbacks.FindEntryCallback } cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries, entry -> { - MessageImpl msg = null; try { - msg = MessageImpl.deserializeBrokerEntryMetaDataFirst(entry.getDataBuffer()); - return msg.publishedEarlierThan(timestamp); + long entryTimestamp = MessageImpl.getEntryTimestamp(entry.getDataBuffer()); + return MessageImpl.isEntryPublishedEarlierThan(entryTimestamp, timestamp); } catch (Exception e) { log.error("[{}][{}] Error deserializing message for message position find", topicName, subName, e); } finally { entry.release(); - if (msg != null) { - msg.recycle(); - } } return false; }, this, callback); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 4ed047053f05e4..112c933a8d1e16 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -2371,10 +2371,9 @@ public boolean isTimeBacklogExceeded() { new AsyncCallbacks.ReadEntryCallback() { @Override public void readEntryComplete(Entry entry, Object ctx) { - MessageImpl msg = null; try { - msg = MessageImpl.deserializeBrokerEntryMetaDataFirst(entry.getDataBuffer()); - boolean expired = msg.isExpired(backlogQuotaLimitInSecond); + long entryTimestamp = MessageImpl.getEntryTimestamp(entry.getDataBuffer()); + boolean expired = MessageImpl.isEntryExpired(backlogQuotaLimitInSecond, entryTimestamp); if (expired && log.isDebugEnabled()) { log.debug("Time based backlog quota exceeded, oldest entry in cursor {}'s backlog" + "exceeded quota {}", ((ManagedLedgerImpl) ledger).getSlowestConsumer().getName(), @@ -2386,9 +2385,6 @@ public void readEntryComplete(Entry entry, Object ctx) { future.complete(false); } finally { entry.release(); - if (msg != null) { - msg.recycle(); - } } } @@ -2460,16 +2456,14 @@ public void terminateFailed(ManagedLedgerException exception, Object ctx) { } public boolean isOldestMessageExpired(ManagedCursor cursor, int messageTTLInSeconds) { - MessageImpl msg = null; Entry entry = null; boolean isOldestMessageExpired = false; try { entry = cursor.getNthEntry(1, IndividualDeletedEntries.Include); if (entry != null) { - msg = MessageImpl.deserializeBrokerEntryMetaDataFirst(entry.getDataBuffer()); - if (messageTTLInSeconds != 0) { - isOldestMessageExpired = msg.isExpired((int) (messageTTLInSeconds * MESSAGE_EXPIRY_THRESHOLD)); - } + long entryTimestamp = MessageImpl.getEntryTimestamp(entry.getDataBuffer()); + isOldestMessageExpired = MessageImpl.isEntryExpired( + (int) (messageTTLInSeconds * MESSAGE_EXPIRY_THRESHOLD), entryTimestamp); } } catch (Exception e) { log.warn("[{}] Error while getting the oldest message", topic, e); @@ -2477,9 +2471,6 @@ public boolean isOldestMessageExpired(ManagedCursor cursor, int messageTTLInSeco if (entry != null) { entry.release(); } - if (msg != null) { - msg.recycle(); - } } return isOldestMessageExpired; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java index c821096a8d4e8c..618aa0888ec109 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java @@ -38,10 +38,10 @@ import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; + import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.client.api.schema.KeyValueSchema; import org.apache.pulsar.client.impl.schema.AbstractSchema; import org.apache.pulsar.client.impl.schema.AutoConsumeSchema; import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl; @@ -269,31 +269,24 @@ public static MessageImpl deserialize(ByteBuf headersAndPayload) throws return msg; } - public static MessageImpl deserializeBrokerEntryMetaDataFirst( - ByteBuf headersAndPayloadWithBrokerEntryMetadata) throws IOException { - @SuppressWarnings("unchecked") - MessageImpl msg = (MessageImpl) RECYCLER.get(); - - msg.brokerEntryMetadata = + public static long getEntryTimestamp( ByteBuf headersAndPayloadWithBrokerEntryMetadata) throws IOException { + // get broker timestamp first if BrokerEntryMetadata is enabled with AppendBrokerTimestampMetadataInterceptor + BrokerEntryMetadata brokerEntryMetadata = Commands.parseBrokerEntryMetadataIfExist(headersAndPayloadWithBrokerEntryMetadata); - - if (msg.brokerEntryMetadata != null) { - msg.msgMetadata.clear(); - msg.payload = null; - msg.messageId = null; - msg.topic = null; - msg.cnx = null; - msg.properties = Collections.emptyMap(); - return msg; + if (brokerEntryMetadata != null && brokerEntryMetadata.hasBrokerTimestamp()) { + return brokerEntryMetadata.getBrokerTimestamp(); } + // otherwise get the publish_time + return Commands.parseMessageMetadata(headersAndPayloadWithBrokerEntryMetadata).getPublishTime(); + } - Commands.parseMessageMetadata(headersAndPayloadWithBrokerEntryMetadata, msg.msgMetadata); - msg.payload = headersAndPayloadWithBrokerEntryMetadata; - msg.messageId = null; - msg.topic = null; - msg.cnx = null; - msg.properties = Collections.emptyMap(); - return msg; + public static boolean isEntryExpired(int messageTTLInSeconds, long entryTimestamp) { + return messageTTLInSeconds != 0 && + (System.currentTimeMillis() > entryTimestamp + TimeUnit.SECONDS.toMillis(messageTTLInSeconds)); + } + + public static boolean isEntryPublishedEarlierThan(long entryTimestamp, long timestamp) { + return entryTimestamp < timestamp; } public static MessageImpl deserializeSkipBrokerEntryMetaData( @@ -350,11 +343,6 @@ public boolean isExpired(int messageTTLInSeconds) { brokerEntryMetadata.getBrokerTimestamp() + TimeUnit.SECONDS.toMillis(messageTTLInSeconds))); } - public boolean publishedEarlierThan(long timestamp) { - return brokerEntryMetadata == null || !brokerEntryMetadata.hasBrokerTimestamp() ? getPublishTime() < timestamp - : brokerEntryMetadata.getBrokerTimestamp() < timestamp; - } - @Override public byte[] getData() { if (msgMetadata.isNullValue()) { diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java index 17b77a2cab6a2e..4083c343d9518f 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java @@ -450,10 +450,9 @@ public void testMessageBrokerAndEntryMetadataTimestampMissed() { CompositeByteBuf compositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer(); compositeByteBuf.addComponents(true, brokerMeta, byteBuf); - MessageImpl messageWithEntryMetadata = MessageImpl.deserializeBrokerEntryMetaDataFirst(compositeByteBuf); - MessageImpl message = MessageImpl.deserializeSkipBrokerEntryMetaData(compositeByteBuf); - message.setBrokerEntryMetadata(messageWithEntryMetadata.getBrokerEntryMetadata()); - assertTrue(message.isExpired(100)); + long entryTimestamp = MessageImpl.getEntryTimestamp(compositeByteBuf); + assertTrue(MessageImpl.isEntryExpired(100, entryTimestamp)); + assertEquals(entryTimestamp, 1); // test BrokerTimestamp set. byteBuf = PulsarByteBufAllocator.DEFAULT.buffer(data.length(), data.length()); @@ -463,8 +462,9 @@ public void testMessageBrokerAndEntryMetadataTimestampMissed() { .setProducerName("test") .setSequenceId(1); byteBuf = Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata, byteBuf); + long brokerEntryTimestamp = System.currentTimeMillis(); brokerMetadata = new BrokerEntryMetadata() - .setBrokerTimestamp(System.currentTimeMillis()) + .setBrokerTimestamp(brokerEntryTimestamp) .setIndex(MOCK_BATCH_SIZE - 1); brokerMetaSize = brokerMetadata.getSerializedSize(); @@ -475,10 +475,9 @@ public void testMessageBrokerAndEntryMetadataTimestampMissed() { compositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer(); compositeByteBuf.addComponents(true, brokerMeta, byteBuf); - messageWithEntryMetadata = MessageImpl.deserializeBrokerEntryMetaDataFirst(compositeByteBuf); - message = MessageImpl.deserializeSkipBrokerEntryMetaData(compositeByteBuf); - message.setBrokerEntryMetadata(messageWithEntryMetadata.getBrokerEntryMetadata()); - assertFalse(message.isExpired(24 * 3600)); + entryTimestamp = MessageImpl.getEntryTimestamp(compositeByteBuf); + assertFalse(MessageImpl.isEntryExpired(24 * 3600, entryTimestamp)); + assertEquals(entryTimestamp, brokerEntryTimestamp); } catch (IOException e) { fail(); } diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java index eca1ba871053c1..18502a969a8b83 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java @@ -442,21 +442,13 @@ private static PositionImpl findPosition(ReadOnlyCursor readOnlyCursor, long tim return (PositionImpl) readOnlyCursor.findNewestMatching(SearchAllAvailableEntries, new Predicate() { @Override public boolean apply(Entry entry) { - - MessageImpl msg = null; try { - msg = MessageImpl.deserializeBrokerEntryMetaDataFirst(entry.getDataBuffer()); - return msg.getBrokerEntryMetadata() != null - ? msg.getBrokerEntryMetadata().getBrokerTimestamp() <= timestamp - : msg.getPublishTime() <= timestamp; - + long entryTimestamp = MessageImpl.getEntryTimestamp(entry.getDataBuffer()); + return entryTimestamp <= timestamp; } catch (Exception e) { log.error(e, "Failed To deserialize message when finding position with error: %s", e); } finally { entry.release(); - if (msg != null) { - msg.recycle(); - } } return false; }