Skip to content

Commit

Permalink
Merge branch 'master' into cd/1577/consumer-group-mocks
Browse files Browse the repository at this point in the history
  • Loading branch information
Kranti Deep committed Jul 12, 2020
2 parents e81f001 + 5933302 commit ac5ca79
Show file tree
Hide file tree
Showing 139 changed files with 3,374 additions and 810 deletions.
20 changes: 4 additions & 16 deletions .github/workflows/ci.yml
Expand Up @@ -9,18 +9,13 @@ jobs:
strategy:
fail-fast: false
matrix:
go-version: [1.12.x, 1.13.x, 1.14.x]
kafka-version: [2.3.1, 2.4.0]
go-version: [1.14.x]
kafka-version: [2.4.1, 2.5.0]
platform: [ubuntu-latest]

env:
KAFKA_PEERS: localhost:9091,localhost:9092,localhost:9093,localhost:9094,localhost:9095
TOXIPROXY_ADDR: http://localhost:8474
KAFKA_INSTALL_ROOT: /home/runner/kafka
KAFKA_HOSTNAME: localhost
DEBUG: true
KAFKA_VERSION: ${{ matrix.kafka-version }}
KAFKA_SCALA_VERSION: 2.12

steps:
- uses: actions/checkout@v1
Expand All @@ -46,18 +41,11 @@ jobs:

- name: Install dependencies
run: |
curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.23.6
curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.27.0
export REPOSITORY_ROOT=${GITHUB_WORKSPACE}
vagrant/install_cluster.sh
vagrant/boot_cluster.sh
vagrant/create_topics.sh
vagrant/run_java_producer.sh
- name: Run test suite
run: make test
run: make test_functional

- name: Run linter
run: make lint

- name: Teardown
run: vagrant/halt_cluster.sh
3 changes: 3 additions & 0 deletions .gitignore
Expand Up @@ -25,3 +25,6 @@ _testmain.go

coverage.txt
profile.out

simplest-uncommitted-msg-0.1-jar-with-dependencies.jar

1 change: 1 addition & 0 deletions .golangci.yml
Expand Up @@ -72,3 +72,4 @@ issues:
exclude:
- consider giving a name to these results
- include an explanation for nolint directive
- Potential Integer overflow made by strconv.Atoi result conversion to int16/32
6 changes: 5 additions & 1 deletion Makefile
Expand Up @@ -21,7 +21,11 @@ fmt:
gofmt -s -l -w $(FILES) $(TESTS)

lint:
golangci-lint run
GOFLAGS="-tags=functional" golangci-lint run

test:
$(GOTEST) ./...

.PHONY: test_functional
test_functional:
$(GOTEST) -tags=functional ./...
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -20,7 +20,7 @@ You might also want to look at the [Frequently Asked Questions](https://github.c
Sarama provides a "2 releases + 2 months" compatibility guarantee: we support
the two latest stable releases of Kafka and Go, and we provide a two month
grace period for older releases. This means we currently officially support
Go 1.12 through 1.14, and Kafka 2.1 through 2.4, although older releases are
Go 1.12 through 1.14, and Kafka 2.3 through 2.5, although older releases are
still likely to work.

Sarama follows semantic versioning and provides API stability via the gopkg.in service.
Expand Down
4 changes: 4 additions & 0 deletions acl_create_request.go
Expand Up @@ -47,6 +47,10 @@ func (c *CreateAclsRequest) version() int16 {
return c.Version
}

func (c *CreateAclsRequest) headerVersion() int16 {
return 1
}

func (c *CreateAclsRequest) requiredVersion() KafkaVersion {
switch c.Version {
case 1:
Expand Down
6 changes: 5 additions & 1 deletion acl_create_response.go
Expand Up @@ -2,7 +2,7 @@ package sarama

import "time"

//CreateAclsResponse is a an acl reponse creation type
//CreateAclsResponse is a an acl response creation type
type CreateAclsResponse struct {
ThrottleTime time.Duration
AclCreationResponses []*AclCreationResponse
Expand Down Expand Up @@ -55,6 +55,10 @@ func (c *CreateAclsResponse) version() int16 {
return 0
}

func (c *CreateAclsResponse) headerVersion() int16 {
return 0
}

func (c *CreateAclsResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
}
Expand Down
4 changes: 4 additions & 0 deletions acl_delete_request.go
Expand Up @@ -48,6 +48,10 @@ func (d *DeleteAclsRequest) version() int16 {
return int16(d.Version)
}

func (c *DeleteAclsRequest) headerVersion() int16 {
return 1
}

func (d *DeleteAclsRequest) requiredVersion() KafkaVersion {
switch d.Version {
case 1:
Expand Down
4 changes: 4 additions & 0 deletions acl_delete_response.go
Expand Up @@ -56,6 +56,10 @@ func (d *DeleteAclsResponse) version() int16 {
return d.Version
}

func (d *DeleteAclsResponse) headerVersion() int16 {
return 0
}

func (d *DeleteAclsResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
}
Expand Down
4 changes: 4 additions & 0 deletions acl_describe_request.go
Expand Up @@ -25,6 +25,10 @@ func (d *DescribeAclsRequest) version() int16 {
return int16(d.Version)
}

func (d *DescribeAclsRequest) headerVersion() int16 {
return 1
}

func (d *DescribeAclsRequest) requiredVersion() KafkaVersion {
switch d.Version {
case 1:
Expand Down
4 changes: 4 additions & 0 deletions acl_describe_response.go
Expand Up @@ -77,6 +77,10 @@ func (d *DescribeAclsResponse) version() int16 {
return d.Version
}

func (d *DescribeAclsResponse) headerVersion() int16 {
return 0
}

func (d *DescribeAclsResponse) requiredVersion() KafkaVersion {
switch d.Version {
case 1:
Expand Down
4 changes: 4 additions & 0 deletions add_offsets_to_txn_request.go
Expand Up @@ -48,6 +48,10 @@ func (a *AddOffsetsToTxnRequest) version() int16 {
return 0
}

func (a *AddOffsetsToTxnRequest) headerVersion() int16 {
return 1
}

func (a *AddOffsetsToTxnRequest) requiredVersion() KafkaVersion {
return V0_11_0_0
}
4 changes: 4 additions & 0 deletions add_offsets_to_txn_response.go
Expand Up @@ -40,6 +40,10 @@ func (a *AddOffsetsToTxnResponse) version() int16 {
return 0
}

func (a *AddOffsetsToTxnResponse) headerVersion() int16 {
return 0
}

func (a *AddOffsetsToTxnResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
}
4 changes: 4 additions & 0 deletions add_partitions_to_txn_request.go
Expand Up @@ -72,6 +72,10 @@ func (a *AddPartitionsToTxnRequest) version() int16 {
return 0
}

func (a *AddPartitionsToTxnRequest) headerVersion() int16 {
return 1
}

func (a *AddPartitionsToTxnRequest) requiredVersion() KafkaVersion {
return V0_11_0_0
}
4 changes: 4 additions & 0 deletions add_partitions_to_txn_response.go
Expand Up @@ -79,6 +79,10 @@ func (a *AddPartitionsToTxnResponse) version() int16 {
return 0
}

func (a *AddPartitionsToTxnResponse) headerVersion() int16 {
return 0
}

func (a *AddPartitionsToTxnResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
}
Expand Down
138 changes: 138 additions & 0 deletions admin.go
Expand Up @@ -42,6 +42,14 @@ type ClusterAdmin interface {
// new partitions. This operation is supported by brokers with version 1.0.0 or higher.
CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error

// Alter the replica assignment for partitions.
// This operation is supported by brokers with version 2.4.0.0 or higher.
AlterPartitionReassignments(topic string, assignment [][]int32) error

// Provides info on ongoing partitions replica reassignments.
// This operation is supported by brokers with version 2.4.0.0 or higher.
ListPartitionReassignments(topics string, partitions []int32) (topicStatus map[string]map[int32]*PartitionReplicaReassignmentsStatus, err error)

// Delete records whose offset is smaller than the given offset of the corresponding partition.
// This operation is supported by brokers with version 0.11.0.0 or higher.
DeleteRecords(topic string, partitionOffsets map[int32]int64) error
Expand Down Expand Up @@ -93,6 +101,9 @@ type ClusterAdmin interface {
// Get information about the nodes in the cluster
DescribeCluster() (brokers []*Broker, controllerID int32, err error)

// Get information about all log directories on the given set of brokers
DescribeLogDirs(brokers []int32) (map[int32][]DescribeLogDirsResponseDirMetadata, error)

// Close shuts down the admin and closes underlying client.
Close() error
}
Expand Down Expand Up @@ -452,6 +463,82 @@ func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [
})
}

func (ca *clusterAdmin) AlterPartitionReassignments(topic string, assignment [][]int32) error {
if topic == "" {
return ErrInvalidTopic
}

request := &AlterPartitionReassignmentsRequest{
TimeoutMs: int32(60000),
Version: int16(0),
}

for i := 0; i < len(assignment); i++ {
request.AddBlock(topic, int32(i), assignment[i])
}

return ca.retryOnError(isErrNoController, func() error {
b, err := ca.Controller()
if err != nil {
return err
}

errs := make([]error, 0)

rsp, err := b.AlterPartitionReassignments(request)

if err != nil {
errs = append(errs, err)
} else {
if rsp.ErrorCode > 0 {
errs = append(errs, errors.New(rsp.ErrorCode.Error()))
}

for topic, topicErrors := range rsp.Errors {
for partition, partitionError := range topicErrors {
if partitionError.errorCode != ErrNoError {
errStr := fmt.Sprintf("[%s-%d]: %s", topic, partition, partitionError.errorCode.Error())
errs = append(errs, errors.New(errStr))
}
}
}
}

if len(errs) > 0 {
return ErrReassignPartitions{MultiError{&errs}}
}

return nil
})
}

func (ca *clusterAdmin) ListPartitionReassignments(topic string, partitions []int32) (topicStatus map[string]map[int32]*PartitionReplicaReassignmentsStatus, err error) {
if topic == "" {
return nil, ErrInvalidTopic
}

request := &ListPartitionReassignmentsRequest{
TimeoutMs: int32(60000),
Version: int16(0),
}

request.AddBlock(topic, partitions)

b, err := ca.Controller()
if err != nil {
return nil, err
}
_ = b.Open(ca.client.Config())

rsp, err := b.ListPartitionReassignments(request)

if err == nil && rsp != nil {
return rsp.TopicStatus, nil
} else {
return nil, err
}
}

func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]int64) error {
if topic == "" {
return ErrInvalidTopic
Expand Down Expand Up @@ -556,6 +643,9 @@ func (ca *clusterAdmin) DescribeConfig(resource ConfigResource) ([]ConfigEntry,
if rspResource.ErrorMsg != "" {
return nil, errors.New(rspResource.ErrorMsg)
}
if rspResource.ErrorCode != 0 {
return nil, KError(rspResource.ErrorCode)
}
for _, cfgEntry := range rspResource.Configs {
entries = append(entries, *cfgEntry)
}
Expand Down Expand Up @@ -604,6 +694,9 @@ func (ca *clusterAdmin) AlterConfig(resourceType ConfigResourceType, name string
if rspResource.ErrorMsg != "" {
return errors.New(rspResource.ErrorMsg)
}
if rspResource.ErrorCode != 0 {
return KError(rspResource.ErrorCode)
}
}
}
return nil
Expand Down Expand Up @@ -794,3 +887,48 @@ func (ca *clusterAdmin) DeleteConsumerGroup(group string) error {

return nil
}

func (ca *clusterAdmin) DescribeLogDirs(brokerIds []int32) (allLogDirs map[int32][]DescribeLogDirsResponseDirMetadata, err error) {
allLogDirs = make(map[int32][]DescribeLogDirsResponseDirMetadata)

// Query brokers in parallel, since we may have to query multiple brokers
logDirsMaps := make(chan map[int32][]DescribeLogDirsResponseDirMetadata, len(brokerIds))
errors := make(chan error, len(brokerIds))
wg := sync.WaitGroup{}

for _, b := range brokerIds {
wg.Add(1)
broker, err := ca.findBroker(b)
if err != nil {
Logger.Printf("Unable to find broker with ID = %v\n", b)
continue
}
go func(b *Broker, conf *Config) {
defer wg.Done()
_ = b.Open(conf) // Ensure that broker is opened

response, err := b.DescribeLogDirs(&DescribeLogDirsRequest{})
if err != nil {
errors <- err
return
}
logDirs := make(map[int32][]DescribeLogDirsResponseDirMetadata)
logDirs[b.ID()] = response.LogDirs
logDirsMaps <- logDirs
}(broker, ca.conf)
}

wg.Wait()
close(logDirsMaps)
close(errors)

for logDirsMap := range logDirsMaps {
for id, logDirs := range logDirsMap {
allLogDirs[id] = logDirs
}
}

// Intentionally return only the first error for simplicity
err = <-errors
return
}

0 comments on commit ac5ca79

Please sign in to comment.