Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix publish_time not set error when broker entry metadata enable without AppendBrokerTimestampMetadataInterceptor #11014

Merged
merged 5 commits into from Jul 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -73,17 +73,13 @@ public boolean expireMessages(int messageTTLInSeconds) {
messageTTLInSeconds);

cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, entry -> {
MessageImpl<byte[]> msg = null;
try {
msg = MessageImpl.deserializeBrokerEntryMetaDataFirst(entry.getDataBuffer());
return msg.isExpired(messageTTLInSeconds);
long entryTimestamp = MessageImpl.getEntryTimestamp(entry.getDataBuffer());
return MessageImpl.isEntryExpired(messageTTLInSeconds, entryTimestamp);
} catch (Exception e) {
log.error("[{}][{}] Error deserializing message for expiry check", topicName, subName, e);
} finally {
entry.release();
if (msg != null) {
msg.recycle();
}
}
return false;
}, this, null);
Expand Down
Expand Up @@ -61,17 +61,13 @@ public void findMessages(final long timestamp, AsyncCallbacks.FindEntryCallback
}

cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries, entry -> {
MessageImpl<byte[]> msg = null;
try {
msg = MessageImpl.deserializeBrokerEntryMetaDataFirst(entry.getDataBuffer());
return msg.publishedEarlierThan(timestamp);
long entryTimestamp = MessageImpl.getEntryTimestamp(entry.getDataBuffer());
return MessageImpl.isEntryPublishedEarlierThan(entryTimestamp, timestamp);
} catch (Exception e) {
log.error("[{}][{}] Error deserializing message for message position find", topicName, subName, e);
} finally {
entry.release();
if (msg != null) {
msg.recycle();
}
}
return false;
}, this, callback);
Expand Down
Expand Up @@ -2393,10 +2393,9 @@ public boolean isTimeBacklogExceeded() {
new AsyncCallbacks.ReadEntryCallback() {
@Override
public void readEntryComplete(Entry entry, Object ctx) {
MessageImpl<byte[]> msg = null;
try {
msg = MessageImpl.deserializeBrokerEntryMetaDataFirst(entry.getDataBuffer());
boolean expired = msg.isExpired(backlogQuotaLimitInSecond);
long entryTimestamp = MessageImpl.getEntryTimestamp(entry.getDataBuffer());
boolean expired = MessageImpl.isEntryExpired(backlogQuotaLimitInSecond, entryTimestamp);
if (expired && log.isDebugEnabled()) {
log.debug("Time based backlog quota exceeded, oldest entry in cursor {}'s backlog"
+ "exceeded quota {}", ((ManagedLedgerImpl) ledger).getSlowestConsumer().getName(),
Expand All @@ -2408,9 +2407,6 @@ public void readEntryComplete(Entry entry, Object ctx) {
future.complete(false);
} finally {
entry.release();
if (msg != null) {
msg.recycle();
}
}
}

Expand Down Expand Up @@ -2482,26 +2478,21 @@ public void terminateFailed(ManagedLedgerException exception, Object ctx) {
}

public boolean isOldestMessageExpired(ManagedCursor cursor, int messageTTLInSeconds) {
MessageImpl<byte[]> msg = null;
Entry entry = null;
boolean isOldestMessageExpired = false;
try {
entry = cursor.getNthEntry(1, IndividualDeletedEntries.Include);
if (entry != null) {
msg = MessageImpl.deserializeBrokerEntryMetaDataFirst(entry.getDataBuffer());
if (messageTTLInSeconds != 0) {
isOldestMessageExpired = msg.isExpired((int) (messageTTLInSeconds * MESSAGE_EXPIRY_THRESHOLD));
}
long entryTimestamp = MessageImpl.getEntryTimestamp(entry.getDataBuffer());
isOldestMessageExpired = MessageImpl.isEntryExpired(
(int) (messageTTLInSeconds * MESSAGE_EXPIRY_THRESHOLD), entryTimestamp);
}
} catch (Exception e) {
log.warn("[{}] Error while getting the oldest message", topic, e);
} finally {
if (entry != null) {
entry.release();
}
if (msg != null) {
msg.recycle();
}
}

return isOldestMessageExpired;
Expand Down
Expand Up @@ -38,10 +38,10 @@
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.KeyValueSchema;
import org.apache.pulsar.client.impl.schema.AbstractSchema;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
Expand Down Expand Up @@ -269,31 +269,24 @@ public static MessageImpl<byte[]> deserialize(ByteBuf headersAndPayload) throws
return msg;
}

public static MessageImpl<byte[]> deserializeBrokerEntryMetaDataFirst(
ByteBuf headersAndPayloadWithBrokerEntryMetadata) throws IOException {
@SuppressWarnings("unchecked")
MessageImpl<byte[]> msg = (MessageImpl<byte[]>) RECYCLER.get();

msg.brokerEntryMetadata =
public static long getEntryTimestamp( ByteBuf headersAndPayloadWithBrokerEntryMetadata) throws IOException {
// get broker timestamp first if BrokerEntryMetadata is enabled with AppendBrokerTimestampMetadataInterceptor
BrokerEntryMetadata brokerEntryMetadata =
Commands.parseBrokerEntryMetadataIfExist(headersAndPayloadWithBrokerEntryMetadata);

if (msg.brokerEntryMetadata != null) {
msg.msgMetadata.clear();
msg.payload = null;
msg.messageId = null;
msg.topic = null;
msg.cnx = null;
msg.properties = Collections.emptyMap();
return msg;
if (brokerEntryMetadata != null && brokerEntryMetadata.hasBrokerTimestamp()) {
return brokerEntryMetadata.getBrokerTimestamp();
}
// otherwise get the publish_time
return Commands.parseMessageMetadata(headersAndPayloadWithBrokerEntryMetadata).getPublishTime();
}

Commands.parseMessageMetadata(headersAndPayloadWithBrokerEntryMetadata, msg.msgMetadata);
msg.payload = headersAndPayloadWithBrokerEntryMetadata;
msg.messageId = null;
msg.topic = null;
msg.cnx = null;
msg.properties = Collections.emptyMap();
return msg;
public static boolean isEntryExpired(int messageTTLInSeconds, long entryTimestamp) {
return messageTTLInSeconds != 0 &&
(System.currentTimeMillis() > entryTimestamp + TimeUnit.SECONDS.toMillis(messageTTLInSeconds));
}

public static boolean isEntryPublishedEarlierThan(long entryTimestamp, long timestamp) {
return entryTimestamp < timestamp;
}

public static MessageImpl<byte[]> deserializeSkipBrokerEntryMetaData(
Expand Down Expand Up @@ -350,11 +343,6 @@ public boolean isExpired(int messageTTLInSeconds) {
brokerEntryMetadata.getBrokerTimestamp() + TimeUnit.SECONDS.toMillis(messageTTLInSeconds)));
}

public boolean publishedEarlierThan(long timestamp) {
return brokerEntryMetadata == null || !brokerEntryMetadata.hasBrokerTimestamp() ? getPublishTime() < timestamp
: brokerEntryMetadata.getBrokerTimestamp() < timestamp;
}

@Override
public byte[] getData() {
if (msgMetadata.isNullValue()) {
Expand Down
Expand Up @@ -450,10 +450,9 @@ public void testMessageBrokerAndEntryMetadataTimestampMissed() {

CompositeByteBuf compositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer();
compositeByteBuf.addComponents(true, brokerMeta, byteBuf);
MessageImpl messageWithEntryMetadata = MessageImpl.deserializeBrokerEntryMetaDataFirst(compositeByteBuf);
MessageImpl message = MessageImpl.deserializeSkipBrokerEntryMetaData(compositeByteBuf);
message.setBrokerEntryMetadata(messageWithEntryMetadata.getBrokerEntryMetadata());
assertTrue(message.isExpired(100));
long entryTimestamp = MessageImpl.getEntryTimestamp(compositeByteBuf);
aloyszhang marked this conversation as resolved.
Show resolved Hide resolved
assertTrue(MessageImpl.isEntryExpired(100, entryTimestamp));
assertEquals(entryTimestamp, 1);

// test BrokerTimestamp set.
byteBuf = PulsarByteBufAllocator.DEFAULT.buffer(data.length(), data.length());
Expand All @@ -463,8 +462,9 @@ public void testMessageBrokerAndEntryMetadataTimestampMissed() {
.setProducerName("test")
.setSequenceId(1);
byteBuf = Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata, byteBuf);
long brokerEntryTimestamp = System.currentTimeMillis();
brokerMetadata = new BrokerEntryMetadata()
.setBrokerTimestamp(System.currentTimeMillis())
.setBrokerTimestamp(brokerEntryTimestamp)
.setIndex(MOCK_BATCH_SIZE - 1);

brokerMetaSize = brokerMetadata.getSerializedSize();
Expand All @@ -475,10 +475,9 @@ public void testMessageBrokerAndEntryMetadataTimestampMissed() {

compositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer();
compositeByteBuf.addComponents(true, brokerMeta, byteBuf);
messageWithEntryMetadata = MessageImpl.deserializeBrokerEntryMetaDataFirst(compositeByteBuf);
message = MessageImpl.deserializeSkipBrokerEntryMetaData(compositeByteBuf);
message.setBrokerEntryMetadata(messageWithEntryMetadata.getBrokerEntryMetadata());
assertFalse(message.isExpired(24 * 3600));
entryTimestamp = MessageImpl.getEntryTimestamp(compositeByteBuf);
assertFalse(MessageImpl.isEntryExpired(24 * 3600, entryTimestamp));
assertEquals(entryTimestamp, brokerEntryTimestamp);
} catch (IOException e) {
fail();
}
Expand Down
Expand Up @@ -442,21 +442,13 @@ private static PositionImpl findPosition(ReadOnlyCursor readOnlyCursor, long tim
return (PositionImpl) readOnlyCursor.findNewestMatching(SearchAllAvailableEntries, new Predicate<Entry>() {
@Override
public boolean apply(Entry entry) {

MessageImpl<byte[]> msg = null;
try {
msg = MessageImpl.deserializeBrokerEntryMetaDataFirst(entry.getDataBuffer());
return msg.getBrokerEntryMetadata() != null
? msg.getBrokerEntryMetadata().getBrokerTimestamp() <= timestamp
: msg.getPublishTime() <= timestamp;

long entryTimestamp = MessageImpl.getEntryTimestamp(entry.getDataBuffer());
return entryTimestamp <= timestamp;
} catch (Exception e) {
log.error(e, "Failed To deserialize message when finding position with error: %s", e);
} finally {
entry.release();
if (msg != null) {
msg.recycle();
}
}
return false;
}
Expand Down