Skip to content

Commit

Permalink
custom channel example change
Browse files Browse the repository at this point in the history
a channel for each Produce call
  • Loading branch information
emasab committed Jun 21, 2022
1 parent adbe802 commit 9b6f606
Showing 1 changed file with 26 additions and 24 deletions.
Expand Up @@ -28,16 +28,16 @@ import (
func main() {

if len(os.Args) != 3 {
fmt.Fprintf(os.Stderr, "Usage: %s <broker> <topic>\n",
fmt.Fprintf(os.Stderr, "Usage: %s <bootstrap-servers> <topic>\n",
os.Args[0])
os.Exit(1)
}

broker := os.Args[1]
bootstrapServers := os.Args[1]
topic := os.Args[2]
totalMsgcnt := 3

p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": broker})
p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": bootstrapServers})

if err != nil {
fmt.Printf("Failed to create producer: %s\n", err)
Expand Down Expand Up @@ -66,38 +66,40 @@ func main() {
}
}()

// Optional delivery channel, if not specified the Producer object's
// .Events channel is used.
deliveryChan := make(chan kafka.Event)
go func() {
for e := range deliveryChan {
switch ev := e.(type) {
case *kafka.Message:
m := ev
if m.TopicPartition.Error != nil {
fmt.Printf("Delivery failed: %v\n", m.TopicPartition.Error)
} else {
fmt.Printf("Delivered message to topic %s [%d] at offset %v\n",
*m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
}

default:
fmt.Printf("Ignored event: %s\n", ev)
}
}
}()

msgcnt := 0
for msgcnt < totalMsgcnt {
value := fmt.Sprintf("Producer example, message #%d", msgcnt)

deliveryChan := make(chan kafka.Event)
go func() {
for e := range deliveryChan {
switch ev := e.(type) {
case *kafka.Message:
m := ev
if m.TopicPartition.Error != nil {
fmt.Printf("Delivery failed: %v\n", m.TopicPartition.Error)
} else {
fmt.Printf("Delivered message to topic %s [%d] at offset %v\n",
*m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
}

default:
fmt.Printf("Ignored event: %s\n", ev)
}
// in this case the caller knows that this channel is used only
// for one Produce call, so it can close it.
close(deliveryChan)
}
}()

err = p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: []byte(value),
Headers: []kafka.Header{{Key: "myTestHeader", Value: []byte("header values are binary")}},
}, deliveryChan)

if err != nil {
close(deliveryChan)
if err.(kafka.Error).Code() == kafka.ErrQueueFull {
// Producer queue is full, waits for it to be freed
time.Sleep(time.Second)
Expand Down

0 comments on commit 9b6f606

Please sign in to comment.