Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16738: Returns BaseRecords instead of MemoryRecords #15935

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from

Conversation

luozhenyu
Copy link

@luozhenyu luozhenyu commented May 13, 2024

Related to https://issues.apache.org/jira/browse/KAFKA-16738

We can write a record which is a subtype of BaseRecords, but we can not read a record which is a subtype of BaseRecords. If we change the return type of Readable#readRecords from MemoryRecords to BaseRecords, we can override the implementation of readRecords and returns a subtype of BaseRecords easily.

We known that the MemoryRecords is based on JDK's ByteBuffer. We are developing a netty project(kroxylicious) and we want to create a subtype of BaseRecords like MemoryRecords based on netty's ByteBuf.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@@ -53,7 +54,7 @@ default List<RawTaggedField> readUnknownTaggedField(List<RawTaggedField> unknown
return unknowns;
}

default MemoryRecords readRecords(int length) {
default BaseRecords readRecords(int length) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in kafak serialization, we assume the impl of BaseRecords is MemoryRecords. For example:

        @Override
        public PartitionProduceData duplicate() {
            PartitionProduceData _duplicate = new PartitionProduceData();
            _duplicate.index = index;
            if (records == null) {
                _duplicate.records = null;
            } else {
                _duplicate.records = MemoryRecords.readableRecords(((MemoryRecords) records).buffer().duplicate());
            }
            return _duplicate;
        }

Hence, I'm not sure how this change works if you introduce a non-MemoryRecords impl.

Copy link
Author

@luozhenyu luozhenyu May 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hence, I'm not sure how this change works if you introduce a non-MemoryRecords impl.

Hard-coded duplicating of MemoryRecords is not elegant. So I add a duplicate method to BaseRecords. How do you think?

        @Override
        public PartitionProduceData duplicate() {
            PartitionProduceData _duplicate = new PartitionProduceData();
            _duplicate.index = index;
            if (records == null) {
                _duplicate.records = null;
            } else {
                _duplicate.records = records.duplicate();
            }
            return _duplicate;
        }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hard-coded duplicating of MemoryRecords is not elegant. So I add a duplicate method to BaseRecords. How do you think?

Could you file a KIP to let us know the whole picture? MemoryRecords is used in code base, so we need to be careful of changing that.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants