Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Knowledge gathering question] How does librdkafka creates and sends batches over the network to the broker? #1148

Open
4 of 7 tasks
Sam-sad-Sajid opened this issue Mar 6, 2024 · 2 comments
Labels

Comments

@Sam-sad-Sajid
Copy link

Sam-sad-Sajid commented Mar 6, 2024

Description

Hi. I have been using confluent-kafka-go for some time. But all this time I have been wondering about how librdkafka creates and sends batches over the network to the brokers.

Here is an example.

I have a topic with 20 partitions.
I enabled the debug log in the producer config.
I am sending two messages on the same topic with different keys.
In the debug log I see the following

topic_A [6] 1 message(s) in xmit queue (1 added from partition queue)
topic_A [6]: Produce MessageSet with 1 message(s) (161 bytes, ApiVersion 7, MsgVersion 2, MsgId 0, BaseSeq -1, PID{Invalid}, uncompressed)
topic_A [19] 1 message(s) in xmit queue (1 added from partition queue)
topic_A [19]: Produce MessageSet with 1 message(s) (161 bytes, ApiVersion 7, MsgVersion 2, MsgId 0, BaseSeq -1, PID{Invalid}, uncompressed)
topic_A [6]: MessageSet with 1 message(s) (MsgId 0, BaseSeq -1) delivered
topic_A [19]: MessageSet with 1 message(s) (MsgId 0, BaseSeq -1) delivered

I understand that the two messages are going to partition 6 and partition 19. Reading the librdkafka documentation, it says Ref

waiting for a certain amount of messages to accumulate in the local queue before sending them off in one large message set or batch to the peer.

In the case of the above example, what does this mean?

Is it that,

locally, librdkafka will create 20 in-memory queues for each partition of the topic, calculate the partition (6 and 19 for the above), enqueue them in-memory, then take all 20 in-memory queues as "one batch" and send it to the leader broker?

Or is it that,
each partition is "one batch" and since only two partitions have messages, librdkafka will create and send two batches to the leader broker.


How does delivery.timeout.ms apply to the batch?

So, based on the above example, if it is one batch or multiple batches how does the delivery.timeout.ms applies to the batch? My understanding was it is per message not per batch. A clarification would be great.

How to reproduce

Having a multi-partitions topic and enabling debug settings in producers and running the set-up locally.

Checklist

Please provide the following information:

  • confluent-kafka-go and librdkafka version (LibraryVersion()): 2.2, 2.2
  • Apache Kafka broker version: 2.8.0
  • Client configuration: ConfigMap{...}
  • Operating system: linux
  • Provide client logs (with "debug": ".." as necessary)
  • Provide broker log excerpts
  • Critical issue: No
@Sam-sad-Sajid Sam-sad-Sajid changed the title How does librdkafka creates and sends batches over the network to the broker? How does librdkafka creates and sends batches over the network to the broker? [Knowledge gathering question] Mar 6, 2024
@Sam-sad-Sajid Sam-sad-Sajid changed the title How does librdkafka creates and sends batches over the network to the broker? [Knowledge gathering question] [Knowledge gathering question] How does librdkafka creates and sends batches over the network to the broker? Mar 6, 2024
@Sam-sad-Sajid
Copy link
Author

Hi @milindl 👋
Sorry for the tag. Wondering if you can share some insight here?

@milindl
Copy link
Contributor

milindl commented Apr 25, 2024

Hi @Sam-sad-Sajid :

  1. When you produce, the message is enqueued into a topic-partition queue (which are created per-partition in memory).
  2. For each topic partition queue (toppar queue) with enqueued messages, the messages are sent when (current time - time of enqueuing for any message on the toppar queue) > linger.ms, or else if the size of the messages enqueued in that toppar queue exceeds batch.num.messages or batch.size. (There are some exceptions to this but this is the general case.)
  3. The entire batch is sent to the leader of that partition. Messages from one batch are sent together, multiple batches are not grouped together even if their topic partitions have the same leader.
  4. The Produce request with the batch is sent with timeout of the first message enqueued in the batch (ie delivery.timeout.ms + time the message was enqueued).
  5. The delivery timeout is still applicable on a per-message basis - for actually making it work with (4), if the batch times out as per the timeout in (4), but some messages within the batch are still within the per-message timeout, they are re-enqueued within the toppar queue and are retried with the next batch to be sent.

In practice, though, the delivery timeout is in seconds, while the linger.ms is in milliseconds, so if one message in a batch times out, probably the entire batch has timed out.

Hope that helps

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants