From b3de6dc0353b5fc77d81c641f8dba36297a566e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=8B=87=E6=B6=9B?= Date: Thu, 24 Feb 2022 15:09:54 +0800 Subject: [PATCH] Kafka.NewWriter() will be removed and Writer value can be instantiated and configured directly --- kq/pusher.go | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/kq/pusher.go b/kq/pusher.go index bb62171..90789a6 100644 --- a/kq/pusher.go +++ b/kq/pusher.go @@ -6,7 +6,6 @@ import ( "time" "github.com/segmentio/kafka-go" - "github.com/segmentio/kafka-go/snappy" "github.com/zeromicro/go-zero/core/executors" "github.com/zeromicro/go-zero/core/logx" ) @@ -27,13 +26,12 @@ type ( ) func NewPusher(addrs []string, topic string, opts ...PushOption) *Pusher { - producer := kafka.NewWriter(kafka.WriterConfig{ - Brokers: addrs, - Topic: topic, - Balancer: &kafka.LeastBytes{}, - CompressionCodec: snappy.NewCompressionCodec(), - }) - + producer := &kafka.Writer{ + Addr: kafka.TCP(addrs...), + Topic: topic, + Balancer: &kafka.LeastBytes{}, + Compression: kafka.Snappy, + } pusher := &Pusher{ produer: producer, topic: topic,