diff --git a/README.md b/README.md index f83f5c217..b102411c9 100644 --- a/README.md +++ b/README.md @@ -224,7 +224,8 @@ with `-tags dynamic`. API Strands =========== -The recommended API strand is the Function-Based one, the Channel-Based one documentation is in [examples/legacy](examples/legacy). +The recommended API strand is the Function-Based one, +the Channel-Based one is documented in [examples/legacy](examples/legacy). Function-Based Consumer ----------------------- diff --git a/examples/.gitignore b/examples/.gitignore index 6685993b6..53c9c6450 100644 --- a/examples/.gitignore +++ b/examples/.gitignore @@ -18,4 +18,4 @@ oauthbearer_example/oauthbearer_example producer_custom_channel_example/producer_custom_channel_example producer_example/producer_example stats_example/stats_example -transactions_example/transactions_example \ No newline at end of file +transactions_example/transactions_example diff --git a/examples/README.md b/examples/README.md index 080d70823..c0ad634e5 100644 --- a/examples/README.md +++ b/examples/README.md @@ -49,4 +49,3 @@ Usage example $ go build (or 'go install') $ ./consumer_example # see usage $ ./consumer_example mybroker mygroup mytopic - diff --git a/examples/producer_custom_channel_example/producer_custom_channel_example.go b/examples/producer_custom_channel_example/producer_custom_channel_example.go index 0560cc04b..b710a5fbd 100644 --- a/examples/producer_custom_channel_example/producer_custom_channel_example.go +++ b/examples/producer_custom_channel_example/producer_custom_channel_example.go @@ -70,6 +70,12 @@ func main() { for msgcnt < totalMsgcnt { value := fmt.Sprintf("Producer example, message #%d", msgcnt) + // A delivery channel for each message sent. + // This permits to receive delivery reports + // separately and to handle the use case + // of a server that has multiple concurrent + // produce requests and needs to deliver the replies + // to many different response channels. deliveryChan := make(chan kafka.Event) go func() { for e := range deliveryChan { @@ -101,23 +107,18 @@ func main() { if err != nil { close(deliveryChan) if err.(kafka.Error).Code() == kafka.ErrQueueFull { - // Producer queue is full, waits for it to be freed + // Producer queue is full, wait 1s for messages + // to be delivered then try again. time.Sleep(time.Second) continue } fmt.Printf("Failed to produce message: %v\n", err) - // Flush and close the producer and the events channel - for p.Flush(1000) > 0 { - fmt.Print("Still waiting to flush outstanding messages\n", err) - } - p.Close() - os.Exit(1) } msgcnt++ } // Flush and close the producer and the events channel - for p.Flush(1000) > 0 { + for p.Flush(10000) > 0 { fmt.Print("Still waiting to flush outstanding messages\n", err) } p.Close() diff --git a/examples/producer_example/producer_example.go b/examples/producer_example/producer_example.go index 939eaa6dd..2cda7d9e4 100644 --- a/examples/producer_example/producer_example.go +++ b/examples/producer_example/producer_example.go @@ -51,6 +51,10 @@ func main() { for e := range p.Events() { switch ev := e.(type) { case *kafka.Message: + // The message delivery report, indicating success or + // permanent failure after retries have been exhausted. + // Application level retries won't help since the client + // is already configured to do that. m := ev if m.TopicPartition.Error != nil { fmt.Printf("Delivery failed: %v\n", m.TopicPartition.Error) @@ -85,17 +89,12 @@ func main() { if err != nil { if err.(kafka.Error).Code() == kafka.ErrQueueFull { - // Producer queue is full, waits for it to be freed + // Producer queue is full, wait 1s for messages + // to be delivered then try again. time.Sleep(time.Second) continue } fmt.Printf("Failed to produce message: %v\n", err) - // Flush and close the producer and the events channel - for p.Flush(1000) > 0 { - fmt.Print("Still waiting to flush outstanding messages\n", err) - } - p.Close() - os.Exit(1) } msgcnt++ }