Skip to content

Commit

Permalink
fix publish_time not set error when broker entry metadata enable with…
Browse files Browse the repository at this point in the history
…out AppendBrokerTimestampMetadataInterceptor
  • Loading branch information
aloyszhang committed Jun 28, 2021
1 parent c8fe1e8 commit fa6c401
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 71 deletions.
Expand Up @@ -73,17 +73,13 @@ public boolean expireMessages(int messageTTLInSeconds) {
messageTTLInSeconds);

cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, entry -> {
MessageImpl<byte[]> msg = null;
try {
msg = MessageImpl.deserializeBrokerEntryMetaDataFirst(entry.getDataBuffer());
return msg.isExpired(messageTTLInSeconds);
long entryTimestamp = MessageImpl.getEntryTimestamp(entry.getDataBuffer());
return MessageImpl.isEntryExpired(messageTTLInSeconds, entryTimestamp);
} catch (Exception e) {
log.error("[{}][{}] Error deserializing message for expiry check", topicName, subName, e);
} finally {
entry.release();
if (msg != null) {
msg.recycle();
}
}
return false;
}, this, null);
Expand Down
Expand Up @@ -61,17 +61,13 @@ public void findMessages(final long timestamp, AsyncCallbacks.FindEntryCallback
}

cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries, entry -> {
MessageImpl<byte[]> msg = null;
try {
msg = MessageImpl.deserializeBrokerEntryMetaDataFirst(entry.getDataBuffer());
return msg.publishedEarlierThan(timestamp);
long entryTimestamp = MessageImpl.getEntryTimestamp(entry.getDataBuffer());
return MessageImpl.isEntryPublishedEarlierThan(timestamp, entryTimestamp);
} catch (Exception e) {
log.error("[{}][{}] Error deserializing message for message position find", topicName, subName, e);
} finally {
entry.release();
if (msg != null) {
msg.recycle();
}
}
return false;
}, this, callback);
Expand Down
Expand Up @@ -2393,10 +2393,9 @@ public boolean isTimeBacklogExceeded() {
new AsyncCallbacks.ReadEntryCallback() {
@Override
public void readEntryComplete(Entry entry, Object ctx) {
MessageImpl<byte[]> msg = null;
try {
msg = MessageImpl.deserializeBrokerEntryMetaDataFirst(entry.getDataBuffer());
boolean expired = msg.isExpired(backlogQuotaLimitInSecond);
long entryTimestamp = MessageImpl.getEntryTimestamp(entry.getDataBuffer());
boolean expired = MessageImpl.isEntryExpired(backlogQuotaLimitInSecond, entryTimestamp);
if (expired && log.isDebugEnabled()) {
log.debug("Time based backlog quota exceeded, oldest entry in cursor {}'s backlog"
+ "exceeded quota {}", ((ManagedLedgerImpl) ledger).getSlowestConsumer().getName(),
Expand All @@ -2408,9 +2407,6 @@ public void readEntryComplete(Entry entry, Object ctx) {
future.complete(false);
} finally {
entry.release();
if (msg != null) {
msg.recycle();
}
}
}

Expand Down Expand Up @@ -2482,26 +2478,21 @@ public void terminateFailed(ManagedLedgerException exception, Object ctx) {
}

public boolean isOldestMessageExpired(ManagedCursor cursor, int messageTTLInSeconds) {
MessageImpl<byte[]> msg = null;
Entry entry = null;
boolean isOldestMessageExpired = false;
try {
entry = cursor.getNthEntry(1, IndividualDeletedEntries.Include);
if (entry != null) {
msg = MessageImpl.deserializeBrokerEntryMetaDataFirst(entry.getDataBuffer());
if (messageTTLInSeconds != 0) {
isOldestMessageExpired = msg.isExpired((int) (messageTTLInSeconds * MESSAGE_EXPIRY_THRESHOLD));
}
long entryTimestamp = MessageImpl.getEntryTimestamp(entry.getDataBuffer());
isOldestMessageExpired = MessageImpl.isEntryExpired(
(int) (messageTTLInSeconds * MESSAGE_EXPIRY_THRESHOLD), entryTimestamp);
}
} catch (Exception e) {
log.warn("[{}] Error while getting the oldest message", topic, e);
} finally {
if (entry != null) {
entry.release();
}
if (msg != null) {
msg.recycle();
}
}

return isOldestMessageExpired;
Expand Down
Expand Up @@ -269,31 +269,31 @@ public static MessageImpl<byte[]> deserialize(ByteBuf headersAndPayload) throws
return msg;
}

public static MessageImpl<byte[]> deserializeBrokerEntryMetaDataFirst(
ByteBuf headersAndPayloadWithBrokerEntryMetadata) throws IOException {
@SuppressWarnings("unchecked")
MessageImpl<byte[]> msg = (MessageImpl<byte[]>) RECYCLER.get();

msg.brokerEntryMetadata =
public static long getEntryTimestamp( ByteBuf headersAndPayloadWithBrokerEntryMetadata) throws IOException {
// get broker timestamp first if BrokerEntryMetadata is enabled with AppendBrokerTimestampMetadataInterceptor
BrokerEntryMetadata brokerEntryMetadata =
Commands.parseBrokerEntryMetadataIfExist(headersAndPayloadWithBrokerEntryMetadata);

if (msg.brokerEntryMetadata != null) {
msg.msgMetadata.clear();
msg.payload = null;
msg.messageId = null;
msg.topic = null;
msg.cnx = null;
msg.properties = Collections.emptyMap();
return msg;
if (brokerEntryMetadata != null && brokerEntryMetadata.hasBrokerTimestamp()) {
return brokerEntryMetadata.getBrokerTimestamp();
}
// otherwise get the publish_time,
MessageImpl<byte[]> msg = (MessageImpl<byte[]>) RECYCLER.get();
try {
Commands.parseMessageMetadata(headersAndPayloadWithBrokerEntryMetadata, msg.msgMetadata);
return msg.getPublishTime();
} finally {
// make sure msg can be recycled
msg.recycle();
}
}

Commands.parseMessageMetadata(headersAndPayloadWithBrokerEntryMetadata, msg.msgMetadata);
msg.payload = headersAndPayloadWithBrokerEntryMetadata;
msg.messageId = null;
msg.topic = null;
msg.cnx = null;
msg.properties = Collections.emptyMap();
return msg;
public static boolean isEntryExpired(int messageTTLInSeconds, long entryTimestamp) {
return messageTTLInSeconds != 0 &&
(System.currentTimeMillis() > entryTimestamp + TimeUnit.SECONDS.toMillis(messageTTLInSeconds));
}

public static boolean isEntryPublishedEarlierThan(long timestamp, long entryTimestamp) {
return entryTimestamp < timestamp;
}

public static MessageImpl<byte[]> deserializeSkipBrokerEntryMetaData(
Expand Down Expand Up @@ -350,11 +350,6 @@ public boolean isExpired(int messageTTLInSeconds) {
brokerEntryMetadata.getBrokerTimestamp() + TimeUnit.SECONDS.toMillis(messageTTLInSeconds)));
}

public boolean publishedEarlierThan(long timestamp) {
return brokerEntryMetadata == null || !brokerEntryMetadata.hasBrokerTimestamp() ? getPublishTime() < timestamp
: brokerEntryMetadata.getBrokerTimestamp() < timestamp;
}

@Override
public byte[] getData() {
if (msgMetadata.isNullValue()) {
Expand Down
Expand Up @@ -450,10 +450,8 @@ public void testMessageBrokerAndEntryMetadataTimestampMissed() {

CompositeByteBuf compositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer();
compositeByteBuf.addComponents(true, brokerMeta, byteBuf);
MessageImpl messageWithEntryMetadata = MessageImpl.deserializeBrokerEntryMetaDataFirst(compositeByteBuf);
MessageImpl message = MessageImpl.deserializeSkipBrokerEntryMetaData(compositeByteBuf);
message.setBrokerEntryMetadata(messageWithEntryMetadata.getBrokerEntryMetadata());
assertTrue(message.isExpired(100));
long entryTimestamp = MessageImpl.getEntryTimestamp(compositeByteBuf);
assertTrue(MessageImpl.isEntryExpired(100, entryTimestamp));

// test BrokerTimestamp set.
byteBuf = PulsarByteBufAllocator.DEFAULT.buffer(data.length(), data.length());
Expand All @@ -475,10 +473,8 @@ public void testMessageBrokerAndEntryMetadataTimestampMissed() {

compositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer();
compositeByteBuf.addComponents(true, brokerMeta, byteBuf);
messageWithEntryMetadata = MessageImpl.deserializeBrokerEntryMetaDataFirst(compositeByteBuf);
message = MessageImpl.deserializeSkipBrokerEntryMetaData(compositeByteBuf);
message.setBrokerEntryMetadata(messageWithEntryMetadata.getBrokerEntryMetadata());
assertFalse(message.isExpired(24 * 3600));
entryTimestamp = MessageImpl.getEntryTimestamp(compositeByteBuf);
assertFalse(MessageImpl.isEntryExpired(24 * 3600, entryTimestamp));
} catch (IOException e) {
fail();
}
Expand Down
Expand Up @@ -442,21 +442,13 @@ private static PositionImpl findPosition(ReadOnlyCursor readOnlyCursor, long tim
return (PositionImpl) readOnlyCursor.findNewestMatching(SearchAllAvailableEntries, new Predicate<Entry>() {
@Override
public boolean apply(Entry entry) {

MessageImpl<byte[]> msg = null;
try {
msg = MessageImpl.deserializeBrokerEntryMetaDataFirst(entry.getDataBuffer());
return msg.getBrokerEntryMetadata() != null
? msg.getBrokerEntryMetadata().getBrokerTimestamp() <= timestamp
: msg.getPublishTime() <= timestamp;

long entryTimestamp = MessageImpl.getEntryTimestamp(entry.getDataBuffer());
return entryTimestamp <= timestamp;
} catch (Exception e) {
log.error(e, "Failed To deserialize message when finding position with error: %s", e);
} finally {
entry.release();
if (msg != null) {
msg.recycle();
}
}
return false;
}
Expand Down

0 comments on commit fa6c401

Please sign in to comment.