diff --git a/README.md b/README.md index 2b83a7045..b102411c9 100644 --- a/README.md +++ b/README.md @@ -224,81 +224,28 @@ with `-tags dynamic`. API Strands =========== -There are two main API strands: function and channel-based. +The recommended API strand is the Function-Based one, +the Channel-Based one is documented in [examples/legacy](examples/legacy). Function-Based Consumer ----------------------- Messages, errors and events are polled through the `consumer.Poll()` function. -Pros: - - * More direct mapping to underlying librdkafka functionality. - -Cons: - - * Makes it harder to read from multiple channels, but a go-routine easily - solves that (see Cons in channel-based consumer below about outdated events). - * Slower than the channel consumer. +It has direct mapping to underlying librdkafka functionality. See [examples/consumer_example](examples/consumer_example) -Channel-Based Consumer (deprecated) ------------------------------------ - -*Deprecated*: The channel-based consumer is deprecated due to the channel issues - mentioned below. Use the function-based consumer. - -Messages, errors and events are posted on the `consumer.Events()` channel -for the application to read. - -Pros: - - * Possibly more Golang:ish - * Makes reading from multiple channels easy - * Fast - -Cons: - - * Outdated events and messages may be consumed due to the buffering nature - of channels. The extent is limited, but not remedied, by the Events channel - buffer size (`go.events.channel.size`). - -See [examples/consumer_channel_example](examples/consumer_channel_example) - -Channel-Based Producer ----------------------- - -Application writes messages to the `producer.ProducerChannel()`. -Delivery reports are emitted on the `producer.Events()` or specified private channel. - -Pros: - - * Go:ish - * Proper channel backpressure if librdkafka internal queue is full. - -Cons: - - * Double queueing: messages are first queued in the channel (size is configurable) - and then inside librdkafka. - -See [examples/producer_channel_example](examples/producer_channel_example) - Function-Based Producer ----------------------- Application calls `producer.Produce()` to produce messages. Delivery reports are emitted on the `producer.Events()` or specified private channel. -Pros: - - * Go:ish - -Cons: +_Warnings_ * `Produce()` is a non-blocking call, if the internal librdkafka queue is full - the call will fail. - * Somewhat slower than the channel producer. + the call will fail and can be retried. See [examples/producer_example](examples/producer_example) diff --git a/examples/.gitignore b/examples/.gitignore index 001a47ba1..53c9c6450 100644 --- a/examples/.gitignore +++ b/examples/.gitignore @@ -1,13 +1,21 @@ -consumer_channel_example/consumer_channel_example +admin_create_acls/admin_create_acls +admin_create_topic/admin_create_topic +admin_delete_acls/admin_delete_acls +admin_delete_topics/admin_delete_topics +admin_describe_acls/admin_describe_acls +admin_describe_config/admin_describe_config +confluent_cloud_example/confluent_cloud_example consumer_example/consumer_example consumer_offset_metadata/consumer_offset_metadata -producer_channel_example/producer_channel_example -producer_example/producer_example +cooperative_consumer_example/cooperative_consumer_example go-kafkacat/go-kafkacat -admin_describe_config/admin_describe_config -admin_delete_topics/admin_delete_topics -admin_create_topic/admin_create_topic -admin_create_acls/admin_create_acls -admin_describe_acls/admin_describe_acls -admin_delete_acls/admin_delete_acls +idempotent_producer_example/idempotent_producer_example +legacy/consumer_channel_example/consumer_channel_example +legacy/producer_channel_example/producer_channel_example +library-version/library-version +mockcluster_example/mockcluster_example +oauthbearer_example/oauthbearer_example +producer_custom_channel_example/producer_custom_channel_example +producer_example/producer_example stats_example/stats_example +transactions_example/transactions_example diff --git a/examples/README.md b/examples/README.md index 49b973636..c0ad634e5 100644 --- a/examples/README.md +++ b/examples/README.md @@ -1,27 +1,51 @@ -Examples: +Examples +-------- - admin_create_acls - Create Access Control Lists - admin_describe_acls - Find Access Control Lists using a filter - admin_delete_acls - Delete Access Control Lists using different filters - consumer_channel_example - Channel based consumer - consumer_example - Function & callback based consumer - consumer_offset_metadata - Commit offset with metadata + [admin_create_acls](admin_create_acls) - Create Access Control Lists + + [admin_create_topic](admin_create_topic) - Create a topic - producer_channel_example - Channel based producer - producer_example - Function based producer + [admin_delete_acls](admin_delete_acls) - Delete Access Control Lists using different filters + + [admin_delete_topics](admin_delete_topics) - Delete some topics + + [admin_describe_acls](admin_describe_acls) - Find Access Control Lists using a filter + + [admin_describe_config](admin_describe_config) - Describe broker, topic or group configs + + [consumer_example](consumer_example) - Function & callback based consumer + + [consumer_offset_metadata](consumer_offset_metadata) - Commit offset with metadata + + [cooperative_consumer_example](cooperative_consumer_example) - Using the cooperative incremental rebalancing protocol - transactions_example - Showcasing a transactional consume-process-produce application + [confluent_cloud_example](confluent_cloud_example) - Usage example with Confluent Cloud - go-kafkacat - Channel based kafkacat Go clone + [go-kafkacat](go-kafkacat) - Channel based kafkacat Go clone - oauthbearer_example - Provides unsecured SASL/OAUTHBEARER example + [idempotent_producer_example](idempotent_producer_example) - Idempotent producer + + [legacy](legacy) - Legacy examples + + [library-version](library-version) - Show the library version + [mockcluster_example](mockcluster_example) - Use a mock cluster for testing -Usage example: + [oauthbearer_example](oauthbearer_example) - Provides unsecured SASL/OAUTHBEARER example + + [producer_custom_channel_example](producer_custom_channel_example) - Function based producer with a custom delivery channel + + [producer_example](producer_example) - Function based producer + + [stats_example](stats_example) - Receiving stats events + + [transactions_example](transactions_example) - Showcasing a transactional consume-process-produce application + +Usage example +------------- $ cd consumer_example $ go build (or 'go install') $ ./consumer_example # see usage $ ./consumer_example mybroker mygroup mytopic - diff --git a/examples/consumer_example/consumer_example.go b/examples/consumer_example/consumer_example.go index ee3b2882a..b6f663afd 100644 --- a/examples/consumer_example/consumer_example.go +++ b/examples/consumer_example/consumer_example.go @@ -50,10 +50,12 @@ func main() { // when using localhost brokers on OSX, since the OSX resolver // will return the IPv6 addresses first. // You typically don't need to specify this configuration property. - "broker.address.family": "v4", - "group.id": group, - "session.timeout.ms": 6000, - "auto.offset.reset": "earliest"}) + "broker.address.family": "v4", + "group.id": group, + "session.timeout.ms": 6000, + "auto.offset.reset": "earliest", + "enable.auto.offset.store": false, + }) if err != nil { fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err) @@ -84,6 +86,11 @@ func main() { if e.Headers != nil { fmt.Printf("%% Headers: %v\n", e.Headers) } + _, err := c.StoreMessage(e) + if err != nil { + fmt.Fprintf(os.Stderr, "%% Error storing offset after message %s:\n", + e.TopicPartition) + } case kafka.Error: // Errors should generally be considered // informational, the client will try to diff --git a/examples/legacy/README.md b/examples/legacy/README.md new file mode 100644 index 000000000..33f72ed23 --- /dev/null +++ b/examples/legacy/README.md @@ -0,0 +1,53 @@ +Legacy examples +=============== + +This directory contains examples for no longer recommended functionality + +Channel-Based Consumer (deprecated) +----------------------------------- + +*Deprecated*: The channel-based consumer is deprecated due to the channel issues + mentioned below. Use the function-based consumer. + +Messages, errors and events are posted on the `consumer.Events()` channel +for the application to read. + +Pros: + + * Possibly more Golang:ish + * Makes reading from multiple channels easy + * Fast + +Cons: + + * Outdated events and messages may be consumed due to the buffering nature + of channels. The extent is limited, but not remedied, by the Events channel + buffer size (`go.events.channel.size`). + +See [consumer_channel_example](consumer_channel_example) + +Channel-Based Producer +---------------------- + +Application writes messages to the `producer.ProducerChannel()`. +Delivery reports are emitted on the `producer.Events()` or specified private channel. + +Pros: + + * Go:ish + * Proper channel backpressure if librdkafka internal queue is full. + +Cons: + + * Double queueing: messages are first queued in the channel (size is configurable) + and then inside librdkafka. + +See [producer_channel_example](producer_channel_example) + +Usage example +------------- + + $ cd consumer_channel_example + $ go build (or 'go install') + $ ./consumer_channel_example # see usage + $ ./consumer_channel_example mybroker mygroup mytopic diff --git a/examples/consumer_channel_example/consumer_channel_example.go b/examples/legacy/consumer_channel_example/consumer_channel_example.go similarity index 100% rename from examples/consumer_channel_example/consumer_channel_example.go rename to examples/legacy/consumer_channel_example/consumer_channel_example.go diff --git a/examples/producer_channel_example/producer_channel_example.go b/examples/legacy/producer_channel_example/producer_channel_example.go similarity index 100% rename from examples/producer_channel_example/producer_channel_example.go rename to examples/legacy/producer_channel_example/producer_channel_example.go diff --git a/examples/producer_custom_channel_example/producer_custom_channel_example.go b/examples/producer_custom_channel_example/producer_custom_channel_example.go new file mode 100644 index 000000000..d4df3efa0 --- /dev/null +++ b/examples/producer_custom_channel_example/producer_custom_channel_example.go @@ -0,0 +1,129 @@ +// Example function-based Apache Kafka producer with a custom delivery channel +package main + +/** + * Copyright 2016 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import ( + "fmt" + "os" + "time" + + "github.com/confluentinc/confluent-kafka-go/kafka" +) + +func main() { + + if len(os.Args) != 3 { + fmt.Fprintf(os.Stderr, "Usage: %s \n", + os.Args[0]) + os.Exit(1) + } + + bootstrapServers := os.Args[1] + topic := os.Args[2] + totalMsgcnt := 3 + + p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": bootstrapServers}) + + if err != nil { + fmt.Printf("Failed to create producer: %s\n", err) + os.Exit(1) + } + + fmt.Printf("Created Producer %v\n", p) + + // Listen to all the client instance-level errors. + // It's important to read these errors too otherwise the events channel will eventually fill up + go func() { + for e := range p.Events() { + switch ev := e.(type) { + case kafka.Error: + // Generic client instance-level errors, such as + // broker connection failures, authentication issues, etc. + // + // These errors should generally be considered informational + // as the underlying client will automatically try to + // recover from any errors encountered, the application + // does not need to take action on them. + fmt.Printf("Error: %v\n", ev) + default: + fmt.Printf("Ignored event: %s\n", ev) + } + } + }() + + msgcnt := 0 + for msgcnt < totalMsgcnt { + value := fmt.Sprintf("Producer example, message #%d", msgcnt) + + // A delivery channel for each message sent. + // This permits to receive delivery reports + // separately and to handle the use case + // of a server that has multiple concurrent + // produce requests and needs to deliver the replies + // to many different response channels. + deliveryChan := make(chan kafka.Event) + go func() { + for e := range deliveryChan { + switch ev := e.(type) { + case *kafka.Message: + // The message delivery report, indicating success or + // permanent failure after retries have been exhausted. + // Application level retries won't help since the client + // is already configured to do that. + m := ev + if m.TopicPartition.Error != nil { + fmt.Printf("Delivery failed: %v\n", m.TopicPartition.Error) + } else { + fmt.Printf("Delivered message to topic %s [%d] at offset %v\n", + *m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset) + } + + default: + fmt.Printf("Ignored event: %s\n", ev) + } + // in this case the caller knows that this channel is used only + // for one Produce call, so it can close it. + close(deliveryChan) + } + }() + + err = p.Produce(&kafka.Message{ + TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, + Value: []byte(value), + Headers: []kafka.Header{{Key: "myTestHeader", Value: []byte("header values are binary")}}, + }, deliveryChan) + + if err != nil { + close(deliveryChan) + if err.(kafka.Error).Code() == kafka.ErrQueueFull { + // Producer queue is full, wait 1s for messages + // to be delivered then try again. + time.Sleep(time.Second) + continue + } + fmt.Printf("Failed to produce message: %v\n", err) + } + msgcnt++ + } + + // Flush and close the producer and the events channel + for p.Flush(10000) > 0 { + fmt.Print("Still waiting to flush outstanding messages\n", err) + } + p.Close() +} diff --git a/examples/producer_example/producer_example.go b/examples/producer_example/producer_example.go index 6751e14aa..5ef124752 100644 --- a/examples/producer_example/producer_example.go +++ b/examples/producer_example/producer_example.go @@ -20,6 +20,7 @@ package main import ( "fmt" "os" + "time" "github.com/confluentinc/confluent-kafka-go/kafka" ) @@ -34,6 +35,7 @@ func main() { bootstrapServers := os.Args[1] topic := os.Args[2] + totalMsgcnt := 3 p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": bootstrapServers}) @@ -44,26 +46,62 @@ func main() { fmt.Printf("Created Producer %v\n", p) - // Optional delivery channel, if not specified the Producer object's - // .Events channel is used. - deliveryChan := make(chan kafka.Event) + // Listen to all the events on the default events channel + go func() { + for e := range p.Events() { + switch ev := e.(type) { + case *kafka.Message: + // The message delivery report, indicating success or + // permanent failure after retries have been exhausted. + // Application level retries won't help since the client + // is already configured to do that. + m := ev + if m.TopicPartition.Error != nil { + fmt.Printf("Delivery failed: %v\n", m.TopicPartition.Error) + } else { + fmt.Printf("Delivered message to topic %s [%d] at offset %v\n", + *m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset) + } + case kafka.Error: + // Generic client instance-level errors, such as + // broker connection failures, authentication issues, etc. + // + // These errors should generally be considered informational + // as the underlying client will automatically try to + // recover from any errors encountered, the application + // does not need to take action on them. + fmt.Printf("Error: %v\n", ev) + default: + fmt.Printf("Ignored event: %s\n", ev) + } + } + }() - value := "Hello Go!" - err = p.Produce(&kafka.Message{ - TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, - Value: []byte(value), - Headers: []kafka.Header{{Key: "myTestHeader", Value: []byte("header values are binary")}}, - }, deliveryChan) + msgcnt := 0 + for msgcnt < totalMsgcnt { + value := fmt.Sprintf("Producer example, message #%d", msgcnt) - e := <-deliveryChan - m := e.(*kafka.Message) + err = p.Produce(&kafka.Message{ + TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, + Value: []byte(value), + Headers: []kafka.Header{{Key: "myTestHeader", Value: []byte("header values are binary")}}, + }, nil) - if m.TopicPartition.Error != nil { - fmt.Printf("Delivery failed: %v\n", m.TopicPartition.Error) - } else { - fmt.Printf("Delivered message to topic %s [%d] at offset %v\n", - *m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset) + if err != nil { + if err.(kafka.Error).Code() == kafka.ErrQueueFull { + // Producer queue is full, wait 1s for messages + // to be delivered then try again. + time.Sleep(time.Second) + continue + } + fmt.Printf("Failed to produce message: %v\n", err) + } + msgcnt++ } - close(deliveryChan) + // Flush and close the producer and the events channel + for p.Flush(10000) > 0 { + fmt.Print("Still waiting to flush outstanding messages\n", err) + } + p.Close() }