New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
contrib/segmentio/kafka-go: add implementation add tracing for kafka writer and kafka reader #1152
Merged
Merged
Changes from all commits
Commits
Show all changes
26 commits
Select commit
Hold shift + click to select a range
7ffd45f
Add tracing for segmentio/kafka-go
f3c0482
Update import as per guidelines
d956bf2
Update circleci config to add topic and use new topic in test
60ae8d5
Use the same topic with last offset
88ed7bc
Update log level
418180d
Use apache kafka image to test segmentio kafka-go changes
4a7ac09
Update apache kafka configuration to expose different port
ddfae80
Fix `Socket server failed to bind to kafka:9092: Unresolved address` …
bb04e82
Use container name to fx port conflict issue
044b55c
Fix kafka listener host name
84fbf18
Use kafka listener and advertised listener to fix connection issue
388658f
Add asserts to validate consumer span
f1454cd
Move Extract function to headers.go file
96ff7ba
Use same container for confluentinc kafka and segmentio kafka client
adcbe33
rename to include .v0
ajgajg1134 37eba0e
contrib/segmentio/kafka.go.v0: add copyright headers
ajgajg1134 2cbfdee
Merge branch 'v1' into segmentio/kafka-go
ajgajg1134 ceaf249
PR Comments, add docker-compose file for easier local integration tes…
ajgajg1134 947d5df
Merge branch 'v1' into segmentio/kafka-go
ajgajg1134 8de6e46
fix goimports, pin kafka to latest 2.x.x version
ajgajg1134 2d9aa54
try creating kafka topic before running tests
ajgajg1134 3edfca6
try creating kafka topic before running tests (attempt2)
ajgajg1134 bb0ab40
create kafka topic for segmentio
ajgajg1134 e6c72d3
Merge branch 'v1' into segmentio/kafka-go
ajgajg1134 967d212
Merge branch 'v1' into segmentio/kafka-go
ajgajg1134 75551e4
Merge branch 'v1' into segmentio/kafka-go
knusbaum File filter
Filter by extension
Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
// Unless explicitly stated otherwise all files in this repository are licensed | ||
// under the Apache License Version 2.0. | ||
// This product includes software developed at Datadog (https://www.datadoghq.com/). | ||
// Copyright 2016 Datadog, Inc. | ||
|
||
package kafka_test | ||
|
||
import ( | ||
"context" | ||
"log" | ||
"time" | ||
|
||
kafkatrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/segmentio/kafka.go.v0" | ||
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" | ||
|
||
kafka "github.com/segmentio/kafka-go" | ||
) | ||
|
||
func ExampleWriter() { | ||
w := kafkatrace.NewWriter(kafka.WriterConfig{ | ||
Brokers: []string{"localhost:9092"}, | ||
Topic: "some-topic", | ||
}) | ||
|
||
// use slice as it passes the value by reference if you want message headers updated in kafkatrace | ||
msgs := []kafka.Message{ | ||
{ | ||
Key: []byte("key1"), | ||
Value: []byte("value1"), | ||
}, | ||
} | ||
if err := w.WriteMessages(context.Background(), msgs...); err != nil { | ||
log.Fatal("Failed to write message", err) | ||
} | ||
} | ||
|
||
func ExampleReader() { | ||
r := kafkatrace.NewReader(kafka.ReaderConfig{ | ||
Brokers: []string{"localhost:9092"}, | ||
Topic: "some-topic", | ||
GroupID: "group-id", | ||
SessionTimeout: 30 * time.Second, | ||
}) | ||
msg, err := r.ReadMessage(context.Background()) | ||
if err != nil { | ||
log.Fatal("Failed to read message", err) | ||
} | ||
|
||
// create a child span using span id and trace id in message header | ||
spanContext, err := kafkatrace.ExtractSpanContext(msg) | ||
if err != nil { | ||
log.Fatal("Failed to extract span context from carrier", err) | ||
} | ||
operationName := "child-span" | ||
s := tracer.StartSpan(operationName, tracer.ChildOf(spanContext)) | ||
defer s.Finish() | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
// Unless explicitly stated otherwise all files in this repository are licensed | ||
// under the Apache License Version 2.0. | ||
// This product includes software developed at Datadog (https://www.datadoghq.com/). | ||
// Copyright 2016 Datadog, Inc. | ||
|
||
package kafka | ||
|
||
import ( | ||
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace" | ||
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" | ||
|
||
"github.com/segmentio/kafka-go" | ||
) | ||
|
||
// A messageCarrier implements TextMapReader/TextMapWriter for extracting/injecting traces on a kafka.Message | ||
type messageCarrier struct { | ||
msg *kafka.Message | ||
} | ||
|
||
var _ interface { | ||
tracer.TextMapReader | ||
tracer.TextMapWriter | ||
} = (*messageCarrier)(nil) | ||
|
||
// ForeachKey conforms to the TextMapReader interface. | ||
func (c messageCarrier) ForeachKey(handler func(key, val string) error) error { | ||
for _, h := range c.msg.Headers { | ||
err := handler(h.Key, string(h.Value)) | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
// Set implements TextMapWriter | ||
func (c messageCarrier) Set(key, val string) { | ||
// ensure uniqueness of keys | ||
for i := 0; i < len(c.msg.Headers); i++ { | ||
if string(c.msg.Headers[i].Key) == key { | ||
c.msg.Headers = append(c.msg.Headers[:i], c.msg.Headers[i+1:]...) | ||
i-- | ||
} | ||
} | ||
c.msg.Headers = append(c.msg.Headers, kafka.Header{ | ||
Key: key, | ||
Value: []byte(val), | ||
}) | ||
} | ||
|
||
// ExtractSpanContext retrieves the SpanContext from a kafka.Message | ||
func ExtractSpanContext(msg kafka.Message) (ddtrace.SpanContext, error) { | ||
return tracer.Extract(messageCarrier{&msg}) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,148 @@ | ||
// Unless explicitly stated otherwise all files in this repository are licensed | ||
// under the Apache License Version 2.0. | ||
// This product includes software developed at Datadog (https://www.datadoghq.com/). | ||
// Copyright 2016 Datadog, Inc. | ||
|
||
package kafka | ||
|
||
import ( | ||
"context" | ||
"math" | ||
|
||
"github.com/segmentio/kafka-go" | ||
|
||
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace" | ||
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext" | ||
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" | ||
"gopkg.in/DataDog/dd-trace-go.v1/internal/log" | ||
) | ||
|
||
// NewReader calls kafka.NewReader and wraps the resulting Consumer. | ||
func NewReader(conf kafka.ReaderConfig, opts ...Option) *Reader { | ||
return WrapReader(kafka.NewReader(conf), opts...) | ||
} | ||
|
||
// NewWriter calls kafka.NewWriter and wraps the resulting Producer. | ||
func NewWriter(conf kafka.WriterConfig, opts ...Option) *Writer { | ||
return WrapWriter(kafka.NewWriter(conf), opts...) | ||
} | ||
|
||
// WrapReader wraps a kafka.Reader so that any consumed events are traced. | ||
func WrapReader(c *kafka.Reader, opts ...Option) *Reader { | ||
wrapped := &Reader{ | ||
Reader: c, | ||
cfg: newConfig(opts...), | ||
} | ||
log.Debug("contrib/confluentinc/confluent-kafka.go.v0/kafka: Wrapping Reader: %#v", wrapped.cfg) | ||
return wrapped | ||
} | ||
|
||
// A Reader wraps a kafka.Reader. | ||
type Reader struct { | ||
*kafka.Reader | ||
cfg *config | ||
prev ddtrace.Span | ||
} | ||
|
||
func (r *Reader) startSpan(ctx context.Context, msg *kafka.Message) ddtrace.Span { | ||
opts := []tracer.StartSpanOption{ | ||
tracer.ServiceName(r.cfg.consumerServiceName), | ||
tracer.ResourceName("Consume Topic " + msg.Topic), | ||
tracer.SpanType(ext.SpanTypeMessageConsumer), | ||
tracer.Tag("partition", msg.Partition), | ||
tracer.Tag("offset", msg.Offset), | ||
tracer.Measured(), | ||
} | ||
if !math.IsNaN(r.cfg.analyticsRate) { | ||
opts = append(opts, tracer.Tag(ext.EventSampleRate, r.cfg.analyticsRate)) | ||
} | ||
// kafka supports headers, so try to extract a span context | ||
carrier := messageCarrier{msg} | ||
if spanctx, err := tracer.Extract(carrier); err == nil { | ||
opts = append(opts, tracer.ChildOf(spanctx)) | ||
} | ||
span, _ := tracer.StartSpanFromContext(ctx, "kafka.consume", opts...) | ||
// reinject the span context so consumers can pick it up | ||
if err := tracer.Inject(span.Context(), carrier); err != nil { | ||
log.Debug("contrib/segmentio/kafka.go.v0: Failed to inject span context into carrier, %v", err) | ||
} | ||
return span | ||
} | ||
|
||
// Close calls the underlying Reader.Close and if polling is enabled, finishes | ||
// any remaining span. | ||
func (r *Reader) Close() error { | ||
err := r.Reader.Close() | ||
if r.prev != nil { | ||
r.prev.Finish() | ||
r.prev = nil | ||
} | ||
return err | ||
} | ||
|
||
// ReadMessage polls the consumer for a message. Message will be traced. | ||
func (r *Reader) ReadMessage(ctx context.Context) (kafka.Message, error) { | ||
if r.prev != nil { | ||
r.prev.Finish() | ||
r.prev = nil | ||
} | ||
msg, err := r.Reader.ReadMessage(ctx) | ||
if err != nil { | ||
return kafka.Message{}, err | ||
} | ||
r.prev = r.startSpan(ctx, &msg) | ||
return msg, nil | ||
} | ||
knusbaum marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
// WrapWriter wraps a kafka.Writer so requests are traced. | ||
func WrapWriter(w *kafka.Writer, opts ...Option) *Writer { | ||
writer := &Writer{ | ||
Writer: w, | ||
cfg: newConfig(opts...), | ||
} | ||
log.Debug("contrib/segmentio/kafka.go.v0: Wrapping Writer: %#v", writer.cfg) | ||
return writer | ||
} | ||
|
||
// Writer wraps a kafka.Writer with tracing config data | ||
type Writer struct { | ||
*kafka.Writer | ||
cfg *config | ||
} | ||
|
||
func (w *Writer) startSpan(ctx context.Context, msg *kafka.Message) ddtrace.Span { | ||
opts := []tracer.StartSpanOption{ | ||
tracer.ServiceName(w.cfg.producerServiceName), | ||
tracer.ResourceName("Produce Topic " + w.Writer.Topic), | ||
tracer.SpanType(ext.SpanTypeMessageProducer), | ||
} | ||
if !math.IsNaN(w.cfg.analyticsRate) { | ||
opts = append(opts, tracer.Tag(ext.EventSampleRate, w.cfg.analyticsRate)) | ||
} | ||
carrier := messageCarrier{msg} | ||
span, _ := tracer.StartSpanFromContext(ctx, "kafka.produce", opts...) | ||
err := tracer.Inject(span.Context(), carrier) | ||
log.Debug("contrib/segmentio/kafka.go.v0: Failed to inject span context into carrier, %v", err) | ||
return span | ||
} | ||
|
||
func finishSpan(span ddtrace.Span, partition int, offset int64, err error) { | ||
span.SetTag("partition", partition) | ||
span.SetTag("offset", offset) | ||
span.Finish(tracer.WithError(err)) | ||
} | ||
|
||
// WriteMessages calls kafka.go.v0.Writer.WriteMessages and traces the requests. | ||
func (w *Writer) WriteMessages(ctx context.Context, msgs ...kafka.Message) error { | ||
// although there's only one call made to the SyncProducer, the messages are | ||
// treated individually, so we create a span for each one | ||
spans := make([]ddtrace.Span, len(msgs)) | ||
for i := range msgs { | ||
spans[i] = w.startSpan(ctx, &msgs[i]) | ||
} | ||
err := w.Writer.WriteMessages(ctx, msgs...) | ||
for i, span := range spans { | ||
finishSpan(span, msgs[i].Partition, msgs[i].Offset, err) | ||
} | ||
return err | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
// Unless explicitly stated otherwise all files in this repository are licensed | ||
// under the Apache License Version 2.0. | ||
// This product includes software developed at Datadog (https://www.datadoghq.com/). | ||
// Copyright 2016 Datadog, Inc. | ||
|
||
package kafka | ||
|
||
import ( | ||
"context" | ||
"os" | ||
"testing" | ||
"time" | ||
|
||
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext" | ||
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/mocktracer" | ||
|
||
kafka "github.com/segmentio/kafka-go" | ||
"github.com/stretchr/testify/assert" | ||
) | ||
|
||
const ( | ||
testGroupID = "gosegtest" | ||
testTopic = "gosegtest" | ||
) | ||
|
||
func skipIntegrationTest(t *testing.T) { | ||
if _, ok := os.LookupEnv("INTEGRATION"); !ok { | ||
t.Skip("🚧 Skipping integration test (INTEGRATION environment variable is not set)") | ||
} | ||
} | ||
|
||
/* | ||
to setup the integration test locally run: | ||
docker-compose -f local_testing.yaml up | ||
*/ | ||
|
||
func TestConsumerFunctional(t *testing.T) { | ||
skipIntegrationTest(t) | ||
mt := mocktracer.Start() | ||
defer mt.Stop() | ||
|
||
kw := &kafka.Writer{ | ||
Addr: kafka.TCP("localhost:9092"), | ||
Topic: testTopic, | ||
RequiredAcks: kafka.RequireOne, | ||
} | ||
|
||
w := WrapWriter(kw, WithAnalyticsRate(0.1)) | ||
msg1 := []kafka.Message{ | ||
{ | ||
Key: []byte("key1"), | ||
Value: []byte("value1"), | ||
}, | ||
} | ||
err := w.WriteMessages(context.Background(), msg1...) | ||
assert.NoError(t, err, "Expected to write message to topic") | ||
err = w.Close() | ||
assert.NoError(t, err) | ||
|
||
tctx, _ := context.WithTimeout(context.Background(), 30*time.Second) | ||
r := NewReader(kafka.ReaderConfig{ | ||
Brokers: []string{"localhost:9092"}, | ||
GroupID: testGroupID, | ||
Topic: testTopic, | ||
}) | ||
msg2, err := r.ReadMessage(tctx) | ||
assert.NoError(t, err, "Expected to consume message") | ||
assert.Equal(t, msg1[0].Value, msg2.Value, "Values should be equal") | ||
r.Close() | ||
|
||
// now verify the spans | ||
spans := mt.FinishedSpans() | ||
assert.Len(t, spans, 2) | ||
// they should be linked via headers | ||
assert.Equal(t, spans[0].TraceID(), spans[1].TraceID(), "Trace IDs should match") | ||
|
||
s0 := spans[0] // produce | ||
assert.Equal(t, "kafka.produce", s0.OperationName()) | ||
assert.Equal(t, "kafka", s0.Tag(ext.ServiceName)) | ||
assert.Equal(t, "Produce Topic "+testTopic, s0.Tag(ext.ResourceName)) | ||
assert.Equal(t, 0.1, s0.Tag(ext.EventSampleRate)) | ||
assert.Equal(t, "queue", s0.Tag(ext.SpanType)) | ||
assert.Equal(t, 0, s0.Tag("partition")) | ||
|
||
s1 := spans[1] // consume | ||
assert.Equal(t, "kafka.consume", s1.OperationName()) | ||
assert.Equal(t, "kafka", s1.Tag(ext.ServiceName)) | ||
assert.Equal(t, "Consume Topic "+testTopic, s1.Tag(ext.ResourceName)) | ||
assert.Equal(t, nil, s1.Tag(ext.EventSampleRate)) | ||
assert.Equal(t, "queue", s1.Tag(ext.SpanType)) | ||
assert.Equal(t, 0, s1.Tag("partition")) | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here, we should add just a bit more info for the documentation.