Skip to content

Commit

Permalink
fix publish_time not set error when broker entry metadata enable with…
Browse files Browse the repository at this point in the history
…out AppendBrokerTimestampMetadataInterceptor (#11014)

Fixes #11013

### Motivation

fix publish_time not set error when broker entry metadata enable without AppendBrokerTimestampMetadataInterceptor

### Modifications

1. add a new method named `getEntryTimestamp` which will return the `brokerEntryTimestamp` if `BrokerEntryMetadata` is enabled or otherwise return the `publishTime`.
2.  using this `entryTimestamp` for expiry checking.

(cherry picked from commit fe7cf67)
  • Loading branch information
aloyszhang authored and hangc0276 committed Aug 11, 2021
1 parent 41ad624 commit 598acec
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 72 deletions.
Expand Up @@ -73,17 +73,13 @@ public boolean expireMessages(int messageTTLInSeconds) {
messageTTLInSeconds);

cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, entry -> {
MessageImpl<byte[]> msg = null;
try {
msg = MessageImpl.deserializeBrokerEntryMetaDataFirst(entry.getDataBuffer());
return msg.isExpired(messageTTLInSeconds);
long entryTimestamp = MessageImpl.getEntryTimestamp(entry.getDataBuffer());
return MessageImpl.isEntryExpired(messageTTLInSeconds, entryTimestamp);
} catch (Exception e) {
log.error("[{}][{}] Error deserializing message for expiry check", topicName, subName, e);
} finally {
entry.release();
if (msg != null) {
msg.recycle();
}
}
return false;
}, this, null);
Expand Down
Expand Up @@ -61,17 +61,13 @@ public void findMessages(final long timestamp, AsyncCallbacks.FindEntryCallback
}

cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries, entry -> {
MessageImpl<byte[]> msg = null;
try {
msg = MessageImpl.deserializeBrokerEntryMetaDataFirst(entry.getDataBuffer());
return msg.publishedEarlierThan(timestamp);
long entryTimestamp = MessageImpl.getEntryTimestamp(entry.getDataBuffer());
return MessageImpl.isEntryPublishedEarlierThan(entryTimestamp, timestamp);
} catch (Exception e) {
log.error("[{}][{}] Error deserializing message for message position find", topicName, subName, e);
} finally {
entry.release();
if (msg != null) {
msg.recycle();
}
}
return false;
}, this, callback);
Expand Down
Expand Up @@ -2401,10 +2401,9 @@ public boolean isTimeBacklogExceeded() {
new AsyncCallbacks.ReadEntryCallback() {
@Override
public void readEntryComplete(Entry entry, Object ctx) {
MessageImpl<byte[]> msg = null;
try {
msg = MessageImpl.deserializeBrokerEntryMetaDataFirst(entry.getDataBuffer());
boolean expired = msg.isExpired(backlogQuotaLimitInSecond);
long entryTimestamp = MessageImpl.getEntryTimestamp(entry.getDataBuffer());
boolean expired = MessageImpl.isEntryExpired(backlogQuotaLimitInSecond, entryTimestamp);
if (expired && log.isDebugEnabled()) {
log.debug("Time based backlog quota exceeded, oldest entry in cursor {}'s backlog"
+ "exceeded quota {}", ((ManagedLedgerImpl) ledger).getSlowestConsumer().getName(),
Expand All @@ -2416,9 +2415,6 @@ public void readEntryComplete(Entry entry, Object ctx) {
future.complete(false);
} finally {
entry.release();
if (msg != null) {
msg.recycle();
}
}
}

Expand Down Expand Up @@ -2490,26 +2486,21 @@ public void terminateFailed(ManagedLedgerException exception, Object ctx) {
}

public boolean isOldestMessageExpired(ManagedCursor cursor, int messageTTLInSeconds) {
MessageImpl<byte[]> msg = null;
Entry entry = null;
boolean isOldestMessageExpired = false;
try {
entry = cursor.getNthEntry(1, IndividualDeletedEntries.Include);
if (entry != null) {
msg = MessageImpl.deserializeBrokerEntryMetaDataFirst(entry.getDataBuffer());
if (messageTTLInSeconds != 0) {
isOldestMessageExpired = msg.isExpired((int) (messageTTLInSeconds * MESSAGE_EXPIRY_THRESHOLD));
}
long entryTimestamp = MessageImpl.getEntryTimestamp(entry.getDataBuffer());
isOldestMessageExpired = MessageImpl.isEntryExpired(
(int) (messageTTLInSeconds * MESSAGE_EXPIRY_THRESHOLD), entryTimestamp);
}
} catch (Exception e) {
log.warn("[{}] Error while getting the oldest message", topic, e);
} finally {
if (entry != null) {
entry.release();
}
if (msg != null) {
msg.recycle();
}
}

return isOldestMessageExpired;
Expand Down
Expand Up @@ -38,6 +38,7 @@
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Schema;
Expand Down Expand Up @@ -268,31 +269,24 @@ public static MessageImpl<byte[]> deserialize(ByteBuf headersAndPayload) throws
return msg;
}

public static MessageImpl<byte[]> deserializeBrokerEntryMetaDataFirst(
ByteBuf headersAndPayloadWithBrokerEntryMetadata) throws IOException {
@SuppressWarnings("unchecked")
MessageImpl<byte[]> msg = (MessageImpl<byte[]>) RECYCLER.get();

msg.brokerEntryMetadata =
public static long getEntryTimestamp( ByteBuf headersAndPayloadWithBrokerEntryMetadata) throws IOException {
// get broker timestamp first if BrokerEntryMetadata is enabled with AppendBrokerTimestampMetadataInterceptor
BrokerEntryMetadata brokerEntryMetadata =
Commands.parseBrokerEntryMetadataIfExist(headersAndPayloadWithBrokerEntryMetadata);

if (msg.brokerEntryMetadata != null) {
msg.msgMetadata.clear();
msg.payload = null;
msg.messageId = null;
msg.topic = null;
msg.cnx = null;
msg.properties = Collections.emptyMap();
return msg;
if (brokerEntryMetadata != null && brokerEntryMetadata.hasBrokerTimestamp()) {
return brokerEntryMetadata.getBrokerTimestamp();
}
// otherwise get the publish_time
return Commands.parseMessageMetadata(headersAndPayloadWithBrokerEntryMetadata).getPublishTime();
}

Commands.parseMessageMetadata(headersAndPayloadWithBrokerEntryMetadata, msg.msgMetadata);
msg.payload = headersAndPayloadWithBrokerEntryMetadata;
msg.messageId = null;
msg.topic = null;
msg.cnx = null;
msg.properties = Collections.emptyMap();
return msg;
public static boolean isEntryExpired(int messageTTLInSeconds, long entryTimestamp) {
return messageTTLInSeconds != 0 &&
(System.currentTimeMillis() > entryTimestamp + TimeUnit.SECONDS.toMillis(messageTTLInSeconds));
}

public static boolean isEntryPublishedEarlierThan(long entryTimestamp, long timestamp) {
return entryTimestamp < timestamp;
}

public static MessageImpl<byte[]> deserializeSkipBrokerEntryMetaData(
Expand Down Expand Up @@ -356,11 +350,6 @@ public boolean isExpired(int messageTTLInSeconds) {
brokerEntryMetadata.getBrokerTimestamp() + TimeUnit.SECONDS.toMillis(messageTTLInSeconds)));
}

public boolean publishedEarlierThan(long timestamp) {
return brokerEntryMetadata == null || !brokerEntryMetadata.hasBrokerTimestamp() ? getPublishTime() < timestamp
: brokerEntryMetadata.getBrokerTimestamp() < timestamp;
}

@Override
public byte[] getData() {
if (msgMetadata.isNullValue()) {
Expand Down
Expand Up @@ -450,10 +450,9 @@ public void testMessageBrokerAndEntryMetadataTimestampMissed() {

CompositeByteBuf compositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer();
compositeByteBuf.addComponents(true, brokerMeta, byteBuf);
MessageImpl messageWithEntryMetadata = MessageImpl.deserializeBrokerEntryMetaDataFirst(compositeByteBuf);
MessageImpl message = MessageImpl.deserializeSkipBrokerEntryMetaData(compositeByteBuf);
message.setBrokerEntryMetadata(messageWithEntryMetadata.getBrokerEntryMetadata());
assertTrue(message.isExpired(100));
long entryTimestamp = MessageImpl.getEntryTimestamp(compositeByteBuf);
assertTrue(MessageImpl.isEntryExpired(100, entryTimestamp));
assertEquals(entryTimestamp, 1);

// test BrokerTimestamp set.
byteBuf = PulsarByteBufAllocator.DEFAULT.buffer(data.length(), data.length());
Expand All @@ -463,8 +462,9 @@ public void testMessageBrokerAndEntryMetadataTimestampMissed() {
.setProducerName("test")
.setSequenceId(1);
byteBuf = Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata, byteBuf);
long brokerEntryTimestamp = System.currentTimeMillis();
brokerMetadata = new BrokerEntryMetadata()
.setBrokerTimestamp(System.currentTimeMillis())
.setBrokerTimestamp(brokerEntryTimestamp)
.setIndex(MOCK_BATCH_SIZE - 1);

brokerMetaSize = brokerMetadata.getSerializedSize();
Expand All @@ -475,10 +475,9 @@ public void testMessageBrokerAndEntryMetadataTimestampMissed() {

compositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer();
compositeByteBuf.addComponents(true, brokerMeta, byteBuf);
messageWithEntryMetadata = MessageImpl.deserializeBrokerEntryMetaDataFirst(compositeByteBuf);
message = MessageImpl.deserializeSkipBrokerEntryMetaData(compositeByteBuf);
message.setBrokerEntryMetadata(messageWithEntryMetadata.getBrokerEntryMetadata());
assertFalse(message.isExpired(24 * 3600));
entryTimestamp = MessageImpl.getEntryTimestamp(compositeByteBuf);
assertFalse(MessageImpl.isEntryExpired(24 * 3600, entryTimestamp));
assertEquals(entryTimestamp, brokerEntryTimestamp);
} catch (IOException e) {
fail();
}
Expand Down
Expand Up @@ -442,21 +442,13 @@ private static PositionImpl findPosition(ReadOnlyCursor readOnlyCursor, long tim
return (PositionImpl) readOnlyCursor.findNewestMatching(SearchAllAvailableEntries, new Predicate<Entry>() {
@Override
public boolean apply(Entry entry) {

MessageImpl<byte[]> msg = null;
try {
msg = MessageImpl.deserializeBrokerEntryMetaDataFirst(entry.getDataBuffer());
return msg.getBrokerEntryMetadata() != null
? msg.getBrokerEntryMetadata().getBrokerTimestamp() <= timestamp
: msg.getPublishTime() <= timestamp;

long entryTimestamp = MessageImpl.getEntryTimestamp(entry.getDataBuffer());
return entryTimestamp <= timestamp;
} catch (Exception e) {
log.error(e, "Failed To deserialize message when finding position with error: %s", e);
} finally {
entry.release();
if (msg != null) {
msg.recycle();
}
}
return false;
}
Expand Down

0 comments on commit 598acec

Please sign in to comment.