New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
contrib/segmentio/kafka-go: add tracing for kafka writer and kafka reader #897
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your contribution. Can you please take a moment to read our contribution guidelines. I think we ultimately want this change, but please go ahead and open an issue so we can follow the regular process...
Thanks for reviewing this PR, created #899 issue. |
@gbbr CircleCI metadata is failing still, not sure what I am missing, Can you help me fix the same? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your contribution. And thanks for following our process. Can you please name the package accordingly as per our guidelines from here. Basically we'd have to call it the folder contrib/segmentio/kafka-go/kafka.go.v0
.
.circleci/config.yml
Outdated
@@ -119,6 +119,17 @@ jobs: | |||
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 | |||
KAFKA_CREATE_TOPICS: gotest:1:1 | |||
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: "1" | |||
- image: wurstmeister/zookeeper:3.4.6 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are already running Kafka above (confluentinc/cp-kafka:5.0.0
). Can we not use that instance instead? Why do we need a different one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried using that one, it didn't work as expected.
When using same topic, it reads the messages from confluent test producer.
Tried with different topic, added the new topic in circleci config, got the error message saying the topic doesn't exist though I see topic is created.
Due to librdkafka requirment, I couldn't test the changes in my local.
One advantage of using a separate will remove the dependency and the unexpected issues, So I added this image.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's use just one of them for both, whichever works.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
using wurstmeister/zookeeper
and wurstmeister/kafka
for both
} | ||
|
||
// NewMessageCarrier creates a new MessageCarrier. | ||
func NewMessageCarrier(msg *kafka.Message) MessageCarrier { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need to be part of the public API? Probably not. We can remove the function and just use contextCarrier(msg)
inline
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That works for me, I added this method here because both confuent-kafka and sarama package has this.
The reason we may need when the clients want to Extract the span context from the header.
I have added ExtractSpanContextFromMessage
method todo that.
) | ||
|
||
// A MessageCarrier injects and extracts traces from a sarama.ProducerMessage. | ||
type MessageCarrier struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need a struct? Probably not
type MessageCarrier struct { | |
// contextCarrier implements tracer.TextMapWriter and tracer.TextMapReader on top of a kafka.Message. | |
type contextCarrier *kafka.Message |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I verified it, it doesn't update the header when we use contextCarrier
prev ddtrace.Span | ||
} | ||
|
||
func (r *Reader) startSpan(ctx context.Context, msg *kafka.Message) ddtrace.Span { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There seems to be a ton of duplication between here and our other Kafka integration. In order to keep the experience consistent I'd prefer it if we would factor out the duplication into another package, let's say contrib/internal/kafkautil
. WDYT? I mean both code and tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also thought the same, didn't get time to do that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately we'll have to do that, it's too much duplication and I don't want to risk discrepancies and unilateral bugs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, will do it when I get time
Hey y'all, was curious if there were any plans to pick this back up any time soon? No worries if not, but figured I'd check! |
This is in our backlog now and we'll continue with it when we can schedule it. |
This was merged with #1152 |
Fixes #899
Currently datadog tracing wrapper is not available for segmentio/kafka-go, add segmentio/kafka-go wrapper for KafkaWriter and KafkaReader in github.com/segementio/kafka-go package to create traces when writing messages to kafka topic and reading message from kafka topic
This implemention is similar to Sarama and Confluentinc kafka.
Also added the integration test to test the Reader and Writer wrappers.