Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

contrib/segmentio/kafka-go: add implementation add tracing for kafka writer and kafka reader #1152

Merged
merged 26 commits into from Feb 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
7ffd45f
Add tracing for segmentio/kafka-go
Apr 9, 2021
f3c0482
Update import as per guidelines
Apr 9, 2021
d956bf2
Update circleci config to add topic and use new topic in test
Apr 11, 2021
60ae8d5
Use the same topic with last offset
Apr 11, 2021
88ed7bc
Update log level
Apr 11, 2021
418180d
Use apache kafka image to test segmentio kafka-go changes
Apr 11, 2021
4a7ac09
Update apache kafka configuration to expose different port
Apr 12, 2021
ddfae80
Fix `Socket server failed to bind to kafka:9092: Unresolved address` …
Apr 12, 2021
bb04e82
Use container name to fx port conflict issue
Apr 12, 2021
044b55c
Fix kafka listener host name
Apr 12, 2021
84fbf18
Use kafka listener and advertised listener to fix connection issue
Apr 12, 2021
388658f
Add asserts to validate consumer span
Apr 12, 2021
f1454cd
Move Extract function to headers.go file
Apr 13, 2021
96ff7ba
Use same container for confluentinc kafka and segmentio kafka client
Apr 21, 2021
adcbe33
rename to include .v0
ajgajg1134 Jan 28, 2022
37eba0e
contrib/segmentio/kafka.go.v0: add copyright headers
ajgajg1134 Jan 28, 2022
2cbfdee
Merge branch 'v1' into segmentio/kafka-go
ajgajg1134 Jan 28, 2022
ceaf249
PR Comments, add docker-compose file for easier local integration tes…
ajgajg1134 Feb 11, 2022
947d5df
Merge branch 'v1' into segmentio/kafka-go
ajgajg1134 Feb 11, 2022
8de6e46
fix goimports, pin kafka to latest 2.x.x version
ajgajg1134 Feb 11, 2022
2d9aa54
try creating kafka topic before running tests
ajgajg1134 Feb 11, 2022
3edfca6
try creating kafka topic before running tests (attempt2)
ajgajg1134 Feb 11, 2022
bb0ab40
create kafka topic for segmentio
ajgajg1134 Feb 14, 2022
e6c72d3
Merge branch 'v1' into segmentio/kafka-go
ajgajg1134 Feb 15, 2022
967d212
Merge branch 'v1' into segmentio/kafka-go
ajgajg1134 Feb 24, 2022
75551e4
Merge branch 'v1' into segmentio/kafka-go
knusbaum Feb 28, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
19 changes: 11 additions & 8 deletions .circleci/config.yml
Expand Up @@ -213,16 +213,19 @@ jobs:
DD_API_KEY: invalid_key_but_this_is_fine
- image: circleci/mongo:latest-ram
- image: memcached:1.5.9
- image: confluentinc/cp-zookeeper:5.0.0
- image: bitnami/zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: "2181"
- image: confluentinc/cp-kafka:5.0.0
ALLOW_ANONYMOUS_LOGIN: yes
- image: bitnami/kafka:2
environment:
KAFKA_ZOOKEEPER_CONNECT: localhost:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_CREATE_TOPICS: gotest:1:1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
KAFKA_CFG_ZOOKEEPER_CONNECT: localhost:2181
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_CFG_LISTENERS: PLAINTEXT://0.0.0.0:9092
ALLOW_PLAINTEXT_LISTENER: yes
- image: bitnami/kafka:2
environment:
KAFKA_CFG_ZOOKEEPER_CONNECT: localhost:2181
command: [kafka-topics.sh, --create, --topic, gosegtest, --bootstrap-server, localhost:9092]

steps:
- checkout
Expand Down
57 changes: 57 additions & 0 deletions contrib/segmentio/kafka.go.v0/example_test.go
@@ -0,0 +1,57 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016 Datadog, Inc.

package kafka_test

import (
"context"
"log"
"time"

kafkatrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/segmentio/kafka.go.v0"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"

kafka "github.com/segmentio/kafka-go"
)

func ExampleWriter() {
w := kafkatrace.NewWriter(kafka.WriterConfig{
Brokers: []string{"localhost:9092"},
Topic: "some-topic",
})

// use slice as it passes the value by reference if you want message headers updated in kafkatrace
msgs := []kafka.Message{
{
Key: []byte("key1"),
Value: []byte("value1"),
},
}
if err := w.WriteMessages(context.Background(), msgs...); err != nil {
log.Fatal("Failed to write message", err)
}
}

func ExampleReader() {
r := kafkatrace.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: "some-topic",
GroupID: "group-id",
SessionTimeout: 30 * time.Second,
})
msg, err := r.ReadMessage(context.Background())
if err != nil {
log.Fatal("Failed to read message", err)
}

// create a child span using span id and trace id in message header
spanContext, err := kafkatrace.ExtractSpanContext(msg)
if err != nil {
log.Fatal("Failed to extract span context from carrier", err)
}
operationName := "child-span"
s := tracer.StartSpan(operationName, tracer.ChildOf(spanContext))
defer s.Finish()
}
54 changes: 54 additions & 0 deletions contrib/segmentio/kafka.go.v0/headers.go
@@ -0,0 +1,54 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016 Datadog, Inc.

package kafka

import (
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"

"github.com/segmentio/kafka-go"
)

// A messageCarrier implements TextMapReader/TextMapWriter for extracting/injecting traces on a kafka.Message
type messageCarrier struct {
msg *kafka.Message
}

var _ interface {
tracer.TextMapReader
tracer.TextMapWriter
} = (*messageCarrier)(nil)

// ForeachKey conforms to the TextMapReader interface.
func (c messageCarrier) ForeachKey(handler func(key, val string) error) error {
for _, h := range c.msg.Headers {
err := handler(h.Key, string(h.Value))
if err != nil {
return err
}
}
return nil
}

// Set implements TextMapWriter
func (c messageCarrier) Set(key, val string) {
// ensure uniqueness of keys
for i := 0; i < len(c.msg.Headers); i++ {
if string(c.msg.Headers[i].Key) == key {
c.msg.Headers = append(c.msg.Headers[:i], c.msg.Headers[i+1:]...)
i--
}
}
c.msg.Headers = append(c.msg.Headers, kafka.Header{
Key: key,
Value: []byte(val),
})
}

// ExtractSpanContext retrieves the SpanContext from a kafka.Message
func ExtractSpanContext(msg kafka.Message) (ddtrace.SpanContext, error) {
return tracer.Extract(messageCarrier{&msg})
}
148 changes: 148 additions & 0 deletions contrib/segmentio/kafka.go.v0/kafka.go
@@ -0,0 +1,148 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016 Datadog, Inc.

package kafka

import (
"context"
"math"

"github.com/segmentio/kafka-go"

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
"gopkg.in/DataDog/dd-trace-go.v1/internal/log"
)

// NewReader calls kafka.NewReader and wraps the resulting Consumer.
func NewReader(conf kafka.ReaderConfig, opts ...Option) *Reader {
return WrapReader(kafka.NewReader(conf), opts...)
}

// NewWriter calls kafka.NewWriter and wraps the resulting Producer.
func NewWriter(conf kafka.WriterConfig, opts ...Option) *Writer {
return WrapWriter(kafka.NewWriter(conf), opts...)
}

// WrapReader wraps a kafka.Reader so that any consumed events are traced.
func WrapReader(c *kafka.Reader, opts ...Option) *Reader {
wrapped := &Reader{
Reader: c,
cfg: newConfig(opts...),
}
log.Debug("contrib/confluentinc/confluent-kafka.go.v0/kafka: Wrapping Reader: %#v", wrapped.cfg)
return wrapped
}

// A Reader wraps a kafka.Reader.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, we should add just a bit more info for the documentation.

type Reader struct {
*kafka.Reader
cfg *config
prev ddtrace.Span
}

func (r *Reader) startSpan(ctx context.Context, msg *kafka.Message) ddtrace.Span {
opts := []tracer.StartSpanOption{
tracer.ServiceName(r.cfg.consumerServiceName),
tracer.ResourceName("Consume Topic " + msg.Topic),
tracer.SpanType(ext.SpanTypeMessageConsumer),
tracer.Tag("partition", msg.Partition),
tracer.Tag("offset", msg.Offset),
tracer.Measured(),
}
if !math.IsNaN(r.cfg.analyticsRate) {
opts = append(opts, tracer.Tag(ext.EventSampleRate, r.cfg.analyticsRate))
}
// kafka supports headers, so try to extract a span context
carrier := messageCarrier{msg}
if spanctx, err := tracer.Extract(carrier); err == nil {
opts = append(opts, tracer.ChildOf(spanctx))
}
span, _ := tracer.StartSpanFromContext(ctx, "kafka.consume", opts...)
// reinject the span context so consumers can pick it up
if err := tracer.Inject(span.Context(), carrier); err != nil {
log.Debug("contrib/segmentio/kafka.go.v0: Failed to inject span context into carrier, %v", err)
}
return span
}

// Close calls the underlying Reader.Close and if polling is enabled, finishes
// any remaining span.
func (r *Reader) Close() error {
err := r.Reader.Close()
if r.prev != nil {
r.prev.Finish()
r.prev = nil
}
return err
}

// ReadMessage polls the consumer for a message. Message will be traced.
func (r *Reader) ReadMessage(ctx context.Context) (kafka.Message, error) {
if r.prev != nil {
r.prev.Finish()
r.prev = nil
}
msg, err := r.Reader.ReadMessage(ctx)
if err != nil {
return kafka.Message{}, err
}
r.prev = r.startSpan(ctx, &msg)
return msg, nil
}
knusbaum marked this conversation as resolved.
Show resolved Hide resolved

// WrapWriter wraps a kafka.Writer so requests are traced.
func WrapWriter(w *kafka.Writer, opts ...Option) *Writer {
writer := &Writer{
Writer: w,
cfg: newConfig(opts...),
}
log.Debug("contrib/segmentio/kafka.go.v0: Wrapping Writer: %#v", writer.cfg)
return writer
}

// Writer wraps a kafka.Writer with tracing config data
type Writer struct {
*kafka.Writer
cfg *config
}

func (w *Writer) startSpan(ctx context.Context, msg *kafka.Message) ddtrace.Span {
opts := []tracer.StartSpanOption{
tracer.ServiceName(w.cfg.producerServiceName),
tracer.ResourceName("Produce Topic " + w.Writer.Topic),
tracer.SpanType(ext.SpanTypeMessageProducer),
}
if !math.IsNaN(w.cfg.analyticsRate) {
opts = append(opts, tracer.Tag(ext.EventSampleRate, w.cfg.analyticsRate))
}
carrier := messageCarrier{msg}
span, _ := tracer.StartSpanFromContext(ctx, "kafka.produce", opts...)
err := tracer.Inject(span.Context(), carrier)
log.Debug("contrib/segmentio/kafka.go.v0: Failed to inject span context into carrier, %v", err)
return span
}

func finishSpan(span ddtrace.Span, partition int, offset int64, err error) {
span.SetTag("partition", partition)
span.SetTag("offset", offset)
span.Finish(tracer.WithError(err))
}

// WriteMessages calls kafka.go.v0.Writer.WriteMessages and traces the requests.
func (w *Writer) WriteMessages(ctx context.Context, msgs ...kafka.Message) error {
// although there's only one call made to the SyncProducer, the messages are
// treated individually, so we create a span for each one
spans := make([]ddtrace.Span, len(msgs))
for i := range msgs {
spans[i] = w.startSpan(ctx, &msgs[i])
}
err := w.Writer.WriteMessages(ctx, msgs...)
for i, span := range spans {
finishSpan(span, msgs[i].Partition, msgs[i].Offset, err)
}
return err
}
92 changes: 92 additions & 0 deletions contrib/segmentio/kafka.go.v0/kafka_test.go
@@ -0,0 +1,92 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016 Datadog, Inc.

package kafka

import (
"context"
"os"
"testing"
"time"

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/mocktracer"

kafka "github.com/segmentio/kafka-go"
"github.com/stretchr/testify/assert"
)

const (
testGroupID = "gosegtest"
testTopic = "gosegtest"
)

func skipIntegrationTest(t *testing.T) {
if _, ok := os.LookupEnv("INTEGRATION"); !ok {
t.Skip("🚧 Skipping integration test (INTEGRATION environment variable is not set)")
}
}

/*
to setup the integration test locally run:
docker-compose -f local_testing.yaml up
*/

func TestConsumerFunctional(t *testing.T) {
skipIntegrationTest(t)
mt := mocktracer.Start()
defer mt.Stop()

kw := &kafka.Writer{
Addr: kafka.TCP("localhost:9092"),
Topic: testTopic,
RequiredAcks: kafka.RequireOne,
}

w := WrapWriter(kw, WithAnalyticsRate(0.1))
msg1 := []kafka.Message{
{
Key: []byte("key1"),
Value: []byte("value1"),
},
}
err := w.WriteMessages(context.Background(), msg1...)
assert.NoError(t, err, "Expected to write message to topic")
err = w.Close()
assert.NoError(t, err)

tctx, _ := context.WithTimeout(context.Background(), 30*time.Second)
r := NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
GroupID: testGroupID,
Topic: testTopic,
})
msg2, err := r.ReadMessage(tctx)
assert.NoError(t, err, "Expected to consume message")
assert.Equal(t, msg1[0].Value, msg2.Value, "Values should be equal")
r.Close()

// now verify the spans
spans := mt.FinishedSpans()
assert.Len(t, spans, 2)
// they should be linked via headers
assert.Equal(t, spans[0].TraceID(), spans[1].TraceID(), "Trace IDs should match")

s0 := spans[0] // produce
assert.Equal(t, "kafka.produce", s0.OperationName())
assert.Equal(t, "kafka", s0.Tag(ext.ServiceName))
assert.Equal(t, "Produce Topic "+testTopic, s0.Tag(ext.ResourceName))
assert.Equal(t, 0.1, s0.Tag(ext.EventSampleRate))
assert.Equal(t, "queue", s0.Tag(ext.SpanType))
assert.Equal(t, 0, s0.Tag("partition"))

s1 := spans[1] // consume
assert.Equal(t, "kafka.consume", s1.OperationName())
assert.Equal(t, "kafka", s1.Tag(ext.ServiceName))
assert.Equal(t, "Consume Topic "+testTopic, s1.Tag(ext.ResourceName))
assert.Equal(t, nil, s1.Tag(ext.EventSampleRate))
assert.Equal(t, "queue", s1.Tag(ext.SpanType))
assert.Equal(t, 0, s1.Tag("partition"))
}