Skip to content

Commit

Permalink
Create a MockCluster within Go for testing (@SourceFellows, @kkoehler,
Browse files Browse the repository at this point in the history
…confluentinc#729)

* Added MockCluster implementation which can be used for testing

* adjust documentation for mock cluster

* Added example for MockCluster

* Fixed naming

Co-authored-by: Magnus Edenhill <magnus@edenhill.se>

* fixed typo

Co-authored-by: Magnus Edenhill <magnus@edenhill.se>

* removed

Co-authored-by: Magnus Edenhill <magnus@edenhill.se>

* fixed comment

Co-authored-by: Magnus Edenhill <magnus@edenhill.se>

* adjust documentation

Co-authored-by: Magnus Edenhill <magnus@edenhill.se>

* adjustments regarding comments from Magnus

* fixed cleanup code for kafka configuration

* fixed destroy for C references

* fixed spaces

* removed call to `C.rd_kafka_conf_destroy(mc.cConf)` in close

* added docu for MockCluster type

* moved package statement into first line

* adjustments according to @edenhill comments

* Update kafka/mockcluster.go

Co-authored-by: Magnus Edenhill <magnus@edenhill.se>

* removed duplicate import

* removed the remark as suggested from @edelhill

* changed typedef from struct_rd_kafka_mock_cluster_s to rd_kafka_mock_cluster_t

Co-authored-by: Kristian Köhler <github@kkoehler.com>
Co-authored-by: Magnus Edenhill <magnus@edenhill.se>
Co-authored-by: Kristian Köhler <kkoehler@users.noreply.github.com>
  • Loading branch information
4 people authored and eran-levy committed Apr 9, 2022
1 parent 513845d commit b73de4b
Show file tree
Hide file tree
Showing 26 changed files with 558 additions and 45 deletions.
95 changes: 95 additions & 0 deletions examples/mockcluster_example/mockcluster.go
@@ -0,0 +1,95 @@
package main

/**
* Copyright 2022 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import (
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
"os"
)

func main() {

mockCluster, err := kafka.NewMockCluster(1)
if err != nil {
fmt.Printf("Failed to create MockCluster: %s\n", err)
os.Exit(1)
}
defer mockCluster.Close()

broker := mockCluster.BootstrapServers()

p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": broker})

if err != nil {
fmt.Printf("Failed to create producer: %s\n", err)
os.Exit(1)
}

fmt.Printf("Created Producer %v\n", p)
deliveryChan := make(chan kafka.Event)

topic := "Test"
value := "Hello Go!"
err = p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: []byte(value),
Headers: []kafka.Header{{Key: "myTestHeader", Value: []byte("header values are binary")}},
}, deliveryChan)

e := <-deliveryChan
m := e.(*kafka.Message)

if m.TopicPartition.Error != nil {
fmt.Printf("Delivery failed: %v\n", m.TopicPartition.Error)
} else {
fmt.Printf("Delivered message to topic %s [%d] at offset %v\n",
*m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
}

close(deliveryChan)

c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": broker,
"broker.address.family": "v4",
"group.id": "group",
"session.timeout.ms": 6000,
"auto.offset.reset": "earliest"})

if err != nil {
fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err)
os.Exit(1)
}
defer c.Close()

fmt.Printf("Created Consumer %v\n", c)

err = c.SubscribeTopics([]string{topic}, nil)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to subscribe to consumer: %s\n", err)
os.Exit(1)
}

msg, err := c.ReadMessage(-1)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to read message: %s\n", err)
os.Exit(1)
}

fmt.Println("received message: ", string(msg.Value))

}
4 changes: 2 additions & 2 deletions kafka/adminapi.go
@@ -1,3 +1,5 @@
package kafka

/**
* Copyright 2018 Confluent Inc.
*
Expand All @@ -14,8 +16,6 @@
* limitations under the License.
*/

package kafka

import (
"context"
"fmt"
Expand Down
4 changes: 2 additions & 2 deletions kafka/adminoptions.go
@@ -1,3 +1,5 @@
package kafka

/**
* Copyright 2018 Confluent Inc.
*
Expand All @@ -14,8 +16,6 @@
* limitations under the License.
*/

package kafka

import (
"fmt"
"time"
Expand Down
4 changes: 2 additions & 2 deletions kafka/config_test.go
@@ -1,3 +1,5 @@
package kafka

/**
* Copyright 2016 Confluent Inc.
*
Expand All @@ -14,8 +16,6 @@
* limitations under the License.
*/

package kafka

import (
"fmt"
"testing"
Expand Down
4 changes: 2 additions & 2 deletions kafka/consumer_performance_test.go
@@ -1,3 +1,5 @@
package kafka

/**
* Copyright 2016 Confluent Inc.
*
Expand All @@ -14,8 +16,6 @@
* limitations under the License.
*/

package kafka

import (
"fmt"
"math/rand"
Expand Down
4 changes: 2 additions & 2 deletions kafka/consumer_test.go
@@ -1,3 +1,5 @@
package kafka

/**
* Copyright 2016 Confluent Inc.
*
Expand All @@ -14,8 +16,6 @@
* limitations under the License.
*/

package kafka

import (
"fmt"
"os"
Expand Down
4 changes: 2 additions & 2 deletions kafka/context.go
@@ -1,3 +1,5 @@
package kafka

/**
* Copyright 2019 Confluent Inc.
*
Expand All @@ -14,8 +16,6 @@
* limitations under the License.
*/

package kafka

import (
"context"
"time"
Expand Down
4 changes: 2 additions & 2 deletions kafka/error_test.go
@@ -1,3 +1,5 @@
package kafka

/**
* Copyright 2019 Confluent Inc.
*
Expand All @@ -14,8 +16,6 @@
* limitations under the License.
*/

package kafka

import (
"strings"
"testing"
Expand Down
4 changes: 2 additions & 2 deletions kafka/event_test.go
@@ -1,3 +1,5 @@
package kafka

/**
* Copyright 2016 Confluent Inc.
*
Expand All @@ -14,8 +16,6 @@
* limitations under the License.
*/

package kafka

import (
"testing"
)
Expand Down
4 changes: 2 additions & 2 deletions kafka/header_test.go
@@ -1,3 +1,5 @@
package kafka

/**
* Copyright 2016 Confluent Inc.
*
Expand All @@ -14,8 +16,6 @@
* limitations under the License.
*/

package kafka

import (
"testing"
)
Expand Down
4 changes: 2 additions & 2 deletions kafka/integration_test.go
@@ -1,3 +1,5 @@
package kafka

/**
* Copyright 2016 Confluent Inc.
*
Expand All @@ -14,8 +16,6 @@
* limitations under the License.
*/

package kafka

import (
"context"
"encoding/binary"
Expand Down
5 changes: 3 additions & 2 deletions kafka/kafka.go
@@ -1,3 +1,5 @@
package kafka

/**
* Copyright 2016 Confluent Inc.
*
Expand All @@ -20,7 +22,7 @@
//
// High-level Consumer
//
// * Decide if you want to read messages and events by calling `.Poll()` or
// * Decide if you want to read messages and events by calling `.Poll()` or
// the deprecated option of using the `.Events()` channel. (If you want to use
// `.Events()` channel then set `"go.events.channel.enable": true`).
//
Expand Down Expand Up @@ -249,7 +251,6 @@
// possible complications with blocking Poll() calls.
//
// Note: The Confluent Kafka Go client is safe for concurrent use.
package kafka

import (
"fmt"
Expand Down
4 changes: 2 additions & 2 deletions kafka/kafka_test.go
@@ -1,3 +1,5 @@
package kafka

/**
* Copyright 2016 Confluent Inc.
*
Expand All @@ -14,8 +16,6 @@
* limitations under the License.
*/

package kafka

import (
"testing"
)
Expand Down

0 comments on commit b73de4b

Please sign in to comment.