Skip to content

Commit

Permalink
contrib/segmentio/kafka-go: add implementation add tracing for kafka …
Browse files Browse the repository at this point in the history
…writer and kafka reader (#1152)

This commit adds support for tracing the Reader and Writer from kafka-go.

Fixes #899
  • Loading branch information
ajgajg1134 committed Feb 28, 2022
1 parent 4f0b6ac commit 41a7cd6
Show file tree
Hide file tree
Showing 8 changed files with 512 additions and 8 deletions.
19 changes: 11 additions & 8 deletions .circleci/config.yml
Expand Up @@ -213,16 +213,19 @@ jobs:
DD_API_KEY: invalid_key_but_this_is_fine
- image: circleci/mongo:latest-ram
- image: memcached:1.5.9
- image: confluentinc/cp-zookeeper:5.0.0
- image: bitnami/zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: "2181"
- image: confluentinc/cp-kafka:5.0.0
ALLOW_ANONYMOUS_LOGIN: yes
- image: bitnami/kafka:2
environment:
KAFKA_ZOOKEEPER_CONNECT: localhost:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_CREATE_TOPICS: gotest:1:1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
KAFKA_CFG_ZOOKEEPER_CONNECT: localhost:2181
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_CFG_LISTENERS: PLAINTEXT://0.0.0.0:9092
ALLOW_PLAINTEXT_LISTENER: yes
- image: bitnami/kafka:2
environment:
KAFKA_CFG_ZOOKEEPER_CONNECT: localhost:2181
command: [kafka-topics.sh, --create, --topic, gosegtest, --bootstrap-server, localhost:9092]

steps:
- checkout
Expand Down
57 changes: 57 additions & 0 deletions contrib/segmentio/kafka.go.v0/example_test.go
@@ -0,0 +1,57 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016 Datadog, Inc.

package kafka_test

import (
"context"
"log"
"time"

kafkatrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/segmentio/kafka.go.v0"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"

kafka "github.com/segmentio/kafka-go"
)

func ExampleWriter() {
w := kafkatrace.NewWriter(kafka.WriterConfig{
Brokers: []string{"localhost:9092"},
Topic: "some-topic",
})

// use slice as it passes the value by reference if you want message headers updated in kafkatrace
msgs := []kafka.Message{
{
Key: []byte("key1"),
Value: []byte("value1"),
},
}
if err := w.WriteMessages(context.Background(), msgs...); err != nil {
log.Fatal("Failed to write message", err)
}
}

func ExampleReader() {
r := kafkatrace.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: "some-topic",
GroupID: "group-id",
SessionTimeout: 30 * time.Second,
})
msg, err := r.ReadMessage(context.Background())
if err != nil {
log.Fatal("Failed to read message", err)
}

// create a child span using span id and trace id in message header
spanContext, err := kafkatrace.ExtractSpanContext(msg)
if err != nil {
log.Fatal("Failed to extract span context from carrier", err)
}
operationName := "child-span"
s := tracer.StartSpan(operationName, tracer.ChildOf(spanContext))
defer s.Finish()
}
54 changes: 54 additions & 0 deletions contrib/segmentio/kafka.go.v0/headers.go
@@ -0,0 +1,54 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016 Datadog, Inc.

package kafka

import (
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"

"github.com/segmentio/kafka-go"
)

// A messageCarrier implements TextMapReader/TextMapWriter for extracting/injecting traces on a kafka.Message
type messageCarrier struct {
msg *kafka.Message
}

var _ interface {
tracer.TextMapReader
tracer.TextMapWriter
} = (*messageCarrier)(nil)

// ForeachKey conforms to the TextMapReader interface.
func (c messageCarrier) ForeachKey(handler func(key, val string) error) error {
for _, h := range c.msg.Headers {
err := handler(h.Key, string(h.Value))
if err != nil {
return err
}
}
return nil
}

// Set implements TextMapWriter
func (c messageCarrier) Set(key, val string) {
// ensure uniqueness of keys
for i := 0; i < len(c.msg.Headers); i++ {
if string(c.msg.Headers[i].Key) == key {
c.msg.Headers = append(c.msg.Headers[:i], c.msg.Headers[i+1:]...)
i--
}
}
c.msg.Headers = append(c.msg.Headers, kafka.Header{
Key: key,
Value: []byte(val),
})
}

// ExtractSpanContext retrieves the SpanContext from a kafka.Message
func ExtractSpanContext(msg kafka.Message) (ddtrace.SpanContext, error) {
return tracer.Extract(messageCarrier{&msg})
}
148 changes: 148 additions & 0 deletions contrib/segmentio/kafka.go.v0/kafka.go
@@ -0,0 +1,148 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016 Datadog, Inc.

package kafka

import (
"context"
"math"

"github.com/segmentio/kafka-go"

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
"gopkg.in/DataDog/dd-trace-go.v1/internal/log"
)

// NewReader calls kafka.NewReader and wraps the resulting Consumer.
func NewReader(conf kafka.ReaderConfig, opts ...Option) *Reader {
return WrapReader(kafka.NewReader(conf), opts...)
}

// NewWriter calls kafka.NewWriter and wraps the resulting Producer.
func NewWriter(conf kafka.WriterConfig, opts ...Option) *Writer {
return WrapWriter(kafka.NewWriter(conf), opts...)
}

// WrapReader wraps a kafka.Reader so that any consumed events are traced.
func WrapReader(c *kafka.Reader, opts ...Option) *Reader {
wrapped := &Reader{
Reader: c,
cfg: newConfig(opts...),
}
log.Debug("contrib/confluentinc/confluent-kafka.go.v0/kafka: Wrapping Reader: %#v", wrapped.cfg)
return wrapped
}

// A Reader wraps a kafka.Reader.
type Reader struct {
*kafka.Reader
cfg *config
prev ddtrace.Span
}

func (r *Reader) startSpan(ctx context.Context, msg *kafka.Message) ddtrace.Span {
opts := []tracer.StartSpanOption{
tracer.ServiceName(r.cfg.consumerServiceName),
tracer.ResourceName("Consume Topic " + msg.Topic),
tracer.SpanType(ext.SpanTypeMessageConsumer),
tracer.Tag("partition", msg.Partition),
tracer.Tag("offset", msg.Offset),
tracer.Measured(),
}
if !math.IsNaN(r.cfg.analyticsRate) {
opts = append(opts, tracer.Tag(ext.EventSampleRate, r.cfg.analyticsRate))
}
// kafka supports headers, so try to extract a span context
carrier := messageCarrier{msg}
if spanctx, err := tracer.Extract(carrier); err == nil {
opts = append(opts, tracer.ChildOf(spanctx))
}
span, _ := tracer.StartSpanFromContext(ctx, "kafka.consume", opts...)
// reinject the span context so consumers can pick it up
if err := tracer.Inject(span.Context(), carrier); err != nil {
log.Debug("contrib/segmentio/kafka.go.v0: Failed to inject span context into carrier, %v", err)
}
return span
}

// Close calls the underlying Reader.Close and if polling is enabled, finishes
// any remaining span.
func (r *Reader) Close() error {
err := r.Reader.Close()
if r.prev != nil {
r.prev.Finish()
r.prev = nil
}
return err
}

// ReadMessage polls the consumer for a message. Message will be traced.
func (r *Reader) ReadMessage(ctx context.Context) (kafka.Message, error) {
if r.prev != nil {
r.prev.Finish()
r.prev = nil
}
msg, err := r.Reader.ReadMessage(ctx)
if err != nil {
return kafka.Message{}, err
}
r.prev = r.startSpan(ctx, &msg)
return msg, nil
}

// WrapWriter wraps a kafka.Writer so requests are traced.
func WrapWriter(w *kafka.Writer, opts ...Option) *Writer {
writer := &Writer{
Writer: w,
cfg: newConfig(opts...),
}
log.Debug("contrib/segmentio/kafka.go.v0: Wrapping Writer: %#v", writer.cfg)
return writer
}

// Writer wraps a kafka.Writer with tracing config data
type Writer struct {
*kafka.Writer
cfg *config
}

func (w *Writer) startSpan(ctx context.Context, msg *kafka.Message) ddtrace.Span {
opts := []tracer.StartSpanOption{
tracer.ServiceName(w.cfg.producerServiceName),
tracer.ResourceName("Produce Topic " + w.Writer.Topic),
tracer.SpanType(ext.SpanTypeMessageProducer),
}
if !math.IsNaN(w.cfg.analyticsRate) {
opts = append(opts, tracer.Tag(ext.EventSampleRate, w.cfg.analyticsRate))
}
carrier := messageCarrier{msg}
span, _ := tracer.StartSpanFromContext(ctx, "kafka.produce", opts...)
err := tracer.Inject(span.Context(), carrier)
log.Debug("contrib/segmentio/kafka.go.v0: Failed to inject span context into carrier, %v", err)
return span
}

func finishSpan(span ddtrace.Span, partition int, offset int64, err error) {
span.SetTag("partition", partition)
span.SetTag("offset", offset)
span.Finish(tracer.WithError(err))
}

// WriteMessages calls kafka.go.v0.Writer.WriteMessages and traces the requests.
func (w *Writer) WriteMessages(ctx context.Context, msgs ...kafka.Message) error {
// although there's only one call made to the SyncProducer, the messages are
// treated individually, so we create a span for each one
spans := make([]ddtrace.Span, len(msgs))
for i := range msgs {
spans[i] = w.startSpan(ctx, &msgs[i])
}
err := w.Writer.WriteMessages(ctx, msgs...)
for i, span := range spans {
finishSpan(span, msgs[i].Partition, msgs[i].Offset, err)
}
return err
}
92 changes: 92 additions & 0 deletions contrib/segmentio/kafka.go.v0/kafka_test.go
@@ -0,0 +1,92 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016 Datadog, Inc.

package kafka

import (
"context"
"os"
"testing"
"time"

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/mocktracer"

kafka "github.com/segmentio/kafka-go"
"github.com/stretchr/testify/assert"
)

const (
testGroupID = "gosegtest"
testTopic = "gosegtest"
)

func skipIntegrationTest(t *testing.T) {
if _, ok := os.LookupEnv("INTEGRATION"); !ok {
t.Skip("🚧 Skipping integration test (INTEGRATION environment variable is not set)")
}
}

/*
to setup the integration test locally run:
docker-compose -f local_testing.yaml up
*/

func TestConsumerFunctional(t *testing.T) {
skipIntegrationTest(t)
mt := mocktracer.Start()
defer mt.Stop()

kw := &kafka.Writer{
Addr: kafka.TCP("localhost:9092"),
Topic: testTopic,
RequiredAcks: kafka.RequireOne,
}

w := WrapWriter(kw, WithAnalyticsRate(0.1))
msg1 := []kafka.Message{
{
Key: []byte("key1"),
Value: []byte("value1"),
},
}
err := w.WriteMessages(context.Background(), msg1...)
assert.NoError(t, err, "Expected to write message to topic")
err = w.Close()
assert.NoError(t, err)

tctx, _ := context.WithTimeout(context.Background(), 30*time.Second)
r := NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
GroupID: testGroupID,
Topic: testTopic,
})
msg2, err := r.ReadMessage(tctx)
assert.NoError(t, err, "Expected to consume message")
assert.Equal(t, msg1[0].Value, msg2.Value, "Values should be equal")
r.Close()

// now verify the spans
spans := mt.FinishedSpans()
assert.Len(t, spans, 2)
// they should be linked via headers
assert.Equal(t, spans[0].TraceID(), spans[1].TraceID(), "Trace IDs should match")

s0 := spans[0] // produce
assert.Equal(t, "kafka.produce", s0.OperationName())
assert.Equal(t, "kafka", s0.Tag(ext.ServiceName))
assert.Equal(t, "Produce Topic "+testTopic, s0.Tag(ext.ResourceName))
assert.Equal(t, 0.1, s0.Tag(ext.EventSampleRate))
assert.Equal(t, "queue", s0.Tag(ext.SpanType))
assert.Equal(t, 0, s0.Tag("partition"))

s1 := spans[1] // consume
assert.Equal(t, "kafka.consume", s1.OperationName())
assert.Equal(t, "kafka", s1.Tag(ext.ServiceName))
assert.Equal(t, "Consume Topic "+testTopic, s1.Tag(ext.ResourceName))
assert.Equal(t, nil, s1.Tag(ext.EventSampleRate))
assert.Equal(t, "queue", s1.Tag(ext.SpanType))
assert.Equal(t, 0, s1.Tag("partition"))
}

0 comments on commit 41a7cd6

Please sign in to comment.