Skip to content

Commit

Permalink
addressed comments
Browse files Browse the repository at this point in the history
  • Loading branch information
emasab committed Jun 21, 2022
1 parent 9b6f606 commit 5f7a94e
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 18 deletions.
3 changes: 2 additions & 1 deletion README.md
Expand Up @@ -224,7 +224,8 @@ with `-tags dynamic`.
API Strands
===========

The recommended API strand is the Function-Based one, the Channel-Based one documentation is in [examples/legacy](examples/legacy).
The recommended API strand is the Function-Based one,
the Channel-Based one is documented in [examples/legacy](examples/legacy).

Function-Based Consumer
-----------------------
Expand Down
2 changes: 1 addition & 1 deletion examples/.gitignore
Expand Up @@ -18,4 +18,4 @@ oauthbearer_example/oauthbearer_example
producer_custom_channel_example/producer_custom_channel_example
producer_example/producer_example
stats_example/stats_example
transactions_example/transactions_example
transactions_example/transactions_example
1 change: 0 additions & 1 deletion examples/README.md
Expand Up @@ -49,4 +49,3 @@ Usage example
$ go build (or 'go install')
$ ./consumer_example # see usage
$ ./consumer_example mybroker mygroup mytopic

Expand Up @@ -70,6 +70,12 @@ func main() {
for msgcnt < totalMsgcnt {
value := fmt.Sprintf("Producer example, message #%d", msgcnt)

// A delivery channel for each message sent.
// This permits to receive delivery reports
// separately and to handle the use case
// of a server that has multiple concurrent
// produce requests and needs to deliver the replies
// to many different response channels.
deliveryChan := make(chan kafka.Event)
go func() {
for e := range deliveryChan {
Expand Down Expand Up @@ -101,23 +107,18 @@ func main() {
if err != nil {
close(deliveryChan)
if err.(kafka.Error).Code() == kafka.ErrQueueFull {
// Producer queue is full, waits for it to be freed
// Producer queue is full, wait 1s for messages
// to be delivered then try again.
time.Sleep(time.Second)
continue
}
fmt.Printf("Failed to produce message: %v\n", err)
// Flush and close the producer and the events channel
for p.Flush(1000) > 0 {
fmt.Print("Still waiting to flush outstanding messages\n", err)
}
p.Close()
os.Exit(1)
}
msgcnt++
}

// Flush and close the producer and the events channel
for p.Flush(1000) > 0 {
for p.Flush(10000) > 0 {
fmt.Print("Still waiting to flush outstanding messages\n", err)
}
p.Close()
Expand Down
13 changes: 6 additions & 7 deletions examples/producer_example/producer_example.go
Expand Up @@ -51,6 +51,10 @@ func main() {
for e := range p.Events() {
switch ev := e.(type) {
case *kafka.Message:
// The message delivery report, indicating success or
// permanent failure after retries have been exhausted.
// Application level retries won't help since the client
// is already configured to do that.
m := ev
if m.TopicPartition.Error != nil {
fmt.Printf("Delivery failed: %v\n", m.TopicPartition.Error)
Expand Down Expand Up @@ -85,17 +89,12 @@ func main() {

if err != nil {
if err.(kafka.Error).Code() == kafka.ErrQueueFull {
// Producer queue is full, waits for it to be freed
// Producer queue is full, wait 1s for messages
// to be delivered then try again.
time.Sleep(time.Second)
continue
}
fmt.Printf("Failed to produce message: %v\n", err)
// Flush and close the producer and the events channel
for p.Flush(1000) > 0 {
fmt.Print("Still waiting to flush outstanding messages\n", err)
}
p.Close()
os.Exit(1)
}
msgcnt++
}
Expand Down

0 comments on commit 5f7a94e

Please sign in to comment.