Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvements to Go client examples #801

Merged
merged 3 commits into from Jun 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
63 changes: 5 additions & 58 deletions README.md
Expand Up @@ -224,81 +224,28 @@ with `-tags dynamic`.
API Strands
===========

There are two main API strands: function and channel-based.
The recommended API strand is the Function-Based one,
the Channel-Based one is documented in [examples/legacy](examples/legacy).

Function-Based Consumer
-----------------------

Messages, errors and events are polled through the `consumer.Poll()` function.

Pros:

* More direct mapping to underlying librdkafka functionality.

Cons:

* Makes it harder to read from multiple channels, but a go-routine easily
solves that (see Cons in channel-based consumer below about outdated events).
* Slower than the channel consumer.
It has direct mapping to underlying librdkafka functionality.

See [examples/consumer_example](examples/consumer_example)

Channel-Based Consumer (deprecated)
-----------------------------------

*Deprecated*: The channel-based consumer is deprecated due to the channel issues
mentioned below. Use the function-based consumer.

Messages, errors and events are posted on the `consumer.Events()` channel
for the application to read.

Pros:

* Possibly more Golang:ish
* Makes reading from multiple channels easy
* Fast

Cons:

* Outdated events and messages may be consumed due to the buffering nature
of channels. The extent is limited, but not remedied, by the Events channel
buffer size (`go.events.channel.size`).

See [examples/consumer_channel_example](examples/consumer_channel_example)

Channel-Based Producer
----------------------

Application writes messages to the `producer.ProducerChannel()`.
Delivery reports are emitted on the `producer.Events()` or specified private channel.

Pros:

* Go:ish
* Proper channel backpressure if librdkafka internal queue is full.

Cons:

* Double queueing: messages are first queued in the channel (size is configurable)
and then inside librdkafka.

See [examples/producer_channel_example](examples/producer_channel_example)

Function-Based Producer
-----------------------

Application calls `producer.Produce()` to produce messages.
Delivery reports are emitted on the `producer.Events()` or specified private channel.

Pros:

* Go:ish

Cons:
_Warnings_

* `Produce()` is a non-blocking call, if the internal librdkafka queue is full
the call will fail.
* Somewhat slower than the channel producer.
the call will fail and can be retried.

See [examples/producer_example](examples/producer_example)

Expand Down
26 changes: 17 additions & 9 deletions examples/.gitignore
@@ -1,13 +1,21 @@
consumer_channel_example/consumer_channel_example
admin_create_acls/admin_create_acls
admin_create_topic/admin_create_topic
admin_delete_acls/admin_delete_acls
admin_delete_topics/admin_delete_topics
admin_describe_acls/admin_describe_acls
admin_describe_config/admin_describe_config
confluent_cloud_example/confluent_cloud_example
consumer_example/consumer_example
consumer_offset_metadata/consumer_offset_metadata
producer_channel_example/producer_channel_example
producer_example/producer_example
cooperative_consumer_example/cooperative_consumer_example
go-kafkacat/go-kafkacat
admin_describe_config/admin_describe_config
admin_delete_topics/admin_delete_topics
admin_create_topic/admin_create_topic
admin_create_acls/admin_create_acls
admin_describe_acls/admin_describe_acls
admin_delete_acls/admin_delete_acls
idempotent_producer_example/idempotent_producer_example
legacy/consumer_channel_example/consumer_channel_example
legacy/producer_channel_example/producer_channel_example
library-version/library-version
mockcluster_example/mockcluster_example
oauthbearer_example/oauthbearer_example
producer_custom_channel_example/producer_custom_channel_example
producer_example/producer_example
stats_example/stats_example
transactions_example/transactions_example
52 changes: 38 additions & 14 deletions examples/README.md
@@ -1,27 +1,51 @@

Examples:
Examples
--------

admin_create_acls - Create Access Control Lists
admin_describe_acls - Find Access Control Lists using a filter
admin_delete_acls - Delete Access Control Lists using different filters
consumer_channel_example - Channel based consumer
consumer_example - Function & callback based consumer
consumer_offset_metadata - Commit offset with metadata
[admin_create_acls](admin_create_acls) - Create Access Control Lists

[admin_create_topic](admin_create_topic) - Create a topic

producer_channel_example - Channel based producer
producer_example - Function based producer
[admin_delete_acls](admin_delete_acls) - Delete Access Control Lists using different filters

[admin_delete_topics](admin_delete_topics) - Delete some topics

[admin_describe_acls](admin_describe_acls) - Find Access Control Lists using a filter

[admin_describe_config](admin_describe_config) - Describe broker, topic or group configs

[consumer_example](consumer_example) - Function & callback based consumer

[consumer_offset_metadata](consumer_offset_metadata) - Commit offset with metadata

[cooperative_consumer_example](cooperative_consumer_example) - Using the cooperative incremental rebalancing protocol

transactions_example - Showcasing a transactional consume-process-produce application
[confluent_cloud_example](confluent_cloud_example) - Usage example with Confluent Cloud

go-kafkacat - Channel based kafkacat Go clone
[go-kafkacat](go-kafkacat) - Channel based kafkacat Go clone

oauthbearer_example - Provides unsecured SASL/OAUTHBEARER example
[idempotent_producer_example](idempotent_producer_example) - Idempotent producer

[legacy](legacy) - Legacy examples

[library-version](library-version) - Show the library version

[mockcluster_example](mockcluster_example) - Use a mock cluster for testing

Usage example:
[oauthbearer_example](oauthbearer_example) - Provides unsecured SASL/OAUTHBEARER example

[producer_custom_channel_example](producer_custom_channel_example) - Function based producer with a custom delivery channel

[producer_example](producer_example) - Function based producer

[stats_example](stats_example) - Receiving stats events

[transactions_example](transactions_example) - Showcasing a transactional consume-process-produce application

Usage example
-------------

$ cd consumer_example
$ go build (or 'go install')
$ ./consumer_example # see usage
$ ./consumer_example mybroker mygroup mytopic

15 changes: 11 additions & 4 deletions examples/consumer_example/consumer_example.go
Expand Up @@ -50,10 +50,12 @@ func main() {
// when using localhost brokers on OSX, since the OSX resolver
// will return the IPv6 addresses first.
// You typically don't need to specify this configuration property.
"broker.address.family": "v4",
"group.id": group,
"session.timeout.ms": 6000,
"auto.offset.reset": "earliest"})
"broker.address.family": "v4",
"group.id": group,
"session.timeout.ms": 6000,
"auto.offset.reset": "earliest",
"enable.auto.offset.store": false,
})

if err != nil {
fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err)
Expand Down Expand Up @@ -84,6 +86,11 @@ func main() {
if e.Headers != nil {
fmt.Printf("%% Headers: %v\n", e.Headers)
}
_, err := c.StoreMessage(e)
if err != nil {
fmt.Fprintf(os.Stderr, "%% Error storing offset after message %s:\n",
e.TopicPartition)
}
case kafka.Error:
// Errors should generally be considered
// informational, the client will try to
Expand Down
53 changes: 53 additions & 0 deletions examples/legacy/README.md
@@ -0,0 +1,53 @@
Legacy examples
===============

This directory contains examples for no longer recommended functionality

Channel-Based Consumer (deprecated)
-----------------------------------

*Deprecated*: The channel-based consumer is deprecated due to the channel issues
mentioned below. Use the function-based consumer.

Messages, errors and events are posted on the `consumer.Events()` channel
for the application to read.

Pros:

* Possibly more Golang:ish
* Makes reading from multiple channels easy
* Fast

Cons:

* Outdated events and messages may be consumed due to the buffering nature
of channels. The extent is limited, but not remedied, by the Events channel
buffer size (`go.events.channel.size`).

See [consumer_channel_example](consumer_channel_example)

Channel-Based Producer
----------------------

Application writes messages to the `producer.ProducerChannel()`.
Delivery reports are emitted on the `producer.Events()` or specified private channel.

Pros:

* Go:ish
* Proper channel backpressure if librdkafka internal queue is full.

Cons:

* Double queueing: messages are first queued in the channel (size is configurable)
and then inside librdkafka.

See [producer_channel_example](producer_channel_example)

Usage example
-------------

$ cd consumer_channel_example
$ go build (or 'go install')
$ ./consumer_channel_example # see usage
$ ./consumer_channel_example mybroker mygroup mytopic
@@ -0,0 +1,129 @@
// Example function-based Apache Kafka producer with a custom delivery channel
package main

/**
* Copyright 2016 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import (
"fmt"
"os"
"time"

"github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {

if len(os.Args) != 3 {
fmt.Fprintf(os.Stderr, "Usage: %s <bootstrap-servers> <topic>\n",
os.Args[0])
os.Exit(1)
}

bootstrapServers := os.Args[1]
topic := os.Args[2]
totalMsgcnt := 3

p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": bootstrapServers})

if err != nil {
fmt.Printf("Failed to create producer: %s\n", err)
os.Exit(1)
}

fmt.Printf("Created Producer %v\n", p)

// Listen to all the client instance-level errors.
// It's important to read these errors too otherwise the events channel will eventually fill up
go func() {
for e := range p.Events() {
switch ev := e.(type) {
case kafka.Error:
// Generic client instance-level errors, such as
// broker connection failures, authentication issues, etc.
//
// These errors should generally be considered informational
// as the underlying client will automatically try to
// recover from any errors encountered, the application
// does not need to take action on them.
fmt.Printf("Error: %v\n", ev)
default:
fmt.Printf("Ignored event: %s\n", ev)
}
}
}()

msgcnt := 0
for msgcnt < totalMsgcnt {
value := fmt.Sprintf("Producer example, message #%d", msgcnt)

// A delivery channel for each message sent.
// This permits to receive delivery reports
// separately and to handle the use case
// of a server that has multiple concurrent
// produce requests and needs to deliver the replies
// to many different response channels.
deliveryChan := make(chan kafka.Event)
go func() {
for e := range deliveryChan {
switch ev := e.(type) {
case *kafka.Message:
// The message delivery report, indicating success or
// permanent failure after retries have been exhausted.
// Application level retries won't help since the client
// is already configured to do that.
m := ev
if m.TopicPartition.Error != nil {
fmt.Printf("Delivery failed: %v\n", m.TopicPartition.Error)
} else {
fmt.Printf("Delivered message to topic %s [%d] at offset %v\n",
*m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
}

default:
fmt.Printf("Ignored event: %s\n", ev)
}
// in this case the caller knows that this channel is used only
// for one Produce call, so it can close it.
close(deliveryChan)
}
}()

err = p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: []byte(value),
Headers: []kafka.Header{{Key: "myTestHeader", Value: []byte("header values are binary")}},
}, deliveryChan)

if err != nil {
close(deliveryChan)
if err.(kafka.Error).Code() == kafka.ErrQueueFull {
// Producer queue is full, wait 1s for messages
// to be delivered then try again.
time.Sleep(time.Second)
continue
}
fmt.Printf("Failed to produce message: %v\n", err)
}
msgcnt++
}

// Flush and close the producer and the events channel
for p.Flush(10000) > 0 {
fmt.Print("Still waiting to flush outstanding messages\n", err)
}
p.Close()
}