Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixed variable names that are named same as some std lib package names #1738

Merged
merged 3 commits into from Jul 15, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 0 additions & 1 deletion .gitignore
Expand Up @@ -27,4 +27,3 @@ coverage.txt
profile.out

simplest-uncommitted-msg-0.1-jar-with-dependencies.jar

16 changes: 8 additions & 8 deletions admin.go
Expand Up @@ -802,7 +802,7 @@ func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err e
// Query brokers in parallel, since we have to query *all* brokers
brokers := ca.client.Brokers()
groupMaps := make(chan map[string]string, len(brokers))
errors := make(chan error, len(brokers))
errChan := make(chan error, len(brokers))
wg := sync.WaitGroup{}

for _, b := range brokers {
Expand All @@ -813,7 +813,7 @@ func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err e

response, err := b.ListGroups(&ListGroupsRequest{})
if err != nil {
errors <- err
errChan <- err
return
}

Expand All @@ -828,7 +828,7 @@ func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err e

wg.Wait()
close(groupMaps)
close(errors)
close(errChan)

for groupMap := range groupMaps {
for group, protocolType := range groupMap {
Expand All @@ -837,7 +837,7 @@ func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err e
}

// Intentionally return only the first error for simplicity
err = <-errors
err = <-errChan
return
}

Expand Down Expand Up @@ -893,7 +893,7 @@ func (ca *clusterAdmin) DescribeLogDirs(brokerIds []int32) (allLogDirs map[int32

// Query brokers in parallel, since we may have to query multiple brokers
logDirsMaps := make(chan map[int32][]DescribeLogDirsResponseDirMetadata, len(brokerIds))
errors := make(chan error, len(brokerIds))
errChan := make(chan error, len(brokerIds))
wg := sync.WaitGroup{}

for _, b := range brokerIds {
Expand All @@ -909,7 +909,7 @@ func (ca *clusterAdmin) DescribeLogDirs(brokerIds []int32) (allLogDirs map[int32

response, err := b.DescribeLogDirs(&DescribeLogDirsRequest{})
if err != nil {
errors <- err
errChan <- err
return
}
logDirs := make(map[int32][]DescribeLogDirsResponseDirMetadata)
Expand All @@ -920,7 +920,7 @@ func (ca *clusterAdmin) DescribeLogDirs(brokerIds []int32) (allLogDirs map[int32

wg.Wait()
close(logDirsMaps)
close(errors)
close(errChan)

for logDirsMap := range logDirsMaps {
for id, logDirs := range logDirsMap {
Expand All @@ -929,6 +929,6 @@ func (ca *clusterAdmin) DescribeLogDirs(brokerIds []int32) (allLogDirs map[int32
}

// Intentionally return only the first error for simplicity
err = <-errors
err = <-errChan
return
}
14 changes: 7 additions & 7 deletions async_producer_test.go
Expand Up @@ -1248,21 +1248,21 @@ func ExampleAsyncProducer_select() {
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)

var enqueued, errors int
var enqueued, producerErrors int
ProducerLoop:
for {
select {
case producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder("testing 123")}:
enqueued++
case err := <-producer.Errors():
log.Println("Failed to produce message", err)
errors++
producerErrors++
case <-signals:
break ProducerLoop
}
}

log.Printf("Enqueued: %d; errors: %d\n", enqueued, errors)
log.Printf("Enqueued: %d; errors: %d\n", enqueued, producerErrors)
}

// This example shows how to use the producer with separate goroutines
Expand All @@ -1282,8 +1282,8 @@ func ExampleAsyncProducer_goroutines() {
signal.Notify(signals, os.Interrupt)

var (
wg sync.WaitGroup
enqueued, successes, errors int
wg sync.WaitGroup
enqueued, successes, producerErrors int
)

wg.Add(1)
Expand All @@ -1299,7 +1299,7 @@ func ExampleAsyncProducer_goroutines() {
defer wg.Done()
for err := range producer.Errors() {
log.Println(err)
errors++
producerErrors++
}
}()

Expand All @@ -1318,5 +1318,5 @@ ProducerLoop:

wg.Wait()

log.Printf("Successfully produced: %d; errors: %d\n", successes, errors)
log.Printf("Successfully produced: %d; errors: %d\n", successes, producerErrors)
}
6 changes: 3 additions & 3 deletions balance_strategy.go
Expand Up @@ -690,11 +690,11 @@ func sortPartitionsByPotentialConsumerAssignments(partition2AllPotentialConsumer
}

func deepCopyAssignment(assignment map[string][]topicPartitionAssignment) map[string][]topicPartitionAssignment {
copy := make(map[string][]topicPartitionAssignment, len(assignment))
m := make(map[string][]topicPartitionAssignment, len(assignment))
for memberID, subscriptions := range assignment {
copy[memberID] = append(subscriptions[:0:0], subscriptions...)
m[memberID] = append(subscriptions[:0:0], subscriptions...)
}
return copy
return m
}

func areSubscriptionsIdentical(partition2AllPotentialConsumers map[topicPartitionAssignment][]string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment) bool {
Expand Down
8 changes: 4 additions & 4 deletions consumer.go
Expand Up @@ -422,13 +422,13 @@ func (child *partitionConsumer) AsyncClose() {
func (child *partitionConsumer) Close() error {
child.AsyncClose()

var errors ConsumerErrors
var consumerErrors ConsumerErrors
varun06 marked this conversation as resolved.
Show resolved Hide resolved
for err := range child.errors {
errors = append(errors, err)
consumerErrors = append(consumerErrors, err)
}

if len(errors) > 0 {
return errors
if len(consumerErrors) > 0 {
return consumerErrors
}
return nil
}
Expand Down
14 changes: 7 additions & 7 deletions consumer_group.go
Expand Up @@ -255,36 +255,36 @@ func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler
}

// Sync consumer group
sync, err := c.syncGroupRequest(coordinator, plan, join.GenerationId)
groupRequest, err := c.syncGroupRequest(coordinator, plan, join.GenerationId)
if err != nil {
_ = coordinator.Close()
return nil, err
}
switch sync.Err {
switch groupRequest.Err {
case ErrNoError:
case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately
c.memberID = ""
return c.newSession(ctx, topics, handler, retries)
case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refresh
if retries <= 0 {
return nil, sync.Err
return nil, groupRequest.Err
}

return c.retryNewSession(ctx, topics, handler, retries, true)
case ErrRebalanceInProgress: // retry after backoff
if retries <= 0 {
return nil, sync.Err
return nil, groupRequest.Err
}

return c.retryNewSession(ctx, topics, handler, retries, false)
default:
return nil, sync.Err
return nil, groupRequest.Err
}

// Retrieve and sort claims
var claims map[string][]int32
if len(sync.MemberAssignment) > 0 {
members, err := sync.GetMemberAssignment()
if len(groupRequest.MemberAssignment) > 0 {
members, err := groupRequest.GetMemberAssignment()
if err != nil {
return nil, err
}
Expand Down