From 9711ba1bee9c5a6397245fbb73bd3cf9cd79866f Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Fri, 17 Jun 2022 12:29:10 +0200 Subject: [PATCH] KIP-140: ACL operations (#796) Bindings for ACL operations: CreateAcls DescribeAcls DeleteAcls --- CHANGELOG.md | 1 + examples/.gitignore | 3 + examples/README.md | 3 + .../admin_create_acls/admin_create_acls.go | 147 ++++ .../admin_create_topic/admin_create_topic.go | 9 +- .../admin_delete_acls/admin_delete_acls.go | 147 ++++ .../admin_delete_topics.go | 9 +- .../admin_describe_acls.go | 145 ++++ .../admin_describe_config.go | 11 +- .../consumer_channel_example.go | 9 +- examples/consumer_example/consumer_example.go | 9 +- .../consumer_offset_metadata.go | 11 +- .../cooperative_consumer_example.go | 9 +- .../idempotent_producer_example.go | 9 +- .../oauthbearer_example.go | 9 +- .../producer_channel_example.go | 9 +- examples/producer_example/producer_example.go | 9 +- examples/stats_example/stats_example.go | 9 +- examples/transactions_example/generator.go | 5 +- examples/transactions_example/processor.go | 5 +- .../transactions_example.go | 11 +- examples/transactions_example/txnhelpers.go | 5 +- examples/transactions_example/visualizer.go | 7 +- kafka/adminapi.go | 587 +++++++++++++++ kafka/adminapi_test.go | 348 +++++++++ kafka/adminoptions.go | 33 + kafka/api.html | 680 ++++++++++++++++-- kafka/error.go | 13 +- kafka/integration_test.go | 195 +++++ 29 files changed, 2328 insertions(+), 119 deletions(-) create mode 100644 examples/admin_create_acls/admin_create_acls.go create mode 100644 examples/admin_delete_acls/admin_delete_acls.go create mode 100644 examples/admin_describe_acls/admin_describe_acls.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 6b93502f4..87c5fb1fc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ This is a feature release: * OAUTHBEARER OIDC support + * KIP-140 Admin API ACL support * Added MockCluster for functional testing of applications without the need for a real Kafka cluster (by @SourceFellows and @kkoehler, #729). See [examples/mock_cluster](examples/mock_cluster). diff --git a/examples/.gitignore b/examples/.gitignore index 408c6ca7b..001a47ba1 100644 --- a/examples/.gitignore +++ b/examples/.gitignore @@ -7,4 +7,7 @@ go-kafkacat/go-kafkacat admin_describe_config/admin_describe_config admin_delete_topics/admin_delete_topics admin_create_topic/admin_create_topic +admin_create_acls/admin_create_acls +admin_describe_acls/admin_describe_acls +admin_delete_acls/admin_delete_acls stats_example/stats_example diff --git a/examples/README.md b/examples/README.md index 98776bc34..49b973636 100644 --- a/examples/README.md +++ b/examples/README.md @@ -1,6 +1,9 @@ Examples: + admin_create_acls - Create Access Control Lists + admin_describe_acls - Find Access Control Lists using a filter + admin_delete_acls - Delete Access Control Lists using different filters consumer_channel_example - Channel based consumer consumer_example - Function & callback based consumer consumer_offset_metadata - Commit offset with metadata diff --git a/examples/admin_create_acls/admin_create_acls.go b/examples/admin_create_acls/admin_create_acls.go new file mode 100644 index 000000000..71b8bbf82 --- /dev/null +++ b/examples/admin_create_acls/admin_create_acls.go @@ -0,0 +1,147 @@ +// Create ACLs +package main + +/** + * Copyright 2022 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import ( + "context" + "fmt" + "os" + "time" + + "github.com/confluentinc/confluent-kafka-go/kafka" +) + +// Parses a list of 7n arguments to a slice of n ACLBinding +func parseACLBindings(args []string) (aclBindings kafka.ACLBindings, err error) { + nACLBindings := len(args) / 7 + parsedACLBindings := make(kafka.ACLBindings, nACLBindings) + + for i := 0; i < nACLBindings; i++ { + start := i * 7 + resourceTypeString := args[start] + name := args[start+1] + resourcePatternTypeString := args[start+2] + principal := args[start+3] + host := args[start+4] + operationString := args[start+5] + permissionTypeString := args[start+6] + + resourceType, errParse := kafka.ResourceTypeFromString(resourceTypeString) + if errParse != nil { + err = errParse + fmt.Printf("Invalid resource type: %s: %v\n", resourceTypeString, err) + return + } + + resourcePatternType, errParse := kafka.ResourcePatternTypeFromString(resourcePatternTypeString) + if errParse != nil { + err = errParse + fmt.Printf("Invalid resource pattern type: %s: %v\n", resourcePatternTypeString, err) + return + } + + operation, errParse := kafka.ACLOperationFromString(operationString) + if errParse != nil { + err = errParse + fmt.Printf("Invalid operation: %s: %v\n", operationString, err) + return + } + + permissionType, errParse := kafka.ACLPermissionTypeFromString(permissionTypeString) + if errParse != nil { + err = errParse + fmt.Printf("Invalid permission type: %s: %v\n", permissionTypeString, err) + return + } + + parsedACLBindings[i] = kafka.ACLBinding{ + Type: resourceType, + Name: name, + ResourcePatternType: resourcePatternType, + Principal: principal, + Host: host, + Operation: operation, + PermissionType: permissionType, + } + } + aclBindings = parsedACLBindings + return +} + +func main() { + + // 2 + 7n arguments to create n ACL bindings + nArgs := len(os.Args) + aclBindingArgs := nArgs - 2 + if aclBindingArgs <= 0 || aclBindingArgs%7 != 0 { + fmt.Fprintf(os.Stderr, + "Usage: %s "+ + " ...\n", + os.Args[0]) + os.Exit(1) + } + + bootstrapServers := os.Args[1] + aclBindings, err := parseACLBindings(os.Args[2:]) + if err != nil { + os.Exit(1) + } + + // Create a new AdminClient. + // AdminClient can also be instantiated using an existing + // Producer or Consumer instance, see NewAdminClientFromProducer and + // NewAdminClientFromConsumer. + a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": bootstrapServers}) + if err != nil { + fmt.Printf("Failed to create Admin client: %s\n", err) + os.Exit(1) + } + + // Contexts are used to abort or limit the amount of time + // the Admin call blocks waiting for a result. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Create ACLs on cluster. + // Set Admin options to wait for the request to finish (or at most 60s) + maxDur, err := time.ParseDuration("60s") + if err != nil { + panic("ParseDuration(60s)") + } + results, err := a.CreateACLs( + ctx, + aclBindings, + kafka.SetAdminRequestTimeout(maxDur), + ) + if err != nil { + fmt.Printf("Failed to create ACLs: %v\n", err) + os.Exit(1) + } + + // Print results + for i, result := range results { + if result.Error.Code() == kafka.ErrNoError { + fmt.Printf("CreateACLs %d successful\n", i) + } else { + fmt.Printf("CreateACLs %d failed, error code: %s, message: %s\n", + i, result.Error.Code(), result.Error.String()) + } + } + + a.Close() +} diff --git a/examples/admin_create_topic/admin_create_topic.go b/examples/admin_create_topic/admin_create_topic.go index 3acda269d..864e4c6be 100644 --- a/examples/admin_create_topic/admin_create_topic.go +++ b/examples/admin_create_topic/admin_create_topic.go @@ -20,22 +20,23 @@ package main import ( "context" "fmt" - "github.com/confluentinc/confluent-kafka-go/kafka" "os" "strconv" "time" + + "github.com/confluentinc/confluent-kafka-go/kafka" ) func main() { if len(os.Args) != 5 { fmt.Fprintf(os.Stderr, - "Usage: %s \n", + "Usage: %s \n", os.Args[0]) os.Exit(1) } - broker := os.Args[1] + bootstrapServers := os.Args[1] topic := os.Args[2] numParts, err := strconv.Atoi(os.Args[3]) if err != nil { @@ -52,7 +53,7 @@ func main() { // AdminClient can also be instantiated using an existing // Producer or Consumer instance, see NewAdminClientFromProducer and // NewAdminClientFromConsumer. - a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": broker}) + a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": bootstrapServers}) if err != nil { fmt.Printf("Failed to create Admin client: %s\n", err) os.Exit(1) diff --git a/examples/admin_delete_acls/admin_delete_acls.go b/examples/admin_delete_acls/admin_delete_acls.go new file mode 100644 index 000000000..11802090f --- /dev/null +++ b/examples/admin_delete_acls/admin_delete_acls.go @@ -0,0 +1,147 @@ +// Delete ACLs +package main + +/** + * Copyright 2022 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import ( + "context" + "fmt" + "os" + "time" + + "github.com/confluentinc/confluent-kafka-go/kafka" +) + +// Parses a list of 7n arguments to a slice of n ACLBindingFilter +func parseACLBindingFilters(args []string) (aclBindingFilters kafka.ACLBindingFilters, err error) { + nACLBindingFilters := len(args) / 7 + parsedACLBindingFilters := make(kafka.ACLBindingFilters, nACLBindingFilters) + + for i := 0; i < nACLBindingFilters; i++ { + start := i * 7 + resourceTypeString := args[start] + name := args[start+1] + resourcePatternTypeString := args[start+2] + principal := args[start+3] + host := args[start+4] + operationString := args[start+5] + permissionTypeString := args[start+6] + + var resourceType kafka.ResourceType + var resourcePatternType kafka.ResourcePatternType + var operation kafka.ACLOperation + var permissionType kafka.ACLPermissionType + + resourceType, err = kafka.ResourceTypeFromString(resourceTypeString) + if err != nil { + fmt.Printf("Invalid resource type: %s: %v\n", resourceTypeString, err) + return + } + resourcePatternType, err = kafka.ResourcePatternTypeFromString(resourcePatternTypeString) + if err != nil { + fmt.Printf("Invalid resource pattern type: %s: %v\n", resourcePatternTypeString, err) + return + } + + operation, err = kafka.ACLOperationFromString(operationString) + if err != nil { + fmt.Printf("Invalid operation: %s: %v\n", operationString, err) + return + } + + permissionType, err = kafka.ACLPermissionTypeFromString(permissionTypeString) + if err != nil { + fmt.Printf("Invalid permission type: %s: %v\n", permissionTypeString, err) + return + } + + parsedACLBindingFilters[i] = kafka.ACLBindingFilter{ + Type: resourceType, + Name: name, + ResourcePatternType: resourcePatternType, + Principal: principal, + Host: host, + Operation: operation, + PermissionType: permissionType, + } + } + aclBindingFilters = parsedACLBindingFilters + return +} + +func main() { + + // 2 + 7n arguments to create n ACL binding filters + nArgs := len(os.Args) + aclBindingFilterArgs := nArgs - 2 + if aclBindingFilterArgs <= 0 || aclBindingFilterArgs%7 != 0 { + fmt.Fprintf(os.Stderr, + "Usage: %s "+ + " ...\n", + os.Args[0]) + os.Exit(1) + } + + bootstrapServers := os.Args[1] + aclBindingFilters, err := parseACLBindingFilters(os.Args[2:]) + if err != nil { + os.Exit(1) + } + + // Create a new AdminClient. + // AdminClient can also be instantiated using an existing + // Producer or Consumer instance, see NewAdminClientFromProducer and + // NewAdminClientFromConsumer. + a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": bootstrapServers}) + if err != nil { + fmt.Printf("Failed to create Admin client: %s\n", err) + os.Exit(1) + } + + // Contexts are used to abort or limit the amount of time + // the Admin call blocks waiting for a result. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Create ACLs on cluster. + // Set Admin options to wait for the request to finish (or at most 60s) + maxDur, err := time.ParseDuration("60s") + if err != nil { + panic("ParseDuration(60s)") + } + results, err := a.DeleteACLs( + ctx, + aclBindingFilters, + kafka.SetAdminRequestTimeout(maxDur), + ) + if err != nil { + fmt.Printf("Failed to delete ACLs: %v\n", err) + os.Exit(1) + } + + // Print results + for i, result := range results { + if result.Error.Code() == kafka.ErrNoError { + fmt.Printf("DeleteACLs %d successful, deleted: %+v\n", i, result.ACLBindings) + } else { + fmt.Printf("DeleteACLs %d failed, error code: %s, message: %s\n", + i, result.Error.Code(), result.Error.String()) + } + } + + a.Close() +} diff --git a/examples/admin_delete_topics/admin_delete_topics.go b/examples/admin_delete_topics/admin_delete_topics.go index ac8120694..e944fe8e0 100644 --- a/examples/admin_delete_topics/admin_delete_topics.go +++ b/examples/admin_delete_topics/admin_delete_topics.go @@ -20,28 +20,29 @@ package main import ( "context" "fmt" - "github.com/confluentinc/confluent-kafka-go/kafka" "os" "time" + + "github.com/confluentinc/confluent-kafka-go/kafka" ) func main() { if len(os.Args) < 3 { fmt.Fprintf(os.Stderr, - "Usage: %s ..\n", + "Usage: %s ..\n", os.Args[0]) os.Exit(1) } - broker := os.Args[1] + bootstrapServers := os.Args[1] topics := os.Args[2:] // Create a new AdminClient. // AdminClient can also be instantiated using an existing // Producer or Consumer instance, see NewAdminClientFromProducer and // NewAdminClientFromConsumer. - a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": broker}) + a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": bootstrapServers}) if err != nil { fmt.Printf("Failed to create Admin client: %s\n", err) os.Exit(1) diff --git a/examples/admin_describe_acls/admin_describe_acls.go b/examples/admin_describe_acls/admin_describe_acls.go new file mode 100644 index 000000000..9a52bef4e --- /dev/null +++ b/examples/admin_describe_acls/admin_describe_acls.go @@ -0,0 +1,145 @@ +// Describe ACLs +package main + +/** + * Copyright 2022 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import ( + "context" + "fmt" + "os" + "time" + + "github.com/confluentinc/confluent-kafka-go/kafka" +) + +// Parses a list of 7n arguments to a slice of n ACLBindingFilter +func parseACLBindingFilters(args []string) (aclBindingFilters kafka.ACLBindingFilters, err error) { + nACLBindingFilters := len(args) / 7 + parsedACLBindingFilters := make(kafka.ACLBindingFilters, nACLBindingFilters) + + for i := 0; i < nACLBindingFilters; i++ { + start := i * 7 + resourceTypeString := args[start] + name := args[start+1] + resourcePatternTypeString := args[start+2] + principal := args[start+3] + host := args[start+4] + operationString := args[start+5] + permissionTypeString := args[start+6] + + var resourceType kafka.ResourceType + var resourcePatternType kafka.ResourcePatternType + var operation kafka.ACLOperation + var permissionType kafka.ACLPermissionType + + resourceType, err = kafka.ResourceTypeFromString(resourceTypeString) + if err != nil { + fmt.Printf("Invalid resource type: %s: %v\n", resourceTypeString, err) + return + } + resourcePatternType, err = kafka.ResourcePatternTypeFromString(resourcePatternTypeString) + if err != nil { + fmt.Printf("Invalid resource pattern type: %s: %v\n", resourcePatternTypeString, err) + return + } + + operation, err = kafka.ACLOperationFromString(operationString) + if err != nil { + fmt.Printf("Invalid operation: %s: %v\n", operationString, err) + return + } + + permissionType, err = kafka.ACLPermissionTypeFromString(permissionTypeString) + if err != nil { + fmt.Printf("Invalid permission type: %s: %v\n", permissionTypeString, err) + return + } + + parsedACLBindingFilters[i] = kafka.ACLBindingFilter{ + Type: resourceType, + Name: name, + ResourcePatternType: resourcePatternType, + Principal: principal, + Host: host, + Operation: operation, + PermissionType: permissionType, + } + } + aclBindingFilters = parsedACLBindingFilters + return +} + +func main() { + + // 2 + 7 arguments to create an ACL binding filter + nArgs := len(os.Args) + aclBindingFilterArgs := nArgs - 2 + if aclBindingFilterArgs != 7 { + fmt.Fprintf(os.Stderr, + "Usage: %s "+ + " ...\n", + os.Args[0]) + os.Exit(1) + } + + bootstrapServers := os.Args[1] + aclBindingFilters, err := parseACLBindingFilters(os.Args[2:]) + if err != nil { + os.Exit(1) + } + + // Create a new AdminClient. + // AdminClient can also be instantiated using an existing + // Producer or Consumer instance, see NewAdminClientFromProducer and + // NewAdminClientFromConsumer. + a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": bootstrapServers}) + if err != nil { + fmt.Printf("Failed to create Admin client: %s\n", err) + os.Exit(1) + } + + // Contexts are used to abort or limit the amount of time + // the Admin call blocks waiting for a result. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Describe ACLs on cluster. + // Set Admin options to wait for the request to finish (or at most 60s) + maxDur, err := time.ParseDuration("60s") + if err != nil { + panic("ParseDuration(60s)") + } + result, err := a.DescribeACLs( + ctx, + aclBindingFilters[0], + kafka.SetAdminRequestTimeout(maxDur), + ) + if err != nil { + fmt.Printf("Failed to describe ACLs: %v\n", err) + os.Exit(1) + } + + // Print results + if result.Error.Code() == kafka.ErrNoError { + fmt.Printf("DescribeACLs successful, result: %+v\n", result.ACLBindings) + } else { + fmt.Printf("DescribeACLs failed, error code: %s, message: %s\n", + result.Error.Code(), result.Error.String()) + } + + a.Close() +} diff --git a/examples/admin_describe_config/admin_describe_config.go b/examples/admin_describe_config/admin_describe_config.go index 3963584a3..f4e08f296 100644 --- a/examples/admin_describe_config/admin_describe_config.go +++ b/examples/admin_describe_config/admin_describe_config.go @@ -20,25 +20,26 @@ package main import ( "context" "fmt" - "github.com/confluentinc/confluent-kafka-go/kafka" "os" "time" + + "github.com/confluentinc/confluent-kafka-go/kafka" ) func main() { if len(os.Args) != 4 { fmt.Fprintf(os.Stderr, - "Usage: %s \n"+ + "Usage: %s \n"+ "\n"+ - " - CSV list of bootstrap brokers\n"+ + " - CSV list of bootstrap brokers\n"+ " - any, broker, topic, group\n"+ " - broker id or topic name\n", os.Args[0]) os.Exit(1) } - broker := os.Args[1] + bootstrapServers := os.Args[1] resourceType, err := kafka.ResourceTypeFromString(os.Args[2]) if err != nil { fmt.Printf("Invalid resource type: %s\n", os.Args[2]) @@ -50,7 +51,7 @@ func main() { // AdminClient can also be instantiated using an existing // Producer or Consumer instance, see NewAdminClientFromProducer and // NewAdminClientFromConsumer. - a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": broker}) + a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": bootstrapServers}) if err != nil { fmt.Printf("Failed to create Admin client: %s\n", err) os.Exit(1) diff --git a/examples/consumer_channel_example/consumer_channel_example.go b/examples/consumer_channel_example/consumer_channel_example.go index a1b2c1ed4..c2304657f 100644 --- a/examples/consumer_channel_example/consumer_channel_example.go +++ b/examples/consumer_channel_example/consumer_channel_example.go @@ -19,21 +19,22 @@ package main import ( "fmt" - "github.com/confluentinc/confluent-kafka-go/kafka" "os" "os/signal" "syscall" + + "github.com/confluentinc/confluent-kafka-go/kafka" ) func main() { if len(os.Args) < 4 { - fmt.Fprintf(os.Stderr, "Usage: %s \n", + fmt.Fprintf(os.Stderr, "Usage: %s \n", os.Args[0]) os.Exit(1) } - broker := os.Args[1] + bootstrapServers := os.Args[1] group := os.Args[2] topics := os.Args[3:] @@ -41,7 +42,7 @@ func main() { signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) c, err := kafka.NewConsumer(&kafka.ConfigMap{ - "bootstrap.servers": broker, + "bootstrap.servers": bootstrapServers, "group.id": group, "session.timeout.ms": 6000, "go.events.channel.enable": true, diff --git a/examples/consumer_example/consumer_example.go b/examples/consumer_example/consumer_example.go index a0157b664..ee3b2882a 100644 --- a/examples/consumer_example/consumer_example.go +++ b/examples/consumer_example/consumer_example.go @@ -22,28 +22,29 @@ package main import ( "fmt" - "github.com/confluentinc/confluent-kafka-go/kafka" "os" "os/signal" "syscall" + + "github.com/confluentinc/confluent-kafka-go/kafka" ) func main() { if len(os.Args) < 4 { - fmt.Fprintf(os.Stderr, "Usage: %s \n", + fmt.Fprintf(os.Stderr, "Usage: %s \n", os.Args[0]) os.Exit(1) } - broker := os.Args[1] + bootstrapServers := os.Args[1] group := os.Args[2] topics := os.Args[3:] sigchan := make(chan os.Signal, 1) signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) c, err := kafka.NewConsumer(&kafka.ConfigMap{ - "bootstrap.servers": broker, + "bootstrap.servers": bootstrapServers, // Avoid connecting to IPv6 brokers: // This is needed for the ErrAllBrokersDown show-case below // when using localhost brokers on OSX, since the OSX resolver diff --git a/examples/consumer_offset_metadata/consumer_offset_metadata.go b/examples/consumer_offset_metadata/consumer_offset_metadata.go index 9f678a6a6..3f059ad0c 100644 --- a/examples/consumer_offset_metadata/consumer_offset_metadata.go +++ b/examples/consumer_offset_metadata/consumer_offset_metadata.go @@ -23,22 +23,23 @@ package main import ( "fmt" - "github.com/confluentinc/confluent-kafka-go/kafka" "os" "strconv" + + "github.com/confluentinc/confluent-kafka-go/kafka" ) func main() { if len(os.Args) != 7 && len(os.Args) != 5 { fmt.Fprintf(os.Stderr, `Usage: -- commit offset with metadata: %s "" -- show partition offset: %s `, +- commit offset with metadata: %s "" +- show partition offset: %s `, os.Args[0], os.Args[0]) os.Exit(1) } - broker := os.Args[1] + bootstrapServers := os.Args[1] group := os.Args[2] topic := os.Args[3] partition, err := strconv.Atoi(os.Args[4]) @@ -48,7 +49,7 @@ func main() { } c, err := kafka.NewConsumer(&kafka.ConfigMap{ - "bootstrap.servers": broker, + "bootstrap.servers": bootstrapServers, "group.id": group, }) diff --git a/examples/cooperative_consumer_example/cooperative_consumer_example.go b/examples/cooperative_consumer_example/cooperative_consumer_example.go index d63be047e..2facabfa6 100644 --- a/examples/cooperative_consumer_example/cooperative_consumer_example.go +++ b/examples/cooperative_consumer_example/cooperative_consumer_example.go @@ -21,28 +21,29 @@ package main import ( "fmt" - "github.com/confluentinc/confluent-kafka-go/kafka" "os" "os/signal" "syscall" + + "github.com/confluentinc/confluent-kafka-go/kafka" ) func main() { if len(os.Args) < 4 { - fmt.Fprintf(os.Stderr, "Usage: %s \n", + fmt.Fprintf(os.Stderr, "Usage: %s \n", os.Args[0]) os.Exit(1) } - broker := os.Args[1] + bootstrapServers := os.Args[1] group := os.Args[2] topics := os.Args[3:] sigchan := make(chan os.Signal, 1) signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) c, err := kafka.NewConsumer(&kafka.ConfigMap{ - "bootstrap.servers": broker, + "bootstrap.servers": bootstrapServers, // Avoid connecting to IPv6 brokers: // This is needed for the ErrAllBrokersDown show-case below // when using localhost brokers on OSX, since the OSX resolver diff --git a/examples/idempotent_producer_example/idempotent_producer_example.go b/examples/idempotent_producer_example/idempotent_producer_example.go index a95f551f8..c20f2f969 100644 --- a/examples/idempotent_producer_example/idempotent_producer_example.go +++ b/examples/idempotent_producer_example/idempotent_producer_example.go @@ -29,9 +29,10 @@ package main import ( "fmt" - "github.com/confluentinc/confluent-kafka-go/kafka" "os" "time" + + "github.com/confluentinc/confluent-kafka-go/kafka" ) var run = true @@ -39,16 +40,16 @@ var run = true func main() { if len(os.Args) != 3 { - fmt.Fprintf(os.Stderr, "Usage: %s \n", + fmt.Fprintf(os.Stderr, "Usage: %s \n", os.Args[0]) os.Exit(1) } - broker := os.Args[1] + bootstrapServers := os.Args[1] topic := os.Args[2] p, err := kafka.NewProducer(&kafka.ConfigMap{ - "bootstrap.servers": broker, + "bootstrap.servers": bootstrapServers, // Enable the Idempotent Producer "enable.idempotence": true}) diff --git a/examples/oauthbearer_example/oauthbearer_example.go b/examples/oauthbearer_example/oauthbearer_example.go index e0a708508..0db48d2a6 100644 --- a/examples/oauthbearer_example/oauthbearer_example.go +++ b/examples/oauthbearer_example/oauthbearer_example.go @@ -21,10 +21,11 @@ import ( "encoding/base64" "encoding/json" "fmt" - "github.com/confluentinc/confluent-kafka-go/kafka" "os" "regexp" "time" + + "github.com/confluentinc/confluent-kafka-go/kafka" ) var ( @@ -116,17 +117,17 @@ func retrieveUnsecuredToken(e kafka.OAuthBearerTokenRefresh) (kafka.OAuthBearerT func main() { if len(os.Args) != 3 { - fmt.Fprintf(os.Stderr, "Usage: %s \"[principalClaimName=] principal=\"\n", os.Args[0]) + fmt.Fprintf(os.Stderr, "Usage: %s \"[principalClaimName=] principal=\"\n", os.Args[0]) os.Exit(1) } - broker := os.Args[1] + bootstrapServers := os.Args[1] oauthConf := os.Args[2] // You'll probably need to modify this configuration to // match your environment. config := kafka.ConfigMap{ - "bootstrap.servers": broker, + "bootstrap.servers": bootstrapServers, "security.protocol": "SASL_PLAINTEXT", "sasl.mechanisms": "OAUTHBEARER", "sasl.oauthbearer.config": oauthConf, diff --git a/examples/producer_channel_example/producer_channel_example.go b/examples/producer_channel_example/producer_channel_example.go index c06dc9251..5b7e18135 100644 --- a/examples/producer_channel_example/producer_channel_example.go +++ b/examples/producer_channel_example/producer_channel_example.go @@ -19,22 +19,23 @@ package main import ( "fmt" - "github.com/confluentinc/confluent-kafka-go/kafka" "os" + + "github.com/confluentinc/confluent-kafka-go/kafka" ) func main() { if len(os.Args) != 3 { - fmt.Fprintf(os.Stderr, "Usage: %s \n", + fmt.Fprintf(os.Stderr, "Usage: %s \n", os.Args[0]) os.Exit(1) } - broker := os.Args[1] + bootstrapServers := os.Args[1] topic := os.Args[2] - p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": broker}) + p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": bootstrapServers}) if err != nil { fmt.Printf("Failed to create producer: %s\n", err) diff --git a/examples/producer_example/producer_example.go b/examples/producer_example/producer_example.go index 62c284144..6751e14aa 100644 --- a/examples/producer_example/producer_example.go +++ b/examples/producer_example/producer_example.go @@ -19,22 +19,23 @@ package main import ( "fmt" - "github.com/confluentinc/confluent-kafka-go/kafka" "os" + + "github.com/confluentinc/confluent-kafka-go/kafka" ) func main() { if len(os.Args) != 3 { - fmt.Fprintf(os.Stderr, "Usage: %s \n", + fmt.Fprintf(os.Stderr, "Usage: %s \n", os.Args[0]) os.Exit(1) } - broker := os.Args[1] + bootstrapServers := os.Args[1] topic := os.Args[2] - p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": broker}) + p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": bootstrapServers}) if err != nil { fmt.Printf("Failed to create producer: %s\n", err) diff --git a/examples/stats_example/stats_example.go b/examples/stats_example/stats_example.go index cd795efe8..9d186a6d4 100644 --- a/examples/stats_example/stats_example.go +++ b/examples/stats_example/stats_example.go @@ -24,28 +24,29 @@ package main import ( "encoding/json" "fmt" - "github.com/confluentinc/confluent-kafka-go/kafka" "os" "os/signal" "syscall" + + "github.com/confluentinc/confluent-kafka-go/kafka" ) func main() { if len(os.Args) < 4 { - fmt.Fprintf(os.Stderr, "Usage: %s \n", + fmt.Fprintf(os.Stderr, "Usage: %s \n", os.Args[0]) os.Exit(1) } - broker := os.Args[1] + bootstrapServers := os.Args[1] group := os.Args[2] topics := os.Args[3:] sigchan := make(chan os.Signal, 1) signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) c, err := kafka.NewConsumer(&kafka.ConfigMap{ - "bootstrap.servers": broker, + "bootstrap.servers": bootstrapServers, "group.id": group, "session.timeout.ms": 6000, "auto.offset.reset": "earliest", diff --git a/examples/transactions_example/generator.go b/examples/transactions_example/generator.go index 95296e3b5..d2184da6c 100644 --- a/examples/transactions_example/generator.go +++ b/examples/transactions_example/generator.go @@ -22,10 +22,11 @@ package main import ( "fmt" - "github.com/confluentinc/confluent-kafka-go/kafka" "math/rand" "sync" "time" + + "github.com/confluentinc/confluent-kafka-go/kafka" ) // Intersections this application will process. @@ -75,7 +76,7 @@ func generateInputMessages(wg *sync.WaitGroup, termChan chan bool) { config := &kafka.ConfigMap{ "client.id": "generator", - "bootstrap.servers": brokers, + "bootstrap.servers": bootstrapServers, "enable.idempotence": true, "go.logs.channel.enable": true, "go.logs.channel": logsChan, diff --git a/examples/transactions_example/processor.go b/examples/transactions_example/processor.go index 201c25745..495e01cfa 100644 --- a/examples/transactions_example/processor.go +++ b/examples/transactions_example/processor.go @@ -24,9 +24,10 @@ package main import ( "encoding/json" "fmt" - "github.com/confluentinc/confluent-kafka-go/kafka" "sync" "time" + + "github.com/confluentinc/confluent-kafka-go/kafka" ) // The processor's consumer group id. @@ -267,7 +268,7 @@ func trafficLightProcessor(wg *sync.WaitGroup, termChan chan bool) { consumerConfig := &kafka.ConfigMap{ "client.id": "processor", - "bootstrap.servers": brokers, + "bootstrap.servers": bootstrapServers, "group.id": processorGroupID, "auto.offset.reset": "earliest", // Consumer used for input to a transactional processor diff --git a/examples/transactions_example/transactions_example.go b/examples/transactions_example/transactions_example.go index 447fe9fb1..af1450cbe 100644 --- a/examples/transactions_example/transactions_example.go +++ b/examples/transactions_example/transactions_example.go @@ -20,13 +20,14 @@ package main import ( "fmt" - "github.com/confluentinc/confluent-kafka-go/kafka" "math/rand" "os" "os/signal" "sync" "syscall" "time" + + "github.com/confluentinc/confluent-kafka-go/kafka" ) // Set to false to disable visualization, useful for troubleshooting. @@ -36,8 +37,8 @@ var withVisualizer = true var inputTopic = "go-transactions-example-ingress-cars" var outputTopic = "go-transactions-example-traffic-light-states" -// brokers holds the bootstrap servers -var brokers string +// bootstrapServers holds the bootstrap servers +var bootstrapServers string // logsChan is the common log channel for all Kafka client instances. var logsChan chan kafka.LogEvent @@ -65,11 +66,11 @@ func logReader(wg *sync.WaitGroup, termChan chan bool) { func main() { if len(os.Args) != 2 { - fmt.Fprintf(os.Stderr, "Usage: %s \n", os.Args[0]) + fmt.Fprintf(os.Stderr, "Usage: %s \n", os.Args[0]) os.Exit(1) } - brokers = os.Args[1] + bootstrapServers = os.Args[1] rand.Seed(time.Now().Unix()) diff --git a/examples/transactions_example/txnhelpers.go b/examples/transactions_example/txnhelpers.go index 6a5055477..09de2c7f4 100644 --- a/examples/transactions_example/txnhelpers.go +++ b/examples/transactions_example/txnhelpers.go @@ -21,8 +21,9 @@ package main import ( "context" "fmt" - "github.com/confluentinc/confluent-kafka-go/kafka" "time" + + "github.com/confluentinc/confluent-kafka-go/kafka" ) // createTransactionalProducer creates a transactional producer for the given @@ -30,7 +31,7 @@ import ( func createTransactionalProducer(toppar kafka.TopicPartition) error { producerConfig := &kafka.ConfigMap{ "client.id": fmt.Sprintf("txn-p%d", toppar.Partition), - "bootstrap.servers": brokers, + "bootstrap.servers": bootstrapServers, "transactional.id": fmt.Sprintf("go-transactions-example-p%d", int(toppar.Partition)), "go.logs.channel.enable": true, "go.logs.channel": logsChan, diff --git a/examples/transactions_example/visualizer.go b/examples/transactions_example/visualizer.go index 260f02d79..04c8610ac 100644 --- a/examples/transactions_example/visualizer.go +++ b/examples/transactions_example/visualizer.go @@ -23,12 +23,13 @@ package main import ( "encoding/json" "fmt" - "github.com/confluentinc/confluent-kafka-go/kafka" - "github.com/gdamore/tcell" "os" "sort" "sync" "time" + + "github.com/confluentinc/confluent-kafka-go/kafka" + "github.com/gdamore/tcell" ) // Height and width (terminal characters) per intersection frame. @@ -331,7 +332,7 @@ func trafficLightVisualizer(wg *sync.WaitGroup, termChan chan bool) { consumerConfig := &kafka.ConfigMap{ "client.id": "visualizer", - "bootstrap.servers": brokers, + "bootstrap.servers": bootstrapServers, "group.id": processorGroupID + "_visualizer", "auto.offset.reset": "earliest", "go.logs.channel.enable": true, diff --git a/kafka/adminapi.go b/kafka/adminapi.go index ef3b31d55..e4371eed5 100644 --- a/kafka/adminapi.go +++ b/kafka/adminapi.go @@ -48,6 +48,27 @@ ConfigEntry_by_idx (const rd_kafka_ConfigEntry_t **entries, size_t cnt, size_t i return NULL; return entries[idx]; } + +static const rd_kafka_acl_result_t * +acl_result_by_idx (const rd_kafka_acl_result_t **acl_results, size_t cnt, size_t idx) { + if (idx >= cnt) + return NULL; + return acl_results[idx]; +} + +static const rd_kafka_DeleteAcls_result_response_t * +DeleteAcls_result_response_by_idx (const rd_kafka_DeleteAcls_result_response_t **delete_acls_result_responses, size_t cnt, size_t idx) { + if (idx >= cnt) + return NULL; + return delete_acls_result_responses[idx]; +} + +static const rd_kafka_AclBinding_t * +AclBinding_by_idx (const rd_kafka_AclBinding_t **acl_bindings, size_t cnt, size_t idx) { + if (idx >= cnt) + return NULL; + return acl_bindings[idx]; +} */ import "C" @@ -312,6 +333,225 @@ func (c ConfigResourceResult) String() string { return fmt.Sprintf("ResourceResult(%s, %s, %d config(s))", c.Type, c.Name, len(c.Config)) } +// ResourcePatternType enumerates the different types of Kafka resource patterns. +type ResourcePatternType int + +const ( + // ResourcePatternTypeUnknown is a resource pattern type not known or not set. + ResourcePatternTypeUnknown = ResourcePatternType(C.RD_KAFKA_RESOURCE_PATTERN_UNKNOWN) + // ResourcePatternTypeAny matches any resource, used for lookups. + ResourcePatternTypeAny = ResourcePatternType(C.RD_KAFKA_RESOURCE_PATTERN_ANY) + // ResourcePatternTypeMatch will perform pattern matching + ResourcePatternTypeMatch = ResourcePatternType(C.RD_KAFKA_RESOURCE_PATTERN_MATCH) + // ResourcePatternTypeLiteral matches a literal resource name + ResourcePatternTypeLiteral = ResourcePatternType(C.RD_KAFKA_RESOURCE_PATTERN_LITERAL) + // ResourcePatternTypePrefixed matches a prefixed resource name + ResourcePatternTypePrefixed = ResourcePatternType(C.RD_KAFKA_RESOURCE_PATTERN_PREFIXED) +) + +// String returns the human-readable representation of a ResourcePatternType +func (t ResourcePatternType) String() string { + return C.GoString(C.rd_kafka_ResourcePatternType_name(C.rd_kafka_ResourcePatternType_t(t))) +} + +// ResourcePatternTypeFromString translates a resource pattern type name to +// a ResourcePatternType value. +func ResourcePatternTypeFromString(patternTypeString string) (ResourcePatternType, error) { + switch strings.ToUpper(patternTypeString) { + case "ANY": + return ResourcePatternTypeAny, nil + case "MATCH": + return ResourcePatternTypeMatch, nil + case "LITERAL": + return ResourcePatternTypeLiteral, nil + case "PREFIXED": + return ResourcePatternTypePrefixed, nil + default: + return ResourcePatternTypeUnknown, NewError(ErrInvalidArg, "Unknown resource pattern type", false) + } +} + +// ACLOperation enumerates the different types of ACL operation. +type ACLOperation int + +const ( + // ACLOperationUnknown represents an unknown or unset operation + ACLOperationUnknown = ACLOperation(C.RD_KAFKA_ACL_OPERATION_UNKNOWN) + // ACLOperationAny in a filter, matches any ACLOperation + ACLOperationAny = ACLOperation(C.RD_KAFKA_ACL_OPERATION_ANY) + // ACLOperationAll represents all the operations + ACLOperationAll = ACLOperation(C.RD_KAFKA_ACL_OPERATION_ALL) + // ACLOperationRead a read operation + ACLOperationRead = ACLOperation(C.RD_KAFKA_ACL_OPERATION_READ) + // ACLOperationWrite represents a write operation + ACLOperationWrite = ACLOperation(C.RD_KAFKA_ACL_OPERATION_WRITE) + // ACLOperationCreate represents a create operation + ACLOperationCreate = ACLOperation(C.RD_KAFKA_ACL_OPERATION_CREATE) + // ACLOperationDelete represents a delete operation + ACLOperationDelete = ACLOperation(C.RD_KAFKA_ACL_OPERATION_DELETE) + // ACLOperationAlter represents an alter operation + ACLOperationAlter = ACLOperation(C.RD_KAFKA_ACL_OPERATION_ALTER) + // ACLOperationDescribe represents a describe operation + ACLOperationDescribe = ACLOperation(C.RD_KAFKA_ACL_OPERATION_DESCRIBE) + // ACLOperationClusterAction represents a cluster action operation + ACLOperationClusterAction = ACLOperation(C.RD_KAFKA_ACL_OPERATION_CLUSTER_ACTION) + // ACLOperationDescribeConfigs represents a describe configs operation + ACLOperationDescribeConfigs = ACLOperation(C.RD_KAFKA_ACL_OPERATION_DESCRIBE_CONFIGS) + // ACLOperationAlterConfigs represents an alter configs operation + ACLOperationAlterConfigs = ACLOperation(C.RD_KAFKA_ACL_OPERATION_ALTER_CONFIGS) + // ACLOperationIdempotentWrite represents an idempotent write operation + ACLOperationIdempotentWrite = ACLOperation(C.RD_KAFKA_ACL_OPERATION_IDEMPOTENT_WRITE) +) + +// String returns the human-readable representation of an ACLOperation +func (o ACLOperation) String() string { + return C.GoString(C.rd_kafka_AclOperation_name(C.rd_kafka_AclOperation_t(o))) +} + +// ACLOperationFromString translates a ACL operation name to +// a ACLOperation value. +func ACLOperationFromString(aclOperationString string) (ACLOperation, error) { + switch strings.ToUpper(aclOperationString) { + case "ANY": + return ACLOperationAny, nil + case "ALL": + return ACLOperationAll, nil + case "READ": + return ACLOperationRead, nil + case "WRITE": + return ACLOperationWrite, nil + case "CREATE": + return ACLOperationCreate, nil + case "DELETE": + return ACLOperationDelete, nil + case "ALTER": + return ACLOperationAlter, nil + case "DESCRIBE": + return ACLOperationDescribe, nil + case "CLUSTER_ACTION": + return ACLOperationClusterAction, nil + case "DESCRIBE_CONFIGS": + return ACLOperationDescribeConfigs, nil + case "ALTER_CONFIGS": + return ACLOperationAlterConfigs, nil + case "IDEMPOTENT_WRITE": + return ACLOperationIdempotentWrite, nil + default: + return ACLOperationUnknown, NewError(ErrInvalidArg, "Unknown ACL operation", false) + } +} + +// ACLPermissionType enumerates the different types of ACL permission types. +type ACLPermissionType int + +const ( + // ACLPermissionTypeUnknown represents an unknown ACLPermissionType + ACLPermissionTypeUnknown = ACLPermissionType(C.RD_KAFKA_ACL_PERMISSION_TYPE_UNKNOWN) + // ACLPermissionTypeAny in a filter, matches any ACLPermissionType + ACLPermissionTypeAny = ACLPermissionType(C.RD_KAFKA_ACL_PERMISSION_TYPE_ANY) + // ACLPermissionTypeDeny disallows access + ACLPermissionTypeDeny = ACLPermissionType(C.RD_KAFKA_ACL_PERMISSION_TYPE_DENY) + // ACLPermissionTypeAllow grants access + ACLPermissionTypeAllow = ACLPermissionType(C.RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW) +) + +// String returns the human-readable representation of an ACLPermissionType +func (o ACLPermissionType) String() string { + return C.GoString(C.rd_kafka_AclPermissionType_name(C.rd_kafka_AclPermissionType_t(o))) +} + +// ACLPermissionTypeFromString translates a ACL permission type name to +// a ACLPermissionType value. +func ACLPermissionTypeFromString(aclPermissionTypeString string) (ACLPermissionType, error) { + switch strings.ToUpper(aclPermissionTypeString) { + case "ANY": + return ACLPermissionTypeAny, nil + case "DENY": + return ACLPermissionTypeDeny, nil + case "ALLOW": + return ACLPermissionTypeAllow, nil + default: + return ACLPermissionTypeUnknown, NewError(ErrInvalidArg, "Unknown ACL permission type", false) + } +} + +// ACLBinding specifies the operation and permission type for a specific principal +// over one or more resources of the same type. Used by `AdminClient.CreateACLs`, +// returned by `AdminClient.DescribeACLs` and `AdminClient.DeleteACLs`. +type ACLBinding struct { + Type ResourceType // The resource type. + // The resource name, which depends on the resource type. + // For ResourceBroker the resource name is the broker id. + Name string + ResourcePatternType ResourcePatternType // The resource pattern, relative to the name. + Principal string // The principal this ACLBinding refers to. + Host string // The host that the call is allowed to come from. + Operation ACLOperation // The operation/s specified by this binding. + PermissionType ACLPermissionType // The permission type for the specified operation. +} + +// ACLBindingFilter specifies a filter used to return a list of ACL bindings matching some or all of its attributes. +// Used by `AdminClient.DescribeACLs` and `AdminClient.DeleteACLs`. +type ACLBindingFilter = ACLBinding + +// ACLBindings is a slice of ACLBinding that also implements +// the sort interface +type ACLBindings []ACLBinding + +// ACLBindingFilters is a slice of ACLBindingFilter that also implements +// the sort interface +type ACLBindingFilters []ACLBindingFilter + +func (a ACLBindings) Len() int { + return len(a) +} + +func (a ACLBindings) Less(i, j int) bool { + if a[i].Type != a[j].Type { + return a[i].Type < a[j].Type + } + if a[i].Name != a[j].Name { + return a[i].Name < a[j].Name + } + if a[i].ResourcePatternType != a[j].ResourcePatternType { + return a[i].ResourcePatternType < a[j].ResourcePatternType + } + if a[i].Principal != a[j].Principal { + return a[i].Principal < a[j].Principal + } + if a[i].Host != a[j].Host { + return a[i].Host < a[j].Host + } + if a[i].Operation != a[j].Operation { + return a[i].Operation < a[j].Operation + } + if a[i].PermissionType != a[j].PermissionType { + return a[i].PermissionType < a[j].PermissionType + } + return true +} + +func (a ACLBindings) Swap(i, j int) { + a[i], a[j] = a[j], a[i] +} + +// CreateACLResult provides create ACL error information. +type CreateACLResult struct { + // Error, if any, of result. Check with `Error.Code() != ErrNoError`. + Error Error +} + +// DescribeACLsResult provides describe ACLs result or error information. +type DescribeACLsResult struct { + // Slice of ACL bindings matching the provided filter + ACLBindings ACLBindings + // Error, if any, of result. Check with `Error.Code() != ErrNoError`. + Error Error +} + +// DeleteACLsResult provides delete ACLs result or error information. +type DeleteACLsResult = DescribeACLsResult + // waitResult waits for a result event on cQueue or the ctx to be cancelled, whichever happens // first. // The returned result event is checked for errors its error is returned if set. @@ -950,6 +1190,353 @@ func (a *AdminClient) SetOAuthBearerTokenFailure(errstr string) error { return a.handle.setOAuthBearerTokenFailure(errstr) } +// aclBindingToC converts a Go ACLBinding struct to a C rd_kafka_AclBinding_t +func (a *AdminClient) aclBindingToC(aclBinding *ACLBinding, cErrstr *C.char, cErrstrSize C.size_t) (result *C.rd_kafka_AclBinding_t, err error) { + var cName, cPrincipal, cHost *C.char + cName, cPrincipal, cHost = nil, nil, nil + if len(aclBinding.Name) > 0 { + cName = C.CString(aclBinding.Name) + defer C.free(unsafe.Pointer(cName)) + } + if len(aclBinding.Principal) > 0 { + cPrincipal = C.CString(aclBinding.Principal) + defer C.free(unsafe.Pointer(cPrincipal)) + } + if len(aclBinding.Host) > 0 { + cHost = C.CString(aclBinding.Host) + defer C.free(unsafe.Pointer(cHost)) + } + + result = C.rd_kafka_AclBinding_new( + C.rd_kafka_ResourceType_t(aclBinding.Type), + cName, + C.rd_kafka_ResourcePatternType_t(aclBinding.ResourcePatternType), + cPrincipal, + cHost, + C.rd_kafka_AclOperation_t(aclBinding.Operation), + C.rd_kafka_AclPermissionType_t(aclBinding.PermissionType), + cErrstr, + cErrstrSize, + ) + if result == nil { + err = newErrorFromString(ErrInvalidArg, + fmt.Sprintf("Invalid arguments for ACL binding %v: %v", aclBinding, C.GoString(cErrstr))) + } + return +} + +// aclBindingFilterToC converts a Go ACLBindingFilter struct to a C rd_kafka_AclBindingFilter_t +func (a *AdminClient) aclBindingFilterToC(aclBindingFilter *ACLBindingFilter, cErrstr *C.char, cErrstrSize C.size_t) (result *C.rd_kafka_AclBindingFilter_t, err error) { + var cName, cPrincipal, cHost *C.char + cName, cPrincipal, cHost = nil, nil, nil + if len(aclBindingFilter.Name) > 0 { + cName = C.CString(aclBindingFilter.Name) + defer C.free(unsafe.Pointer(cName)) + } + if len(aclBindingFilter.Principal) > 0 { + cPrincipal = C.CString(aclBindingFilter.Principal) + defer C.free(unsafe.Pointer(cPrincipal)) + } + if len(aclBindingFilter.Host) > 0 { + cHost = C.CString(aclBindingFilter.Host) + defer C.free(unsafe.Pointer(cHost)) + } + + result = C.rd_kafka_AclBindingFilter_new( + C.rd_kafka_ResourceType_t(aclBindingFilter.Type), + cName, + C.rd_kafka_ResourcePatternType_t(aclBindingFilter.ResourcePatternType), + cPrincipal, + cHost, + C.rd_kafka_AclOperation_t(aclBindingFilter.Operation), + C.rd_kafka_AclPermissionType_t(aclBindingFilter.PermissionType), + cErrstr, + cErrstrSize, + ) + if result == nil { + err = newErrorFromString(ErrInvalidArg, + fmt.Sprintf("Invalid arguments for ACL binding filter %v: %v", aclBindingFilter, C.GoString(cErrstr))) + } + return +} + +// cToACLBinding converts a C rd_kafka_AclBinding_t to Go ACLBinding +func (a *AdminClient) cToACLBinding(cACLBinding *C.rd_kafka_AclBinding_t) ACLBinding { + return ACLBinding{ + ResourceType(C.rd_kafka_AclBinding_restype(cACLBinding)), + C.GoString(C.rd_kafka_AclBinding_name(cACLBinding)), + ResourcePatternType(C.rd_kafka_AclBinding_resource_pattern_type(cACLBinding)), + C.GoString(C.rd_kafka_AclBinding_principal(cACLBinding)), + C.GoString(C.rd_kafka_AclBinding_host(cACLBinding)), + ACLOperation(C.rd_kafka_AclBinding_operation(cACLBinding)), + ACLPermissionType(C.rd_kafka_AclBinding_permission_type(cACLBinding)), + } +} + +// cToACLBindings converts a C rd_kafka_AclBinding_t list to Go ACLBindings +func (a *AdminClient) cToACLBindings(cACLBindings **C.rd_kafka_AclBinding_t, aclCnt C.size_t) (result ACLBindings) { + result = make(ACLBindings, aclCnt) + for i := uint(0); i < uint(aclCnt); i++ { + cACLBinding := C.AclBinding_by_idx(cACLBindings, aclCnt, C.size_t(i)) + if cACLBinding == nil { + panic("AclBinding_by_idx must not return nil") + } + result[i] = a.cToACLBinding(cACLBinding) + } + return +} + +// cToCreateACLResults converts a C acl_result_t array to Go CreateACLResult list. +func (a *AdminClient) cToCreateACLResults(cCreateAclsRes **C.rd_kafka_acl_result_t, aclCnt C.size_t) (result []CreateACLResult, err error) { + result = make([]CreateACLResult, uint(aclCnt)) + + for i := uint(0); i < uint(aclCnt); i++ { + cCreateACLRes := C.acl_result_by_idx(cCreateAclsRes, aclCnt, C.size_t(i)) + if cCreateACLRes != nil { + cCreateACLError := C.rd_kafka_acl_result_error(cCreateACLRes) + result[i].Error = newErrorFromCError(cCreateACLError) + } + } + + return result, nil +} + +// cToDescribeACLsResult converts a C rd_kafka_event_t to a Go DescribeAclsResult struct. +func (a *AdminClient) cToDescribeACLsResult(rkev *C.rd_kafka_event_t) (result *DescribeACLsResult) { + result = &DescribeACLsResult{} + err := C.rd_kafka_event_error(rkev) + errCode := ErrorCode(err) + errStr := C.rd_kafka_event_error_string(rkev) + + var cResultACLsCount C.size_t + cResult := C.rd_kafka_event_DescribeAcls_result(rkev) + cResultACLs := C.rd_kafka_DescribeAcls_result_acls(cResult, &cResultACLsCount) + if errCode != ErrNoError { + result.Error = newErrorFromCString(err, errStr) + } + result.ACLBindings = a.cToACLBindings(cResultACLs, cResultACLsCount) + return +} + +// cToDeleteACLsResults converts a C rd_kafka_DeleteAcls_result_response_t array to Go DeleteAclsResult slice. +func (a *AdminClient) cToDeleteACLsResults(cDeleteACLsResResponse **C.rd_kafka_DeleteAcls_result_response_t, resResponseCnt C.size_t) (result []DeleteACLsResult) { + result = make([]DeleteACLsResult, uint(resResponseCnt)) + + for i := uint(0); i < uint(resResponseCnt); i++ { + cDeleteACLsResResponse := C.DeleteAcls_result_response_by_idx(cDeleteACLsResResponse, resResponseCnt, C.size_t(i)) + if cDeleteACLsResResponse == nil { + panic("DeleteAcls_result_response_by_idx must not return nil") + } + + cDeleteACLsError := C.rd_kafka_DeleteAcls_result_response_error(cDeleteACLsResResponse) + result[i].Error = newErrorFromCError(cDeleteACLsError) + + var cMatchingACLsCount C.size_t + cMatchingACLs := C.rd_kafka_DeleteAcls_result_response_matching_acls( + cDeleteACLsResResponse, &cMatchingACLsCount) + + result[i].ACLBindings = a.cToACLBindings(cMatchingACLs, cMatchingACLsCount) + } + return +} + +// CreateACLs creates one or more ACL bindings. +// +// Parameters: +// * `ctx` - context with the maximum amount of time to block, or nil for indefinite. +// * `aclBindings` - A slice of ACL binding specifications to create. +// * `options` - Create ACLs options +// +// Returns a slice of CreateACLResult with a ErrNoError ErrorCode when the operation was successful +// plus an error that is not nil for client level errors +func (a *AdminClient) CreateACLs(ctx context.Context, aclBindings ACLBindings, options ...CreateACLsAdminOption) (result []CreateACLResult, err error) { + if aclBindings == nil { + return nil, newErrorFromString(ErrInvalidArg, + "Expected non-nil slice of ACLBinding structs") + } + if len(aclBindings) == 0 { + return nil, newErrorFromString(ErrInvalidArg, + "Expected non-empty slice of ACLBinding structs") + } + + cErrstrSize := C.size_t(512) + cErrstr := (*C.char)(C.malloc(cErrstrSize)) + defer C.free(unsafe.Pointer(cErrstr)) + + cACLBindings := make([]*C.rd_kafka_AclBinding_t, len(aclBindings)) + + for i, aclBinding := range aclBindings { + cACLBindings[i], err = a.aclBindingToC(&aclBinding, cErrstr, cErrstrSize) + if err != nil { + return + } + defer C.rd_kafka_AclBinding_destroy(cACLBindings[i]) + } + + // Convert Go AdminOptions (if any) to C AdminOptions + genericOptions := make([]AdminOption, len(options)) + for i := range options { + genericOptions[i] = options[i] + } + cOptions, err := adminOptionsSetup(a.handle, C.RD_KAFKA_ADMIN_OP_CREATEACLS, genericOptions) + if err != nil { + return nil, err + } + + // Create temporary queue for async operation + cQueue := C.rd_kafka_queue_new(a.handle.rk) + defer C.rd_kafka_queue_destroy(cQueue) + + // Asynchronous call + C.rd_kafka_CreateAcls( + a.handle.rk, + (**C.rd_kafka_AclBinding_t)(&cACLBindings[0]), + C.size_t(len(cACLBindings)), + cOptions, + cQueue) + + // Wait for result, error or context timeout + rkev, err := a.waitResult(ctx, cQueue, C.RD_KAFKA_EVENT_CREATEACLS_RESULT) + if err != nil { + return nil, err + } + defer C.rd_kafka_event_destroy(rkev) + + var cResultCnt C.size_t + cResult := C.rd_kafka_event_CreateAcls_result(rkev) + aclResults := C.rd_kafka_CreateAcls_result_acls(cResult, &cResultCnt) + result, err = a.cToCreateACLResults(aclResults, cResultCnt) + return +} + +// DescribeACLs matches ACL bindings by filter. +// +// Parameters: +// * `ctx` - context with the maximum amount of time to block, or nil for indefinite. +// * `aclBindingFilter` - A filter with attributes that must match. +// string attributes match exact values or any string if set to empty string. +// Enum attributes match exact values or any value if ending with `Any`. +// If `ResourcePatternType` is set to `ResourcePatternTypeMatch` returns all +// the ACL bindings with `ResourcePatternTypeLiteral`, `ResourcePatternTypeWildcard` +// or `ResourcePatternTypePrefixed` pattern type that match the resource name. +// * `options` - Describe ACLs options +// +// Returns a slice of ACLBindings when the operation was successful +// plus an error that is not `nil` for client level errors +func (a *AdminClient) DescribeACLs(ctx context.Context, aclBindingFilter ACLBindingFilter, options ...DescribeACLsAdminOption) (result *DescribeACLsResult, err error) { + + cErrstrSize := C.size_t(512) + cErrstr := (*C.char)(C.malloc(cErrstrSize)) + defer C.free(unsafe.Pointer(cErrstr)) + + cACLBindingFilter, err := a.aclBindingFilterToC(&aclBindingFilter, cErrstr, cErrstrSize) + if err != nil { + return + } + + // Convert Go AdminOptions (if any) to C AdminOptions + genericOptions := make([]AdminOption, len(options)) + for i := range options { + genericOptions[i] = options[i] + } + cOptions, err := adminOptionsSetup(a.handle, C.RD_KAFKA_ADMIN_OP_DESCRIBEACLS, genericOptions) + if err != nil { + return nil, err + } + // Create temporary queue for async operation + cQueue := C.rd_kafka_queue_new(a.handle.rk) + defer C.rd_kafka_queue_destroy(cQueue) + + // Asynchronous call + C.rd_kafka_DescribeAcls( + a.handle.rk, + cACLBindingFilter, + cOptions, + cQueue) + + // Wait for result, error or context timeout + rkev, err := a.waitResult(ctx, cQueue, C.RD_KAFKA_EVENT_DESCRIBEACLS_RESULT) + if err != nil { + return nil, err + } + defer C.rd_kafka_event_destroy(rkev) + result = a.cToDescribeACLsResult(rkev) + return +} + +// DeleteACLs deletes ACL bindings matching one or more ACL binding filters. +// +// Parameters: +// * `ctx` - context with the maximum amount of time to block, or nil for indefinite. +// * `aclBindingFilters` - a slice of ACL binding filters to match ACLs to delete. +// string attributes match exact values or any string if set to empty string. +// Enum attributes match exact values or any value if ending with `Any`. +// If `ResourcePatternType` is set to `ResourcePatternTypeMatch` returns all +// the ACL bindings with `ResourcePatternTypeLiteral`, `ResourcePatternTypeWildcard` +// or `ResourcePatternTypePrefixed` pattern type that match the resource name. +// * `options` - Delete ACLs options +// +// Returns a slice of ACLBinding for each filter when the operation was successful +// plus an error that is not `nil` for client level errors +func (a *AdminClient) DeleteACLs(ctx context.Context, aclBindingFilters ACLBindingFilters, options ...DeleteACLsAdminOption) (result []DeleteACLsResult, err error) { + if aclBindingFilters == nil { + return nil, newErrorFromString(ErrInvalidArg, + "Expected non-nil slice of ACLBindingFilter structs") + } + if len(aclBindingFilters) == 0 { + return nil, newErrorFromString(ErrInvalidArg, + "Expected non-empty slice of ACLBindingFilter structs") + } + + cErrstrSize := C.size_t(512) + cErrstr := (*C.char)(C.malloc(cErrstrSize)) + defer C.free(unsafe.Pointer(cErrstr)) + + cACLBindingFilters := make([]*C.rd_kafka_AclBindingFilter_t, len(aclBindingFilters)) + + for i, aclBindingFilter := range aclBindingFilters { + cACLBindingFilters[i], err = a.aclBindingFilterToC(&aclBindingFilter, cErrstr, cErrstrSize) + if err != nil { + return + } + defer C.rd_kafka_AclBinding_destroy(cACLBindingFilters[i]) + } + + // Convert Go AdminOptions (if any) to C AdminOptions + genericOptions := make([]AdminOption, len(options)) + for i := range options { + genericOptions[i] = options[i] + } + cOptions, err := adminOptionsSetup(a.handle, C.RD_KAFKA_ADMIN_OP_DELETEACLS, genericOptions) + if err != nil { + return nil, err + } + // Create temporary queue for async operation + cQueue := C.rd_kafka_queue_new(a.handle.rk) + defer C.rd_kafka_queue_destroy(cQueue) + + // Asynchronous call + C.rd_kafka_DeleteAcls( + a.handle.rk, + (**C.rd_kafka_AclBindingFilter_t)(&cACLBindingFilters[0]), + C.size_t(len(cACLBindingFilters)), + cOptions, + cQueue) + + // Wait for result, error or context timeout + rkev, err := a.waitResult(ctx, cQueue, C.RD_KAFKA_EVENT_DELETEACLS_RESULT) + if err != nil { + return nil, err + } + defer C.rd_kafka_event_destroy(rkev) + + var cResultResponsesCount C.size_t + cResult := C.rd_kafka_event_DeleteAcls_result(rkev) + cResultResponses := C.rd_kafka_DeleteAcls_result_responses(cResult, &cResultResponsesCount) + result = a.cToDeleteACLsResults(cResultResponses, cResultResponsesCount) + return +} + // Close an AdminClient instance. func (a *AdminClient) Close() { if a.isDerived { diff --git a/kafka/adminapi_test.go b/kafka/adminapi_test.go index 50660869e..54b878ee0 100644 --- a/kafka/adminapi_test.go +++ b/kafka/adminapi_test.go @@ -76,6 +76,350 @@ func TestAdminAPIWithDefaultValue(t *testing.T) { adminClient.Close() } +func testAdminAPIsCreateACLs(what string, a *AdminClient, t *testing.T) { + var res []CreateACLResult + var err error + var ctx context.Context + var cancel context.CancelFunc + var expDuration time.Duration + var expDurationLonger time.Duration + var expError string + var invalidTests []ACLBindings + + checkFail := func(res []CreateACLResult, err error) { + if res != nil || err == nil { + t.Fatalf("Expected CreateACLs to fail, but got result: %v, err: %v", res, err) + } + } + + testACLBindings := ACLBindings{ + { + Type: ResourceTopic, + Name: "mytopic", + ResourcePatternType: ResourcePatternTypeLiteral, + Principal: "User:myuser", + Host: "*", + Operation: ACLOperationAll, + PermissionType: ACLPermissionTypeAllow, + }, + } + + copyACLBindings := func() ACLBindings { + return append(ACLBindings{}, testACLBindings...) + } + + t.Logf("AdminClient API - ACLs testing on %s: %s", a, what) + expDuration, err = time.ParseDuration("0.1s") + if err != nil { + t.Fatalf("%s", err) + } + + // nil aclBindings + res, err = a.CreateACLs(ctx, nil) + checkFail(res, err) + expError = "Expected non-nil slice of ACLBinding structs" + if err.Error() != expError { + t.Fatalf("Expected error \"%s\", received: \"%v\"", expError, err.Error()) + } + + // empty aclBindings + res, err = a.CreateACLs(ctx, ACLBindings{}) + checkFail(res, err) + expError = "Expected non-empty slice of ACLBinding structs" + if err.Error() != expError { + t.Fatalf("Expected error \"%s\", received: \"%v\"", expError, err.Error()) + } + + // Correct input, fail with timeout + ctx, cancel = context.WithTimeout(context.Background(), expDuration) + defer cancel() + + res, err = a.CreateACLs(ctx, testACLBindings) + checkFail(res, err) + if ctx.Err() != context.DeadlineExceeded { + t.Fatalf("Expected DeadlineExceeded, not %v, %v", ctx.Err(), err) + } + + // request timeout comes before context deadline + expDurationLonger, err = time.ParseDuration("0.2s") + if err != nil { + t.Fatalf("%s", err) + } + + ctx, cancel = context.WithTimeout(context.Background(), expDurationLonger) + defer cancel() + + res, err = a.CreateACLs(ctx, testACLBindings, SetAdminRequestTimeout(expDuration)) + checkFail(res, err) + expError = "Failed while waiting for controller: Local: Timed out" + if err.Error() != expError { + t.Fatalf("Expected error \"%s\", received: \"%v\"", expError, err.Error()) + } + + // Invalid ACL bindings + invalidTests = []ACLBindings{copyACLBindings(), copyACLBindings()} + invalidTests[0][0].Type = ResourceUnknown + invalidTests[1][0].Type = ResourceAny + expError = ": Invalid resource type" + for _, invalidACLBindings := range invalidTests { + res, err = a.CreateACLs(ctx, invalidACLBindings) + checkFail(res, err) + if !strings.HasSuffix(err.Error(), expError) { + t.Fatalf("Expected an error ending with \"%s\", received: \"%s\"", expError, err.Error()) + } + } + + suffixes := []string{ + ": Invalid resource pattern type", + ": Invalid resource pattern type", + ": Invalid resource pattern type", + ": Invalid operation", + ": Invalid operation", + ": Invalid permission type", + ": Invalid permission type", + ": Invalid resource name", + ": Invalid principal", + ": Invalid host", + } + nInvalidTests := len(suffixes) + invalidTests = make([]ACLBindings, nInvalidTests) + for i := 0; i < nInvalidTests; i++ { + invalidTests[i] = copyACLBindings() + } + invalidTests[0][0].ResourcePatternType = ResourcePatternTypeUnknown + invalidTests[1][0].ResourcePatternType = ResourcePatternTypeMatch + invalidTests[2][0].ResourcePatternType = ResourcePatternTypeAny + invalidTests[3][0].Operation = ACLOperationUnknown + invalidTests[4][0].Operation = ACLOperationAny + invalidTests[5][0].PermissionType = ACLPermissionTypeUnknown + invalidTests[6][0].PermissionType = ACLPermissionTypeAny + invalidTests[7][0].Name = "" + invalidTests[8][0].Principal = "" + invalidTests[9][0].Host = "" + + for i, invalidACLBindings := range invalidTests { + res, err = a.CreateACLs(ctx, invalidACLBindings) + checkFail(res, err) + if !strings.HasSuffix(err.Error(), suffixes[i]) { + t.Fatalf("Expected an error ending with \"%s\", received: \"%s\"", suffixes[i], err.Error()) + } + } +} + +func testAdminAPIsDescribeACLs(what string, a *AdminClient, t *testing.T) { + var res *DescribeACLsResult + var err error + var ctx context.Context + var cancel context.CancelFunc + var expDuration time.Duration + var expDurationLonger time.Duration + var expError string + + checkFail := func(res *DescribeACLsResult, err error) { + if res != nil || err == nil { + t.Fatalf("Expected DescribeACLs to fail, but got result: %v, err: %v", res, err) + } + } + + aclBindingsFilter := ACLBindingFilter{ + Type: ResourceTopic, + ResourcePatternType: ResourcePatternTypeLiteral, + Operation: ACLOperationAll, + PermissionType: ACLPermissionTypeAllow, + } + + t.Logf("AdminClient API - ACLs testing on %s: %s", a, what) + expDuration, err = time.ParseDuration("0.1s") + if err != nil { + t.Fatalf("%s", err) + } + + // Correct input, fail with timeout + ctx, cancel = context.WithTimeout(context.Background(), expDuration) + defer cancel() + + res, err = a.DescribeACLs(ctx, aclBindingsFilter) + checkFail(res, err) + if ctx.Err() != context.DeadlineExceeded { + t.Fatalf("Expected DeadlineExceeded, not %v, %v", ctx.Err(), err) + } + + // request timeout comes before context deadline + expDurationLonger, err = time.ParseDuration("0.2s") + if err != nil { + t.Fatalf("%s", err) + } + + ctx, cancel = context.WithTimeout(context.Background(), expDurationLonger) + defer cancel() + + res, err = a.DescribeACLs(ctx, aclBindingsFilter, SetAdminRequestTimeout(expDuration)) + checkFail(res, err) + expError = "Failed while waiting for controller: Local: Timed out" + if err.Error() != expError { + t.Fatalf("Expected error \"%s\", received: \"%v\"", expError, err.Error()) + } + + // Invalid ACL binding filters + suffixes := []string{ + ": Invalid resource pattern type", + ": Invalid operation", + ": Invalid permission type", + } + nInvalidTests := len(suffixes) + invalidTests := make(ACLBindingFilters, nInvalidTests) + for i := 0; i < nInvalidTests; i++ { + invalidTests[i] = aclBindingsFilter + } + invalidTests[0].ResourcePatternType = ResourcePatternTypeUnknown + invalidTests[1].Operation = ACLOperationUnknown + invalidTests[2].PermissionType = ACLPermissionTypeUnknown + + for i, invalidACLBindingFilter := range invalidTests { + res, err = a.DescribeACLs(ctx, invalidACLBindingFilter) + checkFail(res, err) + if !strings.HasSuffix(err.Error(), suffixes[i]) { + t.Fatalf("Expected an error ending with \"%s\", received: \"%s\"", suffixes[i], err.Error()) + } + } + + // ACL binding filters are valid with empty strings, + // matching any value + validTests := [3]ACLBindingFilter{} + for i := 0; i < len(validTests); i++ { + validTests[i] = aclBindingsFilter + } + validTests[0].Name = "" + validTests[1].Principal = "" + validTests[2].Host = "" + + for _, validACLBindingFilter := range validTests { + res, err = a.DescribeACLs(ctx, validACLBindingFilter) + checkFail(res, err) + if ctx.Err() != context.DeadlineExceeded { + t.Fatalf("Expected DeadlineExceeded, not %v, %v", ctx.Err(), err) + } + } +} + +func testAdminAPIsDeleteACLs(what string, a *AdminClient, t *testing.T) { + var res []DeleteACLsResult + var err error + var ctx context.Context + var cancel context.CancelFunc + var expDuration time.Duration + var expDurationLonger time.Duration + var expError string + + checkFail := func(res []DeleteACLsResult, err error) { + if res != nil || err == nil { + t.Fatalf("Expected DeleteACL to fail, but got result: %v, err: %v", res, err) + } + } + + aclBindingsFilters := ACLBindingFilters{ + { + Type: ResourceTopic, + ResourcePatternType: ResourcePatternTypeLiteral, + Operation: ACLOperationAll, + PermissionType: ACLPermissionTypeAllow, + }, + } + + copyACLBindingFilters := func() ACLBindingFilters { + return append(ACLBindingFilters{}, aclBindingsFilters...) + } + + t.Logf("AdminClient API - ACLs testing on %s: %s", a, what) + expDuration, err = time.ParseDuration("0.1s") + if err != nil { + t.Fatalf("%s", err) + } + + // nil aclBindingFilters + res, err = a.DeleteACLs(ctx, nil) + checkFail(res, err) + expError = "Expected non-nil slice of ACLBindingFilter structs" + if err.Error() != expError { + t.Fatalf("Expected error \"%s\", received: \"%v\"", expError, err.Error()) + } + + // empty aclBindingFilters + res, err = a.DeleteACLs(ctx, ACLBindingFilters{}) + checkFail(res, err) + expError = "Expected non-empty slice of ACLBindingFilter structs" + if err.Error() != expError { + t.Fatalf("Expected error \"%s\", received: \"%v\"", expError, err.Error()) + } + + // Correct input, fail with timeout + ctx, cancel = context.WithTimeout(context.Background(), expDuration) + defer cancel() + + res, err = a.DeleteACLs(ctx, aclBindingsFilters) + checkFail(res, err) + if ctx.Err() != context.DeadlineExceeded { + t.Fatalf("Expected DeadlineExceeded, not %v, %v", ctx.Err(), err) + } + + // request timeout comes before context deadline + expDurationLonger, err = time.ParseDuration("0.2s") + if err != nil { + t.Fatalf("%s", err) + } + + ctx, cancel = context.WithTimeout(context.Background(), expDurationLonger) + defer cancel() + + res, err = a.DeleteACLs(ctx, aclBindingsFilters, SetAdminRequestTimeout(expDuration)) + checkFail(res, err) + expError = "Failed while waiting for controller: Local: Timed out" + if err.Error() != expError { + t.Fatalf("Expected error \"%s\", received: \"%v\"", expError, err.Error()) + } + + // Invalid ACL binding filters + suffixes := []string{ + ": Invalid resource pattern type", + ": Invalid operation", + ": Invalid permission type", + } + nInvalidTests := len(suffixes) + invalidTests := make([]ACLBindingFilters, nInvalidTests) + for i := 0; i < nInvalidTests; i++ { + invalidTests[i] = copyACLBindingFilters() + } + invalidTests[0][0].ResourcePatternType = ResourcePatternTypeUnknown + invalidTests[1][0].Operation = ACLOperationUnknown + invalidTests[2][0].PermissionType = ACLPermissionTypeUnknown + + for i, invalidACLBindingFilters := range invalidTests { + res, err = a.DeleteACLs(ctx, invalidACLBindingFilters) + checkFail(res, err) + if !strings.HasSuffix(err.Error(), suffixes[i]) { + t.Fatalf("Expected an error ending with \"%s\", received: \"%s\"", suffixes[i], err.Error()) + } + } + + // ACL binding filters are valid with empty strings, + // matching any value + validTests := [3]ACLBindingFilters{} + for i := 0; i < len(validTests); i++ { + validTests[i] = copyACLBindingFilters() + } + validTests[0][0].Name = "" + validTests[1][0].Principal = "" + validTests[2][0].Host = "" + + for _, validACLBindingFilters := range validTests { + res, err = a.DeleteACLs(ctx, validACLBindingFilters) + checkFail(res, err) + if ctx.Err() != context.DeadlineExceeded { + t.Fatalf("Expected DeadlineExceeded, not %v, %v", ctx.Err(), err) + } + } +} + func testAdminAPIs(what string, a *AdminClient, t *testing.T) { t.Logf("AdminClient API testing on %s: %s", a, what) @@ -301,6 +645,10 @@ func testAdminAPIs(what string, a *AdminClient, t *testing.T) { if ctx.Err() != context.DeadlineExceeded || err != context.DeadlineExceeded { t.Fatalf("Expected DeadlineExceeded, not %v", ctx.Err()) } + + testAdminAPIsCreateACLs(what, a, t) + testAdminAPIsDescribeACLs(what, a, t) + testAdminAPIsDeleteACLs(what, a, t) } // TestAdminAPIs dry-tests most Admin APIs, no broker is needed. diff --git a/kafka/adminoptions.go b/kafka/adminoptions.go index 6d7022ad9..8c1bc81ff 100644 --- a/kafka/adminoptions.go +++ b/kafka/adminoptions.go @@ -166,6 +166,15 @@ func (ao AdminOptionValidateOnly) supportsCreatePartitions() { func (ao AdminOptionValidateOnly) supportsAlterConfigs() { } +func (ao AdminOptionRequestTimeout) supportsCreateACLs() { +} + +func (ao AdminOptionRequestTimeout) supportsDescribeACLs() { +} + +func (ao AdminOptionRequestTimeout) supportsDeleteACLs() { +} + func (ao AdminOptionValidateOnly) apply(cOptions *C.rd_kafka_AdminOptions_t) error { if !ao.isSet { return nil @@ -240,6 +249,30 @@ type DescribeConfigsAdminOption interface { apply(cOptions *C.rd_kafka_AdminOptions_t) error } +// CreateACLsAdminOption - see setter. +// +// See SetAdminRequestTimeout +type CreateACLsAdminOption interface { + supportsCreateACLs() + apply(cOptions *C.rd_kafka_AdminOptions_t) error +} + +// DescribeACLsAdminOption - see setter. +// +// See SetAdminRequestTimeout +type DescribeACLsAdminOption interface { + supportsDescribeACLs() + apply(cOptions *C.rd_kafka_AdminOptions_t) error +} + +// DeleteACLsAdminOption - see setter. +// +// See SetAdminRequestTimeout +type DeleteACLsAdminOption interface { + supportsDeleteACLs() + apply(cOptions *C.rd_kafka_AdminOptions_t) error +} + // AdminOption is a generic type not to be used directly. // // See CreateTopicsAdminOption et.al. diff --git a/kafka/api.html b/kafka/api.html index c53cbd87b..18d4ec0f1 100644 --- a/kafka/api.html +++ b/kafka/api.html @@ -387,6 +387,71 @@

func WriteErrorCodes(f *os.File) +
+ + type ACLBinding + +
+
+ + type ACLBindingFilter + +
+
+ + type ACLBindingFilters + +
+
+ + type ACLBindings + +
+
+ + func (a ACLBindings) Len() int + +
+
+ + func (a ACLBindings) Less(i, j int) bool + +
+
+ + func (a ACLBindings) Swap(i, j int) + +
+
+ + type ACLOperation + +
+
+ + func ACLOperationFromString(aclOperationString string) (ACLOperation, error) + +
+
+ + func (o ACLOperation) String() string + +
+
+ + type ACLPermissionType + +
+
+ + func ACLPermissionTypeFromString(aclPermissionTypeString string) (ACLPermissionType, error) + +
+
+ + func (o ACLPermissionType) String() string + +
type AdminClient @@ -427,6 +492,11 @@

func (a *AdminClient) ControllerID(ctx context.Context) (controllerID int32, err error)

+
+ + func (a *AdminClient) CreateACLs(ctx context.Context, aclBindings ACLBindings, options ...CreateACLsAdminOption) (result []CreateACLResult, err error) + +
func (a *AdminClient) CreatePartitions(ctx context.Context, partitions []PartitionsSpecification, options ...CreatePartitionsAdminOption) (result []TopicResult, err error) @@ -437,11 +507,21 @@

func (a *AdminClient) CreateTopics(ctx context.Context, topics []TopicSpecification, options ...CreateTopicsAdminOption) (result []TopicResult, err error)

+
+ + func (a *AdminClient) DeleteACLs(ctx context.Context, aclBindingFilters ACLBindingFilters, options ...DeleteACLsAdminOption) (result []DeleteACLsResult, err error) + +
func (a *AdminClient) DeleteTopics(ctx context.Context, topics []string, options ...DeleteTopicsAdminOption) (result []TopicResult, err error)
+
+ + func (a *AdminClient) DescribeACLs(ctx context.Context, aclBindingFilter ACLBindingFilter, options ...DescribeACLsAdminOption) (result *DescribeACLsResult, err error) + +
func (a *AdminClient) DescribeConfigs(ctx context.Context, resources []ConfigResource, options ...DescribeConfigsAdminOption) (result []ConfigResourceResult, err error) @@ -802,6 +882,16 @@

func NewTestConsumerGroupMetadata(groupID string) (*ConsumerGroupMetadata, error)

+
+ + type CreateACLResult + +
+
+ + type CreateACLsAdminOption + +
type CreatePartitionsAdminOption @@ -812,11 +902,31 @@

type CreateTopicsAdminOption

+
+ + type DeleteACLsAdminOption + +
+
+ + type DeleteACLsResult + +
type DeleteTopicsAdminOption
+
+ + type DescribeACLsAdminOption + +
+
+ + type DescribeACLsResult + +
type DescribeConfigsAdminOption @@ -1127,6 +1237,21 @@

type RebalanceCb

+
+ + type ResourcePatternType + +
+
+ + func ResourcePatternTypeFromString(patternTypeString string) (ResourcePatternType, error) + +
+
+ + func (t ResourcePatternType) String() string + +
type ResourceType @@ -1333,6 +1458,56 @@

ConfigSourceStaticBroker = ConfigSource(C.RD_KAFKA_CONFIG_SOURCE_STATIC_BROKER_CONFIG) // ConfigSourceDefault is built-in default configuration for configs that have a default value ConfigSourceDefault = ConfigSource(C.RD_KAFKA_CONFIG_SOURCE_DEFAULT_CONFIG) +) +
const (
+    // ResourcePatternTypeUnknown is a resource pattern type not known or not set.
+    ResourcePatternTypeUnknown = ResourcePatternType(C.RD_KAFKA_RESOURCE_PATTERN_UNKNOWN)
+    // ResourcePatternTypeAny matches any resource, used for lookups.
+    ResourcePatternTypeAny = ResourcePatternType(C.RD_KAFKA_RESOURCE_PATTERN_ANY)
+    // ResourcePatternTypeMatch will perform pattern matching
+    ResourcePatternTypeMatch = ResourcePatternType(C.RD_KAFKA_RESOURCE_PATTERN_MATCH)
+    // ResourcePatternTypeLiteral matches a literal resource name
+    ResourcePatternTypeLiteral = ResourcePatternType(C.RD_KAFKA_RESOURCE_PATTERN_LITERAL)
+    // ResourcePatternTypePrefixed matches a prefixed resource name
+    ResourcePatternTypePrefixed = ResourcePatternType(C.RD_KAFKA_RESOURCE_PATTERN_PREFIXED)
+)
+
const (
+    // ACLOperationUnknown represents an unknown or unset operation
+    ACLOperationUnknown = ACLOperation(C.RD_KAFKA_ACL_OPERATION_UNKNOWN)
+    // ACLOperationAny in a filter, matches any ACLOperation
+    ACLOperationAny = ACLOperation(C.RD_KAFKA_ACL_OPERATION_ANY)
+    // ACLOperationAll represents all the operations
+    ACLOperationAll = ACLOperation(C.RD_KAFKA_ACL_OPERATION_ALL)
+    // ACLOperationRead a read operation
+    ACLOperationRead = ACLOperation(C.RD_KAFKA_ACL_OPERATION_READ)
+    // ACLOperationWrite represents a write operation
+    ACLOperationWrite = ACLOperation(C.RD_KAFKA_ACL_OPERATION_WRITE)
+    // ACLOperationCreate represents a create operation
+    ACLOperationCreate = ACLOperation(C.RD_KAFKA_ACL_OPERATION_CREATE)
+    // ACLOperationDelete represents a delete operation
+    ACLOperationDelete = ACLOperation(C.RD_KAFKA_ACL_OPERATION_DELETE)
+    // ACLOperationAlter represents an alter operation
+    ACLOperationAlter = ACLOperation(C.RD_KAFKA_ACL_OPERATION_ALTER)
+    // ACLOperationDescribe represents a describe operation
+    ACLOperationDescribe = ACLOperation(C.RD_KAFKA_ACL_OPERATION_DESCRIBE)
+    // ACLOperationClusterAction represents a cluster action operation
+    ACLOperationClusterAction = ACLOperation(C.RD_KAFKA_ACL_OPERATION_CLUSTER_ACTION)
+    // ACLOperationDescribeConfigs represents a describe configs operation
+    ACLOperationDescribeConfigs = ACLOperation(C.RD_KAFKA_ACL_OPERATION_DESCRIBE_CONFIGS)
+    // ACLOperationAlterConfigs represents an alter configs operation
+    ACLOperationAlterConfigs = ACLOperation(C.RD_KAFKA_ACL_OPERATION_ALTER_CONFIGS)
+    // ACLOperationIdempotentWrite represents an idempotent write operation
+    ACLOperationIdempotentWrite = ACLOperation(C.RD_KAFKA_ACL_OPERATION_IDEMPOTENT_WRITE)
+)
+
const (
+    // ACLPermissionTypeUnknown represents an unknown ACLPermissionType
+    ACLPermissionTypeUnknown = ACLPermissionType(C.RD_KAFKA_ACL_PERMISSION_TYPE_UNKNOWN)
+    // ACLPermissionTypeAny in a filter, matches any ACLPermissionType
+    ACLPermissionTypeAny = ACLPermissionType(C.RD_KAFKA_ACL_PERMISSION_TYPE_ANY)
+    // ACLPermissionTypeDeny disallows access
+    ACLPermissionTypeDeny = ACLPermissionType(C.RD_KAFKA_ACL_PERMISSION_TYPE_DENY)
+    // ACLPermissionTypeAllow grants access
+    ACLPermissionTypeAllow = ACLPermissionType(C.RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW)
 )
const (
     // TimestampNotAvailable indicates no timestamp was set, or not available due to lacking broker support
@@ -1414,9 +1589,187 @@ 

librdkafka error codes. This function is not intended for public use.

+

+ type + + ACLBinding + + +

+

+ ACLBinding specifies the operation and permission type for a specific principal +over one or more resources of the same type. Used by `AdminClient.CreateACLs`, +returned by `AdminClient.DescribeACLs` and `AdminClient.DeleteACLs`. +

+
type ACLBinding struct {
+    Type ResourceType // The resource type.
+    // The resource name, which depends on the resource type.
+    // For ResourceBroker the resource name is the broker id.
+    Name                string
+    ResourcePatternType ResourcePatternType // The resource pattern, relative to the name.
+    Principal           string              // The principal this ACLBinding refers to.
+    Host                string              // The host that the call is allowed to come from.
+    Operation           ACLOperation        // The operation/s specified by this binding.
+    PermissionType      ACLPermissionType   // The permission type for the specified operation.
+}
+
+

+ type + + ACLBindingFilter + + +

+

+ ACLBindingFilter specifies a filter used to return a list of ACL bindings matching some or all of its attributes. +Used by `AdminClient.DescribeACLs` and `AdminClient.DeleteACLs`. +

+
type ACLBindingFilter = ACLBinding
+

+ type + + ACLBindingFilters + + +

+

+ ACLBindingFilters is a slice of ACLBindingFilter that also implements +the sort interface +

+
type ACLBindingFilters []ACLBindingFilter
+

+ type + + ACLBindings + + +

+

+ ACLBindings is a slice of ACLBinding that also implements +the sort interface +

+
type ACLBindings []ACLBinding
+

+ func (ACLBindings) + + Len + + +

+
func (a ACLBindings) Len() int
+

+ func (ACLBindings) + + Less + + +

+
func (a ACLBindings) Less(i, j int) bool
+

+ func (ACLBindings) + + Swap + + +

+
func (a ACLBindings) Swap(i, j int)
+

+ type + + ACLOperation + + +

+

+ ACLOperation enumerates the different types of ACL operation. +

+
type ACLOperation int
+

+ func + + ACLOperationFromString + + +

+
func ACLOperationFromString(aclOperationString string) (ACLOperation, error)
+

+ ACLOperationFromString translates a ACL operation name to +a ACLOperation value. +

+

+ func (ACLOperation) + + String + + +

+
func (o ACLOperation) String() string
+

+ String returns the human-readable representation of an ACLOperation +

+

+ type + + ACLPermissionType + + +

+

+ ACLPermissionType enumerates the different types of ACL permission types. +

+
type ACLPermissionType int
+

+ func + + ACLPermissionTypeFromString + + +

+
func ACLPermissionTypeFromString(aclPermissionTypeString string) (ACLPermissionType, error)
+

+ ACLPermissionTypeFromString translates a ACL permission type name to +a ACLPermissionType value. +

+

+ func (ACLPermissionType) + + String + + +

+
func (o ACLPermissionType) String() string
+

+ String returns the human-readable representation of an ACLPermissionType +

type - + AdminClient

+ NewAdminClient

+ NewAdminClientFromConsumer

+ NewAdminClientFromProducer

+ AlterConfigs

+ Close

+ ClusterID

+ ControllerID

+ CreateACLs + + +

+
func (a *AdminClient) CreateACLs(ctx context.Context, aclBindings ACLBindings, options ...CreateACLsAdminOption) (result []CreateACLResult, err error)
+

+ CreateACLs creates one or more ACL bindings. +

+

+ Parameters: +

+
* `ctx` - context with the maximum amount of time to block, or nil for indefinite.
+* `aclBindings` - A slice of ACL binding specifications to create.
+* `options` - Create ACLs options
+
+

+ Returns a slice of CreateACLResult with a ErrNoError ErrorCode when the operation was successful +plus an error that is not nil for client level errors +

func (*AdminClient) - + CreatePartitions

+ CreateTopics

+ DeleteACLs + + +

+
func (a *AdminClient) DeleteACLs(ctx context.Context, aclBindingFilters ACLBindingFilters, options ...DeleteACLsAdminOption) (result []DeleteACLsResult, err error)
+

+ DeleteACLs deletes ACL bindings matching one or more ACL binding filters. +

+

+ Parameters: +

+
* `ctx` - context with the maximum amount of time to block, or nil for indefinite.
+* `aclBindingFilters` - a slice of ACL binding filters to match ACLs to delete.
+   string attributes match exact values or any string if set to empty string.
+   Enum attributes match exact values or any value if ending with `Any`.
+   If `ResourcePatternType` is set to `ResourcePatternTypeMatch` returns all
+   the ACL bindings with `ResourcePatternTypeLiteral`, `ResourcePatternTypeWildcard`
+   or `ResourcePatternTypePrefixed` pattern type that match the resource name.
+* `options` - Delete ACLs options
+
+

+ Returns a slice of ACLBinding for each filter when the operation was successful +plus an error that is not `nil` for client level errors +

func (*AdminClient) - + DeleteTopics

+ DescribeACLs + + +

+
func (a *AdminClient) DescribeACLs(ctx context.Context, aclBindingFilter ACLBindingFilter, options ...DescribeACLsAdminOption) (result *DescribeACLsResult, err error)
+

+ DescribeACLs matches ACL bindings by filter. +

+

+ Parameters: +

+
* `ctx` - context with the maximum amount of time to block, or nil for indefinite.
+* `aclBindingFilter` - A filter with attributes that must match.
+   string attributes match exact values or any string if set to empty string.
+   Enum attributes match exact values or any value if ending with `Any`.
+   If `ResourcePatternType` is set to `ResourcePatternTypeMatch` returns all
+   the ACL bindings with `ResourcePatternTypeLiteral`, `ResourcePatternTypeWildcard`
+   or `ResourcePatternTypePrefixed` pattern type that match the resource name.
+* `options` - Describe ACLs options
+
+

+ Returns a slice of ACLBindings when the operation was successful +plus an error that is not `nil` for client level errors +

func (*AdminClient) - + DescribeConfigs

+ GetMetadata

+ SetOAuthBearerToken

+ SetOAuthBearerTokenFailure

+ String

+ AdminOption

+ SetAdminValidateOnly

+ AlterConfigsAdminOption

+ AlterOperation

int

func (AlterOperation) - + String

+ ConfigEntry

+ StringMapToConfigEntries

+ String

+ ConfigEntryResult

+ String

+ ConfigResource

+ String

+ ConfigResourceResult

+ String

+ ConfigSource

int

func (ConfigSource) - + String

+ CreateACLResult + + +

+

+ CreateACLResult provides create ACL error information. +

+
type CreateACLResult struct {
+    // Error, if any, of result. Check with `Error.Code() != ErrNoError`.
+    Error Error
+}
+
+

+ type + + CreateACLsAdminOption + + +

+

+ CreateACLsAdminOption - see setter. +

+

+ See SetAdminRequestTimeout +

+
type CreateACLsAdminOption interface {
+    // contains filtered or unexported methods
+}

type - + CreatePartitionsAdminOption

+ CreateTopicsAdminOption

+ DeleteACLsAdminOption + + +

+

+ DeleteACLsAdminOption - see setter. +

+

+ See SetAdminRequestTimeout +

+
type DeleteACLsAdminOption interface {
+    // contains filtered or unexported methods
+}
+

+ type + + DeleteACLsResult + + +

+

+ DeleteACLsResult provides delete ACLs result or error information. +

+
type DeleteACLsResult = DescribeACLsResult

type - + DeleteTopicsAdminOption

+ DescribeACLsAdminOption + + +

+

+ DescribeACLsAdminOption - see setter. +

+

+ See SetAdminRequestTimeout +

+
type DescribeACLsAdminOption interface {
+    // contains filtered or unexported methods
+}
+

+ type + + DescribeACLsResult + + +

+

+ DescribeACLsResult provides describe ACLs result or error information. +

+
type DescribeACLsResult struct {
+    // Slice of ACL bindings matching the provided filter
+    ACLBindings ACLBindings
+    // Error, if any, of result. Check with `Error.Code() != ErrNoError`.
+    Error Error
+}
+

type - + DescribeConfigsAdminOption

+ Code

+ Error

+ IsFatal

+ IsRetriable

+ String

+ TxnRequiresAbort

+ ErrorCode

+ String

+ PartitionsSpecification

Consumer, Event) error
+

+ type + + ResourcePatternType + + +

+

+ ResourcePatternType enumerates the different types of Kafka resource patterns. +

+
type ResourcePatternType int
+

+ func + + ResourcePatternTypeFromString + + +

+
func ResourcePatternTypeFromString(patternTypeString string) (ResourcePatternType, error)
+

+ ResourcePatternTypeFromString translates a resource pattern type name to +a ResourcePatternType value. +

+

+ func (ResourcePatternType) + + String + + +

+
func (t ResourcePatternType) String() string
+

+ String returns the human-readable representation of a ResourcePatternType +

type - + ResourceType

int

func - + ResourceTypeFromString

+ String

TopicPartitions) Swap(i, j int)

type - + TopicResult

+ String

+ TopicSpecification