Skip to content

Commit

Permalink
Bump default version to 0.11.0.2
Browse files Browse the repository at this point in the history
The current default, 0.8.2, is a 5 year-old version. The problem with defaulting to older versions when the Kafka cluster is a newer version is manifold:

- requires Kafka to downconvert consumer response messages (e.g if a newly-configured client produced messages and a default Sarama client is reading) - this has implications on the performance of Kafka, because it uses additional CPU. Especially so if compression is enabled, at which point the broker would need to decompress, downconvert and compress the message back. Downconversion also blocks zerocopy, because your fetch response now needs to be copied into memory
- requires Kafka to upconvert producer requests' messages
- in general is hard to support. While the latest Kafka currently supports the oldest versions, it is in the best interest of the project to deprecate and eventually drop support for legacy versions. Otherwise it becomes hard to maintain, the test matrix grows and new features need to work around old version limitations (no idempotency, exactly once). It is easier for Kafka to deprecate/drop support when its ecosystem has done so already

0.11 is the minimum we should default at as it enables Kafka's v2 message format, avoiding expensive upconversion/downconversion in the Kafka broker. It is also required for correctness in some cases (e.g KIP-101)
  • Loading branch information
stanislavkozlovski committed Aug 26, 2020
1 parent 2b9c593 commit d3c19b3
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 3 deletions.
2 changes: 1 addition & 1 deletion config.go
Expand Up @@ -486,7 +486,7 @@ func NewConfig() *Config {

c.ClientID = defaultClientID
c.ChannelBufferSize = 256
c.Version = MinVersion
c.Version = DefaultVersion
c.MetricRegistry = metrics.NewRegistry()

return c
Expand Down
5 changes: 3 additions & 2 deletions utils.go
Expand Up @@ -192,12 +192,13 @@ var (
}
MinVersion = V0_8_2_0
MaxVersion = V2_6_0_0
DefaultVersion = V0_11_0_2
)

//ParseKafkaVersion parses and returns kafka version or error from a string
func ParseKafkaVersion(s string) (KafkaVersion, error) {
if len(s) < 5 {
return MinVersion, fmt.Errorf("invalid version `%s`", s)
return DefaultVersion, fmt.Errorf("invalid version `%s`", s)
}
var major, minor, veryMinor, patch uint
var err error
Expand All @@ -207,7 +208,7 @@ func ParseKafkaVersion(s string) (KafkaVersion, error) {
err = scanKafkaVersion(s, `^\d+\.\d+\.\d+$`, "%d.%d.%d", [3]*uint{&major, &minor, &veryMinor})
}
if err != nil {
return MinVersion, err
return DefaultVersion, err
}
return newKafkaVersion(major, minor, veryMinor, patch), nil
}
Expand Down

0 comments on commit d3c19b3

Please sign in to comment.