Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: provide IsMessageSizeTooLarge() function #2848

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ func (p *asyncProducer) dispatcher() {

size := msg.ByteSize(version)
if size > p.conf.Producer.MaxMessageBytes {
p.returnError(msg, ConfigurationError(fmt.Sprintf("Attempt to produce message larger than configured Producer.MaxMessageBytes: %d > %d", size, p.conf.Producer.MaxMessageBytes)))
p.returnError(msg, newMessageSizeTooLargeConfigurationError(size, p.conf.Producer.MaxMessageBytes))
continue
}

Expand Down
31 changes: 31 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,22 @@ func (err ConfigurationError) Error() string {
return "kafka: invalid configuration (" + string(err) + ")"
}

const (
// messageSizeTooLargePrefix used by newMessageSizeTooLargeConfigurationError and IsMessageSizeTooLarge
messageSizeTooLargePrefix = "Attempt to produce message larger than configured Producer.MaxMessageBytes"
)

// newMessageSizeTooLargeConfigurationError creates an error returned when a message payload is
// attempted to be published that exceeds the configured maximum message size. We return a specific
// string that is checked for by IsMessageSizeTooLarge() as several consumers of this library
// need to differentiate this error from other ConfigurationErrors, and the actual type returned
// has changed multiple times during the evolution of this library. While the string check is a hack,
// it's better that it's done once within this library, than multiple users reimplementing in their
// own clients.
func newMessageSizeTooLargeConfigurationError(msgSize, configuredSize int) ConfigurationError {
return ConfigurationError(fmt.Sprintf("%s: %d > %d", messageSizeTooLargePrefix, msgSize, configuredSize))
}

// KError is the type of error that can be returned directly by the Kafka broker.
// See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes
type KError int16
Expand Down Expand Up @@ -455,3 +471,18 @@ func (err KError) Error() string {

return fmt.Sprintf("Unknown error, how did this happen? Error code = %d", err)
}

// IsMessageSizeTooLarge returns true if the error relates the message size
// being either too large as reported by the broker, or too large because it
// exceeds the configured maximum size.
func IsMessageSizeTooLarge(err error) bool {
if err == nil {
return false
}

if errors.Is(err, ErrMessageSizeTooLarge) {
return true
}

return strings.Contains(err.Error(), messageSizeTooLargePrefix)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This really shouldn’t be something that we bake in. Checking the string value of the error is considered The Wrong Way™ to check error conditions. And once we provide this, the API is kind of stuck with it no matter what.

I think returning this as a ConfigurationError is itself faulty. We should be returning a type that allows errors.Is(err, ErrMessageSizeTooLarge) to cover all the functionality necessary.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd argue encapsulating the string check as a private implementation detail of the library does the opposite of baking it in. e.g. currently users need to bake-in this logic (we have, as have others). Moving it to a function means that users won't need to update their logic each time this library chooses the change the error type that is returned (or changes the text in it).

The precise error returned for this condition has changed at least 3 times over the years so far:

I agree it's not great, but I'm not sure that there is an alterative to this change that doesn't change the error type returned for a 4th time. (Although I'd be supportive of a simple revert of #2628 which would return ErrMessageSizeTooLarge again).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, it’s better than depending on raw strings copied into all our codebases, for sure. But I worry that it’s too much like putting wallpaper over a hole in the wall.

There will be a non-zero number of codebases that aren’t going to change their code to use this function, and just keep using the raw string, because “hey, if it ain’t broke, don’t fix it.” (Also, finding priority for maintenance work is almost always a struggle, since there’s a fairly ephemeral value to product.) And these people are still going to find themselves stuck out if/when the error text changes, and the breaking behavior change for these people will again be relatively silent until it causes the same critical/issue/alert/whatever that brought us all here.

🤷‍♀️ I’m just not sure… like I said, I don’t know if this improves the situation in any practical way. And in a year or two, someone will be looking at this code and wondering why we would ever double down on something much of Go considers an anti-pattern.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have opened alternate PR #2851 and CCed you there. I prefer the approach in #2851 but the downside is that it changes the error type (again).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, I kind of hope no one removed the ErrMessageSizeTooLarge handling right? 😰 Because this was still a value that the server could be returning, and one would likely want to handle the same as if the client refused to even send the message.

}
8 changes: 8 additions & 0 deletions errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"net"
"testing"

"github.com/stretchr/testify/assert"
)

func TestSentinelWithSingleWrappedError(t *testing.T) {
Expand Down Expand Up @@ -63,3 +65,9 @@ func TestSentinelWithMultipleWrappedErrors(t *testing.T) {
t.Errorf("unwrapped value unexpected result")
}
}

func TestIsMessageSizeTooLarge(t *testing.T) {
assert.True(t, IsMessageSizeTooLarge(ErrMessageSizeTooLarge), "broker side error must be regarded as a too large error")
assert.True(t, IsMessageSizeTooLarge(newMessageSizeTooLargeConfigurationError(2, 1)), "config side error must be regarded as a too large error")
assert.False(t, IsMessageSizeTooLarge(nil), "nil is not an error")
}
29 changes: 29 additions & 0 deletions sync_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,35 @@ func TestSyncProducer(t *testing.T) {
}
}

// try to send a message too large
_, _, err = producer.SendMessage(&ProducerMessage{
Topic: "my_topic",
Value: ByteEncoder(make([]byte, config.Producer.MaxMessageBytes+1)), // will exceed default max size, e.g. configuration side
Metadata: "test",
})
if !IsMessageSizeTooLarge(err) {
t.Error("expected err to be message too large")
}

// try to send small message the server rejects because too large
leader.Returns(&ProduceResponse{
Blocks: map[string]map[int32]*ProduceResponseBlock{
"my_topic": {
0: &ProduceResponseBlock{
Err: ErrMessageSizeTooLarge,
},
},
},
})
_, _, err = producer.SendMessage(&ProducerMessage{
Topic: "my_topic",
Value: StringEncoder(TestMessage),
Metadata: "test",
})
if !IsMessageSizeTooLarge(err) {
t.Error("expected err to be message too large")
}

safeClose(t, producer)
leader.Close()
seedBroker.Close()
Expand Down