Skip to content

Commit

Permalink
Merge pull request #1738 from varun06/varun-fix-package-name-as-ident…
Browse files Browse the repository at this point in the history
…ifier

fixed variable names that are named same as some std lib package names
  • Loading branch information
d1egoaz committed Jul 15, 2020
2 parents bf41722 + 3f8f953 commit 4bc9b8f
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 30 deletions.
1 change: 0 additions & 1 deletion .gitignore
Expand Up @@ -27,4 +27,3 @@ coverage.txt
profile.out

simplest-uncommitted-msg-0.1-jar-with-dependencies.jar

16 changes: 8 additions & 8 deletions admin.go
Expand Up @@ -802,7 +802,7 @@ func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err e
// Query brokers in parallel, since we have to query *all* brokers
brokers := ca.client.Brokers()
groupMaps := make(chan map[string]string, len(brokers))
errors := make(chan error, len(brokers))
errChan := make(chan error, len(brokers))
wg := sync.WaitGroup{}

for _, b := range brokers {
Expand All @@ -813,7 +813,7 @@ func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err e

response, err := b.ListGroups(&ListGroupsRequest{})
if err != nil {
errors <- err
errChan <- err
return
}

Expand All @@ -828,7 +828,7 @@ func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err e

wg.Wait()
close(groupMaps)
close(errors)
close(errChan)

for groupMap := range groupMaps {
for group, protocolType := range groupMap {
Expand All @@ -837,7 +837,7 @@ func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err e
}

// Intentionally return only the first error for simplicity
err = <-errors
err = <-errChan
return
}

Expand Down Expand Up @@ -893,7 +893,7 @@ func (ca *clusterAdmin) DescribeLogDirs(brokerIds []int32) (allLogDirs map[int32

// Query brokers in parallel, since we may have to query multiple brokers
logDirsMaps := make(chan map[int32][]DescribeLogDirsResponseDirMetadata, len(brokerIds))
errors := make(chan error, len(brokerIds))
errChan := make(chan error, len(brokerIds))
wg := sync.WaitGroup{}

for _, b := range brokerIds {
Expand All @@ -909,7 +909,7 @@ func (ca *clusterAdmin) DescribeLogDirs(brokerIds []int32) (allLogDirs map[int32

response, err := b.DescribeLogDirs(&DescribeLogDirsRequest{})
if err != nil {
errors <- err
errChan <- err
return
}
logDirs := make(map[int32][]DescribeLogDirsResponseDirMetadata)
Expand All @@ -920,7 +920,7 @@ func (ca *clusterAdmin) DescribeLogDirs(brokerIds []int32) (allLogDirs map[int32

wg.Wait()
close(logDirsMaps)
close(errors)
close(errChan)

for logDirsMap := range logDirsMaps {
for id, logDirs := range logDirsMap {
Expand All @@ -929,6 +929,6 @@ func (ca *clusterAdmin) DescribeLogDirs(brokerIds []int32) (allLogDirs map[int32
}

// Intentionally return only the first error for simplicity
err = <-errors
err = <-errChan
return
}
14 changes: 7 additions & 7 deletions async_producer_test.go
Expand Up @@ -1369,21 +1369,21 @@ func ExampleAsyncProducer_select() {
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)

var enqueued, errors int
var enqueued, producerErrors int
ProducerLoop:
for {
select {
case producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder("testing 123")}:
enqueued++
case err := <-producer.Errors():
log.Println("Failed to produce message", err)
errors++
producerErrors++
case <-signals:
break ProducerLoop
}
}

log.Printf("Enqueued: %d; errors: %d\n", enqueued, errors)
log.Printf("Enqueued: %d; errors: %d\n", enqueued, producerErrors)
}

// This example shows how to use the producer with separate goroutines
Expand All @@ -1403,8 +1403,8 @@ func ExampleAsyncProducer_goroutines() {
signal.Notify(signals, os.Interrupt)

var (
wg sync.WaitGroup
enqueued, successes, errors int
wg sync.WaitGroup
enqueued, successes, producerErrors int
)

wg.Add(1)
Expand All @@ -1420,7 +1420,7 @@ func ExampleAsyncProducer_goroutines() {
defer wg.Done()
for err := range producer.Errors() {
log.Println(err)
errors++
producerErrors++
}
}()

Expand All @@ -1439,5 +1439,5 @@ ProducerLoop:

wg.Wait()

log.Printf("Successfully produced: %d; errors: %d\n", successes, errors)
log.Printf("Successfully produced: %d; errors: %d\n", successes, producerErrors)
}
6 changes: 3 additions & 3 deletions balance_strategy.go
Expand Up @@ -690,11 +690,11 @@ func sortPartitionsByPotentialConsumerAssignments(partition2AllPotentialConsumer
}

func deepCopyAssignment(assignment map[string][]topicPartitionAssignment) map[string][]topicPartitionAssignment {
copy := make(map[string][]topicPartitionAssignment, len(assignment))
m := make(map[string][]topicPartitionAssignment, len(assignment))
for memberID, subscriptions := range assignment {
copy[memberID] = append(subscriptions[:0:0], subscriptions...)
m[memberID] = append(subscriptions[:0:0], subscriptions...)
}
return copy
return m
}

func areSubscriptionsIdentical(partition2AllPotentialConsumers map[topicPartitionAssignment][]string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment) bool {
Expand Down
8 changes: 4 additions & 4 deletions consumer.go
Expand Up @@ -422,13 +422,13 @@ func (child *partitionConsumer) AsyncClose() {
func (child *partitionConsumer) Close() error {
child.AsyncClose()

var errors ConsumerErrors
var consumerErrors ConsumerErrors
for err := range child.errors {
errors = append(errors, err)
consumerErrors = append(consumerErrors, err)
}

if len(errors) > 0 {
return errors
if len(consumerErrors) > 0 {
return consumerErrors
}
return nil
}
Expand Down
14 changes: 7 additions & 7 deletions consumer_group.go
Expand Up @@ -255,36 +255,36 @@ func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler
}

// Sync consumer group
sync, err := c.syncGroupRequest(coordinator, plan, join.GenerationId)
groupRequest, err := c.syncGroupRequest(coordinator, plan, join.GenerationId)
if err != nil {
_ = coordinator.Close()
return nil, err
}
switch sync.Err {
switch groupRequest.Err {
case ErrNoError:
case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately
c.memberID = ""
return c.newSession(ctx, topics, handler, retries)
case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refresh
if retries <= 0 {
return nil, sync.Err
return nil, groupRequest.Err
}

return c.retryNewSession(ctx, topics, handler, retries, true)
case ErrRebalanceInProgress: // retry after backoff
if retries <= 0 {
return nil, sync.Err
return nil, groupRequest.Err
}

return c.retryNewSession(ctx, topics, handler, retries, false)
default:
return nil, sync.Err
return nil, groupRequest.Err
}

// Retrieve and sort claims
var claims map[string][]int32
if len(sync.MemberAssignment) > 0 {
members, err := sync.GetMemberAssignment()
if len(groupRequest.MemberAssignment) > 0 {
members, err := groupRequest.GetMemberAssignment()
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 4bc9b8f

Please sign in to comment.