Skip to content

Commit

Permalink
KIP-140: ACL operations (#796)
Browse files Browse the repository at this point in the history
Bindings for ACL operations:
CreateAcls
DescribeAcls
DeleteAcls
  • Loading branch information
emasab committed Jun 20, 2022
1 parent f195ac6 commit 753eb99
Show file tree
Hide file tree
Showing 29 changed files with 2,336 additions and 127 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -5,6 +5,7 @@
This is a feature release:

* OAUTHBEARER OIDC support
* KIP-140 Admin API ACL support
* Added MockCluster for functional testing of applications without the need
for a real Kafka cluster (by @SourceFellows and @kkoehler, #729).
See [examples/mock_cluster](examples/mock_cluster).
Expand Down
3 changes: 3 additions & 0 deletions examples/.gitignore
Expand Up @@ -7,4 +7,7 @@ go-kafkacat/go-kafkacat
admin_describe_config/admin_describe_config
admin_delete_topics/admin_delete_topics
admin_create_topic/admin_create_topic
admin_create_acls/admin_create_acls
admin_describe_acls/admin_describe_acls
admin_delete_acls/admin_delete_acls
stats_example/stats_example
3 changes: 3 additions & 0 deletions examples/README.md
@@ -1,6 +1,9 @@

Examples:

admin_create_acls - Create Access Control Lists
admin_describe_acls - Find Access Control Lists using a filter
admin_delete_acls - Delete Access Control Lists using different filters
consumer_channel_example - Channel based consumer
consumer_example - Function & callback based consumer
consumer_offset_metadata - Commit offset with metadata
Expand Down
147 changes: 147 additions & 0 deletions examples/admin_create_acls/admin_create_acls.go
@@ -0,0 +1,147 @@
// Create ACLs
package main

/**
* Copyright 2022 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import (
"context"
"fmt"
"os"
"time"

"github.com/confluentinc/confluent-kafka-go/kafka"
)

// Parses a list of 7n arguments to a slice of n ACLBinding
func parseACLBindings(args []string) (aclBindings kafka.ACLBindings, err error) {
nACLBindings := len(args) / 7
parsedACLBindings := make(kafka.ACLBindings, nACLBindings)

for i := 0; i < nACLBindings; i++ {
start := i * 7
resourceTypeString := args[start]
name := args[start+1]
resourcePatternTypeString := args[start+2]
principal := args[start+3]
host := args[start+4]
operationString := args[start+5]
permissionTypeString := args[start+6]

resourceType, errParse := kafka.ResourceTypeFromString(resourceTypeString)
if errParse != nil {
err = errParse
fmt.Printf("Invalid resource type: %s: %v\n", resourceTypeString, err)
return
}

resourcePatternType, errParse := kafka.ResourcePatternTypeFromString(resourcePatternTypeString)
if errParse != nil {
err = errParse
fmt.Printf("Invalid resource pattern type: %s: %v\n", resourcePatternTypeString, err)
return
}

operation, errParse := kafka.ACLOperationFromString(operationString)
if errParse != nil {
err = errParse
fmt.Printf("Invalid operation: %s: %v\n", operationString, err)
return
}

permissionType, errParse := kafka.ACLPermissionTypeFromString(permissionTypeString)
if errParse != nil {
err = errParse
fmt.Printf("Invalid permission type: %s: %v\n", permissionTypeString, err)
return
}

parsedACLBindings[i] = kafka.ACLBinding{
Type: resourceType,
Name: name,
ResourcePatternType: resourcePatternType,
Principal: principal,
Host: host,
Operation: operation,
PermissionType: permissionType,
}
}
aclBindings = parsedACLBindings
return
}

func main() {

// 2 + 7n arguments to create n ACL bindings
nArgs := len(os.Args)
aclBindingArgs := nArgs - 2
if aclBindingArgs <= 0 || aclBindingArgs%7 != 0 {
fmt.Fprintf(os.Stderr,
"Usage: %s <bootstrap-servers> <resource-type1> <resource-name1> <resource-pattern-type1> "+
"<principal1> <host1> <operation1> <permission-type1> ...\n",
os.Args[0])
os.Exit(1)
}

bootstrapServers := os.Args[1]
aclBindings, err := parseACLBindings(os.Args[2:])
if err != nil {
os.Exit(1)
}

// Create a new AdminClient.
// AdminClient can also be instantiated using an existing
// Producer or Consumer instance, see NewAdminClientFromProducer and
// NewAdminClientFromConsumer.
a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": bootstrapServers})
if err != nil {
fmt.Printf("Failed to create Admin client: %s\n", err)
os.Exit(1)
}

// Contexts are used to abort or limit the amount of time
// the Admin call blocks waiting for a result.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Create ACLs on cluster.
// Set Admin options to wait for the request to finish (or at most 60s)
maxDur, err := time.ParseDuration("60s")
if err != nil {
panic("ParseDuration(60s)")
}
results, err := a.CreateACLs(
ctx,
aclBindings,
kafka.SetAdminRequestTimeout(maxDur),
)
if err != nil {
fmt.Printf("Failed to create ACLs: %v\n", err)
os.Exit(1)
}

// Print results
for i, result := range results {
if result.Error.Code() == kafka.ErrNoError {
fmt.Printf("CreateACLs %d successful\n", i)
} else {
fmt.Printf("CreateACLs %d failed, error code: %s, message: %s\n",
i, result.Error.Code(), result.Error.String())
}
}

a.Close()
}
9 changes: 5 additions & 4 deletions examples/admin_create_topic/admin_create_topic.go
Expand Up @@ -20,22 +20,23 @@ package main
import (
"context"
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
"os"
"strconv"
"time"

"github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {

if len(os.Args) != 5 {
fmt.Fprintf(os.Stderr,
"Usage: %s <broker> <topic> <partition-count> <replication-factor>\n",
"Usage: %s <bootstrap-servers> <topic> <partition-count> <replication-factor>\n",
os.Args[0])
os.Exit(1)
}

broker := os.Args[1]
bootstrapServers := os.Args[1]
topic := os.Args[2]
numParts, err := strconv.Atoi(os.Args[3])
if err != nil {
Expand All @@ -52,7 +53,7 @@ func main() {
// AdminClient can also be instantiated using an existing
// Producer or Consumer instance, see NewAdminClientFromProducer and
// NewAdminClientFromConsumer.
a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": broker})
a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": bootstrapServers})
if err != nil {
fmt.Printf("Failed to create Admin client: %s\n", err)
os.Exit(1)
Expand Down
147 changes: 147 additions & 0 deletions examples/admin_delete_acls/admin_delete_acls.go
@@ -0,0 +1,147 @@
// Delete ACLs
package main

/**
* Copyright 2022 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import (
"context"
"fmt"
"os"
"time"

"github.com/confluentinc/confluent-kafka-go/kafka"
)

// Parses a list of 7n arguments to a slice of n ACLBindingFilter
func parseACLBindingFilters(args []string) (aclBindingFilters kafka.ACLBindingFilters, err error) {
nACLBindingFilters := len(args) / 7
parsedACLBindingFilters := make(kafka.ACLBindingFilters, nACLBindingFilters)

for i := 0; i < nACLBindingFilters; i++ {
start := i * 7
resourceTypeString := args[start]
name := args[start+1]
resourcePatternTypeString := args[start+2]
principal := args[start+3]
host := args[start+4]
operationString := args[start+5]
permissionTypeString := args[start+6]

var resourceType kafka.ResourceType
var resourcePatternType kafka.ResourcePatternType
var operation kafka.ACLOperation
var permissionType kafka.ACLPermissionType

resourceType, err = kafka.ResourceTypeFromString(resourceTypeString)
if err != nil {
fmt.Printf("Invalid resource type: %s: %v\n", resourceTypeString, err)
return
}
resourcePatternType, err = kafka.ResourcePatternTypeFromString(resourcePatternTypeString)
if err != nil {
fmt.Printf("Invalid resource pattern type: %s: %v\n", resourcePatternTypeString, err)
return
}

operation, err = kafka.ACLOperationFromString(operationString)
if err != nil {
fmt.Printf("Invalid operation: %s: %v\n", operationString, err)
return
}

permissionType, err = kafka.ACLPermissionTypeFromString(permissionTypeString)
if err != nil {
fmt.Printf("Invalid permission type: %s: %v\n", permissionTypeString, err)
return
}

parsedACLBindingFilters[i] = kafka.ACLBindingFilter{
Type: resourceType,
Name: name,
ResourcePatternType: resourcePatternType,
Principal: principal,
Host: host,
Operation: operation,
PermissionType: permissionType,
}
}
aclBindingFilters = parsedACLBindingFilters
return
}

func main() {

// 2 + 7n arguments to create n ACL binding filters
nArgs := len(os.Args)
aclBindingFilterArgs := nArgs - 2
if aclBindingFilterArgs <= 0 || aclBindingFilterArgs%7 != 0 {
fmt.Fprintf(os.Stderr,
"Usage: %s <bootstrap-servers> <resource-type1> <resource-name1> <resource-pattern-type1> "+
"<principal1> <host1> <operation1> <permission-type1> ...\n",
os.Args[0])
os.Exit(1)
}

bootstrapServers := os.Args[1]
aclBindingFilters, err := parseACLBindingFilters(os.Args[2:])
if err != nil {
os.Exit(1)
}

// Create a new AdminClient.
// AdminClient can also be instantiated using an existing
// Producer or Consumer instance, see NewAdminClientFromProducer and
// NewAdminClientFromConsumer.
a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": bootstrapServers})
if err != nil {
fmt.Printf("Failed to create Admin client: %s\n", err)
os.Exit(1)
}

// Contexts are used to abort or limit the amount of time
// the Admin call blocks waiting for a result.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Create ACLs on cluster.
// Set Admin options to wait for the request to finish (or at most 60s)
maxDur, err := time.ParseDuration("60s")
if err != nil {
panic("ParseDuration(60s)")
}
results, err := a.DeleteACLs(
ctx,
aclBindingFilters,
kafka.SetAdminRequestTimeout(maxDur),
)
if err != nil {
fmt.Printf("Failed to delete ACLs: %v\n", err)
os.Exit(1)
}

// Print results
for i, result := range results {
if result.Error.Code() == kafka.ErrNoError {
fmt.Printf("DeleteACLs %d successful, deleted: %+v\n", i, result.ACLBindings)
} else {
fmt.Printf("DeleteACLs %d failed, error code: %s, message: %s\n",
i, result.Error.Code(), result.Error.String())
}
}

a.Close()
}
9 changes: 5 additions & 4 deletions examples/admin_delete_topics/admin_delete_topics.go
Expand Up @@ -20,28 +20,29 @@ package main
import (
"context"
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
"os"
"time"

"github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {

if len(os.Args) < 3 {
fmt.Fprintf(os.Stderr,
"Usage: %s <broker> <topic1> <topic2> ..\n",
"Usage: %s <bootstrap-servers> <topic1> <topic2> ..\n",
os.Args[0])
os.Exit(1)
}

broker := os.Args[1]
bootstrapServers := os.Args[1]
topics := os.Args[2:]

// Create a new AdminClient.
// AdminClient can also be instantiated using an existing
// Producer or Consumer instance, see NewAdminClientFromProducer and
// NewAdminClientFromConsumer.
a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": broker})
a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": bootstrapServers})
if err != nil {
fmt.Printf("Failed to create Admin client: %s\n", err)
os.Exit(1)
Expand Down

0 comments on commit 753eb99

Please sign in to comment.