Skip to content

Commit

Permalink
Merge branch 'master' into cd/1577/consumer-group-mocks
Browse files Browse the repository at this point in the history
  • Loading branch information
Kranti Deep committed Oct 11, 2020
2 parents 6dbd2ce + 65f0fec commit d80a202
Show file tree
Hide file tree
Showing 40 changed files with 851 additions and 233 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/ci.yml
Expand Up @@ -9,8 +9,8 @@ jobs:
strategy:
fail-fast: false
matrix:
go-version: [1.14.x]
kafka-version: [2.4.1, 2.6.0]
go-version: [1.15.x]
kafka-version: [2.5.1, 2.6.0]
platform: [ubuntu-latest]

env:
Expand Down Expand Up @@ -41,7 +41,7 @@ jobs:

- name: Install dependencies
run: |
curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.30.0
curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.31.0
export REPOSITORY_ROOT=${GITHUB_WORKSPACE}
- name: Run test suite
Expand Down
16 changes: 16 additions & 0 deletions CHANGELOG.md
Expand Up @@ -2,6 +2,22 @@

#### Unreleased

#### Version 1.27.1 (2020-10-07)

# Improvements

#1775 - @d1egoaz - Adds a Producer Interceptor example
#1781 - @justin-chen - Refresh brokers given list of seed brokers
#1784 - @justin-chen - Add randomize seed broker method
#1790 - @d1egoaz - remove example binary
#1798 - @bai - Test against Go 1.15
#1785 - @justin-chen - Add private method to Client interface to prevent implementation
#1802 - @uvw - Support Go 1.13 error unwrapping

# Fixes

#1791 - @stanislavkozlovski - bump default version to 1.0.0

#### Version 1.27.0 (2020-08-11)

# Improvements
Expand Down
72 changes: 36 additions & 36 deletions admin_test.go
Expand Up @@ -16,7 +16,7 @@ func TestClusterAdmin(t *testing.T) {
SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
})

config := NewConfig()
config := NewTestConfig()
config.Version = V1_0_0_0
admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
if err != nil {
Expand All @@ -38,7 +38,7 @@ func TestClusterAdminInvalidController(t *testing.T) {
SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
})

config := NewConfig()
config := NewTestConfig()
config.Version = V1_0_0_0
_, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
if err == nil {
Expand All @@ -61,7 +61,7 @@ func TestClusterAdminCreateTopic(t *testing.T) {
"CreateTopicsRequest": NewMockCreateTopicsResponse(t),
})

config := NewConfig()
config := NewTestConfig()
config.Version = V0_10_2_0
admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
if err != nil {
Expand Down Expand Up @@ -89,7 +89,7 @@ func TestClusterAdminCreateTopicWithInvalidTopicDetail(t *testing.T) {
"CreateTopicsRequest": NewMockCreateTopicsResponse(t),
})

config := NewConfig()
config := NewTestConfig()
config.Version = V0_10_2_0
admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
if err != nil {
Expand Down Expand Up @@ -117,7 +117,7 @@ func TestClusterAdminCreateTopicWithoutAuthorization(t *testing.T) {
"CreateTopicsRequest": NewMockCreateTopicsResponse(t),
})

config := NewConfig()
config := NewTestConfig()
config.Version = V0_11_0_0

admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
Expand Down Expand Up @@ -148,7 +148,7 @@ func TestClusterAdminListTopics(t *testing.T) {
"DescribeConfigsRequest": NewMockDescribeConfigsResponse(t),
})

config := NewConfig()
config := NewTestConfig()
config.Version = V1_1_0_0
admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
if err != nil {
Expand Down Expand Up @@ -198,7 +198,7 @@ func TestClusterAdminDeleteTopic(t *testing.T) {
"DeleteTopicsRequest": NewMockDeleteTopicsResponse(t),
})

config := NewConfig()
config := NewTestConfig()
config.Version = V0_10_2_0
admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
if err != nil {
Expand Down Expand Up @@ -227,7 +227,7 @@ func TestClusterAdminDeleteEmptyTopic(t *testing.T) {
"DeleteTopicsRequest": NewMockDeleteTopicsResponse(t),
})

config := NewConfig()
config := NewTestConfig()
config.Version = V0_10_2_0
admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
if err != nil {
Expand Down Expand Up @@ -256,7 +256,7 @@ func TestClusterAdminCreatePartitions(t *testing.T) {
"CreatePartitionsRequest": NewMockCreatePartitionsResponse(t),
})

config := NewConfig()
config := NewTestConfig()
config.Version = V1_0_0_0
admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
if err != nil {
Expand Down Expand Up @@ -285,7 +285,7 @@ func TestClusterAdminCreatePartitionsWithDiffVersion(t *testing.T) {
"CreatePartitionsRequest": NewMockCreatePartitionsResponse(t),
})

config := NewConfig()
config := NewTestConfig()
config.Version = V0_10_2_0
admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
if err != nil {
Expand Down Expand Up @@ -314,7 +314,7 @@ func TestClusterAdminCreatePartitionsWithoutAuthorization(t *testing.T) {
"CreatePartitionsRequest": NewMockCreatePartitionsResponse(t),
})

config := NewConfig()
config := NewTestConfig()
config.Version = V1_0_0_0
admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
if err != nil {
Expand Down Expand Up @@ -350,7 +350,7 @@ func TestClusterAdminAlterPartitionReassignments(t *testing.T) {
"AlterPartitionReassignmentsRequest": NewMockAlterPartitionReassignmentsResponse(t),
})

config := NewConfig()
config := NewTestConfig()
config.Version = V2_4_0_0
admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
if err != nil {
Expand Down Expand Up @@ -388,7 +388,7 @@ func TestClusterAdminAlterPartitionReassignmentsWithDiffVersion(t *testing.T) {
"AlterPartitionReassignmentsRequest": NewMockAlterPartitionReassignmentsResponse(t),
})

config := NewConfig()
config := NewTestConfig()
config.Version = V2_3_0_0
admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
if err != nil {
Expand Down Expand Up @@ -427,7 +427,7 @@ func TestClusterAdminListPartitionReassignments(t *testing.T) {
"ListPartitionReassignmentsRequest": NewMockListPartitionReassignmentsResponse(t),
})

config := NewConfig()
config := NewTestConfig()
config.Version = V2_4_0_0
admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
if err != nil {
Expand Down Expand Up @@ -472,7 +472,7 @@ func TestClusterAdminListPartitionReassignmentsWithDiffVersion(t *testing.T) {
"ListPartitionReassignmentsRequest": NewMockListPartitionReassignmentsResponse(t),
})

config := NewConfig()
config := NewTestConfig()
config.Version = V2_3_0_0
admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
if err != nil {
Expand Down Expand Up @@ -508,7 +508,7 @@ func TestClusterAdminDeleteRecords(t *testing.T) {
"DeleteRecordsRequest": NewMockDeleteRecordsResponse(t),
})

config := NewConfig()
config := NewTestConfig()
config.Version = V1_0_0_0
admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
if err != nil {
Expand Down Expand Up @@ -567,7 +567,7 @@ func TestClusterAdminDeleteRecordsWithInCorrectBroker(t *testing.T) {
"DeleteRecordsRequest": NewMockDeleteRecordsResponse(t),
})

config := NewConfig()
config := NewTestConfig()
config.Version = V1_0_0_0
admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
if err != nil {
Expand Down Expand Up @@ -604,7 +604,7 @@ func TestClusterAdminDeleteRecordsWithDiffVersion(t *testing.T) {
"DeleteRecordsRequest": NewMockDeleteRecordsResponse(t),
})

config := NewConfig()
config := NewTestConfig()
config.Version = V0_10_2_0
admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
if err != nil {
Expand Down Expand Up @@ -660,7 +660,7 @@ func TestClusterAdminDescribeConfig(t *testing.T) {
{V2_0_0_0, 2, true},
}
for _, tt := range tests {
config := NewConfig()
config := NewTestConfig()
config.Version = tt.saramaVersion
admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
if err != nil {
Expand Down Expand Up @@ -715,7 +715,7 @@ func TestClusterAdminDescribeConfigWithErrorCode(t *testing.T) {
"DescribeConfigsRequest": NewMockDescribeConfigsResponseWithErrorCode(t),
})

config := NewConfig()
config := NewTestConfig()
config.Version = V1_1_0_0
admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
if err != nil {
Expand Down Expand Up @@ -760,7 +760,7 @@ func TestClusterAdminDescribeBrokerConfig(t *testing.T) {
"DescribeConfigsRequest": NewMockDescribeConfigsResponse(t),
})

config := NewConfig()
config := NewTestConfig()
config.Version = V1_0_0_0
admin, err := NewClusterAdmin(
[]string{
Expand Down Expand Up @@ -800,7 +800,7 @@ func TestClusterAdminAlterConfig(t *testing.T) {
"AlterConfigsRequest": NewMockAlterConfigsResponse(t),
})

config := NewConfig()
config := NewTestConfig()
config.Version = V1_0_0_0
admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
if err != nil {
Expand Down Expand Up @@ -833,7 +833,7 @@ func TestClusterAdminAlterConfigWithErrorCode(t *testing.T) {
"AlterConfigsRequest": NewMockAlterConfigsResponseWithErrorCode(t),
})

config := NewConfig()
config := NewTestConfig()
config.Version = V1_0_0_0
admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
if err != nil {
Expand Down Expand Up @@ -873,7 +873,7 @@ func TestClusterAdminAlterBrokerConfig(t *testing.T) {
"AlterConfigsRequest": NewMockAlterConfigsResponse(t),
})

config := NewConfig()
config := NewTestConfig()
config.Version = V1_0_0_0
admin, err := NewClusterAdmin(
[]string{
Expand Down Expand Up @@ -918,7 +918,7 @@ func TestClusterAdminCreateAcl(t *testing.T) {
"CreateAclsRequest": NewMockCreateAclsResponse(t),
})

config := NewConfig()
config := NewTestConfig()
config.Version = V1_0_0_0
admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
if err != nil {
Expand Down Expand Up @@ -951,7 +951,7 @@ func TestClusterAdminListAcls(t *testing.T) {
"CreateAclsRequest": NewMockCreateAclsResponse(t),
})

config := NewConfig()
config := NewTestConfig()
config.Version = V1_0_0_0
admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
if err != nil {
Expand Down Expand Up @@ -997,7 +997,7 @@ func TestClusterAdminDeleteAcl(t *testing.T) {
"DeleteAclsRequest": NewMockDeleteAclsResponse(t),
})

config := NewConfig()
config := NewTestConfig()
config.Version = V1_0_0_0
admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
if err != nil {
Expand Down Expand Up @@ -1033,7 +1033,7 @@ func TestDescribeTopic(t *testing.T) {
SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
})

config := NewConfig()
config := NewTestConfig()
config.Version = V1_0_0_0

admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
Expand Down Expand Up @@ -1071,7 +1071,7 @@ func TestDescribeTopicWithVersion0_11(t *testing.T) {
SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
})

config := NewConfig()
config := NewTestConfig()
config.Version = V0_11_0_0

admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
Expand Down Expand Up @@ -1114,7 +1114,7 @@ func TestDescribeConsumerGroup(t *testing.T) {
"FindCoordinatorRequest": NewMockFindCoordinatorResponse(t).SetCoordinator(CoordinatorGroup, expectedGroupID, seedBroker),
})

config := NewConfig()
config := NewTestConfig()
config.Version = V1_0_0_0

admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
Expand Down Expand Up @@ -1153,7 +1153,7 @@ func TestListConsumerGroups(t *testing.T) {
AddGroup("my-group", "consumer"),
})

config := NewConfig()
config := NewTestConfig()
config.Version = V1_0_0_0

admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
Expand Down Expand Up @@ -1215,7 +1215,7 @@ func TestListConsumerGroupsMultiBroker(t *testing.T) {
AddGroup(secondGroup, "consumer"),
})

config := NewConfig()
config := NewTestConfig()
config.Version = V1_0_0_0

admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
Expand Down Expand Up @@ -1267,7 +1267,7 @@ func TestListConsumerGroupOffsets(t *testing.T) {
"FindCoordinatorRequest": NewMockFindCoordinatorResponse(t).SetCoordinator(CoordinatorGroup, group, seedBroker),
})

config := NewConfig()
config := NewTestConfig()
config.Version = V1_0_0_0

admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
Expand Down Expand Up @@ -1312,7 +1312,7 @@ func TestDeleteConsumerGroup(t *testing.T) {
"FindCoordinatorRequest": NewMockFindCoordinatorResponse(t).SetCoordinator(CoordinatorGroup, group, seedBroker),
})

config := NewConfig()
config := NewTestConfig()
config.Version = V1_1_0_0

admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
Expand Down Expand Up @@ -1341,7 +1341,7 @@ func TestRefreshMetaDataWithDifferentController(t *testing.T) {
SetBroker(seedBroker2.Addr(), seedBroker2.BrokerID()),
})

config := NewConfig()
config := NewTestConfig()
config.Version = V1_1_0_0

client, err := NewClient([]string{seedBroker1.Addr()}, config)
Expand Down Expand Up @@ -1386,7 +1386,7 @@ func TestDescribeLogDirs(t *testing.T) {
SetLogDirs("/tmp/logs", map[string]int{"topic1": 2, "topic2": 2}),
})

config := NewConfig()
config := NewTestConfig()
config.Version = V1_0_0_0

admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
Expand Down
4 changes: 4 additions & 0 deletions async_producer.go
Expand Up @@ -267,6 +267,10 @@ func (pe ProducerError) Error() string {
return fmt.Sprintf("kafka: Failed to produce message to topic %s: %s", pe.Msg.Topic, pe.Err)
}

func (pe ProducerError) Unwrap() error {
return pe.Err
}

// ProducerErrors is a type that wraps a batch of "ProducerError"s and implements the Error interface.
// It can be returned from the Producer's Close method to avoid the need to manually drain the Errors channel
// when closing a producer.
Expand Down

0 comments on commit d80a202

Please sign in to comment.