Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Dockerized versions of the SPIRE producer/consumer examples #1044

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 18 additions & 0 deletions examples/docker_spire_kafka_consumer_example/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
FROM golang:1.19 AS builder

ARG TARGETPLATFORM
ARG BUILDPLATFORM
RUN echo "I am running on $BUILDPLATFORM, building for $TARGETPLATFORM"

WORKDIR /docker_spire_kafka_consumer_example

# Copy go files
COPY go.mod .
COPY go.sum .
COPY consumer.go .
RUN go mod download
COPY . .

RUN CGO_ENABLED=1 GOOS=linux go build -o consumer .

CMD ["./consumer"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
apiVersion: apps/v1
kind: Deployment
metadata:
annotations:
kompose.cmd: kompose convert
kompose.version: 1.26.0 (40646f47)
creationTimestamp: null
labels:
io.kompose.service: consumer
name: consumer-chang
namespace: pkc-devcc97qpm5
spec:
replicas: 1
selector:
matchLabels:
io.kompose.service: consumer
strategy: {}
template:
metadata:
annotations:
kompose.cmd: kompose convert
kompose.version: 1.26.0 (40646f47)
creationTimestamp: null
labels:
io.kompose.service: consumer
spec:
containers:
- env:
- name: BOOTSTRAP_SERVERS
value: pkc-devcc97qpm5.us-west-2.aws.devel.cpdev.cloud:9092
- name: LKC
value: lkc-devcczwjmmd
- name: PRINCIPAL
value: sub
- name: SOCKET_PATH
value: unix:/opt/spire/sockets/workload_api.sock
- name: TOPIC
value: kafka-spire-native
image: 755363985185.dkr.ecr.us-west-2.amazonaws.com/docker/dev/cc-base:kafka-consumer-spire-chang
imagePullPolicy: Always
name: consumer-chang
resources: {}
volumeMounts:
- mountPath: /opt/spire/sockets
name: spire-agent-socket
readOnly: true
volumes:
- name: spire-agent-socket
hostPath:
path: /opt/spire/sockets
type: DirectoryOrCreate
restartPolicy: Always
status: {}
164 changes: 164 additions & 0 deletions examples/docker_spire_kafka_consumer_example/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/**
* Copyright 2023 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// Docker example consumer with a custom SPIFFE token implementation.
package main

import (
"context"
"fmt"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/spiffe/go-spiffe/v2/svid/jwtsvid"
"github.com/spiffe/go-spiffe/v2/workloadapi"
"os"
"os/signal"
"syscall"
"time"
)

// handleJWTTokenRefreshEvent retrieves JWT from the SPIFFE workload API and
// sets the token on the client for use in any future authentication attempt.
// It must be invoked whenever kafka.OAuthBearerTokenRefresh appears on the client's event channel,
// which will occur whenever the client requires a token (i.e. when it first starts and when the
// previously-received token is 80% of the way to its expiration time).
func handleJWTTokenRefreshEvent(ctx context.Context, client kafka.Handle, principal, socketPath string, audience []string, lkc string) {
fmt.Fprintf(os.Stderr, "Token refresh in handleJWTTokenRefreshEvent\n")
oauthBearerToken, closer, retrieveErr := retrieveJWTToken(ctx, principal, socketPath, audience, lkc)
defer closer()
if retrieveErr != nil {
fmt.Fprintf(os.Stderr, "%% Token retrieval error: %v\n", retrieveErr)
client.SetOAuthBearerTokenFailure(retrieveErr.Error())
} else {
setTokenError := client.SetOAuthBearerToken(oauthBearerToken)
if setTokenError != nil {
fmt.Fprintf(os.Stderr, "%% Error setting token and extensions: %v\n", setTokenError)
client.SetOAuthBearerTokenFailure(setTokenError.Error())
}
}
}

func retrieveJWTToken(ctx context.Context, principal, socketPath string, audience []string, lkc string) (kafka.OAuthBearerToken, func() error, error) {
ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
jwtSource, err := workloadapi.NewJWTSource(
ctx,
workloadapi.WithClientOptions(workloadapi.WithAddr(socketPath)),
)

if err != nil {
return kafka.OAuthBearerToken{}, nil, fmt.Errorf("unable to create JWTSource: %w", err)
}

defer jwtSource.Close()

params := jwtsvid.Params{
Audience: audience[0],
// Other fields...
}

jwtSVID, err := jwtSource.FetchJWTSVID(ctx, params)
if err != nil {
return kafka.OAuthBearerToken{}, nil, fmt.Errorf("unable to fetch JWT SVID: %w", err)
}

extensions := map[string]string{
"logicalCluster": lkc,
}
oauthBearerToken := kafka.OAuthBearerToken{
TokenValue: jwtSVID.Marshal(),
Expiration: jwtSVID.Expiry,
Principal: principal,
Extensions: extensions,
}

return oauthBearerToken, jwtSource.Close, nil
}

func main() {

bootstrapServers := os.Getenv("BOOTSTRAP_SERVERS")
topic := os.Getenv("TOPIC")
principal := os.Getenv("PRINCIPAL")
socketPath := os.Getenv("SOCKET_PATH")
audience := []string{"audience1", "audience2"}
lkc := os.Getenv("LKC")

fmt.Fprintf(os.Stderr, "Token refresh\n")
fmt.Fprintf(os.Stderr, "bootstrapServers is: %s\n", bootstrapServers)

config := kafka.ConfigMap{
"bootstrap.servers": bootstrapServers,
"security.protocol": "SASL_SSL",
"sasl.mechanisms": "OAUTHBEARER",
"sasl.oauthbearer.config": principal,
}

c, err := kafka.NewConsumer(&config)

if err != nil {
fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err)
os.Exit(1)
}

fmt.Printf("Created Consumer %v\n", c)

err = c.SubscribeTopics([]string{topic}, nil)

if err != nil {
fmt.Fprintf(os.Stderr, "Failed to subscribe to topic: %s\n", topic)
os.Exit(1)
}

run := true
signalChannel := make(chan os.Signal, 1)
signal.Notify(signalChannel, syscall.SIGINT, syscall.SIGTERM)

ctx := context.Background()

for run {
select {
case sig := <-signalChannel:
fmt.Printf("Caught signal %v: terminating\n", sig)
run = false
default:
ev := c.Poll(100)
if ev == nil {
continue
}

switch e := ev.(type) {
case *kafka.Message:
fmt.Printf("%% Message on %s:\n%s\n",
e.TopicPartition, string(e.Value))
if e.Headers != nil {
fmt.Printf("%% Headers: %v\n", e.Headers)
}
case kafka.Error:
// Errors should generally be considered
// informational, the client will try to
// automatically recover.
fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
case kafka.OAuthBearerTokenRefresh:
handleJWTTokenRefreshEvent(ctx, c, principal, socketPath, audience, lkc)
default:
fmt.Printf("Ignored %v\n", e)
}
}
}

fmt.Printf("Closing consumer\n")
c.Close()
}
13 changes: 13 additions & 0 deletions examples/docker_spire_kafka_consumer_example/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
version: '3'
services:
consumer:
network_mode: "host"
build:
context: .
dockerfile: Dockerfile
environment:
- BOOTSTRAP_SERVERS=pkc-devcc97qpm5.us-west-2.aws.devel.cpdev.cloud:9092
- TOPIC=kafka-spire-native
- PRINCIPAL=sub
- SOCKET_PATH=unix:/opt/spire/sockets/workload_api.sock
- LKC=lkc-devcczwnq87
24 changes: 24 additions & 0 deletions examples/docker_spire_kafka_consumer_example/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
module docker_example_spire_consumer

go 1.19

require (
github.com/confluentinc/confluent-kafka-go/v2 v2.2.0
github.com/spiffe/go-spiffe/v2 v2.1.6
)

require (
github.com/Microsoft/go-winio v0.6.0 // indirect
github.com/go-jose/go-jose/v3 v3.0.0 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/zeebo/errs v1.3.0 // indirect
golang.org/x/crypto v0.6.0 // indirect
golang.org/x/mod v0.8.0 // indirect
golang.org/x/net v0.8.0 // indirect
golang.org/x/sys v0.6.0 // indirect
golang.org/x/text v0.8.0 // indirect
golang.org/x/tools v0.6.0 // indirect
google.golang.org/genproto v0.0.0-20230331144136-dcfb400f0633 // indirect
google.golang.org/grpc v1.54.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
)