Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AUTHN-1982] Implement OAuthBearer mechanism for Kafka client to fetch JWT token by communicating with SPIRE agent #1015

Open
wants to merge 11 commits into
base: master
Choose a base branch
from

Conversation

chang-you
Copy link
Member

@chang-you chang-you commented Jun 23, 2023

What is the purpose of the change

The goal of these changes is to write a test that integrates the Kafka client with a SPIRE server that runs in your local machine to fetch JWT token via the workload API.

  • Create handleJWTTokenRefreshEvent function to handle token refresh events
  • Create retrieveJWTToken function to fetch JWT from SPIRE agent
  • Import SPIFFE Workload API
  • Integrate OAuthBearer mechanism into Kafka Producer to enable secure communication

References

What will this work accomplish/help with?

This example test will serve as a guide for integrating Confluent’s Kafka Golang client with SPIRE to mint SPIFFE SVID’s in the form of JWT’s, this will be the client-side changes needed, we will have a separate ticket to implement the server-side changes where Kafka will be able to validate these credentials with SPIRE via the SPIRE agent/the SPIFFE workload API.

Verify the change

The test should run successfully against an actual Kafka cluster in that the followings are verified:
1- Authentication with a Kafka cluster works successfully.
2- Token refresh events are handled properly by fetching a new token every time the refresh event is fired.

CleanShot 2023-06-30 at 15 52 35@2x

2023-06-30 15 50 59

Producer + Consumer:
image

2023-07-11 16 42 20

Documentation

To run the SPIRE server on kubernetes, either remote or on minikube, follow this guide

@chang-you chang-you requested a review from moe-omar June 23, 2023 18:55
@chang-you chang-you marked this pull request as draft June 24, 2023 06:40
@chang-you chang-you marked this pull request as ready for review June 24, 2023 06:41
@chang-you chang-you changed the title Implement OAuthBearer mechanism for creating Kafka Producer by communicating with SPIRE agent Implement OAuthBearer mechanism for Kafka client to fetch JWT token by communicating with SPIRE agent Jun 30, 2023
@milindl milindl self-requested a review July 3, 2023 10:26
@chang-you chang-you force-pushed the spire_producer_example_0623 branch from 49fe984 to 87ba39f Compare July 7, 2023 21:59
@chang-you chang-you changed the title Implement OAuthBearer mechanism for Kafka client to fetch JWT token by communicating with SPIRE agent [AUTHN-1982] Implement OAuthBearer mechanism for Kafka client to fetch JWT token by communicating with SPIRE agent Jul 20, 2023
Copy link
Contributor

@milindl milindl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this PR @chang-you . I gave it a first-pass review, it's a good example to add.

Apart from the comments on the code, there are a few other things:

  1. Is it possible to add a README into the folder (like inside examples/spire_consumer_example/README). It can talk about:
    a. how custom OAuth handlers are to be implemented,
    b. how to run this example locally.
    c. make it more clear that the client doesn't provide support for SPIRE/SPIFFE, but rather, that this is supposed to be an example implementation of a custom OAuth handler.
    I can help with writing this README if you prefer, especially part a. and c.

  2. Is there an open guide on how to set up a SPIRE server locally? As the linked guide is inside Confluent's internal page, and external users of the library can't access it.

  3. Add the binary to the examples/.gitignore and the example link to examples/README. Similar to other examples. And add an entry into the CHANGELOG.md, too.

// But in this example we choose to terminate
// the application if all brokers are down.
fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
if e.Code() == kafka.ErrAllBrokersDown {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This if block isn't needed, you can remove the comment too, and replace with just

// Errors should generally be considered
// informational, the client will try to
// automatically recover.
fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)

"group.id": "myGroup",
"session.timeout.ms": 6000,
"auto.offset.reset": "earliest",
"enable.auto.offset.store": false,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this config, as it's not relevant to this example and will complicate things

if e.Headers != nil {
fmt.Printf("%% Headers: %v\n", e.Headers)
}
_, err := c.StoreMessage(e)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the StoreMessage and the subsequent error check, as it's not needed after removing "enable.auto.offset.store"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

regarding this file: Would it make sense to keep just one file (either the consumer or the producer, but not both)?
Since this is an example to demonstrate the usage of a custom oauth handler, what we're actually doing with the client shouldn't be the main focus of the example, what do you think?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comments! That makes sense too.
@arvindth what do you think? Thank you!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, a single producer or consumer example is probably good enough.

Copy link
Member Author

@chang-you chang-you Oct 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@milindl @arvindth but I noticed that under the examples folder, most of the previous examples: protobuf/oauthbearer/json/avro include both producer example and consumer example, should we keep both?

}
}

func retrieveJWTToken(ctx context.Context, principal, socketPath string, audience []string) (kafka.OAuthBearerToken, func() error, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment for this method.

We don't need to describe the exact logic that the method uses (in fact, it's probably a fair idea to say that we can use any arbitrary logic, like here we are using the go-spiffe library as long as we construct a kafka.OAuthBearerToken token correctly).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comments. To clarify:

We don't need to describe the exact logic that the method use

Here do you suggest that we don't need to include a description of this method in the readme, or is there something specific I should modify within this method?

"time"
)

// handleJWTTokenRefreshEvent retrieves JWT from the SPIRE workload API and
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// handleJWTTokenRefreshEvent retrieves JWT from the SPIRE workload API and
// handleJWTTokenRefreshEvent retrieves JWT from the SPIFFE workload API and

for correct terminology.

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

// handleJWTTokenRefreshEvent retrieves JWT from the SPIRE workload API and
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// handleJWTTokenRefreshEvent retrieves JWT from the SPIRE workload API and
// handleJWTTokenRefreshEvent retrieves JWT from the SPIFFE workload API and

@arvindth
Copy link
Member

@chang-you what's the status of this PR?

@arvindth arvindth dismissed their stale review November 20, 2023 13:25

Changes made.

// It must be invoked whenever kafka.OAuthBearerTokenRefresh appears on the client's event channel,
// which will occur whenever the client requires a token (i.e. when it first starts and when the
// previously-received token is 80% of the way to its expiration time).
func handleJWTTokenRefreshEvent(ctx context.Context, client kafka.Handle, principal, socketPath string, audience []string) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chang-you I don't think we're using the principal here correctly. In the oauth producer case, the principal needs to be passed in because it's the principal for which we get a token, and the token would then contain the principal in the sub claim.

For spire though, the principal value should be a spiffe id, and we should be using this spiffe id to look through all the svids returned by the spire agent to see if one of them matches the given principal, and use that one.

Copy link
Member Author

@chang-you chang-you Dec 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@arvindth Do you suggest we should use FetchJWTSVIDs instead of FetchJWTSVID and use the JWTSVID with its spiffeid.ID that match the sub?

"bootstrap.servers": bootstrapServers,
"security.protocol": "SASL_SSL",
"sasl.mechanisms": "OAUTHBEARER",
"sasl.oauthbearer.config": principal,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this parameter is even required. Could we remove it and see if it works as expected?

Extensions: extensions,
}

return oauthBearerToken, jwtSource.Close, nil
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't seem necessary to return jwtSource.Close, as defer jwtSource.Close() is called above

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants