Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KIP-42 Add producer and consumer interceptors #1

Closed
wants to merge 19 commits into from

Conversation

d1egoaz
Copy link
Owner

@d1egoaz d1egoaz commented Jun 11, 2020

This PR includes:
Producer: onSend but it doesn't implement onAcknowledgement
Consumer: onConsume but it doesn't implement onCommit

I'm not sure if I need to add the onClose method. Maybe in another
iteration ¯_(ツ)_/¯

TODO: functional test, but it looks the way sarama functional test works is that they don't have this detail.

* Functional tests are skipped except when the `functional` build tag
  is passed to the go test command (i.e. go test -tags=functional)
* If TOXIPROXY_ADDR is not set when the functional tests are being run,
  it will use docker-compose to automatically spin up a
  zookeeper/kafka/toxiproxy environment suitab le for running the tests.
  This requies a working Docker and for the docker-compose command line
  tool to be installed.
* If TOXIPROXY_ADDR and KAFKA_VERSION are set, then the tests will not
  spin up any docker infrastructure and will instead rely on whatever
  kafka broker is behind the specified toxiproxy.
@vvuibert
Copy link

I'm not sure if I need to add the onClose

I think we should add the onClose since we use it to close the trace exporter
https://github.com/Shopify/shopify-docker-images/blob/master/kafka-mirrormaker/tracing/src/main/java/com/shopify/kafka/tracing/TracingConsumerInterceptor.java#L62-L63

interceptors.go Outdated Show resolved Hide resolved
@d1egoaz
Copy link
Owner Author

d1egoaz commented Jun 12, 2020

think we should add the onClose since we use it to close the trace exporter
https://github.com/Shopify/shopify-docker-images/blob/master/kafka-mirrormaker/tracing/src/main/java/com/shopify/kafka/tracing/TracingConsumerInterceptor.java#L62-L63

still unsure about this.
I understand that most of the java code needs something like this to allow this kind of plugins to be closed successfully as this might be things that are loaded dynamically, injected, etc.
In go there is not such thing, and you can always do some kind of defer myInterceptor.Close on the method that initializes the interceptor to close things if needed.

functional_producer_test.go Outdated Show resolved Hide resolved
// ProducerInterceptor allows you to intercept (and possibly mutate) the records
// received by the producer before they are published to the Kafka cluster.
// https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors#KIP42:AddProducerandConsumerInterceptors-Motivation
type ProducerInterceptor interface {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need a ProducerInterceptor and ConsumerInterceptor? could we just have an Interceptor with an onCall(message)?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we would need at least 2 methods, one that receives a ProducerMessage and other for ConsumerMessage, and that might be an issue, as if someone would like to only intercept a producer, it'll need to implement the onSend for consumer too.

So I think it's safe to leave it in 2 separated interfaces

@@ -25,6 +26,12 @@ func TestFuncProducingGzip(t *testing.T) {
testProducingMessages(t, config)
}

func TestFuncProducingZstd(t *testing.T) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe rebase since IBM#1726 was merged

interceptors.go Outdated
Comment on lines 19 to 21
// onConsume is called when the consumed message is intercepted. Please
// avoid modifying the message until it's safe to do so, as this is _not_ a
// copy of the message.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a bit scary since monitoring should not affect the code. this doc implies that an error in monitoring could break the code

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, that was my opinionated take on this.
I added some comments on the upstream PR about this, let's see if we can get some 👀 on this.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doc implies that an error in monitoring could break the code

Not break the code, but modify partially the message

@d1egoaz d1egoaz force-pushed the diego_interceptors branch 4 times, most recently from 5664afe to 4c63884 Compare June 22, 2020 19:09
KJTsanaktsidis and others added 4 commits June 24, 2020 13:51
…_tests

Use docker-compose to run the functional tests
- expose a `Commit()` sync method on ConsumerGroupSession
- don't create mainLoop in OffsetManager unless AutoCommit is enabled
Consumer group support for manually comitting offsets
This PR includes:
Producer: `onSend` but it doesn't implement `onAcknowledgement`
Consumer: `onConsume` but it doesn't implement `onCommit`

I'm not sure if I need to add the `onClose` method. Maybe in another
iteration ¯\_(ツ)_/¯
@d1egoaz d1egoaz closed this Jul 15, 2020
@d1egoaz d1egoaz deleted the diego_interceptors branch July 15, 2020 16:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
7 participants