Skip to content

Commit

Permalink
export interceptor methods
Browse files Browse the repository at this point in the history
  • Loading branch information
d1egoaz committed Jun 29, 2020
1 parent 76f08b8 commit a299948
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 11 deletions.
4 changes: 2 additions & 2 deletions async_producer_test.go
Expand Up @@ -1235,7 +1235,7 @@ type appendInterceptor struct {
i int
}

func (b *appendInterceptor) onSend(msg *ProducerMessage) {
func (b *appendInterceptor) OnSend(msg *ProducerMessage) {
if b.i < 0 {
panic("hey, the interceptor has failed")
}
Expand All @@ -1244,7 +1244,7 @@ func (b *appendInterceptor) onSend(msg *ProducerMessage) {
b.i++
}

func (b *appendInterceptor) onConsume(msg *ConsumerMessage) {
func (b *appendInterceptor) OnConsume(msg *ConsumerMessage) {
if b.i < 0 {
panic("hey, the interceptor has failed")
}
Expand Down
6 changes: 3 additions & 3 deletions config.go
Expand Up @@ -234,7 +234,7 @@ type Config struct {
// message for the first time. Interceptors allows to intercept and
// possible mutate the message before they are published to Kafka
// cluster. *ProducerMessage modified by the first interceptor's
// onSend() is passed to the second interceptor onSend(), and so on in
// OnSend() is passed to the second interceptor OnSend(), and so on in
// the interceptor chain.
Interceptors []ProducerInterceptor
}
Expand Down Expand Up @@ -403,8 +403,8 @@ type Config struct {
// Interceptors to be called just before the record is sent to the
// messages channel. Interceptors allows to intercept and possible
// mutate the message before they are returned to the client.
// *ConsumerMessage modified by the first interceptor's onConsume() is
// passed to the second interceptor onConsume(), and so on in the
// *ConsumerMessage modified by the first interceptor's OnConsume() is
// passed to the second interceptor OnConsume(), and so on in the
// interceptor chain.
Interceptors []ConsumerInterceptor
}
Expand Down
12 changes: 6 additions & 6 deletions interceptors.go
Expand Up @@ -5,21 +5,21 @@ package sarama
// https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors#KIP42:AddProducerandConsumerInterceptors-Motivation
type ProducerInterceptor interface {

// onSend is called when the producer message is intercepted. Please avoid
// OnSend is called when the producer message is intercepted. Please avoid
// modifying the message until it's safe to do so, as this is _not_ a copy
// of the message.
onSend(*ProducerMessage)
OnSend(*ProducerMessage)
}

// ConsumerInterceptor allows you to intercept (and possibly mutate) the records
// received by the consumer before they are sent to the messages channel.
// https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors#KIP42:AddProducerandConsumerInterceptors-Motivation
type ConsumerInterceptor interface {

// onConsume is called when the consumed message is intercepted. Please
// OnConsume is called when the consumed message is intercepted. Please
// avoid modifying the message until it's safe to do so, as this is _not_ a
// copy of the message.
onConsume(*ConsumerMessage)
OnConsume(*ConsumerMessage)
}

func (msg *ProducerMessage) safelyApplyInterceptor(interceptor ProducerInterceptor) {
Expand All @@ -29,7 +29,7 @@ func (msg *ProducerMessage) safelyApplyInterceptor(interceptor ProducerIntercept
}
}()

interceptor.onSend(msg)
interceptor.OnSend(msg)
}

func (msg *ConsumerMessage) safelyApplyInterceptor(interceptor ConsumerInterceptor) {
Expand All @@ -39,5 +39,5 @@ func (msg *ConsumerMessage) safelyApplyInterceptor(interceptor ConsumerIntercept
}
}()

interceptor.onConsume(msg)
interceptor.OnConsume(msg)
}

0 comments on commit a299948

Please sign in to comment.