From d3c19b38e8a11b2754af294655483785e33377ee Mon Sep 17 00:00:00 2001 From: Stanislav Kozlovski Date: Wed, 26 Aug 2020 18:03:13 +0100 Subject: [PATCH] Bump default version to 0.11.0.2 The current default, 0.8.2, is a 5 year-old version. The problem with defaulting to older versions when the Kafka cluster is a newer version is manifold: - requires Kafka to downconvert consumer response messages (e.g if a newly-configured client produced messages and a default Sarama client is reading) - this has implications on the performance of Kafka, because it uses additional CPU. Especially so if compression is enabled, at which point the broker would need to decompress, downconvert and compress the message back. Downconversion also blocks zerocopy, because your fetch response now needs to be copied into memory - requires Kafka to upconvert producer requests' messages - in general is hard to support. While the latest Kafka currently supports the oldest versions, it is in the best interest of the project to deprecate and eventually drop support for legacy versions. Otherwise it becomes hard to maintain, the test matrix grows and new features need to work around old version limitations (no idempotency, exactly once). It is easier for Kafka to deprecate/drop support when its ecosystem has done so already 0.11 is the minimum we should default at as it enables Kafka's v2 message format, avoiding expensive upconversion/downconversion in the Kafka broker. It is also required for correctness in some cases (e.g KIP-101) --- config.go | 2 +- utils.go | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/config.go b/config.go index 9b7ce7aeb4..43e739cad9 100644 --- a/config.go +++ b/config.go @@ -486,7 +486,7 @@ func NewConfig() *Config { c.ClientID = defaultClientID c.ChannelBufferSize = 256 - c.Version = MinVersion + c.Version = DefaultVersion c.MetricRegistry = metrics.NewRegistry() return c diff --git a/utils.go b/utils.go index 93bdeefef6..4dcd797af7 100644 --- a/utils.go +++ b/utils.go @@ -192,12 +192,13 @@ var ( } MinVersion = V0_8_2_0 MaxVersion = V2_6_0_0 + DefaultVersion = V0_11_0_2 ) //ParseKafkaVersion parses and returns kafka version or error from a string func ParseKafkaVersion(s string) (KafkaVersion, error) { if len(s) < 5 { - return MinVersion, fmt.Errorf("invalid version `%s`", s) + return DefaultVersion, fmt.Errorf("invalid version `%s`", s) } var major, minor, veryMinor, patch uint var err error @@ -207,7 +208,7 @@ func ParseKafkaVersion(s string) (KafkaVersion, error) { err = scanKafkaVersion(s, `^\d+\.\d+\.\d+$`, "%d.%d.%d", [3]*uint{&major, &minor, &veryMinor}) } if err != nil { - return MinVersion, err + return DefaultVersion, err } return newKafkaVersion(major, minor, veryMinor, patch), nil }