New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
contrib/segmentio/kafka-go: add implementation add tracing for kafka writer and kafka reader #1152
Conversation
drop NewMessageCarrier function, just use MessageCarrier{msg}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good.
I have some comments.
return writer | ||
} | ||
|
||
// Writer wraps a kafka.Writer. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Writer wraps a kafka.Writer.
Why? What does wrapping kafka.Writer accomplish? We should add a little more to this documentation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about something like?
Writer wraps a kafka.Writer with config data used for tracing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That works.
return wrapped | ||
} | ||
|
||
// A Reader wraps a kafka.Reader. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here, we should add just a bit more info for the documentation.
/* | ||
to run the integration test locally, update the broker name to localhost:29092: | ||
|
||
docker network create segementio | ||
|
||
docker run --rm \ | ||
--name zookeeper \ | ||
--network segementio \ | ||
-p 2181:2181 \ | ||
wurstmeister/zookeeper:3.4.6 | ||
|
||
docker run --rm \ | ||
--name kafka \ | ||
--network segementio \ | ||
-p 29092:29092 \ | ||
-e KAFKA_CREATE_TOPICS=gotest:1:1 \ | ||
-e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \ | ||
-e KAFKA_LISTENERS=INSIDE://kafka:9092,OUTSIDE://kafka:29092 \ | ||
-e KAFKA_ADVERTISED_LISTENERS=INSIDE://kafka:9092,OUTSIDE://localhost:29092 \ | ||
-e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT \ | ||
-e KAFKA_INTER_BROKER_LISTENER_NAME=INSIDE \ | ||
wurstmeister/kafka:2.13-2.7.0 | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not able to get these instructions to work, and anyway we don't want users to have to edit the tests in order to run them locally.
Let's either fix these instructions to bring up a kafka such that tests pass without modification (should be possible since 9092 is not a protected port) or remove these instructions.
return nil | ||
} | ||
|
||
// Set sets a header. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment is not useful. We should point out that this implements the ddtrace.TextMapWriter
interface.
Same for ForEachKey
(only for the TextMapReader
interface)
We do this with TextMapCarrier
. See:
dd-trace-go/ddtrace/tracer/textmap.go
Lines 53 to 66 in 5b6e61b
// Set implements TextMapWriter. | |
func (c TextMapCarrier) Set(key, val string) { | |
c[key] = val | |
} | |
// ForeachKey conforms to the TextMapReader interface. | |
func (c TextMapCarrier) ForeachKey(handler func(key, val string) error) error { | |
for k, v := range c { | |
if err := handler(k, v); err != nil { | |
return err | |
} | |
} | |
return nil | |
} |
) | ||
|
||
// A MessageCarrier injects and extracts traces from a kafka.Message. | ||
type MessageCarrier struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this type need to be exported? I don't think users need to interact directly with this and an unexported type can still conform to an interface.
Let's keep this unexported if possible.
This library fails on brand new topics as the leader is elected, to get around this create the topic in setup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good.
System tests look misconfigured. We can fix them elsewhere.
Fixes #899
This is a continuation of #897 with some of the comments addressed.
I attempted to pull out some of the duplication across the two, but since both pieces of code work with distinct
kafka
objects it led to a large layer of abstraction that seemed to be worse than the duplication included before. Namely: thestartSpan
methods would need to interact usinginterface{}
and it grew quite unreadable it seemed. I'm happy to consider other ideas / approaches to de-duplicating some of this with the confluentinc kafka package.