diff --git a/CHANGELOG.md b/CHANGELOG.md
index 6b93502f4..87c5fb1fc 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -5,6 +5,7 @@
This is a feature release:
* OAUTHBEARER OIDC support
+ * KIP-140 Admin API ACL support
* Added MockCluster for functional testing of applications without the need
for a real Kafka cluster (by @SourceFellows and @kkoehler, #729).
See [examples/mock_cluster](examples/mock_cluster).
diff --git a/examples/.gitignore b/examples/.gitignore
index 408c6ca7b..001a47ba1 100644
--- a/examples/.gitignore
+++ b/examples/.gitignore
@@ -7,4 +7,7 @@ go-kafkacat/go-kafkacat
admin_describe_config/admin_describe_config
admin_delete_topics/admin_delete_topics
admin_create_topic/admin_create_topic
+admin_create_acls/admin_create_acls
+admin_describe_acls/admin_describe_acls
+admin_delete_acls/admin_delete_acls
stats_example/stats_example
diff --git a/examples/README.md b/examples/README.md
index 98776bc34..49b973636 100644
--- a/examples/README.md
+++ b/examples/README.md
@@ -1,6 +1,9 @@
Examples:
+ admin_create_acls - Create Access Control Lists
+ admin_describe_acls - Find Access Control Lists using a filter
+ admin_delete_acls - Delete Access Control Lists using different filters
consumer_channel_example - Channel based consumer
consumer_example - Function & callback based consumer
consumer_offset_metadata - Commit offset with metadata
diff --git a/examples/admin_create_acls/admin_create_acls.go b/examples/admin_create_acls/admin_create_acls.go
new file mode 100644
index 000000000..71b8bbf82
--- /dev/null
+++ b/examples/admin_create_acls/admin_create_acls.go
@@ -0,0 +1,147 @@
+// Create ACLs
+package main
+
+/**
+ * Copyright 2022 Confluent Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import (
+ "context"
+ "fmt"
+ "os"
+ "time"
+
+ "github.com/confluentinc/confluent-kafka-go/kafka"
+)
+
+// Parses a list of 7n arguments to a slice of n ACLBinding
+func parseACLBindings(args []string) (aclBindings kafka.ACLBindings, err error) {
+ nACLBindings := len(args) / 7
+ parsedACLBindings := make(kafka.ACLBindings, nACLBindings)
+
+ for i := 0; i < nACLBindings; i++ {
+ start := i * 7
+ resourceTypeString := args[start]
+ name := args[start+1]
+ resourcePatternTypeString := args[start+2]
+ principal := args[start+3]
+ host := args[start+4]
+ operationString := args[start+5]
+ permissionTypeString := args[start+6]
+
+ resourceType, errParse := kafka.ResourceTypeFromString(resourceTypeString)
+ if errParse != nil {
+ err = errParse
+ fmt.Printf("Invalid resource type: %s: %v\n", resourceTypeString, err)
+ return
+ }
+
+ resourcePatternType, errParse := kafka.ResourcePatternTypeFromString(resourcePatternTypeString)
+ if errParse != nil {
+ err = errParse
+ fmt.Printf("Invalid resource pattern type: %s: %v\n", resourcePatternTypeString, err)
+ return
+ }
+
+ operation, errParse := kafka.ACLOperationFromString(operationString)
+ if errParse != nil {
+ err = errParse
+ fmt.Printf("Invalid operation: %s: %v\n", operationString, err)
+ return
+ }
+
+ permissionType, errParse := kafka.ACLPermissionTypeFromString(permissionTypeString)
+ if errParse != nil {
+ err = errParse
+ fmt.Printf("Invalid permission type: %s: %v\n", permissionTypeString, err)
+ return
+ }
+
+ parsedACLBindings[i] = kafka.ACLBinding{
+ Type: resourceType,
+ Name: name,
+ ResourcePatternType: resourcePatternType,
+ Principal: principal,
+ Host: host,
+ Operation: operation,
+ PermissionType: permissionType,
+ }
+ }
+ aclBindings = parsedACLBindings
+ return
+}
+
+func main() {
+
+ // 2 + 7n arguments to create n ACL bindings
+ nArgs := len(os.Args)
+ aclBindingArgs := nArgs - 2
+ if aclBindingArgs <= 0 || aclBindingArgs%7 != 0 {
+ fmt.Fprintf(os.Stderr,
+ "Usage: %s
func WriteErrorCodes(f *os.File)
+
func (a *AdminClient) ControllerID(ctx context.Context) (controllerID int32, err error)
func (a *AdminClient) CreateTopics(ctx context.Context, topics []TopicSpecification, options ...CreateTopicsAdminOption) (result []TopicResult, err error)
func NewTestConsumerGroupMetadata(groupID string) (*ConsumerGroupMetadata, error)
type CreateTopicsAdminOption
type RebalanceCb
ConfigSourceStaticBroker = ConfigSource(C.RD_KAFKA_CONFIG_SOURCE_STATIC_BROKER_CONFIG)
// ConfigSourceDefault is built-in default configuration for configs that have a default value
ConfigSourceDefault = ConfigSource(C.RD_KAFKA_CONFIG_SOURCE_DEFAULT_CONFIG)
+)
+
const (
+ // ResourcePatternTypeUnknown is a resource pattern type not known or not set.
+ ResourcePatternTypeUnknown = ResourcePatternType(C.RD_KAFKA_RESOURCE_PATTERN_UNKNOWN)
+ // ResourcePatternTypeAny matches any resource, used for lookups.
+ ResourcePatternTypeAny = ResourcePatternType(C.RD_KAFKA_RESOURCE_PATTERN_ANY)
+ // ResourcePatternTypeMatch will perform pattern matching
+ ResourcePatternTypeMatch = ResourcePatternType(C.RD_KAFKA_RESOURCE_PATTERN_MATCH)
+ // ResourcePatternTypeLiteral matches a literal resource name
+ ResourcePatternTypeLiteral = ResourcePatternType(C.RD_KAFKA_RESOURCE_PATTERN_LITERAL)
+ // ResourcePatternTypePrefixed matches a prefixed resource name
+ ResourcePatternTypePrefixed = ResourcePatternType(C.RD_KAFKA_RESOURCE_PATTERN_PREFIXED)
+)
+ const (
+ // ACLOperationUnknown represents an unknown or unset operation
+ ACLOperationUnknown = ACLOperation(C.RD_KAFKA_ACL_OPERATION_UNKNOWN)
+ // ACLOperationAny in a filter, matches any ACLOperation
+ ACLOperationAny = ACLOperation(C.RD_KAFKA_ACL_OPERATION_ANY)
+ // ACLOperationAll represents all the operations
+ ACLOperationAll = ACLOperation(C.RD_KAFKA_ACL_OPERATION_ALL)
+ // ACLOperationRead a read operation
+ ACLOperationRead = ACLOperation(C.RD_KAFKA_ACL_OPERATION_READ)
+ // ACLOperationWrite represents a write operation
+ ACLOperationWrite = ACLOperation(C.RD_KAFKA_ACL_OPERATION_WRITE)
+ // ACLOperationCreate represents a create operation
+ ACLOperationCreate = ACLOperation(C.RD_KAFKA_ACL_OPERATION_CREATE)
+ // ACLOperationDelete represents a delete operation
+ ACLOperationDelete = ACLOperation(C.RD_KAFKA_ACL_OPERATION_DELETE)
+ // ACLOperationAlter represents an alter operation
+ ACLOperationAlter = ACLOperation(C.RD_KAFKA_ACL_OPERATION_ALTER)
+ // ACLOperationDescribe represents a describe operation
+ ACLOperationDescribe = ACLOperation(C.RD_KAFKA_ACL_OPERATION_DESCRIBE)
+ // ACLOperationClusterAction represents a cluster action operation
+ ACLOperationClusterAction = ACLOperation(C.RD_KAFKA_ACL_OPERATION_CLUSTER_ACTION)
+ // ACLOperationDescribeConfigs represents a describe configs operation
+ ACLOperationDescribeConfigs = ACLOperation(C.RD_KAFKA_ACL_OPERATION_DESCRIBE_CONFIGS)
+ // ACLOperationAlterConfigs represents an alter configs operation
+ ACLOperationAlterConfigs = ACLOperation(C.RD_KAFKA_ACL_OPERATION_ALTER_CONFIGS)
+ // ACLOperationIdempotentWrite represents an idempotent write operation
+ ACLOperationIdempotentWrite = ACLOperation(C.RD_KAFKA_ACL_OPERATION_IDEMPOTENT_WRITE)
+)
+ const (
+ // ACLPermissionTypeUnknown represents an unknown ACLPermissionType
+ ACLPermissionTypeUnknown = ACLPermissionType(C.RD_KAFKA_ACL_PERMISSION_TYPE_UNKNOWN)
+ // ACLPermissionTypeAny in a filter, matches any ACLPermissionType
+ ACLPermissionTypeAny = ACLPermissionType(C.RD_KAFKA_ACL_PERMISSION_TYPE_ANY)
+ // ACLPermissionTypeDeny disallows access
+ ACLPermissionTypeDeny = ACLPermissionType(C.RD_KAFKA_ACL_PERMISSION_TYPE_DENY)
+ // ACLPermissionTypeAllow grants access
+ ACLPermissionTypeAllow = ACLPermissionType(C.RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW)
)
const (
// TimestampNotAvailable indicates no timestamp was set, or not available due to lacking broker support
@@ -1414,9 +1589,187 @@
librdkafka error codes.
This function is not intended for public use.
+ ACLBinding specifies the operation and permission type for a specific principal +over one or more resources of the same type. Used by `AdminClient.CreateACLs`, +returned by `AdminClient.DescribeACLs` and `AdminClient.DeleteACLs`. +
+type ACLBinding struct { + Type ResourceType // The resource type. + // The resource name, which depends on the resource type. + // For ResourceBroker the resource name is the broker id. + Name string + ResourcePatternType ResourcePatternType // The resource pattern, relative to the name. + Principal string // The principal this ACLBinding refers to. + Host string // The host that the call is allowed to come from. + Operation ACLOperation // The operation/s specified by this binding. + PermissionType ACLPermissionType // The permission type for the specified operation. +} ++
+ ACLBindingFilter specifies a filter used to return a list of ACL bindings matching some or all of its attributes. +Used by `AdminClient.DescribeACLs` and `AdminClient.DeleteACLs`. +
+type ACLBindingFilter = ACLBinding+
+ ACLBindingFilters is a slice of ACLBindingFilter that also implements +the sort interface +
+type ACLBindingFilters []ACLBindingFilter+
+ ACLBindings is a slice of ACLBinding that also implements +the sort interface +
+type ACLBindings []ACLBinding+
func (a ACLBindings) Len() int+
func (a ACLBindings) Less(i, j int) bool+
func (a ACLBindings) Swap(i, j int)+
+ ACLOperation enumerates the different types of ACL operation. +
+type ACLOperation int+
func ACLOperationFromString(aclOperationString string) (ACLOperation, error)+
+ ACLOperationFromString translates a ACL operation name to +a ACLOperation value. +
+func (o ACLOperation) String() string+
+ String returns the human-readable representation of an ACLOperation +
++ ACLPermissionType enumerates the different types of ACL permission types. +
+type ACLPermissionType int+
func ACLPermissionTypeFromString(aclPermissionTypeString string) (ACLPermissionType, error)+
+ ACLPermissionTypeFromString translates a ACL permission type name to +a ACLPermissionType value. +
+func (o ACLPermissionType) String() string+
+ String returns the human-readable representation of an ACLPermissionType +
Requires broker version >= 0.10.0.
+func (a *AdminClient) CreateACLs(ctx context.Context, aclBindings ACLBindings, options ...CreateACLsAdminOption) (result []CreateACLResult, err error)+
+ CreateACLs creates one or more ACL bindings. +
++ Parameters: +
+* `ctx` - context with the maximum amount of time to block, or nil for indefinite. +* `aclBindings` - A slice of ACL binding specifications to create. +* `options` - Create ACLs options ++
+ Returns a slice of CreateACLResult with a ErrNoError ErrorCode when the operation was successful +plus an error that is not nil for client level errors +
Note: TopicSpecification is analogous to NewTopic in the Java Topic Admin API.
+func (a *AdminClient) DeleteACLs(ctx context.Context, aclBindingFilters ACLBindingFilters, options ...DeleteACLsAdminOption) (result []DeleteACLsResult, err error)+
+ DeleteACLs deletes ACL bindings matching one or more ACL binding filters. +
++ Parameters: +
+* `ctx` - context with the maximum amount of time to block, or nil for indefinite. +* `aclBindingFilters` - a slice of ACL binding filters to match ACLs to delete. + string attributes match exact values or any string if set to empty string. + Enum attributes match exact values or any value if ending with `Any`. + If `ResourcePatternType` is set to `ResourcePatternTypeMatch` returns all + the ACL bindings with `ResourcePatternTypeLiteral`, `ResourcePatternTypeWildcard` + or `ResourcePatternTypePrefixed` pattern type that match the resource name. +* `options` - Delete ACLs options ++
+ Returns a slice of ACLBinding for each filter when the operation was successful +plus an error that is not `nil` for client level errors +
Requires broker version >= 0.10.1.0
+func (a *AdminClient) DescribeACLs(ctx context.Context, aclBindingFilter ACLBindingFilter, options ...DescribeACLsAdminOption) (result *DescribeACLsResult, err error)+
+ DescribeACLs matches ACL bindings by filter. +
++ Parameters: +
+* `ctx` - context with the maximum amount of time to block, or nil for indefinite. +* `aclBindingFilter` - A filter with attributes that must match. + string attributes match exact values or any string if set to empty string. + Enum attributes match exact values or any value if ending with `Any`. + If `ResourcePatternType` is set to `ResourcePatternTypeMatch` returns all + the ACL bindings with `ResourcePatternTypeLiteral`, `ResourcePatternTypeWildcard` + or `ResourcePatternTypePrefixed` pattern type that match the resource name. +* `options` - Describe ACLs options ++
+ Returns a slice of ACLBindings when the operation was successful +plus an error that is not `nil` for client level errors +
type AlterOperation int
type ConfigSource int
+ CreateACLResult provides create ACL error information. +
+type CreateACLResult struct {
+ // Error, if any, of result. Check with `Error.Code() != ErrNoError`.
+ Error Error
+}
+
+ + CreateACLsAdminOption - see setter. +
++ See SetAdminRequestTimeout +
+type CreateACLsAdminOption interface {
+ // contains filtered or unexported methods
+}
type CreateTopicsAdminOption interface {
// contains filtered or unexported methods
}
+ + DeleteACLsAdminOption - see setter. +
++ See SetAdminRequestTimeout +
+type DeleteACLsAdminOption interface {
+ // contains filtered or unexported methods
+}
+ + DeleteACLsResult provides delete ACLs result or error information. +
+type DeleteACLsResult = DescribeACLsResult
type DeleteTopicsAdminOption interface {
// contains filtered or unexported methods
}
+ + DescribeACLsAdminOption - see setter. +
++ See SetAdminRequestTimeout +
+type DescribeACLsAdminOption interface {
+ // contains filtered or unexported methods
+}
+ + DescribeACLsResult provides describe ACLs result or error information. +
+type DescribeACLsResult struct { + // Slice of ACL bindings matching the provided filter + ACLBindings ACLBindings + // Error, if any, of result. Check with `Error.Code() != ErrNoError`. + Error Error +} +
type RebalanceCb func(*Consumer, Event) error+
+ ResourcePatternType enumerates the different types of Kafka resource patterns. +
+type ResourcePatternType int+
func ResourcePatternTypeFromString(patternTypeString string) (ResourcePatternType, error)+
+ ResourcePatternTypeFromString translates a resource pattern type name to +a ResourcePatternType value. +
+func (t ResourcePatternType) String() string+
+ String returns the human-readable representation of a ResourcePatternType +
type ResourceType int
func (tps TopicPartitions) Swap(i, j int)