Skip to content

Commit

Permalink
KIP-42 Add producer and consumer interceptors
Browse files Browse the repository at this point in the history
This PR includes:
Producer: `onSend` but it doesn't implement `onAcknowledgement`
Consumer: `onConsume` but it doesn't implement `onCommit`

I'm not sure if I need to add the `onClose` method. Maybe in another
iteration ¯\_(ツ)_/¯
  • Loading branch information
d1egoaz committed Jun 22, 2020
1 parent 6523153 commit 5664afe
Show file tree
Hide file tree
Showing 7 changed files with 377 additions and 0 deletions.
4 changes: 4 additions & 0 deletions async_producer.go
Expand Up @@ -348,6 +348,10 @@ func (p *asyncProducer) dispatcher() {
p.inFlight.Add(1)
}

for _, interceptor := range p.conf.Producer.Interceptors {
msg.safelyApplyInterceptor(interceptor)
}

version := 1
if p.conf.Version.IsAtLeast(V0_11_0_0) {
version = 2
Expand Down
121 changes: 121 additions & 0 deletions async_producer_test.go
Expand Up @@ -5,6 +5,7 @@ import (
"log"
"os"
"os/signal"
"strconv"
"sync"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -1230,6 +1231,126 @@ func TestBrokerProducerShutdown(t *testing.T) {
mockBroker.Close()
}

type appendInterceptor struct {
i int
}

func (b *appendInterceptor) onSend(msg *ProducerMessage) {
if b.i < 0 {
panic("hey, the interceptor have failed")
}
v, _ := msg.Value.Encode()
msg.Value = StringEncoder(string(v) + strconv.Itoa(b.i))
b.i++
}

func (b *appendInterceptor) onConsume(msg *ConsumerMessage) {
if b.i < 0 {
panic("hey, the interceptor have failed")
}
msg.Value = []byte(string(msg.Value) + strconv.Itoa(b.i))
b.i++
}

func testProducerInterceptor(
t *testing.T,
interceptors []ProducerInterceptor,
expectationFn func(*testing.T, int, *ProducerMessage),
) {
seedBroker := NewMockBroker(t, 1)
leader := NewMockBroker(t, 2)
metadataLeader := new(MetadataResponse)
metadataLeader.AddBroker(leader.Addr(), leader.BrokerID())
metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
seedBroker.Returns(metadataLeader)

config := NewConfig()
config.Producer.Flush.Messages = 10
config.Producer.Return.Successes = true
config.Producer.Interceptors = interceptors
producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

for i := 0; i < 10; i++ {
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
}

prodSuccess := new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
leader.Returns(prodSuccess)

for i := 0; i < 10; i++ {
select {
case msg := <-producer.Errors():
t.Error(msg.Err)
case msg := <-producer.Successes():
expectationFn(t, i, msg)
}
}

closeProducer(t, producer)
leader.Close()
seedBroker.Close()
}

func TestAsyncProducerInterceptors(t *testing.T) {
tests := []struct {
name string
interceptors []ProducerInterceptor
expectationFn func(*testing.T, int, *ProducerMessage)
}{
{
name: "intercept messages",
interceptors: []ProducerInterceptor{&appendInterceptor{i: 0}},
expectationFn: func(t *testing.T, i int, msg *ProducerMessage) {
v, _ := msg.Value.Encode()
expected := TestMessage + strconv.Itoa(i)
if string(v) != expected {
t.Errorf("Interceptor should have incremented the value, got %s, expected %s", v, expected)
}
},
},
{
name: "interceptor chain",
interceptors: []ProducerInterceptor{&appendInterceptor{i: 0}, &appendInterceptor{i: 1000}},
expectationFn: func(t *testing.T, i int, msg *ProducerMessage) {
v, _ := msg.Value.Encode()
expected := TestMessage + strconv.Itoa(i) + strconv.Itoa(i+1000)
if string(v) != expected {
t.Errorf("Interceptor should have incremented the value, got %s, expected %s", v, expected)
}
},
},
{
name: "interceptor chain with one interceptor failing",
interceptors: []ProducerInterceptor{&appendInterceptor{i: -1}, &appendInterceptor{i: 1000}},
expectationFn: func(t *testing.T, i int, msg *ProducerMessage) {
v, _ := msg.Value.Encode()
expected := TestMessage + strconv.Itoa(i+1000)
if string(v) != expected {
t.Errorf("Interceptor should have incremented the value, got %s, expected %s", v, expected)
}
},
},
{
name: "interceptor chain with all interceptors failing",
interceptors: []ProducerInterceptor{&appendInterceptor{i: -1}, &appendInterceptor{i: -1}},
expectationFn: func(t *testing.T, i int, msg *ProducerMessage) {
v, _ := msg.Value.Encode()
expected := TestMessage
if string(v) != expected {
t.Errorf("Interceptor should have not changed the value, got %s, expected %s", v, expected)
}
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { testProducerInterceptor(t, tt.interceptors, tt.expectationFn) })
}
}

// This example shows how to use the producer while simultaneously
// reading the Errors channel to know about any failures.
func ExampleAsyncProducer_select() {
Expand Down
16 changes: 16 additions & 0 deletions config.go
Expand Up @@ -229,6 +229,14 @@ type Config struct {
// `Backoff` if set.
BackoffFunc func(retries, maxRetries int) time.Duration
}

// Interceptors to be called when the producer dispatcher reads the
// message for the first time. Interceptors allows to intercept and
// possible mutate the message before they are published to Kafka
// cluster. *ProducerMessage modified by the first interceptor's
// onSend() is passed to the second interceptor onSend(), and so on in
// the interceptor chain.
Interceptors []ProducerInterceptor
}

// Consumer is the namespace for configuration related to consuming messages,
Expand Down Expand Up @@ -391,6 +399,14 @@ type Config struct {
// - use `ReadUncommitted` (default) to consume and return all messages in message channel
// - use `ReadCommitted` to hide messages that are part of an aborted transaction
IsolationLevel IsolationLevel

// Interceptors to be called just before the record is sent to the
// messages channel. Interceptors allows to intercept and possible
// mutate the message before they are returned to the client.
// *ConsumerMessage modified by the first interceptor's onConsume() is
// passed to the second interceptor onConsume(), and so on in the
// interceptor chain.
Interceptors []ConsumerInterceptor
}

// A user-provided string sent with every request to the brokers for logging,
Expand Down
3 changes: 3 additions & 0 deletions consumer.go
Expand Up @@ -451,6 +451,9 @@ feederLoop:
}

for i, msg := range msgs {
for _, interceptor := range child.conf.Consumer.Interceptors {
msg.safelyApplyInterceptor(interceptor)
}
messageSelect:
select {
case <-child.dying:
Expand Down
110 changes: 110 additions & 0 deletions consumer_test.go
Expand Up @@ -5,6 +5,7 @@ import (
"os"
"os/signal"
"reflect"
"strconv"
"sync"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -1342,3 +1343,112 @@ func Test_partitionConsumer_parseResponse(t *testing.T) {
})
}
}

func testConsumerInterceptor(
t *testing.T,
interceptors []ConsumerInterceptor,
expectationFn func(*testing.T, int, *ConsumerMessage),
) {
// Given
broker0 := NewMockBroker(t, 0)

mockFetchResponse := NewMockFetchResponse(t, 1)
for i := 0; i < 10; i++ {
mockFetchResponse.SetMessage("my_topic", 0, int64(i), testMsg)
}

broker0.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetBroker(broker0.Addr(), broker0.BrokerID()).
SetLeader("my_topic", 0, broker0.BrokerID()),
"OffsetRequest": NewMockOffsetResponse(t).
SetOffset("my_topic", 0, OffsetOldest, 0).
SetOffset("my_topic", 0, OffsetNewest, 0),
"FetchRequest": mockFetchResponse,
})
config := NewConfig()
config.Consumer.Interceptors = interceptors
// When
master, err := NewConsumer([]string{broker0.Addr()}, config)
if err != nil {
t.Fatal(err)
}

consumer, err := master.ConsumePartition("my_topic", 0, 0)
if err != nil {
t.Fatal(err)
}

for i := 0; i < 10; i++ {
select {
case msg := <-consumer.Messages():
expectationFn(t, i, msg)
case err := <-consumer.Errors():
t.Error(err)
}
}

safeClose(t, consumer)
safeClose(t, master)
broker0.Close()
}

func TestConsumerInterceptors(t *testing.T) {
tests := []struct {
name string
interceptors []ConsumerInterceptor
expectationFn func(*testing.T, int, *ConsumerMessage)
}{
{
name: "intercept messages",
interceptors: []ConsumerInterceptor{&appendInterceptor{i: 0}},
expectationFn: func(t *testing.T, i int, msg *ConsumerMessage) {
ev, _ := testMsg.Encode()
expected := string(ev) + strconv.Itoa(i)
v := string(msg.Value)
if string(v) != expected {
t.Errorf("Interceptor should have incremented the value, got %s, expected %s", v, expected)
}
},
},
{
name: "interceptor chain",
interceptors: []ConsumerInterceptor{&appendInterceptor{i: 0}, &appendInterceptor{i: 1000}},
expectationFn: func(t *testing.T, i int, msg *ConsumerMessage) {
ev, _ := testMsg.Encode()
expected := string(ev) + strconv.Itoa(i) + strconv.Itoa(i+1000)
v := string(msg.Value)
if string(v) != expected {
t.Errorf("Interceptor should have incremented the value, got %s, expected %s", v, expected)
}
},
},
{
name: "interceptor chain with one interceptor failing",
interceptors: []ConsumerInterceptor{&appendInterceptor{i: -1}, &appendInterceptor{i: 1000}},
expectationFn: func(t *testing.T, i int, msg *ConsumerMessage) {
ev, _ := testMsg.Encode()
expected := string(ev) + strconv.Itoa(i+1000)
v := string(msg.Value)
if string(v) != expected {
t.Errorf("Interceptor should have not changed the value, got %s, expected %s", v, expected)
}
},
},
{
name: "interceptor chain with all interceptors failing",
interceptors: []ConsumerInterceptor{&appendInterceptor{i: -1}, &appendInterceptor{i: -1}},
expectationFn: func(t *testing.T, i int, msg *ConsumerMessage) {
ev, _ := testMsg.Encode()
expected := string(ev)
v := string(msg.Value)
if string(v) != expected {
t.Errorf("Interceptor should have incremented the value, got %s, expected %s", v, expected)
}
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { testConsumerInterceptor(t, tt.interceptors, tt.expectationFn) })
}
}

0 comments on commit 5664afe

Please sign in to comment.