diff --git a/.gitignore b/.gitignore index eb4b19509..2c9adc20b 100644 --- a/.gitignore +++ b/.gitignore @@ -27,4 +27,3 @@ coverage.txt profile.out simplest-uncommitted-msg-0.1-jar-with-dependencies.jar - diff --git a/admin.go b/admin.go index 0430d9841..9dea0255f 100644 --- a/admin.go +++ b/admin.go @@ -802,7 +802,7 @@ func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err e // Query brokers in parallel, since we have to query *all* brokers brokers := ca.client.Brokers() groupMaps := make(chan map[string]string, len(brokers)) - errors := make(chan error, len(brokers)) + errChan := make(chan error, len(brokers)) wg := sync.WaitGroup{} for _, b := range brokers { @@ -813,7 +813,7 @@ func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err e response, err := b.ListGroups(&ListGroupsRequest{}) if err != nil { - errors <- err + errChan <- err return } @@ -828,7 +828,7 @@ func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err e wg.Wait() close(groupMaps) - close(errors) + close(errChan) for groupMap := range groupMaps { for group, protocolType := range groupMap { @@ -837,7 +837,7 @@ func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err e } // Intentionally return only the first error for simplicity - err = <-errors + err = <-errChan return } @@ -893,7 +893,7 @@ func (ca *clusterAdmin) DescribeLogDirs(brokerIds []int32) (allLogDirs map[int32 // Query brokers in parallel, since we may have to query multiple brokers logDirsMaps := make(chan map[int32][]DescribeLogDirsResponseDirMetadata, len(brokerIds)) - errors := make(chan error, len(brokerIds)) + errChan := make(chan error, len(brokerIds)) wg := sync.WaitGroup{} for _, b := range brokerIds { @@ -909,7 +909,7 @@ func (ca *clusterAdmin) DescribeLogDirs(brokerIds []int32) (allLogDirs map[int32 response, err := b.DescribeLogDirs(&DescribeLogDirsRequest{}) if err != nil { - errors <- err + errChan <- err return } logDirs := make(map[int32][]DescribeLogDirsResponseDirMetadata) @@ -920,7 +920,7 @@ func (ca *clusterAdmin) DescribeLogDirs(brokerIds []int32) (allLogDirs map[int32 wg.Wait() close(logDirsMaps) - close(errors) + close(errChan) for logDirsMap := range logDirsMaps { for id, logDirs := range logDirsMap { @@ -929,6 +929,6 @@ func (ca *clusterAdmin) DescribeLogDirs(brokerIds []int32) (allLogDirs map[int32 } // Intentionally return only the first error for simplicity - err = <-errors + err = <-errChan return } diff --git a/async_producer_test.go b/async_producer_test.go index 46b97790a..d53cd3d15 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -1248,7 +1248,7 @@ func ExampleAsyncProducer_select() { signals := make(chan os.Signal, 1) signal.Notify(signals, os.Interrupt) - var enqueued, errors int + var enqueued, producerErrors int ProducerLoop: for { select { @@ -1256,13 +1256,13 @@ ProducerLoop: enqueued++ case err := <-producer.Errors(): log.Println("Failed to produce message", err) - errors++ + producerErrors++ case <-signals: break ProducerLoop } } - log.Printf("Enqueued: %d; errors: %d\n", enqueued, errors) + log.Printf("Enqueued: %d; errors: %d\n", enqueued, producerErrors) } // This example shows how to use the producer with separate goroutines @@ -1282,8 +1282,8 @@ func ExampleAsyncProducer_goroutines() { signal.Notify(signals, os.Interrupt) var ( - wg sync.WaitGroup - enqueued, successes, errors int + wg sync.WaitGroup + enqueued, successes, producerErrors int ) wg.Add(1) @@ -1299,7 +1299,7 @@ func ExampleAsyncProducer_goroutines() { defer wg.Done() for err := range producer.Errors() { log.Println(err) - errors++ + producerErrors++ } }() @@ -1318,5 +1318,5 @@ ProducerLoop: wg.Wait() - log.Printf("Successfully produced: %d; errors: %d\n", successes, errors) + log.Printf("Successfully produced: %d; errors: %d\n", successes, producerErrors) } diff --git a/balance_strategy.go b/balance_strategy.go index d9789a026..a3bf22cb0 100644 --- a/balance_strategy.go +++ b/balance_strategy.go @@ -690,11 +690,11 @@ func sortPartitionsByPotentialConsumerAssignments(partition2AllPotentialConsumer } func deepCopyAssignment(assignment map[string][]topicPartitionAssignment) map[string][]topicPartitionAssignment { - copy := make(map[string][]topicPartitionAssignment, len(assignment)) + m := make(map[string][]topicPartitionAssignment, len(assignment)) for memberID, subscriptions := range assignment { - copy[memberID] = append(subscriptions[:0:0], subscriptions...) + m[memberID] = append(subscriptions[:0:0], subscriptions...) } - return copy + return m } func areSubscriptionsIdentical(partition2AllPotentialConsumers map[topicPartitionAssignment][]string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment) bool { diff --git a/consumer.go b/consumer.go index e16d08aa9..7b703b0b3 100644 --- a/consumer.go +++ b/consumer.go @@ -422,13 +422,13 @@ func (child *partitionConsumer) AsyncClose() { func (child *partitionConsumer) Close() error { child.AsyncClose() - var errors ConsumerErrors + var consumerErrors ConsumerErrors for err := range child.errors { - errors = append(errors, err) + consumerErrors = append(consumerErrors, err) } - if len(errors) > 0 { - return errors + if len(consumerErrors) > 0 { + return consumerErrors } return nil } diff --git a/consumer_group.go b/consumer_group.go index aae6599ca..fcc5792ea 100644 --- a/consumer_group.go +++ b/consumer_group.go @@ -255,36 +255,36 @@ func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler } // Sync consumer group - sync, err := c.syncGroupRequest(coordinator, plan, join.GenerationId) + groupRequest, err := c.syncGroupRequest(coordinator, plan, join.GenerationId) if err != nil { _ = coordinator.Close() return nil, err } - switch sync.Err { + switch groupRequest.Err { case ErrNoError: case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately c.memberID = "" return c.newSession(ctx, topics, handler, retries) case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refresh if retries <= 0 { - return nil, sync.Err + return nil, groupRequest.Err } return c.retryNewSession(ctx, topics, handler, retries, true) case ErrRebalanceInProgress: // retry after backoff if retries <= 0 { - return nil, sync.Err + return nil, groupRequest.Err } return c.retryNewSession(ctx, topics, handler, retries, false) default: - return nil, sync.Err + return nil, groupRequest.Err } // Retrieve and sort claims var claims map[string][]int32 - if len(sync.MemberAssignment) > 0 { - members, err := sync.GetMemberAssignment() + if len(groupRequest.MemberAssignment) > 0 { + members, err := groupRequest.GetMemberAssignment() if err != nil { return nil, err }