Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

contrib/segmentio/kafka-go: add implementation add tracing for kafka writer and kafka reader #1152

Merged
merged 26 commits into from Feb 28, 2022
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
7ffd45f
Add tracing for segmentio/kafka-go
Apr 9, 2021
f3c0482
Update import as per guidelines
Apr 9, 2021
d956bf2
Update circleci config to add topic and use new topic in test
Apr 11, 2021
60ae8d5
Use the same topic with last offset
Apr 11, 2021
88ed7bc
Update log level
Apr 11, 2021
418180d
Use apache kafka image to test segmentio kafka-go changes
Apr 11, 2021
4a7ac09
Update apache kafka configuration to expose different port
Apr 12, 2021
ddfae80
Fix `Socket server failed to bind to kafka:9092: Unresolved address` …
Apr 12, 2021
bb04e82
Use container name to fx port conflict issue
Apr 12, 2021
044b55c
Fix kafka listener host name
Apr 12, 2021
84fbf18
Use kafka listener and advertised listener to fix connection issue
Apr 12, 2021
388658f
Add asserts to validate consumer span
Apr 12, 2021
f1454cd
Move Extract function to headers.go file
Apr 13, 2021
96ff7ba
Use same container for confluentinc kafka and segmentio kafka client
Apr 21, 2021
adcbe33
rename to include .v0
ajgajg1134 Jan 28, 2022
37eba0e
contrib/segmentio/kafka.go.v0: add copyright headers
ajgajg1134 Jan 28, 2022
2cbfdee
Merge branch 'v1' into segmentio/kafka-go
ajgajg1134 Jan 28, 2022
ceaf249
PR Comments, add docker-compose file for easier local integration tes…
ajgajg1134 Feb 11, 2022
947d5df
Merge branch 'v1' into segmentio/kafka-go
ajgajg1134 Feb 11, 2022
8de6e46
fix goimports, pin kafka to latest 2.x.x version
ajgajg1134 Feb 11, 2022
2d9aa54
try creating kafka topic before running tests
ajgajg1134 Feb 11, 2022
3edfca6
try creating kafka topic before running tests (attempt2)
ajgajg1134 Feb 11, 2022
bb0ab40
create kafka topic for segmentio
ajgajg1134 Feb 14, 2022
e6c72d3
Merge branch 'v1' into segmentio/kafka-go
ajgajg1134 Feb 15, 2022
967d212
Merge branch 'v1' into segmentio/kafka-go
ajgajg1134 Feb 24, 2022
75551e4
Merge branch 'v1' into segmentio/kafka-go
knusbaum Feb 28, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 3 additions & 6 deletions .circleci/config.yml
Expand Up @@ -201,16 +201,13 @@ jobs:
DD_API_KEY: invalid_key_but_this_is_fine
- image: circleci/mongo:latest-ram
- image: memcached:1.5.9
- image: confluentinc/cp-zookeeper:5.0.0
environment:
ZOOKEEPER_CLIENT_PORT: "2181"
- image: confluentinc/cp-kafka:5.0.0
- image: wurstmeister/zookeeper:3.4.6
- image: wurstmeister/kafka:2.13-2.7.0
ajgajg1134 marked this conversation as resolved.
Show resolved Hide resolved
environment:
KAFKA_ZOOKEEPER_CONNECT: localhost:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_CREATE_TOPICS: gotest:1:1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
KAFKA_CREATE_TOPICS: gosegtest:1:1,gotest:1:1

steps:
- checkout
Expand Down
57 changes: 57 additions & 0 deletions contrib/segmentio/kafka.go.v0/example_test.go
@@ -0,0 +1,57 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016 Datadog, Inc.

package kafka_test

import (
"context"
"log"
"time"

kafkatrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/segmentio/kafka.go.v0"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"

kafka "github.com/segmentio/kafka-go"
)

func ExampleWriter() {
w := kafkatrace.NewWriter(kafka.WriterConfig{
Brokers: []string{"localhost:9092"},
Topic: "some-topic",
})

// use slice as it passes the value by reference if you want message headers updated in kafkatrace
msgs := []kafka.Message{
{
Key: []byte("key1"),
Value: []byte("value1"),
},
}
if err := w.WriteMessages(context.Background(), msgs...); err != nil {
log.Fatal("Failed to write message", err)
}
}

func ExampleReader() {
r := kafkatrace.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: "some-topic",
GroupID: "group-id",
SessionTimeout: 30 * time.Second,
})
msg, err := r.ReadMessage(context.Background())
if err != nil {
log.Fatal("Failed to read message", err)
}

// create a child span using span id and trace id in message header
spanContext, err := kafkatrace.ExtractSpanContextFromMessage(msg)
if err != nil {
log.Fatal("Failed to extract span context from carrier", err)
}
operationName := "child-span"
s := tracer.StartSpan(operationName, tracer.ChildOf(spanContext))
defer s.Finish()
}
54 changes: 54 additions & 0 deletions contrib/segmentio/kafka.go.v0/headers.go
@@ -0,0 +1,54 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016 Datadog, Inc.

package kafka

import (
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"

"github.com/segmentio/kafka-go"
)

// A MessageCarrier injects and extracts traces from a kafka.Message.
ajgajg1134 marked this conversation as resolved.
Show resolved Hide resolved
type MessageCarrier struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this type need to be exported? I don't think users need to interact directly with this and an unexported type can still conform to an interface.

Let's keep this unexported if possible.

msg *kafka.Message
}

var _ interface {
tracer.TextMapReader
tracer.TextMapWriter
} = (*MessageCarrier)(nil)

// ForeachKey iterates over every header.
func (c MessageCarrier) ForeachKey(handler func(key, val string) error) error {
for _, h := range c.msg.Headers {
err := handler(h.Key, string(h.Value))
if err != nil {
return err
}
}
return nil
}

// Set sets a header.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is not useful. We should point out that this implements the ddtrace.TextMapWriter interface.
Same for ForEachKey (only for the TextMapReader interface)

We do this with TextMapCarrier. See:

// Set implements TextMapWriter.
func (c TextMapCarrier) Set(key, val string) {
c[key] = val
}
// ForeachKey conforms to the TextMapReader interface.
func (c TextMapCarrier) ForeachKey(handler func(key, val string) error) error {
for k, v := range c {
if err := handler(k, v); err != nil {
return err
}
}
return nil
}

func (c MessageCarrier) Set(key, val string) {
// ensure uniqueness of keys
for i := 0; i < len(c.msg.Headers); i++ {
if string(c.msg.Headers[i].Key) == key {
c.msg.Headers = append(c.msg.Headers[:i], c.msg.Headers[i+1:]...)
i--
}
}
c.msg.Headers = append(c.msg.Headers, kafka.Header{
Key: key,
Value: []byte(val),
})
}

// ExtractSpanContextFromMessage retrieves the SpanContext from message header
func ExtractSpanContextFromMessage(msg kafka.Message) (ddtrace.SpanContext, error) {
ajgajg1134 marked this conversation as resolved.
Show resolved Hide resolved
return tracer.Extract(MessageCarrier{&msg})
}
149 changes: 149 additions & 0 deletions contrib/segmentio/kafka.go.v0/kafka.go
@@ -0,0 +1,149 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016 Datadog, Inc.

package kafka

import (
"context"
"math"

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
"gopkg.in/DataDog/dd-trace-go.v1/internal/log"

"github.com/segmentio/kafka-go"
)

// NewReader calls kafka.NewReader and wraps the resulting Consumer.
func NewReader(conf kafka.ReaderConfig, opts ...Option) *Reader {
return WrapReader(kafka.NewReader(conf), opts...)
}

// NewWriter calls kafka.NewWriter and wraps the resulting Producer.
func NewWriter(conf kafka.WriterConfig, opts ...Option) *Writer {
return WrapWriter(kafka.NewWriter(conf), opts...)
}

// WrapReader wraps a kafka.Reader so that any consumed events are traced.
func WrapReader(c *kafka.Reader, opts ...Option) *Reader {
wrapped := &Reader{
Reader: c,
cfg: newConfig(opts...),
}
log.Debug("contrib/confluentinc/confluent-kafka.go.v0/kafka: Wrapping Reader: %#v", wrapped.cfg)
return wrapped
}

// A Reader wraps a kafka.Reader.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, we should add just a bit more info for the documentation.

type Reader struct {
*kafka.Reader
cfg *config
prev ddtrace.Span
}

func (r *Reader) startSpan(ctx context.Context, msg *kafka.Message) ddtrace.Span {
opts := []tracer.StartSpanOption{
tracer.ServiceName(r.cfg.consumerServiceName),
tracer.ResourceName("Consume Topic " + msg.Topic),
tracer.SpanType(ext.SpanTypeMessageConsumer),
tracer.Tag("partition", msg.Partition),
tracer.Tag("offset", msg.Offset),
tracer.Measured(),
}
if !math.IsNaN(r.cfg.analyticsRate) {
opts = append(opts, tracer.Tag(ext.EventSampleRate, r.cfg.analyticsRate))
}
// kafka supports headers, so try to extract a span context
carrier := MessageCarrier{msg}
if spanctx, err := tracer.Extract(carrier); err == nil {
opts = append(opts, tracer.ChildOf(spanctx))
}
span, _ := tracer.StartSpanFromContext(ctx, "kafka.consume", opts...)
// reinject the span context so consumers can pick it up
if err := tracer.Inject(span.Context(), carrier); err != nil {
log.Debug("contrib/segmentio/kafka.go.v0: Failed to inject span context into carrier, %v", err)
}
return span
}

// Close calls the underlying Reader.Close and if polling is enabled, finishes
// any remaining span.
func (r *Reader) Close() error {
err := r.Reader.Close()
if r.prev != nil {
r.prev.Finish()
r.prev = nil
}
return err
}

// ReadMessage polls the consumer for a message. Message will be traced.
func (r *Reader) ReadMessage(ctx context.Context) (kafka.Message, error) {
if r.prev != nil {
r.prev.Finish()
r.prev = nil
}
msg, err := r.Reader.ReadMessage(ctx)
if err != nil {
return kafka.Message{}, err
}
r.prev = r.startSpan(ctx, &msg)
return msg, nil
}
knusbaum marked this conversation as resolved.
Show resolved Hide resolved

// WrapWriter wraps a kafka.Writer so requests are traced.
func WrapWriter(w *kafka.Writer, opts ...Option) *Writer {
writer := &Writer{
Writer: w,
cfg: newConfig(opts...),
}
log.Debug("contrib/segmentio/kafka.go.v0: Wrapping Writer: %#v", writer.cfg)
return writer
}

// Writer wraps a kafka.Writer.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// Writer wraps a kafka.Writer.

Why? What does wrapping kafka.Writer accomplish? We should add a little more to this documentation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about something like?

Writer wraps a kafka.Writer with config data used for tracing

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That works.

type Writer struct {
*kafka.Writer
cfg *config
}

func (w *Writer) startSpan(ctx context.Context, msg *kafka.Message) ddtrace.Span {
opts := []tracer.StartSpanOption{
tracer.ServiceName(w.cfg.producerServiceName),
tracer.ResourceName("Produce Topic " + w.Writer.Topic),
tracer.SpanType(ext.SpanTypeMessageProducer),
}
if !math.IsNaN(w.cfg.analyticsRate) {
opts = append(opts, tracer.Tag(ext.EventSampleRate, w.cfg.analyticsRate))
}
carrier := MessageCarrier{msg}
span, _ := tracer.StartSpanFromContext(ctx, "kafka.produce", opts...)
// inject the span context so consumers can pick it up
ajgajg1134 marked this conversation as resolved.
Show resolved Hide resolved
err := tracer.Inject(span.Context(), carrier)
log.Debug("contrib/segmentio/kafka.go.v0: Failed to inject span context into carrier, %v", err)
return span
}

func finishSpan(span ddtrace.Span, partition int, offset int64, err error) {
span.SetTag("partition", partition)
span.SetTag("offset", offset)
span.Finish(tracer.WithError(err))
}

// WriteMessages calls kafka.go.v0.Writer.WriteMessages and traces the requests.
func (w *Writer) WriteMessages(ctx context.Context, msgs ...kafka.Message) error {
// although there's only one call made to the SyncProducer, the messages are
// treated individually, so we create a span for each one
spans := make([]ddtrace.Span, len(msgs))
for i := range msgs {
spans[i] = w.startSpan(ctx, &msgs[i])
}
err := w.Writer.WriteMessages(ctx, msgs...)
for i, span := range spans {
finishSpan(span, msgs[i].Partition, msgs[i].Offset, err)
}
return err
}
105 changes: 105 additions & 0 deletions contrib/segmentio/kafka.go.v0/kafka_test.go
@@ -0,0 +1,105 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016 Datadog, Inc.

package kafka

import (
"context"
"os"
"testing"

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/mocktracer"

kafka "github.com/segmentio/kafka-go"
"github.com/stretchr/testify/assert"
)

const (
testGroupID = "gosegtest"
testTopic = "gosegtest"
)

func skipIntegrationTest(t *testing.T) {
if _, ok := os.LookupEnv("INTEGRATION"); !ok {
t.Skip("🚧 Skipping integration test (INTEGRATION environment variable is not set)")
}
}

/*
to run the integration test locally, update the broker name to localhost:29092:

docker network create segementio

docker run --rm \
--name zookeeper \
--network segementio \
-p 2181:2181 \
wurstmeister/zookeeper:3.4.6

docker run --rm \
--name kafka \
--network segementio \
-p 29092:29092 \
-e KAFKA_CREATE_TOPICS=gotest:1:1 \
-e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
-e KAFKA_LISTENERS=INSIDE://kafka:9092,OUTSIDE://kafka:29092 \
-e KAFKA_ADVERTISED_LISTENERS=INSIDE://kafka:9092,OUTSIDE://localhost:29092 \
-e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT \
-e KAFKA_INTER_BROKER_LISTENER_NAME=INSIDE \
wurstmeister/kafka:2.13-2.7.0
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not able to get these instructions to work, and anyway we don't want users to have to edit the tests in order to run them locally.

Let's either fix these instructions to bring up a kafka such that tests pass without modification (should be possible since 9092 is not a protected port) or remove these instructions.


func TestConsumerFunctional(t *testing.T) {
skipIntegrationTest(t)
mt := mocktracer.Start()
defer mt.Stop()

w := NewWriter(kafka.WriterConfig{
Brokers: []string{"localhost:9092"},
Topic: testTopic,
}, WithAnalyticsRate(0.1))
msg1 := []kafka.Message{
{
Key: []byte("key1"),
Value: []byte("value1"),
},
}
err := w.WriteMessages(context.Background(), msg1...)
assert.NoError(t, err, "Expected to write message to topic")
w.Close()

r := NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
GroupID: testGroupID,
Topic: testTopic,
})
msg2, err := r.ReadMessage(context.Background())
assert.NoError(t, err, "Expected to consume message")
assert.Equal(t, msg1[0].Value, msg2.Value, "Values should be equal")
r.Close()

// now verify the spans
spans := mt.FinishedSpans()
assert.Len(t, spans, 2)
// they should be linked via headers
assert.Equal(t, spans[0].TraceID(), spans[1].TraceID(), "Trace IDs should match")

s0 := spans[0] // produce
assert.Equal(t, "kafka.produce", s0.OperationName())
assert.Equal(t, "kafka", s0.Tag(ext.ServiceName))
assert.Equal(t, "Produce Topic "+testTopic, s0.Tag(ext.ResourceName))
assert.Equal(t, 0.1, s0.Tag(ext.EventSampleRate))
assert.Equal(t, "queue", s0.Tag(ext.SpanType))
assert.Equal(t, 0, s0.Tag("partition"))

s1 := spans[1] // consume
assert.Equal(t, "kafka.consume", s1.OperationName())
assert.Equal(t, "kafka", s1.Tag(ext.ServiceName))
assert.Equal(t, "Consume Topic "+testTopic, s1.Tag(ext.ResourceName))
assert.Equal(t, nil, s1.Tag(ext.EventSampleRate))
assert.Equal(t, "queue", s1.Tag(ext.SpanType))
assert.Equal(t, 0, s1.Tag("partition"))
}