Skip to content

Commit

Permalink
Add more examples
Browse files Browse the repository at this point in the history
  • Loading branch information
ryarnyah committed Aug 3, 2022
1 parent b66540b commit 6b5d3d0
Show file tree
Hide file tree
Showing 7 changed files with 1,000 additions and 213 deletions.
1 change: 1 addition & 0 deletions examples/exactly_once/.gitignore
@@ -0,0 +1 @@
exactly_once
9 changes: 9 additions & 0 deletions examples/exactly_once/README.md
@@ -0,0 +1,9 @@
# Consumergroup example

This example shows you how to use the Sarama transactional producer. The example simply starts consuming the given Kafka topics and logs the consumed messages to produce and commit in another topic.

```bash
$ go run main.go -brokers="127.0.0.1:9092" -topics="sarama" -group="example"
```

You can also toggle (pause/resume) the consumption by sending SIGUSR1
7 changes: 7 additions & 0 deletions examples/exactly_once/go.mod
@@ -0,0 +1,7 @@
module github.com/Shopify/sarama/examples/exactly_once

go 1.16

require github.com/Shopify/sarama v1.34.1

replace github.com/Shopify/sarama => ../../
528 changes: 528 additions & 0 deletions examples/exactly_once/go.sum

Large diffs are not rendered by default.

301 changes: 301 additions & 0 deletions examples/exactly_once/main.go
@@ -0,0 +1,301 @@
package main

// SIGUSR1 toggle the pause/resume consumption
import (
"context"
"flag"
"fmt"
"log"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"time"

"github.com/Shopify/sarama"
)

// Sarama configuration options
var (
brokers = ""
version = ""
group = ""
topics = ""
destinationTopic = ""
assignor = ""
oldest = true
verbose = false
)

func init() {
flag.StringVar(&brokers, "brokers", "", "Kafka bootstrap brokers to connect to, as a comma separated list")
flag.StringVar(&group, "group", "", "Kafka consumer group definition")
flag.StringVar(&version, "version", "2.6.0", "Kafka cluster version")
flag.StringVar(&topics, "topics", "", "Kafka topics to be consumed, as a comma separated list")
flag.StringVar(&destinationTopic, "destination-topics", "", "Kafka topics where records will be copied from topics.")
flag.StringVar(&assignor, "assignor", "range", "Consumer group partition assignment strategy (range, roundrobin, sticky)")
flag.BoolVar(&oldest, "oldest", true, "Kafka consumer consume initial offset from oldest")
flag.BoolVar(&verbose, "verbose", false, "Sarama logging")
flag.Parse()

if len(brokers) == 0 {
panic("no Kafka bootstrap brokers defined, please set the -brokers flag")
}

if len(topics) == 0 {
panic("no topics given to be consumed, please set the -topics flag")
}

if len(destinationTopic) == 0 {
panic("no destination topics given to be consumed, please set the -destination-topics flag")
}

if len(group) == 0 {
panic("no Kafka consumer group defined, please set the -group flag")
}
}

func main() {
keepRunning := true
log.Println("Starting a new Sarama consumer")

if verbose {
sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
}

version, err := sarama.ParseKafkaVersion(version)
if err != nil {
log.Panicf("Error parsing Kafka version: %v", err)
}

/**
* Construct a new Sarama configuration.
* The Kafka cluster version has to be defined before the consumer/producer is initialized.
*/
config := sarama.NewConfig()
config.Version = version

config.Consumer.IsolationLevel = sarama.ReadCommitted
config.Consumer.Offsets.AutoCommit.Enable = false

switch assignor {
case "sticky":
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategySticky
case "roundrobin":
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
case "range":
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
default:
log.Panicf("Unrecognized consumer group partition assignor: %s", assignor)
}

if oldest {
config.Consumer.Offsets.Initial = sarama.OffsetOldest
}

/**
* Setup a new Sarama consumer group
*/
consumer := Consumer{
groupId: group,
brokers: strings.Split(brokers, ","),
configProvider: func(topic string, partition int32) *sarama.Config {
producerConfig := sarama.NewConfig()
producerConfig.Version = version

producerConfig.Net.MaxOpenRequests = 1
producerConfig.Producer.RequiredAcks = sarama.WaitForAll
producerConfig.Producer.Idempotent = true
producerConfig.Producer.Transaction.IDProvider = func() string {
return "sarama" + "-" + topic + "-" + fmt.Sprint(partition)
}
return producerConfig
},
ready: make(chan bool),
}

ctx, cancel := context.WithCancel(context.Background())
client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), group, config)
if err != nil {
log.Panicf("Error creating consumer group client: %v", err)
}

consumptionIsPaused := false
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for {
// `Consume` should be called inside an infinite loop, when a
// server-side rebalance happens, the consumer session will need to be
// recreated to get the new claims
if err := client.Consume(ctx, strings.Split(topics, ","), &consumer); err != nil {
log.Panicf("Error from consumer: %v", err)
}
// check if context was cancelled, signaling that the consumer should stop
if ctx.Err() != nil {
return
}
consumer.ready = make(chan bool)
}
}()

<-consumer.ready // Await till the consumer has been set up
log.Println("Sarama consumer up and running!...")

sigusr1 := make(chan os.Signal, 1)
signal.Notify(sigusr1, syscall.SIGUSR1)

sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)

for keepRunning {
select {
case <-ctx.Done():
log.Println("terminating: context cancelled")
keepRunning = false
case <-sigterm:
log.Println("terminating: via signal")
keepRunning = false
case <-sigusr1:
toggleConsumptionFlow(client, &consumptionIsPaused)
}
}
cancel()
wg.Wait()

if err = client.Close(); err != nil {
log.Panicf("Error closing client: %v", err)
}
}

func toggleConsumptionFlow(client sarama.ConsumerGroup, isPaused *bool) {
if *isPaused {
client.ResumeAll()
log.Println("Resuming consumption")
} else {
client.PauseAll()
log.Println("Pausing consumption")
}

*isPaused = !*isPaused
}

// Consumer represents a Sarama consumer group consumer
type Consumer struct {
ready chan bool
groupId string
brokers []string
configProvider func(topic string, partition int32) *sarama.Config
}

// Setup is run at the beginning of a new session, before ConsumeClaim
func (consumer *Consumer) Setup(session sarama.ConsumerGroupSession) error {
// Mark the consumer as ready
close(consumer.ready)
return nil
}

// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
// NOTE:
// Do not move the code below to a goroutine.
// The `ConsumeClaim` itself is called within a goroutine, see:
// https://github.com/Shopify/sarama/blob/main/consumer_group.go#L27-L2
var producer sarama.AsyncProducer
defer consumer.closeProducer(producer)
for {
select {
case message := <-claim.Messages():
for {
var err error
if producer == nil {
producer, err = sarama.NewAsyncProducer(consumer.brokers, consumer.configProvider(claim.Topic(), claim.Partition()))
if err != nil {
continue
}
go func() {
for err := range producer.Errors() {
log.Println("Failed to write access log entry:", err)
}
}()
}
startTime := time.Now()
// BeginTxn must be called before any messages
err = producer.BeginTxn()
if err != nil {
log.Printf("Message consumer: unable to start transaction: %+v", err)
continue
}
producer.Input() <- &sarama.ProducerMessage{
Topic: destinationTopic,
Key: sarama.ByteEncoder(message.Key),
Value: sarama.ByteEncoder(message.Value),
}
// You can add current message to this transaction
err = producer.AddMessageToTxn(message, consumer.groupId, nil)
if err != nil {
log.Println("error on AddMessageToTxn")
producer = consumer.handleTxnError(producer, err, func() error {
return producer.AddMessageToTxn(message, consumer.groupId, nil)
})
continue
}

err = producer.CommitTxn()
if err != nil {
log.Println("error on CommitTxn")
producer = consumer.handleTxnError(producer, err, func() error {
return producer.CommitTxn()
})
continue
}
log.Printf("Message claimed [%s]: value = %s, timestamp = %v, topic = %s, partition = %d", time.Since(startTime), string(message.Value), message.Timestamp, message.Topic, message.Partition)
break
}
// Should return when `session.Context()` is done.
// If not, will raise `ErrRebalanceInProgress` or `read tcp <ip>:<port>: i/o timeout` when kafka rebalance. see:
// https://github.com/Shopify/sarama/issues/1192
case <-session.Context().Done():
return nil
}
}
}

func (consumer *Consumer) handleTxnError(producer sarama.AsyncProducer, err error, defaulthandler func() error) sarama.AsyncProducer {
log.Printf("Message consumer: unable to process transaction: %+v", err)
for {
if producer.TxnStatus()&sarama.ProducerTxnFlagFatalError != 0 {
// fatal error. need to recreate producer.
log.Printf("Message consumer: producer is in a fatal state, need to recreate it")
consumer.closeProducer(producer)
return nil
}
if producer.TxnStatus()&sarama.ProducerTxnFlagAbortableError != 0 {
err = producer.AbortTxn()
if err != nil {
log.Printf("Message consumer: unable to abort transaction: %+v", err)
continue
}
return producer
}
// if not you can retry
err = defaulthandler()
if err == nil {
return producer
}
}
}

func (consumer *Consumer) closeProducer(producer sarama.AsyncProducer) {
if producer != nil {
log.Println("Closing producer")
_ = producer.Close()
}
}
5 changes: 4 additions & 1 deletion examples/txn_producer/go.mod
Expand Up @@ -2,6 +2,9 @@ module github.com/Shopify/sarama/examples/txn_producer

go 1.16

require github.com/Shopify/sarama v1.34.1
require (
github.com/Shopify/sarama v1.34.1
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
)

replace github.com/Shopify/sarama => ../../

0 comments on commit 6b5d3d0

Please sign in to comment.