Skip to content

Commit

Permalink
Merge pull request #4 from practo/kafka-go
Browse files Browse the repository at this point in the history
Support to switch to other kafka clients
  • Loading branch information
alok87 committed Aug 9, 2020
2 parents 75f979e + 5c3d0ac commit 5f95077
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 34 deletions.
46 changes: 29 additions & 17 deletions redshiftsink/cmd/redshiftbatcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"flag"
pflag "github.com/spf13/pflag"
"log"
"os"
"os/signal"
Expand All @@ -11,18 +12,18 @@ import (

"github.com/practo/klog/v2"
"github.com/practo/tipoca-stream/redshiftbatcher/pkg/consumer"
pflag "github.com/spf13/pflag"
)

// Sarama configuration options
var (
brokers = ""
version = ""
group = ""
topicPrefixes = ""
assignor = ""
oldest = true
clientlog = false
brokers = ""
version = ""
group = ""
topicPrefixes = ""
kafkaClient = ""
saramaAssignor = ""
saramaOldest = true
saramaLog = false
)

func init() {
Expand All @@ -31,10 +32,14 @@ func init() {
flag.StringVar(&brokers, "brokers", "", "Kafka bootstrap brokers to connect to, as a comma separated list")
flag.StringVar(&group, "group", "", "Kafka consumer group definition")
flag.StringVar(&version, "version", "2.1.1", "Kafka cluster version")
flag.StringVar(&topicPrefixes, "topicPrefixes", "", "Kafka topics to be consumed, as a comma separated list")
flag.StringVar(&assignor, "assignor", "range", "Consumer group partition assignment strategy (range, roundrobin, sticky)")
flag.BoolVar(&oldest, "oldest", true, "Kafka consumer consume initial offset from oldest")
flag.BoolVar(&clientlog, "clientlog", false, "client logging")
flag.StringVar(&topicPrefixes, "topic-prefixes", "", "Kafka topics to be consumed, as a comma separated list")
flag.StringVar(&kafkaClient, "kafka-client", "sarama", "Kafka client to use: kafka-go or sarama")

// sarama specifc flags
flag.StringVar(&saramaAssignor, "sarama-assignor", "range", "Consumer group partition assignment strategy (range, roundrobin, sticky)")
flag.BoolVar(&saramaOldest, "sarama-oldest", true, "Kafka consumer consume initial offset from oldest")
flag.BoolVar(&saramaLog, "sarama-log", false, "Enable or disable sarama client logging")

flag.Parse()

if len(brokers) == 0 {
Expand All @@ -49,25 +54,32 @@ func init() {
klog.Fatal("no Kafka consumer group defined, please set the -group flag")
}

if kafkaClient != consumer.KafkaGo && kafkaClient != consumer.Sarama {
klog.Fatalf("supported kafka clients are: %s and %s\n",
consumer.KafkaGo, consumer.Sarama)
}

pflag.CommandLine.AddGoFlag(flag.CommandLine.Lookup("v"))
}

func main() {
klog.Info("Starting the redshift batcher")

ctx, cancel := context.WithCancel(context.Background())

client, err := consumer.NewClient(
brokers, group, clientlog, version, assignor, oldest)
kafkaClient, brokers, group, version,
saramaLog, saramaAssignor, saramaOldest,
)
if err != nil {
klog.Fatal("Error creating kafka consumer client")
klog.Fatalf("Error creating kafka consumer client: %v\n", err)
}

ctx, cancel := context.WithCancel(context.Background())
klog.Info("Succesfully created kafka client")

manager := consumer.NewManager(client, topicPrefixes)
wg := &sync.WaitGroup{}
wg.Add(1)
go manager.RefreshTopics(ctx, 15, wg)
go manager.SyncTopics(ctx, 15, wg)
wg.Add(1)
go manager.Consume(ctx, wg)

Expand Down
43 changes: 36 additions & 7 deletions redshiftsink/pkg/consumer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,50 @@ import (
"github.com/Shopify/sarama"
)

const (
KafkaGo = "kafka-go"
Sarama = "sarama"
)

type Client interface {
Topics() ([]string, error)
Consume(ctx context.Context, topics []string, ready chan bool) error
Close() error
}

func NewClient(
kafkaClient string,
brokerURLs string,
group string,
ver string,
saramaLog bool,
saramaAssignor string,
saramaOldest bool) (Client, error) {

switch kafkaClient {
case Sarama:
return NewSaramaClient(
brokerURLs, group, ver, saramaLog, saramaAssignor, saramaOldest,
)
case KafkaGo:
return nil, fmt.Errorf(
"not yet supported, waiting for: %s",
"https://github.com/segmentio/kafka-go/issues/131",
)
default:
return nil, fmt.Errorf("kafkaClient not supported: %v\n", kafkaClient)
}
}

func NewSaramaClient(
brokerURLs string,
group string,
clientlog bool,
ver string,
assignor string,
oldest bool) (Client, error) {
saramaLog bool,
saramaAssignor string,
saramaOldest bool) (Client, error) {

if clientlog {
if saramaLog {
sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
}

Expand All @@ -36,7 +65,7 @@ func NewClient(
c := sarama.NewConfig()
c.Version = version

switch assignor {
switch saramaAssignor {
case "sticky":
c.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategySticky
case "roundrobin":
Expand All @@ -45,10 +74,10 @@ func NewClient(
c.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
default:
return nil, fmt.Errorf(
"Unknown group partition assignor: %s", assignor)
"Unknown group partition saramaAssignor: %s", saramaAssignor)
}

if oldest {
if saramaOldest {
c.Consumer.Offsets.Initial = sarama.OffsetOldest
}

Expand Down
3 changes: 3 additions & 0 deletions redshiftsink/pkg/consumer/kafkago.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package consumer

//
22 changes: 14 additions & 8 deletions redshiftsink/pkg/consumer/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,21 +60,27 @@ func (c *Manager) getDeepCopyTopics() []string {
return append(make([]string, 0, len(c.topics)), c.topics...)
}

func (c *Manager) RefreshTopics(
func (c *Manager) refreshTopics() {
topics, err := c.client.Topics()
if err != nil {
klog.Fatalf("Error getting topics, err=%v\n", err)
}
klog.V(4).Infof("%d topic(s) in the cluster\n", len(topics))
klog.V(5).Infof("Topics in the cluster=%v\n", topics)
c.updatetopics(topics)
}

func (c *Manager) SyncTopics(
ctx context.Context, seconds int, wg *sync.WaitGroup) {

defer wg.Done()
ticker := time.NewTicker(time.Second * time.Duration(seconds))
for {
c.refreshTopics()

select {
case <-ticker.C:
topics, err := c.client.Topics()
if err != nil {
klog.Fatalf("Error getting topics, err=%v\n", err)
}
klog.V(4).Infof("%d topic(s) in the cluster\n", len(topics))
klog.V(5).Infof("Topics in the cluster=%v\n", topics)
c.updatetopics(topics)
continue
case <-ctx.Done():
return
}
Expand Down
File renamed without changes.
2 changes: 0 additions & 2 deletions redshiftsink/vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -79,5 +79,3 @@ gopkg.in/jcmturner/gokrb5.v7/types
# gopkg.in/jcmturner/rpc.v1 v1.1.0
gopkg.in/jcmturner/rpc.v1/mstypes
gopkg.in/jcmturner/rpc.v1/ndr
# k8s.io/klog v1.0.0
## explicit

0 comments on commit 5f95077

Please sign in to comment.