Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Possible Message Batching Bug? #2797

Open
road-cycling opened this issue Feb 11, 2024 · 3 comments
Open

Possible Message Batching Bug? #2797

road-cycling opened this issue Feb 11, 2024 · 3 comments
Labels
needs-investigation Issues that require followup from maintainers

Comments

@road-cycling
Copy link

Description

I'm noticing Kafka Broker batched message submissions are getting rejected due to the message being too large. Initially I thought this was due to the post compressed message size being too large however, logs added to the Kafka.Errors feedback channel (as shown below) displayed messages as small as 900B being rejected.

Logs

2024/02/11 05:32:57 Message Too Large(497922): kafka: Failed to produce message to topic metrics-debug: kafka server: Message was too large, server rejected it to avoid allocation error
2024/02/11 05:32:57 Message Too Large(2394856): kafka: Failed to produce message to topic metrics-debug: kafka server: Message was too large, server rejected it to avoid allocation error
2024/02/11 05:32:57 Message Too Large(919): kafka: Failed to produce message to topic metrics-debug: kafka server: Message was too large, server rejected it to avoid allocation error
2024/02/11 05:32:57 Message Too Large(188553): kafka: Failed to produce message to topic metrics-debug: kafka server: Message was too large, server rejected it to avoid allocation error
2024/02/11 05:32:57 Message Too Large(3805): kafka: Failed to produce message to topic metrics-debug: kafka server: Message was too large, server rejected it to avoid allocation error
2024/02/11 05:32:57 Message Too Large(1742883): kafka: Failed to produce message to topic metrics-debug: kafka server: Message was too large, server rejected it to avoid allocation error
2024/02/11 05:32:57 Message Too Large(6544): kafka: Failed to produce message to topic metrics-debug: kafka server: Message was too large, server rejected it to avoid allocation error
2024/02/11 05:32:57 Message Too Large(1216): kafka: Failed to produce message to topic metrics-debug: kafka server: Message was too large, server rejected it to avoid allocation error
2024/02/11 05:32:57 Message Too Large(1164809): kafka: Failed to produce message to topic metrics-debug: kafka server: Message was too large, server rejected it to avoid allocation error
2024/02/11 05:32:57 Message Too Large(6546): kafka: Failed to produce message to topic metrics-debug: kafka server: Message was too large, server rejected it to avoid allocation error
2024/02/11 05:32:57 Message Too Large(903): kafka: Failed to produce message to topic metrics-debug: kafka server: Message was too large, server rejected it to avoid allocation error
2024/02/11 05:32:57 Message Too Large(4911): kafka: Failed to produce message to topic metrics-debug: kafka server: Message was too large, server rejected it to avoid allocation error
2024/02/11 05:32:57 Message Too Large(4911): kafka: Failed to produce message to topic metrics-debug: kafka server: Message was too large, server rejected it to avoid allocation error

Logging Code

case kafkaErr := <-asyncProducer.Errors():
switch unwrapedErr := kafkaErr.Err.(type) {
case sarama.KError:
	switch unwrapedErr {
	case sarama.ErrMessageSizeTooLarge:
		log.Printf("Message Too Large(%d) %s\n", kafkaErr.Msg.Value.Length(), kafkaErr.Error())
		metrics.KafkaErrorOtel.Add(ctx, 1, attribute.String("error", "message_size_too_large"))

Looking closer, these errors were bursty and occurs all at once. I know that one kafka client will take batching errors and resubmit them after breaking them down but looking at the code in this repo, this is not the case. I was looking through the code + docs and saw that most of the configs are stated as 'best-effort' with the docs stating 'By default, messages are sent as fast as possible, and all messages received while the current batch is in-flight are placed into the subsequent batch.' Is it possible here that the buffers are filling too quickly past the limit when an existing publish is in flight, and then is over the size?

Versions

Note: I'm unable to reproduce this on my laptop. I can only reproduce this in a production environemnt where the throughput specs are 350GB/min, 13M messages consumed/min, 13M messages published/min. Adding more hosts to the cluster does alleviate the issue even though CPU/Memory are very low

Sarama Kafka Go
v1.38.1 TODO 1.21.3
Configuration
       // This is done due to the bug checking pre/post compressed incoming message sizes.
	saramaConfig.Producer.MaxMessageBytes = int(sarama.MaxRequestSize)
	saramaConfig.Producer.Compression = sarama.CompressionGZIP
	saramaConfig.Producer.Return.Successes = true
Logs
Additional Context
@dnwe dnwe added the needs-investigation Issues that require followup from maintainers label Feb 11, 2024
@dnwe
Copy link
Collaborator

dnwe commented Feb 11, 2024

@road-cycling thanks for reporting this, would it be possible for you to upgrade to a newer version of Sarama? At least v1.41.1, but ideally picking up the latest v1.42.2. In particular #2628 disambiguated between messages failing a local client-side Producer.MaxMessageBytes check, versus those being genuinely rejected by the remote cluster so that would help to narrow down where you're seeing this occur

@road-cycling
Copy link
Author

road-cycling commented Feb 11, 2024

Using IBM:sarama:v1.42.2

Logs

2024/02/11 22:34:04 Message Too Large(909) kafka: Failed to produce message to topic metrics-debug: kafka server: Message was too large, server rejected it to avoid allocation error
2024/02/11 22:34:04 Message Too Large(1212828) kafka: Failed to produce message to topic metrics-debug: kafka server: Message was too large, server rejected it to avoid allocation error
2024/02/11 22:34:04 Message Too Large(804014) kafka: Failed to produce message to topic metrics-debug: kafka server: Message was too large, server rejected it to avoid allocation error
2024/02/11 22:34:04 Message Too Large(1733361) kafka: Failed to produce message to topic metrics-debug: kafka server: Message was too large, server rejected it to avoid allocation error
2024/02/11 22:34:04 Message Too Large(1739543) kafka: Failed to produce message to topic metrics-debug: kafka server: Message was too large, server rejected it to avoid allocation error
2024/02/11 22:34:04 Message Too Large(1000719) kafka: Failed to produce message to topic metrics-debug: kafka server: Message was too large, server rejected it to avoid allocation error
2024/02/11 22:34:04 Message Too Large(465610) kafka: Failed to produce message to topic metrics-debug: kafka server: Message was too large, server rejected it to avoid allocation error
2024/02/11 22:34:04 Message Too Large(1750667) kafka: Failed to produce message to topic metrics-debug: kafka server: Message was too large, server rejected it to avoid allocation error
2024/02/11 22:34:04 Message Too Large(1206) kafka: Failed to produce message to topic metrics-debug: kafka server: Message was too large, server rejected it to avoid allocation error
2024/02/11 22:34:04 Message Too Large(1171) kafka: Failed to produce message to topic metrics-debug: kafka server: Message was too large, server rejected it to avoid allocation error
2024/02/11 22:34:04 Message Too Large(6544) kafka: Failed to produce message to topic metrics-debug: kafka server: Message was too large, server rejected it to avoid allocation error
2024/02/11 22:35:56 Message Too Large(1790812) kafka: Failed to produce message to topic metrics-debug: kafka server: Message was too large, server rejected it to avoid allocation error
2024/02/11 22:35:56 Message Too Large(6544) kafka: Failed to produce message to topic metrics-debug: kafka server: Message was too large, server rejected it to avoid allocation error
2024/02/11 22:35:56 Message Too Large(6745518) kafka: Failed to produce message to topic metrics-debug: kafka server: Message was too large, server rejected it to avoid allocation error
2024/02/11 22:35:56 Message Too Large(6546) kafka: Failed to produce message to topic metrics-debug: kafka server: Message was too large, server rejected it to avoid allocation error
2024/02/11 22:35:56 Message Too Large(721256) kafka: Failed to produce message to topic metrics-debug: kafka server: Message was too large, server rejected it to avoid allocation error

Same Switch Case

case sarama.ErrMessageSizeTooLarge:
  log.Printf("Message Too Large(%d) %s\n", kafkaErr.Msg.Value.Length(), kafkaErr.Error())
  metrics.KafkaErrorOtel.Add(ctx, 1, attribute.String("error", "message_size_too_large"))

Note:
Adding the config saramaConfig.Producer.Flush.MaxMessages = 1 fixes this issue and leads to 0 "Message Too Large" errors but the saram.log is flooded with (below) which is in no way efficient.

[sarama] 2024/02/11 22:58:16.075040 producer/broker/97623 maximum request accumulated, waiting for space
[sarama] 2024/02/11 22:58:16.115777 producer/broker/123160 maximum request accumulated, waiting for space
[sarama] 2024/02/11 22:58:16.124684 producer/broker/123160 maximum request accumulated, waiting for space
[sarama] 2024/02/11 22:58:16.131789 producer/broker/123160 maximum request accumulated, waiting for space
[sarama] 2024/02/11 22:58:16.133214 producer/broker/97623 maximum request accumulated, waiting for space
[sarama] 2024/02/11 22:58:16.136310 producer/broker/116321 maximum request accumulated, waiting for space
[sarama] 2024/02/11 22:58:16.136723 producer/broker/123253 maximum request accumulated, waiting for space
[sarama] 2024/02/11 22:58:16.136814 producer/broker/122428 maximum request accumulated, waiting for space
[sarama] 2024/02/11 22:58:16.141094 producer/broker/123160 maximum request accumulated, waiting for space
[sarama] 2024/02/11 22:58:16.141183 producer/broker/123160 maximum request accumulated, waiting for space
[sarama] 2024/02/11 22:58:16.149647 producer/broker/112508 maximum request accumulated, waiting for space
[sarama] 2024/02/11 22:58:16.161138 producer/broker/100837 maximum request accumulated, waiting for space
[sarama] 2024/02/11 22:58:16.161767 producer/broker/112120 maximum request accumulated, waiting for space
[sarama] 2024/02/11 22:58:16.165873 producer/broker/123448 maximum request accumulated, waiting for space

@ae-govau
Copy link

ae-govau commented May 8, 2024

I was looking at this today. We were also playing with producer vs broker compression and testing sending batches of data, some all zeroes, some random (which won't compress).

We quickly found that we (like @road-cycling above) had to set:

saramaConfig.Producer.MaxMessageBytes = int(sarama.MaxRequestSize)

(presumably following the same advice we found in #2142 (comment))

And while that happily fixed our ability to send large compressible messages (e.g. 2MB of all zeroes), it then broke our ability to send other messages, such as 400KB of random data, or 400KB of zeroes to a topic with no producer compressions.

e.g. what we'd see if we tried to send 10 x 400KB messages in quick succession, we would see the first message send in a batch on its own and succeed, however the following 9 messages would batch together (making a total size of approx 3.2MB) which would then result in the error: Message was too large, server rejected it to avoid allocation error

Output from our test app:

Compression: none Size: 1000000 Source: random Quantity: 10

0: success
1: kafka server: Message was too large, server rejected it to avoid allocation error
...
9: kafka server: Message was too large, server rejected it to avoid allocation error

And this is because an unintended side-effect of max-ing out MaxMessageBytes is that it is used in 2 places - the most important actually being here:

sarama/produce_set.go

Lines 253 to 255 in 0ab2bb7

case ps.msgs[msg.Topic] != nil && ps.msgs[msg.Topic][msg.Partition] != nil &&
ps.msgs[msg.Topic][msg.Partition].bufferBytes+msg.ByteSize(version) >= ps.parent.conf.Producer.MaxMessageBytes:
return true

which as I understand it controls the batching logic. By setting that to MaxRequestSize the overall batch doesn't know that it's too big, and the server rejects the message.

If the messages are sent by themselves (low-traffic) then it works, but when batched up, fails miserably.

So I do think we need to keep MaxMessageBytes set to a sane default, but instead we should remove the following check, or have a flag to disable it:

sarama/async_producer.go

Lines 453 to 457 in 0ab2bb7

size := msg.ByteSize(version)
if size > p.conf.Producer.MaxMessageBytes {
p.returnError(msg, ConfigurationError(fmt.Sprintf("Attempt to produce message larger than configured Producer.MaxMessageBytes: %d > %d", size, p.conf.Producer.MaxMessageBytes)))
continue
}

That would also help clear up #2851 and all the associated issues there, as we'd likely opt to ignore that check.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
needs-investigation Issues that require followup from maintainers
Projects
None yet
Development

No branches or pull requests

3 participants