Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KIP-42 Add producer and consumer interceptors #1730

Merged
merged 6 commits into from Jul 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions async_producer.go
Expand Up @@ -348,6 +348,10 @@ func (p *asyncProducer) dispatcher() {
p.inFlight.Add(1)
}

for _, interceptor := range p.conf.Producer.Interceptors {
msg.safelyApplyInterceptor(interceptor)
}

version := 1
if p.conf.Version.IsAtLeast(V0_11_0_0) {
version = 2
Expand Down
121 changes: 121 additions & 0 deletions async_producer_test.go
Expand Up @@ -5,6 +5,7 @@ import (
"log"
"os"
"os/signal"
"strconv"
"sync"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -1230,6 +1231,126 @@ func TestBrokerProducerShutdown(t *testing.T) {
mockBroker.Close()
}

type appendInterceptor struct {
i int
}

func (b *appendInterceptor) OnSend(msg *ProducerMessage) {
if b.i < 0 {
panic("hey, the interceptor has failed")
}
v, _ := msg.Value.Encode()
msg.Value = StringEncoder(string(v) + strconv.Itoa(b.i))
b.i++
}

func (b *appendInterceptor) OnConsume(msg *ConsumerMessage) {
if b.i < 0 {
panic("hey, the interceptor has failed")
}
msg.Value = []byte(string(msg.Value) + strconv.Itoa(b.i))
b.i++
}

func testProducerInterceptor(
t *testing.T,
interceptors []ProducerInterceptor,
expectationFn func(*testing.T, int, *ProducerMessage),
) {
seedBroker := NewMockBroker(t, 1)
leader := NewMockBroker(t, 2)
metadataLeader := new(MetadataResponse)
metadataLeader.AddBroker(leader.Addr(), leader.BrokerID())
metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
seedBroker.Returns(metadataLeader)

config := NewConfig()
config.Producer.Flush.Messages = 10
config.Producer.Return.Successes = true
config.Producer.Interceptors = interceptors
producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

for i := 0; i < 10; i++ {
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
}

prodSuccess := new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
leader.Returns(prodSuccess)

for i := 0; i < 10; i++ {
select {
case msg := <-producer.Errors():
t.Error(msg.Err)
case msg := <-producer.Successes():
expectationFn(t, i, msg)
}
}

closeProducer(t, producer)
leader.Close()
seedBroker.Close()
}

func TestAsyncProducerInterceptors(t *testing.T) {
tests := []struct {
name string
interceptors []ProducerInterceptor
expectationFn func(*testing.T, int, *ProducerMessage)
}{
{
name: "intercept messages",
interceptors: []ProducerInterceptor{&appendInterceptor{i: 0}},
expectationFn: func(t *testing.T, i int, msg *ProducerMessage) {
v, _ := msg.Value.Encode()
expected := TestMessage + strconv.Itoa(i)
if string(v) != expected {
t.Errorf("Interceptor should have incremented the value, got %s, expected %s", v, expected)
}
},
},
{
name: "interceptor chain",
interceptors: []ProducerInterceptor{&appendInterceptor{i: 0}, &appendInterceptor{i: 1000}},
expectationFn: func(t *testing.T, i int, msg *ProducerMessage) {
v, _ := msg.Value.Encode()
expected := TestMessage + strconv.Itoa(i) + strconv.Itoa(i+1000)
if string(v) != expected {
t.Errorf("Interceptor should have incremented the value, got %s, expected %s", v, expected)
}
},
},
{
name: "interceptor chain with one interceptor failing",
interceptors: []ProducerInterceptor{&appendInterceptor{i: -1}, &appendInterceptor{i: 1000}},
expectationFn: func(t *testing.T, i int, msg *ProducerMessage) {
v, _ := msg.Value.Encode()
expected := TestMessage + strconv.Itoa(i+1000)
if string(v) != expected {
t.Errorf("Interceptor should have incremented the value, got %s, expected %s", v, expected)
}
},
},
{
name: "interceptor chain with all interceptors failing",
interceptors: []ProducerInterceptor{&appendInterceptor{i: -1}, &appendInterceptor{i: -1}},
expectationFn: func(t *testing.T, i int, msg *ProducerMessage) {
v, _ := msg.Value.Encode()
expected := TestMessage
if string(v) != expected {
t.Errorf("Interceptor should have not changed the value, got %s, expected %s", v, expected)
}
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { testProducerInterceptor(t, tt.interceptors, tt.expectationFn) })
}
}

// This example shows how to use the producer while simultaneously
// reading the Errors channel to know about any failures.
func ExampleAsyncProducer_select() {
Expand Down
16 changes: 16 additions & 0 deletions config.go
Expand Up @@ -229,6 +229,14 @@ type Config struct {
// `Backoff` if set.
BackoffFunc func(retries, maxRetries int) time.Duration
}

// Interceptors to be called when the producer dispatcher reads the
// message for the first time. Interceptors allows to intercept and
// possible mutate the message before they are published to Kafka
// cluster. *ProducerMessage modified by the first interceptor's
// OnSend() is passed to the second interceptor OnSend(), and so on in
// the interceptor chain.
Interceptors []ProducerInterceptor
}

// Consumer is the namespace for configuration related to consuming messages,
Expand Down Expand Up @@ -391,6 +399,14 @@ type Config struct {
// - use `ReadUncommitted` (default) to consume and return all messages in message channel
// - use `ReadCommitted` to hide messages that are part of an aborted transaction
IsolationLevel IsolationLevel

// Interceptors to be called just before the record is sent to the
// messages channel. Interceptors allows to intercept and possible
// mutate the message before they are returned to the client.
// *ConsumerMessage modified by the first interceptor's OnConsume() is
// passed to the second interceptor OnConsume(), and so on in the
// interceptor chain.
Interceptors []ConsumerInterceptor
}

// A user-provided string sent with every request to the brokers for logging,
Expand Down
3 changes: 3 additions & 0 deletions consumer.go
Expand Up @@ -451,6 +451,9 @@ feederLoop:
}

for i, msg := range msgs {
for _, interceptor := range child.conf.Consumer.Interceptors {
msg.safelyApplyInterceptor(interceptor)
}
messageSelect:
select {
case <-child.dying:
Expand Down
110 changes: 110 additions & 0 deletions consumer_test.go
Expand Up @@ -5,6 +5,7 @@ import (
"os"
"os/signal"
"reflect"
"strconv"
"sync"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -1342,3 +1343,112 @@ func Test_partitionConsumer_parseResponse(t *testing.T) {
})
}
}

func testConsumerInterceptor(
t *testing.T,
interceptors []ConsumerInterceptor,
expectationFn func(*testing.T, int, *ConsumerMessage),
) {
// Given
broker0 := NewMockBroker(t, 0)

mockFetchResponse := NewMockFetchResponse(t, 1)
for i := 0; i < 10; i++ {
mockFetchResponse.SetMessage("my_topic", 0, int64(i), testMsg)
}

broker0.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetBroker(broker0.Addr(), broker0.BrokerID()).
SetLeader("my_topic", 0, broker0.BrokerID()),
"OffsetRequest": NewMockOffsetResponse(t).
SetOffset("my_topic", 0, OffsetOldest, 0).
SetOffset("my_topic", 0, OffsetNewest, 0),
"FetchRequest": mockFetchResponse,
})
config := NewConfig()
config.Consumer.Interceptors = interceptors
// When
master, err := NewConsumer([]string{broker0.Addr()}, config)
if err != nil {
t.Fatal(err)
}

consumer, err := master.ConsumePartition("my_topic", 0, 0)
if err != nil {
t.Fatal(err)
}

for i := 0; i < 10; i++ {
select {
case msg := <-consumer.Messages():
expectationFn(t, i, msg)
case err := <-consumer.Errors():
t.Error(err)
}
}

safeClose(t, consumer)
safeClose(t, master)
broker0.Close()
}

func TestConsumerInterceptors(t *testing.T) {
tests := []struct {
name string
interceptors []ConsumerInterceptor
expectationFn func(*testing.T, int, *ConsumerMessage)
}{
{
name: "intercept messages",
interceptors: []ConsumerInterceptor{&appendInterceptor{i: 0}},
expectationFn: func(t *testing.T, i int, msg *ConsumerMessage) {
ev, _ := testMsg.Encode()
expected := string(ev) + strconv.Itoa(i)
v := string(msg.Value)
if v != expected {
t.Errorf("Interceptor should have incremented the value, got %s, expected %s", v, expected)
}
},
},
{
name: "interceptor chain",
interceptors: []ConsumerInterceptor{&appendInterceptor{i: 0}, &appendInterceptor{i: 1000}},
expectationFn: func(t *testing.T, i int, msg *ConsumerMessage) {
ev, _ := testMsg.Encode()
expected := string(ev) + strconv.Itoa(i) + strconv.Itoa(i+1000)
v := string(msg.Value)
if v != expected {
t.Errorf("Interceptor should have incremented the value, got %s, expected %s", v, expected)
}
},
},
{
name: "interceptor chain with one interceptor failing",
interceptors: []ConsumerInterceptor{&appendInterceptor{i: -1}, &appendInterceptor{i: 1000}},
expectationFn: func(t *testing.T, i int, msg *ConsumerMessage) {
ev, _ := testMsg.Encode()
expected := string(ev) + strconv.Itoa(i+1000)
v := string(msg.Value)
if v != expected {
t.Errorf("Interceptor should have not changed the value, got %s, expected %s", v, expected)
}
},
},
{
name: "interceptor chain with all interceptors failing",
interceptors: []ConsumerInterceptor{&appendInterceptor{i: -1}, &appendInterceptor{i: -1}},
expectationFn: func(t *testing.T, i int, msg *ConsumerMessage) {
ev, _ := testMsg.Encode()
expected := string(ev)
v := string(msg.Value)
if v != expected {
t.Errorf("Interceptor should have incremented the value, got %s, expected %s", v, expected)
}
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { testConsumerInterceptor(t, tt.interceptors, tt.expectationFn) })
}
}
73 changes: 73 additions & 0 deletions functional_producer_test.go
Expand Up @@ -5,6 +5,7 @@ package sarama
import (
"fmt"
"os"
"strconv"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -183,6 +184,78 @@ func TestFuncProducingIdempotentWithBrokerFailure(t *testing.T) {
}
}

func TestInterceptors(t *testing.T) {
config := NewConfig()
setupFunctionalTest(t)
defer teardownFunctionalTest(t)

config.Producer.Return.Successes = true
config.Consumer.Return.Errors = true
config.Producer.Interceptors = []ProducerInterceptor{&appendInterceptor{i: 0}, &appendInterceptor{i: 100}}
config.Consumer.Interceptors = []ConsumerInterceptor{&appendInterceptor{i: 20}}

client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config)
if err != nil {
t.Fatal(err)
}

initialOffset, err := client.GetOffset("test.1", 0, OffsetNewest)
if err != nil {
t.Fatal(err)
}

producer, err := NewAsyncProducerFromClient(client)
if err != nil {
t.Fatal(err)
}

for i := 0; i < 10; i++ {
producer.Input() <- &ProducerMessage{Topic: "test.1", Key: nil, Value: StringEncoder(TestMessage)}
}

for i := 0; i < 10; i++ {
select {
case msg := <-producer.Errors():
t.Error(msg.Err)
case msg := <-producer.Successes():
v, _ := msg.Value.Encode()
expected := TestMessage + strconv.Itoa(i) + strconv.Itoa(i+100)
if string(v) != expected {
t.Errorf("Interceptor should have incremented the value, got %s, expected %s", v, expected)
}
}
}
safeClose(t, producer)

master, err := NewConsumerFromClient(client)
if err != nil {
t.Fatal(err)
}
consumer, err := master.ConsumePartition("test.1", 0, initialOffset)
if err != nil {
t.Fatal(err)
}

for i := 0; i < 10; i++ {
select {
case <-time.After(10 * time.Second):
t.Fatal("Not received any more events in the last 10 seconds.")
case err := <-consumer.Errors():
t.Error(err)
case msg := <-consumer.Messages():
prodInteExpectation := strconv.Itoa(i) + strconv.Itoa(i+100)
consInteExpectation := strconv.Itoa(i + 20)
expected := TestMessage + prodInteExpectation + consInteExpectation
v := string(msg.Value)
if v != expected {
t.Errorf("Interceptor should have incremented the value, got %s, expected %s", v, expected)
}
}
}
safeClose(t, consumer)
safeClose(t, client)
}

func testProducingMessages(t *testing.T, config *Config) {
setupFunctionalTest(t)
defer teardownFunctionalTest(t)
Expand Down