Skip to content

Commit

Permalink
fix memory leak
Browse files Browse the repository at this point in the history
  • Loading branch information
aloyszhang committed Jun 28, 2021
1 parent 8d257ab commit d4220f1
Show file tree
Hide file tree
Showing 5 changed files with 3 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,13 @@ public boolean expireMessages(int messageTTLInSeconds) {
messageTTLInSeconds);

cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, entry -> {
MessageImpl<byte[]> msg = null;
try {
long entryTimestamp = MessageImpl.getEntryTimestamp(entry.getDataBuffer());
return MessageImpl.isEntryExpired(messageTTLInSeconds, entryTimestamp);
} catch (Exception e) {
log.error("[{}][{}] Error deserializing message for expiry check", topicName, subName, e);
} finally {
entry.release();
if (msg != null) {
msg.recycle();
}
}
return false;
}, this, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,17 +61,13 @@ public void findMessages(final long timestamp, AsyncCallbacks.FindEntryCallback
}

cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries, entry -> {
MessageImpl<byte[]> msg = null;
try {
long entryTimestamp = MessageImpl.getEntryTimestamp(entry.getDataBuffer());
return MessageImpl.isEntryPublishedEarlierThan(timestamp, entryTimestamp);
} catch (Exception e) {
log.error("[{}][{}] Error deserializing message for message position find", topicName, subName, e);
} finally {
entry.release();
if (msg != null) {
msg.recycle();
}
}
return false;
}, this, callback);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2393,7 +2393,6 @@ public boolean isTimeBacklogExceeded() {
new AsyncCallbacks.ReadEntryCallback() {
@Override
public void readEntryComplete(Entry entry, Object ctx) {
MessageImpl<byte[]> msg = null;
try {
long entryTimestamp = MessageImpl.getEntryTimestamp(entry.getDataBuffer());
boolean expired = MessageImpl.isEntryExpired(backlogQuotaLimitInSecond, entryTimestamp);
Expand All @@ -2408,9 +2407,6 @@ public void readEntryComplete(Entry entry, Object ctx) {
future.complete(false);
} finally {
entry.release();
if (msg != null) {
msg.recycle();
}
}
}

Expand Down Expand Up @@ -2482,7 +2478,6 @@ public void terminateFailed(ManagedLedgerException exception, Object ctx) {
}

public boolean isOldestMessageExpired(ManagedCursor cursor, int messageTTLInSeconds) {
MessageImpl<byte[]> msg = null;
Entry entry = null;
boolean isOldestMessageExpired = false;
try {
Expand All @@ -2498,9 +2493,6 @@ public boolean isOldestMessageExpired(ManagedCursor cursor, int messageTTLInSeco
if (entry != null) {
entry.release();
}
if (msg != null) {
msg.recycle();
}
}

return isOldestMessageExpired;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,9 @@ public static long getEntryTimestamp( ByteBuf headersAndPayloadWithBrokerEntryMe
// otherwise get the publish_time
MessageImpl<byte[]> msg = (MessageImpl<byte[]>) RECYCLER.get();
Commands.parseMessageMetadata(headersAndPayloadWithBrokerEntryMetadata, msg.msgMetadata);
return msg.getPublishTime();
long entryTimestamp = msg.getPublishTime();
msg.recycle();
return entryTimestamp;
}

public static boolean isEntryExpired(int messageTTLInSeconds, long entryTimestamp) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,18 +442,13 @@ private static PositionImpl findPosition(ReadOnlyCursor readOnlyCursor, long tim
return (PositionImpl) readOnlyCursor.findNewestMatching(SearchAllAvailableEntries, new Predicate<Entry>() {
@Override
public boolean apply(Entry entry) {

MessageImpl<byte[]> msg = null;
try {
long entryTimestamp = MessageImpl.getEntryTimestamp(entry.getDataBuffer());
return entryTimestamp <= timestamp;
} catch (Exception e) {
log.error(e, "Failed To deserialize message when finding position with error: %s", e);
} finally {
entry.release();
if (msg != null) {
msg.recycle();
}
}
return false;
}
Expand Down

0 comments on commit d4220f1

Please sign in to comment.