Skip to content

Commit

Permalink
Improvements to Go client examples (#801)
Browse files Browse the repository at this point in the history
improvements to client examples:
- Changed the producer example to use the Events delivery channel.
- Added another producer example using a custom delivery channel
- Moved channel producer and consumer examples to the
  examples/legacy folder
- Added missing descriptions for other examples
- Consumer example with at-least-once guarantee by disabling
  enable.auto.offset.store and using StoreMessage
- Custom channel example with a channel for each Produce call
  • Loading branch information
emasab committed Jun 21, 2022
1 parent 9c586e3 commit b89b9e4
Show file tree
Hide file tree
Showing 9 changed files with 308 additions and 102 deletions.
63 changes: 5 additions & 58 deletions README.md
Expand Up @@ -224,81 +224,28 @@ with `-tags dynamic`.
API Strands
===========

There are two main API strands: function and channel-based.
The recommended API strand is the Function-Based one,
the Channel-Based one is documented in [examples/legacy](examples/legacy).

Function-Based Consumer
-----------------------

Messages, errors and events are polled through the `consumer.Poll()` function.

Pros:

* More direct mapping to underlying librdkafka functionality.

Cons:

* Makes it harder to read from multiple channels, but a go-routine easily
solves that (see Cons in channel-based consumer below about outdated events).
* Slower than the channel consumer.
It has direct mapping to underlying librdkafka functionality.

See [examples/consumer_example](examples/consumer_example)

Channel-Based Consumer (deprecated)
-----------------------------------

*Deprecated*: The channel-based consumer is deprecated due to the channel issues
mentioned below. Use the function-based consumer.

Messages, errors and events are posted on the `consumer.Events()` channel
for the application to read.

Pros:

* Possibly more Golang:ish
* Makes reading from multiple channels easy
* Fast

Cons:

* Outdated events and messages may be consumed due to the buffering nature
of channels. The extent is limited, but not remedied, by the Events channel
buffer size (`go.events.channel.size`).

See [examples/consumer_channel_example](examples/consumer_channel_example)

Channel-Based Producer
----------------------

Application writes messages to the `producer.ProducerChannel()`.
Delivery reports are emitted on the `producer.Events()` or specified private channel.

Pros:

* Go:ish
* Proper channel backpressure if librdkafka internal queue is full.

Cons:

* Double queueing: messages are first queued in the channel (size is configurable)
and then inside librdkafka.

See [examples/producer_channel_example](examples/producer_channel_example)

Function-Based Producer
-----------------------

Application calls `producer.Produce()` to produce messages.
Delivery reports are emitted on the `producer.Events()` or specified private channel.

Pros:

* Go:ish

Cons:
_Warnings_

* `Produce()` is a non-blocking call, if the internal librdkafka queue is full
the call will fail.
* Somewhat slower than the channel producer.
the call will fail and can be retried.

See [examples/producer_example](examples/producer_example)

Expand Down
26 changes: 17 additions & 9 deletions examples/.gitignore
@@ -1,13 +1,21 @@
consumer_channel_example/consumer_channel_example
admin_create_acls/admin_create_acls
admin_create_topic/admin_create_topic
admin_delete_acls/admin_delete_acls
admin_delete_topics/admin_delete_topics
admin_describe_acls/admin_describe_acls
admin_describe_config/admin_describe_config
confluent_cloud_example/confluent_cloud_example
consumer_example/consumer_example
consumer_offset_metadata/consumer_offset_metadata
producer_channel_example/producer_channel_example
producer_example/producer_example
cooperative_consumer_example/cooperative_consumer_example
go-kafkacat/go-kafkacat
admin_describe_config/admin_describe_config
admin_delete_topics/admin_delete_topics
admin_create_topic/admin_create_topic
admin_create_acls/admin_create_acls
admin_describe_acls/admin_describe_acls
admin_delete_acls/admin_delete_acls
idempotent_producer_example/idempotent_producer_example
legacy/consumer_channel_example/consumer_channel_example
legacy/producer_channel_example/producer_channel_example
library-version/library-version
mockcluster_example/mockcluster_example
oauthbearer_example/oauthbearer_example
producer_custom_channel_example/producer_custom_channel_example
producer_example/producer_example
stats_example/stats_example
transactions_example/transactions_example
52 changes: 38 additions & 14 deletions examples/README.md
@@ -1,27 +1,51 @@

Examples:
Examples
--------

admin_create_acls - Create Access Control Lists
admin_describe_acls - Find Access Control Lists using a filter
admin_delete_acls - Delete Access Control Lists using different filters
consumer_channel_example - Channel based consumer
consumer_example - Function & callback based consumer
consumer_offset_metadata - Commit offset with metadata
[admin_create_acls](admin_create_acls) - Create Access Control Lists

[admin_create_topic](admin_create_topic) - Create a topic

producer_channel_example - Channel based producer
producer_example - Function based producer
[admin_delete_acls](admin_delete_acls) - Delete Access Control Lists using different filters

[admin_delete_topics](admin_delete_topics) - Delete some topics

[admin_describe_acls](admin_describe_acls) - Find Access Control Lists using a filter

[admin_describe_config](admin_describe_config) - Describe broker, topic or group configs

[consumer_example](consumer_example) - Function & callback based consumer

[consumer_offset_metadata](consumer_offset_metadata) - Commit offset with metadata

[cooperative_consumer_example](cooperative_consumer_example) - Using the cooperative incremental rebalancing protocol

transactions_example - Showcasing a transactional consume-process-produce application
[confluent_cloud_example](confluent_cloud_example) - Usage example with Confluent Cloud

go-kafkacat - Channel based kafkacat Go clone
[go-kafkacat](go-kafkacat) - Channel based kafkacat Go clone

oauthbearer_example - Provides unsecured SASL/OAUTHBEARER example
[idempotent_producer_example](idempotent_producer_example) - Idempotent producer

[legacy](legacy) - Legacy examples

[library-version](library-version) - Show the library version

[mockcluster_example](mockcluster_example) - Use a mock cluster for testing

Usage example:
[oauthbearer_example](oauthbearer_example) - Provides unsecured SASL/OAUTHBEARER example

[producer_custom_channel_example](producer_custom_channel_example) - Function based producer with a custom delivery channel

[producer_example](producer_example) - Function based producer

[stats_example](stats_example) - Receiving stats events

[transactions_example](transactions_example) - Showcasing a transactional consume-process-produce application

Usage example
-------------

$ cd consumer_example
$ go build (or 'go install')
$ ./consumer_example # see usage
$ ./consumer_example mybroker mygroup mytopic

15 changes: 11 additions & 4 deletions examples/consumer_example/consumer_example.go
Expand Up @@ -50,10 +50,12 @@ func main() {
// when using localhost brokers on OSX, since the OSX resolver
// will return the IPv6 addresses first.
// You typically don't need to specify this configuration property.
"broker.address.family": "v4",
"group.id": group,
"session.timeout.ms": 6000,
"auto.offset.reset": "earliest"})
"broker.address.family": "v4",
"group.id": group,
"session.timeout.ms": 6000,
"auto.offset.reset": "earliest",
"enable.auto.offset.store": false,
})

if err != nil {
fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err)
Expand Down Expand Up @@ -84,6 +86,11 @@ func main() {
if e.Headers != nil {
fmt.Printf("%% Headers: %v\n", e.Headers)
}
_, err := c.StoreMessage(e)
if err != nil {
fmt.Fprintf(os.Stderr, "%% Error storing offset after message %s:\n",
e.TopicPartition)
}
case kafka.Error:
// Errors should generally be considered
// informational, the client will try to
Expand Down
53 changes: 53 additions & 0 deletions examples/legacy/README.md
@@ -0,0 +1,53 @@
Legacy examples
===============

This directory contains examples for no longer recommended functionality

Channel-Based Consumer (deprecated)
-----------------------------------

*Deprecated*: The channel-based consumer is deprecated due to the channel issues
mentioned below. Use the function-based consumer.

Messages, errors and events are posted on the `consumer.Events()` channel
for the application to read.

Pros:

* Possibly more Golang:ish
* Makes reading from multiple channels easy
* Fast

Cons:

* Outdated events and messages may be consumed due to the buffering nature
of channels. The extent is limited, but not remedied, by the Events channel
buffer size (`go.events.channel.size`).

See [consumer_channel_example](consumer_channel_example)

Channel-Based Producer
----------------------

Application writes messages to the `producer.ProducerChannel()`.
Delivery reports are emitted on the `producer.Events()` or specified private channel.

Pros:

* Go:ish
* Proper channel backpressure if librdkafka internal queue is full.

Cons:

* Double queueing: messages are first queued in the channel (size is configurable)
and then inside librdkafka.

See [producer_channel_example](producer_channel_example)

Usage example
-------------

$ cd consumer_channel_example
$ go build (or 'go install')
$ ./consumer_channel_example # see usage
$ ./consumer_channel_example mybroker mygroup mytopic
@@ -0,0 +1,129 @@
// Example function-based Apache Kafka producer with a custom delivery channel
package main

/**
* Copyright 2016 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import (
"fmt"
"os"
"time"

"github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {

if len(os.Args) != 3 {
fmt.Fprintf(os.Stderr, "Usage: %s <bootstrap-servers> <topic>\n",
os.Args[0])
os.Exit(1)
}

bootstrapServers := os.Args[1]
topic := os.Args[2]
totalMsgcnt := 3

p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": bootstrapServers})

if err != nil {
fmt.Printf("Failed to create producer: %s\n", err)
os.Exit(1)
}

fmt.Printf("Created Producer %v\n", p)

// Listen to all the client instance-level errors.
// It's important to read these errors too otherwise the events channel will eventually fill up
go func() {
for e := range p.Events() {
switch ev := e.(type) {
case kafka.Error:
// Generic client instance-level errors, such as
// broker connection failures, authentication issues, etc.
//
// These errors should generally be considered informational
// as the underlying client will automatically try to
// recover from any errors encountered, the application
// does not need to take action on them.
fmt.Printf("Error: %v\n", ev)
default:
fmt.Printf("Ignored event: %s\n", ev)
}
}
}()

msgcnt := 0
for msgcnt < totalMsgcnt {
value := fmt.Sprintf("Producer example, message #%d", msgcnt)

// A delivery channel for each message sent.
// This permits to receive delivery reports
// separately and to handle the use case
// of a server that has multiple concurrent
// produce requests and needs to deliver the replies
// to many different response channels.
deliveryChan := make(chan kafka.Event)
go func() {
for e := range deliveryChan {
switch ev := e.(type) {
case *kafka.Message:
// The message delivery report, indicating success or
// permanent failure after retries have been exhausted.
// Application level retries won't help since the client
// is already configured to do that.
m := ev
if m.TopicPartition.Error != nil {
fmt.Printf("Delivery failed: %v\n", m.TopicPartition.Error)
} else {
fmt.Printf("Delivered message to topic %s [%d] at offset %v\n",
*m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
}

default:
fmt.Printf("Ignored event: %s\n", ev)
}
// in this case the caller knows that this channel is used only
// for one Produce call, so it can close it.
close(deliveryChan)
}
}()

err = p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: []byte(value),
Headers: []kafka.Header{{Key: "myTestHeader", Value: []byte("header values are binary")}},
}, deliveryChan)

if err != nil {
close(deliveryChan)
if err.(kafka.Error).Code() == kafka.ErrQueueFull {
// Producer queue is full, wait 1s for messages
// to be delivered then try again.
time.Sleep(time.Second)
continue
}
fmt.Printf("Failed to produce message: %v\n", err)
}
msgcnt++
}

// Flush and close the producer and the events channel
for p.Flush(10000) > 0 {
fmt.Print("Still waiting to flush outstanding messages\n", err)
}
p.Close()
}

0 comments on commit b89b9e4

Please sign in to comment.