diff --git a/examples/mockcluster_example/mockcluster.go b/examples/mockcluster_example/mockcluster.go new file mode 100644 index 000000000..4732c98ea --- /dev/null +++ b/examples/mockcluster_example/mockcluster.go @@ -0,0 +1,95 @@ +package main + +/** + * Copyright 2022 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import ( + "fmt" + "github.com/confluentinc/confluent-kafka-go/kafka" + "os" +) + +func main() { + + mockCluster, err := kafka.NewMockCluster(1) + if err != nil { + fmt.Printf("Failed to create MockCluster: %s\n", err) + os.Exit(1) + } + defer mockCluster.Close() + + broker := mockCluster.BootstrapServers() + + p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": broker}) + + if err != nil { + fmt.Printf("Failed to create producer: %s\n", err) + os.Exit(1) + } + + fmt.Printf("Created Producer %v\n", p) + deliveryChan := make(chan kafka.Event) + + topic := "Test" + value := "Hello Go!" + err = p.Produce(&kafka.Message{ + TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, + Value: []byte(value), + Headers: []kafka.Header{{Key: "myTestHeader", Value: []byte("header values are binary")}}, + }, deliveryChan) + + e := <-deliveryChan + m := e.(*kafka.Message) + + if m.TopicPartition.Error != nil { + fmt.Printf("Delivery failed: %v\n", m.TopicPartition.Error) + } else { + fmt.Printf("Delivered message to topic %s [%d] at offset %v\n", + *m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset) + } + + close(deliveryChan) + + c, err := kafka.NewConsumer(&kafka.ConfigMap{ + "bootstrap.servers": broker, + "broker.address.family": "v4", + "group.id": "group", + "session.timeout.ms": 6000, + "auto.offset.reset": "earliest"}) + + if err != nil { + fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err) + os.Exit(1) + } + defer c.Close() + + fmt.Printf("Created Consumer %v\n", c) + + err = c.SubscribeTopics([]string{topic}, nil) + if err != nil { + fmt.Fprintf(os.Stderr, "Failed to subscribe to consumer: %s\n", err) + os.Exit(1) + } + + msg, err := c.ReadMessage(-1) + if err != nil { + fmt.Fprintf(os.Stderr, "Failed to read message: %s\n", err) + os.Exit(1) + } + + fmt.Println("received message: ", string(msg.Value)) + +} diff --git a/kafka/adminapi.go b/kafka/adminapi.go index e12812969..ef3b31d55 100644 --- a/kafka/adminapi.go +++ b/kafka/adminapi.go @@ -1,3 +1,5 @@ +package kafka + /** * Copyright 2018 Confluent Inc. * @@ -14,8 +16,6 @@ * limitations under the License. */ -package kafka - import ( "context" "fmt" diff --git a/kafka/adminoptions.go b/kafka/adminoptions.go index 842631b30..6d7022ad9 100644 --- a/kafka/adminoptions.go +++ b/kafka/adminoptions.go @@ -1,3 +1,5 @@ +package kafka + /** * Copyright 2018 Confluent Inc. * @@ -14,8 +16,6 @@ * limitations under the License. */ -package kafka - import ( "fmt" "time" diff --git a/kafka/config_test.go b/kafka/config_test.go index 60093e496..67ee724ac 100644 --- a/kafka/config_test.go +++ b/kafka/config_test.go @@ -1,3 +1,5 @@ +package kafka + /** * Copyright 2016 Confluent Inc. * @@ -14,8 +16,6 @@ * limitations under the License. */ -package kafka - import ( "fmt" "testing" diff --git a/kafka/consumer_performance_test.go b/kafka/consumer_performance_test.go index 49edbaea5..e77cf22dc 100644 --- a/kafka/consumer_performance_test.go +++ b/kafka/consumer_performance_test.go @@ -1,3 +1,5 @@ +package kafka + /** * Copyright 2016 Confluent Inc. * @@ -14,8 +16,6 @@ * limitations under the License. */ -package kafka - import ( "fmt" "math/rand" diff --git a/kafka/consumer_test.go b/kafka/consumer_test.go index 90e0e8efc..caa23e5c2 100644 --- a/kafka/consumer_test.go +++ b/kafka/consumer_test.go @@ -1,3 +1,5 @@ +package kafka + /** * Copyright 2016 Confluent Inc. * @@ -14,8 +16,6 @@ * limitations under the License. */ -package kafka - import ( "fmt" "os" diff --git a/kafka/context.go b/kafka/context.go index 85709be0f..b0dfd0e44 100644 --- a/kafka/context.go +++ b/kafka/context.go @@ -1,3 +1,5 @@ +package kafka + /** * Copyright 2019 Confluent Inc. * @@ -14,8 +16,6 @@ * limitations under the License. */ -package kafka - import ( "context" "time" diff --git a/kafka/error_test.go b/kafka/error_test.go index d405ba749..414fd59e8 100644 --- a/kafka/error_test.go +++ b/kafka/error_test.go @@ -1,3 +1,5 @@ +package kafka + /** * Copyright 2019 Confluent Inc. * @@ -14,8 +16,6 @@ * limitations under the License. */ -package kafka - import ( "strings" "testing" diff --git a/kafka/event_test.go b/kafka/event_test.go index 0c172e0b8..355ae2145 100644 --- a/kafka/event_test.go +++ b/kafka/event_test.go @@ -1,3 +1,5 @@ +package kafka + /** * Copyright 2016 Confluent Inc. * @@ -14,8 +16,6 @@ * limitations under the License. */ -package kafka - import ( "testing" ) diff --git a/kafka/header_test.go b/kafka/header_test.go index f365cb912..6c0111941 100644 --- a/kafka/header_test.go +++ b/kafka/header_test.go @@ -1,3 +1,5 @@ +package kafka + /** * Copyright 2016 Confluent Inc. * @@ -14,8 +16,6 @@ * limitations under the License. */ -package kafka - import ( "testing" ) diff --git a/kafka/integration_test.go b/kafka/integration_test.go index b0df51535..917f1a9df 100644 --- a/kafka/integration_test.go +++ b/kafka/integration_test.go @@ -1,3 +1,5 @@ +package kafka + /** * Copyright 2016 Confluent Inc. * @@ -14,8 +16,6 @@ * limitations under the License. */ -package kafka - import ( "context" "encoding/binary" diff --git a/kafka/kafka.go b/kafka/kafka.go index 20dc30a89..766cb24d7 100644 --- a/kafka/kafka.go +++ b/kafka/kafka.go @@ -1,3 +1,5 @@ +package kafka + /** * Copyright 2016 Confluent Inc. * @@ -20,7 +22,7 @@ // // High-level Consumer // -// * Decide if you want to read messages and events by calling `.Poll()` or +// * Decide if you want to read messages and events by calling `.Poll()` or // the deprecated option of using the `.Events()` channel. (If you want to use // `.Events()` channel then set `"go.events.channel.enable": true`). // @@ -249,7 +251,6 @@ // possible complications with blocking Poll() calls. // // Note: The Confluent Kafka Go client is safe for concurrent use. -package kafka import ( "fmt" diff --git a/kafka/kafka_test.go b/kafka/kafka_test.go index e268f7eb6..9659c812e 100644 --- a/kafka/kafka_test.go +++ b/kafka/kafka_test.go @@ -1,3 +1,5 @@ +package kafka + /** * Copyright 2016 Confluent Inc. * @@ -14,8 +16,6 @@ * limitations under the License. */ -package kafka - import ( "testing" ) diff --git a/kafka/librdkafka_vendor/rdkafka_mock.h b/kafka/librdkafka_vendor/rdkafka_mock.h new file mode 100644 index 000000000..099280078 --- /dev/null +++ b/kafka/librdkafka_vendor/rdkafka_mock.h @@ -0,0 +1,331 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2019 Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef _RDKAFKA_MOCK_H_ +#define _RDKAFKA_MOCK_H_ + +#ifndef _RDKAFKA_H_ +#error "rdkafka_mock.h must be included after rdkafka.h" +#endif + +#ifdef __cplusplus +extern "C" { +#if 0 +} /* Restore indent */ +#endif +#endif + + +/** + * @name Mock cluster + * + * Provides a mock Kafka cluster with a configurable number of brokers + * that support a reasonable subset of Kafka protocol operations, + * error injection, etc. + * + * There are two ways to use the mock clusters, the most simple approach + * is to configure `test.mock.num.brokers` (to e.g. 3) on the rd_kafka_t + * in an existing application, which will replace the configured + * `bootstrap.servers` with the mock cluster brokers. + * This approach is convenient to easily test existing applications. + * + * The second approach is to explicitly create a mock cluster on an + * rd_kafka_t instance by using rd_kafka_mock_cluster_new(). + * + * Mock clusters provide localhost listeners that can be used as the bootstrap + * servers by multiple rd_kafka_t instances. + * + * Currently supported functionality: + * - Producer + * - Idempotent Producer + * - Transactional Producer + * - Low-level consumer + * - High-level balanced consumer groups with offset commits + * - Topic Metadata and auto creation + * + * @remark High-level consumers making use of the balanced consumer groups + * are not supported. + * + * @remark This is an experimental public API that is NOT covered by the + * librdkafka API or ABI stability guarantees. + * + * + * @warning THIS IS AN EXPERIMENTAL API, SUBJECT TO CHANGE OR REMOVAL. + * + * @{ + */ + +typedef struct rd_kafka_mock_cluster_s rd_kafka_mock_cluster_t; + + +/** + * @brief Create new mock cluster with \p broker_cnt brokers. + * + * The broker ids will start at 1 up to and including \p broker_cnt. + * + * The \p rk instance is required for internal book keeping but continues + * to operate as usual. + */ +RD_EXPORT +rd_kafka_mock_cluster_t *rd_kafka_mock_cluster_new (rd_kafka_t *rk, + int broker_cnt); + + +/** + * @brief Destroy mock cluster. + */ +RD_EXPORT +void rd_kafka_mock_cluster_destroy (rd_kafka_mock_cluster_t *mcluster); + + + +/** + * @returns the rd_kafka_t instance for a cluster as passed to + * rd_kafka_mock_cluster_new(). + */ +RD_EXPORT rd_kafka_t * +rd_kafka_mock_cluster_handle (const rd_kafka_mock_cluster_t *mcluster); + + +/** + * @returns the rd_kafka_mock_cluster_t instance as created by + * setting the `test.mock.num.brokers` configuration property, + * or NULL if no such instance. + */ +RD_EXPORT rd_kafka_mock_cluster_t * +rd_kafka_handle_mock_cluster (const rd_kafka_t *rk); + + + +/** + * @returns the mock cluster's bootstrap.servers list + */ +RD_EXPORT const char * +rd_kafka_mock_cluster_bootstraps (const rd_kafka_mock_cluster_t *mcluster); + + +/** + * @brief Clear the cluster's error state for the given \p ApiKey. + */ +RD_EXPORT +void rd_kafka_mock_clear_request_errors (rd_kafka_mock_cluster_t *mcluster, + int16_t ApiKey); + + +/** + * @brief Push \p cnt errors in the \p ... va-arg list onto the cluster's + * error stack for the given \p ApiKey. + * + * \p ApiKey is the Kafka protocol request type, e.g., ProduceRequest (0). + * + * The following \p cnt protocol requests matching \p ApiKey will fail with the + * provided error code and removed from the stack, starting with + * the first error code, then the second, etc. + * + * Passing \c RD_KAFKA_RESP_ERR__TRANSPORT will make the mock broker + * disconnect the client which can be useful to trigger a disconnect on certain + * requests. + */ +RD_EXPORT +void rd_kafka_mock_push_request_errors (rd_kafka_mock_cluster_t *mcluster, + int16_t ApiKey, size_t cnt, ...); + + +/** + * @brief Same as rd_kafka_mock_push_request_errors() but takes + * an array of errors. + */ +RD_EXPORT void +rd_kafka_mock_push_request_errors_array (rd_kafka_mock_cluster_t *mcluster, + int16_t ApiKey, + size_t cnt, + const rd_kafka_resp_err_t *errors); + + +/** + * @brief Push \p cnt errors and RTT tuples in the \p ... va-arg list onto + * the broker's error stack for the given \p ApiKey. + * + * \p ApiKey is the Kafka protocol request type, e.g., ProduceRequest (0). + * + * Each entry is a tuple of: + * rd_kafka_resp_err_t err - error to return (or 0) + * int rtt_ms - response RTT/delay in milliseconds (or 0) + * + * The following \p cnt protocol requests matching \p ApiKey will fail with the + * provided error code and removed from the stack, starting with + * the first error code, then the second, etc. + * + * @remark The broker errors take precedence over the cluster errors. + */ +RD_EXPORT rd_kafka_resp_err_t +rd_kafka_mock_broker_push_request_error_rtts (rd_kafka_mock_cluster_t *mcluster, + int32_t broker_id, + int16_t ApiKey, size_t cnt, ...); + + +/** + * @brief Set the topic error to return in protocol requests. + * + * Currently only used for TopicMetadataRequest and AddPartitionsToTxnRequest. + */ +RD_EXPORT +void rd_kafka_mock_topic_set_error (rd_kafka_mock_cluster_t *mcluster, + const char *topic, + rd_kafka_resp_err_t err); + + +/** + * @brief Creates a topic. + * + * This is an alternative to automatic topic creation as performed by + * the client itself. + * + * @remark The Topic Admin API (CreateTopics) is not supported by the + * mock broker. + */ +RD_EXPORT rd_kafka_resp_err_t +rd_kafka_mock_topic_create (rd_kafka_mock_cluster_t *mcluster, + const char *topic, int partition_cnt, + int replication_factor); + + +/** + * @brief Sets the partition leader. + * + * The topic will be created if it does not exist. + * + * \p broker_id needs to be an existing broker, or -1 to make the + * partition leader-less. + */ +RD_EXPORT rd_kafka_resp_err_t +rd_kafka_mock_partition_set_leader (rd_kafka_mock_cluster_t *mcluster, + const char *topic, int32_t partition, + int32_t broker_id); + +/** + * @brief Sets the partition's preferred replica / follower. + * + * The topic will be created if it does not exist. + * + * \p broker_id does not need to point to an existing broker. + */ +RD_EXPORT rd_kafka_resp_err_t +rd_kafka_mock_partition_set_follower (rd_kafka_mock_cluster_t *mcluster, + const char *topic, int32_t partition, + int32_t broker_id); + +/** + * @brief Sets the partition's preferred replica / follower low and high + * watermarks. + * + * The topic will be created if it does not exist. + * + * Setting an offset to -1 will revert back to the leader's corresponding + * watermark. + */ +RD_EXPORT rd_kafka_resp_err_t +rd_kafka_mock_partition_set_follower_wmarks (rd_kafka_mock_cluster_t *mcluster, + const char *topic, + int32_t partition, + int64_t lo, int64_t hi); + + +/** + * @brief Disconnects the broker and disallows any new connections. + * This does NOT trigger leader change. + */ +RD_EXPORT rd_kafka_resp_err_t +rd_kafka_mock_broker_set_down (rd_kafka_mock_cluster_t *mcluster, + int32_t broker_id); + +/** + * @brief Makes the broker accept connections again. + * This does NOT trigger leader change. + */ +RD_EXPORT rd_kafka_resp_err_t +rd_kafka_mock_broker_set_up (rd_kafka_mock_cluster_t *mcluster, + int32_t broker_id); + + +/** + * @brief Set broker round-trip-time delay in milliseconds. + */ +RD_EXPORT rd_kafka_resp_err_t +rd_kafka_mock_broker_set_rtt (rd_kafka_mock_cluster_t *mcluster, + int32_t broker_id, int rtt_ms); + +/** + * @brief Sets the broker's rack as reported in Metadata to the client. + */ +RD_EXPORT rd_kafka_resp_err_t +rd_kafka_mock_broker_set_rack (rd_kafka_mock_cluster_t *mcluster, + int32_t broker_id, const char *rack); + + + +/** + * @brief Explicitly sets the coordinator. If this API is not a standard + * hashing scheme will be used. + * + * @param key_type "transaction" or "group" + * @param key The transactional.id or group.id + * @param broker_id The new coordinator, does not have to be a valid broker. + */ +RD_EXPORT rd_kafka_resp_err_t +rd_kafka_mock_coordinator_set (rd_kafka_mock_cluster_t *mcluster, + const char *key_type, const char *key, + int32_t broker_id); + + + +/** + * @brief Set the allowed ApiVersion range for \p ApiKey. + * + * Set \p MinVersion and \p MaxVersion to -1 to disable the API + * completely. + * + * \p MaxVersion MUST not exceed the maximum implemented value, + * see rdkafka_mock_handlers.c. + * + * @param ApiKey Protocol request type/key + * @param MinVersion Minimum version supported (or -1 to disable). + * @param MinVersion Maximum version supported (or -1 to disable). + */ +RD_EXPORT rd_kafka_resp_err_t +rd_kafka_mock_set_apiversion (rd_kafka_mock_cluster_t *mcluster, + int16_t ApiKey, + int16_t MinVersion, int16_t MaxVersion); + + +/**@}*/ + +#ifdef __cplusplus +} +#endif +#endif /* _RDKAFKA_MOCK_H_ */ \ No newline at end of file diff --git a/kafka/message_test.go b/kafka/message_test.go index e868dcc7a..39b071849 100644 --- a/kafka/message_test.go +++ b/kafka/message_test.go @@ -1,3 +1,5 @@ +package kafka + /** * Copyright 2016 Confluent Inc. * @@ -14,8 +16,6 @@ * limitations under the License. */ -package kafka - import ( "testing" ) diff --git a/kafka/metadata_test.go b/kafka/metadata_test.go index 96e8a9141..de2f60ba8 100644 --- a/kafka/metadata_test.go +++ b/kafka/metadata_test.go @@ -1,3 +1,5 @@ +package kafka + /** * Copyright 2016 Confluent Inc. * @@ -14,8 +16,6 @@ * limitations under the License. */ -package kafka - import ( "testing" ) diff --git a/kafka/misc.go b/kafka/misc.go index 6d602ce77..e47a188c5 100644 --- a/kafka/misc.go +++ b/kafka/misc.go @@ -1,3 +1,5 @@ +package kafka + /** * Copyright 2016 Confluent Inc. * @@ -14,8 +16,6 @@ * limitations under the License. */ -package kafka - import "C" // bool2int converts a bool to a C.int (1 or 0) diff --git a/kafka/mockcluster.go b/kafka/mockcluster.go new file mode 100644 index 000000000..613589149 --- /dev/null +++ b/kafka/mockcluster.go @@ -0,0 +1,84 @@ +package kafka + +/** + * Copyright 2022 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import "unsafe" + +/* +#include +#include "select_rdkafka.h" +#include "glue_rdkafka.h" +*/ +import "C" + +// MockCluster represents a Kafka mock cluster instance which can be used +// for testing. +type MockCluster struct { + rk *C.rd_kafka_t + mcluster *C.rd_kafka_mock_cluster_t +} + +// NewMockCluster provides a mock Kafka cluster with a configurable +// number of brokers that support a reasonable subset of Kafka protocol +// operations, error injection, etc. +// +// Mock clusters provide localhost listeners that can be used as the bootstrap +// servers by multiple Kafka client instances. +// +// Currently supported functionality: +// - Producer +// - Idempotent Producer +// - Transactional Producer +// - Low-level consumer +// - High-level balanced consumer groups with offset commits +// - Topic Metadata and auto creation +// +// Warning THIS IS AN EXPERIMENTAL API, SUBJECT TO CHANGE OR REMOVAL. +func NewMockCluster(brokerCount int) (*MockCluster, error) { + + mc := &MockCluster{} + + cErrstr := (*C.char)(C.malloc(C.size_t(512))) + defer C.free(unsafe.Pointer(cErrstr)) + + cConf := C.rd_kafka_conf_new() + + mc.rk = C.rd_kafka_new(C.RD_KAFKA_PRODUCER, cConf, cErrstr, 256) + if mc.rk == nil { + C.rd_kafka_conf_destroy(cConf) + return nil, newErrorFromCString(C.RD_KAFKA_RESP_ERR__INVALID_ARG, cErrstr) + } + + mc.mcluster = C.rd_kafka_mock_cluster_new(mc.rk, C.int(brokerCount)) + if mc.mcluster == nil { + C.rd_kafka_destroy(mc.rk) + return nil, newErrorFromCString(C.RD_KAFKA_RESP_ERR__INVALID_ARG, cErrstr) + } + + return mc, nil +} + +// BootstrapServers returns the bootstrap.servers property for this MockCluster +func (mc *MockCluster) BootstrapServers() string { + return C.GoString(C.rd_kafka_mock_cluster_bootstraps(mc.mcluster)) +} + +// Close and destroy the MockCluster +func (mc *MockCluster) Close() { + C.rd_kafka_mock_cluster_destroy(mc.mcluster) + C.rd_kafka_destroy(mc.rk) +} diff --git a/kafka/offset.go b/kafka/offset.go index 4cb1819c8..5d62b119d 100644 --- a/kafka/offset.go +++ b/kafka/offset.go @@ -1,3 +1,5 @@ +package kafka + /** * Copyright 2017 Confluent Inc. * @@ -14,8 +16,6 @@ * limitations under the License. */ -package kafka - import ( "fmt" "strconv" diff --git a/kafka/producer.go b/kafka/producer.go index f68854a96..34ce2ece4 100644 --- a/kafka/producer.go +++ b/kafka/producer.go @@ -1,3 +1,5 @@ +package kafka + /** * Copyright 2016 Confluent Inc. * @@ -14,8 +16,6 @@ * limitations under the License. */ -package kafka - import ( "context" "fmt" diff --git a/kafka/producer_performance_test.go b/kafka/producer_performance_test.go index f9473ed70..01f7920c1 100644 --- a/kafka/producer_performance_test.go +++ b/kafka/producer_performance_test.go @@ -1,3 +1,5 @@ +package kafka + /** * Copyright 2016 Confluent Inc. * @@ -14,8 +16,6 @@ * limitations under the License. */ -package kafka - import ( "fmt" "strings" @@ -70,7 +70,7 @@ func producerPerfTest(b *testing.B, testname string, msgcnt int, withDr bool, ba "queue.buffering.max.messages": msgcnt, "api.version.request": "true", "broker.version.fallback": "0.9.0.1", - "acks": 1} + "acks": 1} conf.updateFromTestconf() diff --git a/kafka/producer_test.go b/kafka/producer_test.go index 1192406f6..19c395a3d 100644 --- a/kafka/producer_test.go +++ b/kafka/producer_test.go @@ -1,3 +1,5 @@ +package kafka + /** * Copyright 2016 Confluent Inc. * @@ -14,8 +16,6 @@ * limitations under the License. */ -package kafka - import ( "bytes" "context" diff --git a/kafka/select_rdkafka.h b/kafka/select_rdkafka.h index 98fe330ae..3cfd095b2 100644 --- a/kafka/select_rdkafka.h +++ b/kafka/select_rdkafka.h @@ -24,6 +24,8 @@ #ifdef USE_VENDORED_LIBRDKAFKA #include "librdkafka_vendor/rdkafka.h" +#include "librdkafka_vendor/rdkafka_mock.h" #else #include +#include #endif diff --git a/kafka/testhelpers_test.go b/kafka/testhelpers_test.go index 4fca59e70..a2997d301 100644 --- a/kafka/testhelpers_test.go +++ b/kafka/testhelpers_test.go @@ -1,3 +1,5 @@ +package kafka + /** * Copyright 2016 Confluent Inc. * @@ -14,8 +16,6 @@ * limitations under the License. */ -package kafka - import ( "testing" "time" diff --git a/kafka/time.go b/kafka/time.go index ff93f0ad2..62b2a11d8 100644 --- a/kafka/time.go +++ b/kafka/time.go @@ -1,3 +1,5 @@ +package kafka + /** * Copyright 2019 Confluent Inc. * @@ -14,8 +16,6 @@ * limitations under the License. */ -package kafka - import "C" import ( diff --git a/kafka/txn_integration_test.go b/kafka/txn_integration_test.go index 4cd298465..daea77b4e 100644 --- a/kafka/txn_integration_test.go +++ b/kafka/txn_integration_test.go @@ -1,3 +1,5 @@ +package kafka + /** * Copyright 2020 Confluent Inc. * @@ -14,8 +16,6 @@ * limitations under the License. */ -package kafka - // Integration tests for the transactional producer import (