Skip to content

Commit

Permalink
Kafka integration tests (#212)
Browse files Browse the repository at this point in the history
Signed-off-by: Sotirios Mantziaris <smantziaris@gmail.com>
  • Loading branch information
Sotirios Mantziaris committed May 25, 2020
1 parent 5bd08ad commit 7ef6482
Show file tree
Hide file tree
Showing 15 changed files with 957 additions and 664 deletions.
86 changes: 6 additions & 80 deletions client/kafka/builder_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package kafka

import (
"errors"
"testing"
"time"

Expand All @@ -14,7 +13,6 @@ import (
)

func TestVersion(t *testing.T) {
seed := createKafkaBroker(t, true)
type args struct {
version string
}
Expand All @@ -29,7 +27,7 @@ func TestVersion(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ab := NewBuilder([]string{seed.Addr()}).WithVersion(tt.args.version)
ab := NewBuilder([]string{"123"}).WithVersion(tt.args.version)
if tt.wantErr {
assert.NotEmpty(t, ab.errors)
} else {
Expand All @@ -43,7 +41,6 @@ func TestVersion(t *testing.T) {
}

func TestTimeouts(t *testing.T) {
seed := createKafkaBroker(t, true)
type args struct {
dial time.Duration
}
Expand All @@ -57,7 +54,7 @@ func TestTimeouts(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ab := NewBuilder([]string{seed.Addr()}).WithTimeout(tt.args.dial)
ab := NewBuilder([]string{"123"}).WithTimeout(tt.args.dial)
if tt.wantErr {
assert.NotEmpty(t, ab.errors)
} else {
Expand All @@ -69,7 +66,6 @@ func TestTimeouts(t *testing.T) {
}

func TestRequiredAcksPolicy(t *testing.T) {
seed := createKafkaBroker(t, true)
type args struct {
requiredAcks RequiredAcks
}
Expand All @@ -85,7 +81,7 @@ func TestRequiredAcksPolicy(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ab := NewBuilder([]string{seed.Addr()}).WithRequiredAcksPolicy(tt.args.requiredAcks)
ab := NewBuilder([]string{"123"}).WithRequiredAcksPolicy(tt.args.requiredAcks)
if tt.wantErr {
assert.NotEmpty(t, ab.errors)
} else {
Expand All @@ -97,7 +93,6 @@ func TestRequiredAcksPolicy(t *testing.T) {
}

func TestEncoder(t *testing.T) {
seed := createKafkaBroker(t, true)
type args struct {
enc encoding.EncodeFunc
contentType string
Expand All @@ -115,7 +110,7 @@ func TestEncoder(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ab := NewBuilder([]string{seed.Addr()}).WithEncoder(tt.args.enc, tt.args.contentType)
ab := NewBuilder([]string{"123"}).WithEncoder(tt.args.enc, tt.args.contentType)
if tt.wantErr {
assert.NotEmpty(t, ab.errors)
} else {
Expand All @@ -128,7 +123,6 @@ func TestEncoder(t *testing.T) {
}

func TestBrokers(t *testing.T) {
seed := createKafkaBroker(t, true)

type args struct {
brokers []string
Expand All @@ -138,8 +132,8 @@ func TestBrokers(t *testing.T) {
args args
wantErr bool
}{
{name: "single mock broker", args: args{brokers: []string{seed.Addr()}}, wantErr: false},
{name: "multiple mock brokers", args: args{brokers: []string{seed.Addr(), seed.Addr(), seed.Addr()}}, wantErr: false},
{name: "single mock broker", args: args{brokers: []string{"123"}}, wantErr: false},
{name: "multiple mock brokers", args: args{brokers: []string{"123", "123", "123"}}, wantErr: false},
{name: "empty brokers list", args: args{brokers: []string{}}, wantErr: true},
{name: "brokers list with an empty value", args: args{brokers: []string{" ", "value"}}, wantErr: true},
{name: "nil brokers list", args: args{brokers: nil}, wantErr: true},
Expand All @@ -155,71 +149,3 @@ func TestBrokers(t *testing.T) {
})
}
}

func Test_createAsyncProducerUsingBuilder(t *testing.T) {
seed := createKafkaBroker(t, true)

var builderNoErrors = []error{}
var builderAllErrors = []error{
errors.New("brokers list is empty"),
errors.New("encoder is nil"),
errors.New("content type is empty"),
errors.New("dial timeout has to be positive"),
errors.New("version is required"),
errors.New("invalid value for required acks policy provided"),
}

tests := map[string]struct {
brokers []string
version string
ack RequiredAcks
timeout time.Duration
enc encoding.EncodeFunc
contentType string
wantErrs []error
}{
"success": {
brokers: []string{seed.Addr()},
version: sarama.V0_8_2_0.String(),
ack: NoResponse,
timeout: 1 * time.Second,
enc: json.Encode,
contentType: json.Type,
wantErrs: builderNoErrors,
},
"error in all builder steps": {
brokers: []string{},
version: "",
ack: -5,
timeout: -1 * time.Second,
enc: nil,
contentType: "",
wantErrs: builderAllErrors,
},
}

for name, tt := range tests {
t.Run(name, func(t *testing.T) {
gotAsyncProducer, chErr, gotErrs := NewBuilder(tt.brokers).
WithVersion(tt.version).
WithRequiredAcksPolicy(tt.ack).
WithTimeout(tt.timeout).
WithEncoder(tt.enc, tt.contentType).
CreateAsync()

v, _ := sarama.ParseKafkaVersion(tt.version)
if len(tt.wantErrs) > 0 {
assert.ObjectsAreEqual(tt.wantErrs, gotErrs)
assert.Nil(t, gotAsyncProducer)
} else {
assert.NotNil(t, gotAsyncProducer)
assert.NotNil(t, chErr)
assert.IsType(t, &AsyncProducer{}, gotAsyncProducer)
assert.EqualValues(t, v, gotAsyncProducer.cfg.Version)
assert.EqualValues(t, tt.ack, gotAsyncProducer.cfg.Producer.RequiredAcks)
assert.Equal(t, tt.timeout, gotAsyncProducer.cfg.Net.DialTimeout)
}
})
}

}

0 comments on commit 7ef6482

Please sign in to comment.