diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8653b45..470e257 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -37,7 +37,7 @@ jobs: runs-on: ubuntu-latest env: SPIFFE_ENDPOINT_SOCKET: unix:/tmp/spire-agent/public/api.sock - SPIFFE_ADMIN_ENDPOINT_SOCKET: unix:/tmp/spire-agent/admin/api.sock + SPIRE_ADMIN_ENDPOINT_SOCKET: unix:/tmp/spire-agent/admin/api.sock needs: build steps: - name: Check out code diff --git a/scripts/agent.conf b/scripts/agent.conf index 0a891b8..dbbef62 100644 --- a/scripts/agent.conf +++ b/scripts/agent.conf @@ -9,7 +9,7 @@ agent { # simple testing/evaluation purposes. insecure_bootstrap = true - admin_socket_path = "$STRIPPED_SPIFFE_ADMIN_ENDPOINT_SOCKET" + admin_socket_path = "$STRIPPED_SPIRE_ADMIN_ENDPOINT_SOCKET" authorized_delegates = [ "spiffe://example.org/myservice", ] diff --git a/scripts/run-spire.sh b/scripts/run-spire.sh index d0b228e..f1954ef 100755 --- a/scripts/run-spire.sh +++ b/scripts/run-spire.sh @@ -37,7 +37,7 @@ mkdir -p /tmp/spire-server bin/spire-server run -config conf/server/server.conf > "${spire_server_log_file}" 2>&1 & wait_for_service "bin/spire-server healthcheck" "SPIRE Server" "${spire_server_log_file}" -export STRIPPED_SPIFFE_ADMIN_ENDPOINT_SOCKET=$(echo $SPIFFE_ADMIN_ENDPOINT_SOCKET| cut -c6-) +export STRIPPED_SPIRE_ADMIN_ENDPOINT_SOCKET=$(echo $SPIRE_ADMIN_ENDPOINT_SOCKET| cut -c6-) cat $SCRIPT_DIR/agent.conf | envsubst > "conf/agent/agent.conf" # Run the SPIRE agent with the joint token diff --git a/spire-api/Cargo.toml b/spire-api/Cargo.toml index 9a5b623..11ac0b9 100644 --- a/spire-api/Cargo.toml +++ b/spire-api/Cargo.toml @@ -23,19 +23,9 @@ prost-types = {version = "0.11"} tokio = { "version" = "1", features = ["net", "test-util"]} tokio-stream = "0.1" tower = { version = "0.4", features = ["util"] } -thiserror = "1.0" -url = "2.2" -asn1 = { package = "simple_asn1", version = "0.6" } -x509-parser = "0.15" -pkcs8 = "0.10" -jsonwebtoken = "8.3" -jsonwebkey = { version = "0.3", features = ["jsonwebtoken", "jwt-convert"] } -serde = { version = "1.0", features = ["derive"] } -serde_json = "1.0" -zeroize = { version = "1.6", features = ["zeroize_derive"] } -time = "0.3" [dev-dependencies] +once_cell = "1.18" [build-dependencies] tonic-build = { version = "0.9", default-features = false, features = ["prost"] } diff --git a/spire-api/src/agent/delegated_identity.rs b/spire-api/src/agent/delegated_identity.rs index 58cdc68..8fe25b0 100644 --- a/spire-api/src/agent/delegated_identity.rs +++ b/spire-api/src/agent/delegated_identity.rs @@ -8,11 +8,15 @@ use crate::proto::spire::api::agent::delegatedidentity::v1::{ delegated_identity_client::DelegatedIdentityClient as DelegatedIdentityApiClient, - SubscribeToX509BundlesRequest, SubscribeToX509BundlesResponse, SubscribeToX509sviDsRequest, - SubscribeToX509sviDsResponse, + FetchJwtsviDsRequest, SubscribeToX509BundlesRequest, SubscribeToX509BundlesResponse, + SubscribeToX509sviDsRequest, SubscribeToX509sviDsResponse, SubscribeToJwtBundlesRequest, + SubscribeToJwtBundlesResponse, }; +use crate::proto::spire::api::types::Jwtsvid as ProtoJwtSvid; use spiffe::bundle::x509::{X509Bundle, X509BundleSet}; +use spiffe::bundle::jwt::{JwtBundleSet, JwtBundle}; use spiffe::spiffe_id::TrustDomain; +use spiffe::svid::jwt::JwtSvid; use spiffe::svid::x509::X509Svid; use spiffe::workload_api::address::validate_socket_path; use tokio_stream::{Stream, StreamExt}; @@ -20,14 +24,15 @@ use tokio_stream::{Stream, StreamExt}; use crate::selectors::Selector; use spiffe::workload_api::client::{ClientError, DEFAULT_SVID}; use std::convert::{Into, TryFrom}; +use std::str::FromStr; use tokio::net::UnixStream; use tonic::transport::{Endpoint, Uri}; use tower::service_fn; /// Name of the environment variable that holds the default socket endpoint path. -pub const ADMIN_SOCKET_ENV: &str = "SPIFFE_ADMIN_ENDPOINT_SOCKET"; +pub const ADMIN_SOCKET_ENV: &str = "SPIRE_ADMIN_ENDPOINT_SOCKET"; -/// Gets the endpoint socket endpoint path from the environment variable `SPIFFE_ENDPOINT_SOCKET`, +/// Gets the endpoint socket endpoint path from the environment variable `ADMIN_SOCKET_ENV`, /// as described in [SPIFFE standard](https://github.com/spiffe/spiffe/blob/main/standards/SPIFFE_Workload_Endpoint.md#4-locating-the-endpoint). pub fn get_admin_socket_path() -> Option { match std::env::var(ADMIN_SOCKET_ENV) { @@ -132,7 +137,7 @@ impl DelegatedIdentityClient { /// /// Returns [`ClientError`] if the gRPC call fails or if the SVID could not be parsed from the gRPC response. pub async fn fetch_x509_svid( - mut self, + &mut self, selectors: Vec, ) -> Result { let request = SubscribeToX509sviDsRequest { @@ -172,7 +177,7 @@ impl DelegatedIdentityClient { /// /// Individual stream items might also be errors if there's an issue processing the response for a specific update. pub async fn stream_x509_svids( - mut self, + &mut self, selectors: Vec, ) -> Result>, ClientError> { let request = SubscribeToX509sviDsRequest { @@ -197,7 +202,7 @@ impl DelegatedIdentityClient { /// /// The function returns a variant of [`ClientError`] if there is en error connecting to the Workload API or /// there is a problem processing the response. - pub async fn fetch_x509_bundles(mut self) -> Result { + pub async fn fetch_x509_bundles(&mut self) -> Result { let request = SubscribeToX509BundlesRequest::default(); let response: tonic::Response> = @@ -227,7 +232,7 @@ impl DelegatedIdentityClient { /// /// Individual stream items might also be errors if there's an issue processing the response for a specific update. pub async fn stream_x509_bundles( - mut self, + &mut self, ) -> Result>, ClientError> { let request = SubscribeToX509BundlesRequest::default(); @@ -242,37 +247,133 @@ impl DelegatedIdentityClient { Ok(stream) } + + /// Fetches a list of [`JwtSvid`] parsing the JWT token in the Workload API response, for the given audience and selectors. + /// + /// # Arguments + /// + /// * `audience` - A list of audiences to include in the JWT token. Cannot be empty nor contain only empty strings. + /// * `selectors` - A list of selectors to filter the list of [`JwtSvid`]. + /// + /// # Errors + /// + /// The function returns a variant of [`ClientError`] if there is en error connecting to the Workload API or + /// there is a problem processing the response. + pub async fn fetch_jwt_svids + ToString>( + &mut self, + audience: &[T], + selectors: Vec, + ) -> Result, ClientError> { + let request = FetchJwtsviDsRequest { + audience: audience.iter().map(|s| s.to_string()).collect(), + selectors: selectors.into_iter().map(|s| s.into()).collect(), + }; + + DelegatedIdentityClient::parse_jwt_svid_from_grpc_response( + self.client + .fetch_jwtsvi_ds(request) + .await? + .into_inner() + .svids, + ) + } + + + + /// Watches the stream of [`JwtBundleSet`] updates. + /// + /// This function establishes a stream with the Workload API to continuously receive updates for the [`JwtBundleSet`]. + /// The returned stream can be used to asynchronously yield new `JwtBundleSet` updates as they become available. + /// + /// # Returns + /// + /// Returns a stream of `Result`. Each item represents an updated [`JwtBundleSet`] or an error if + /// there was a problem processing an update from the stream. + /// + /// # Errors + /// + /// The function can return an error variant of [`ClientError`] in the following scenarios: + /// + /// * There's an issue connecting to the Workload API. + /// * An error occurs while setting up the stream. + /// + /// Individual stream items might also be errors if there's an issue processing the response for a specific update. + pub async fn stream_jwt_bundles( + &mut self, + ) -> Result>, ClientError> { + let request = SubscribeToJwtBundlesRequest::default(); + let response = self.client.subscribe_to_jwt_bundles(request).await?; + Ok(response.into_inner().map(|message| { + message + .map_err(ClientError::from) + .and_then(DelegatedIdentityClient::parse_jwt_bundle_set_from_grpc_response) + })) + } + + /// Fetches [`JwtBundleSet`] that is a set of [`JwtBundle`] keyed by the trust domain to which they belong. + /// + /// # Errors + /// + /// The function returns a variant of [`ClientError`] if there is en error connecting to the Workload API or + /// there is a problem processing the response. + pub async fn fetch_jwt_bundles( + &mut self, + ) -> Result { + let request = SubscribeToJwtBundlesRequest::default(); + let response = self.client.subscribe_to_jwt_bundles(request).await?; + let initial = response.into_inner().message().await?; + DelegatedIdentityClient::parse_jwt_bundle_set_from_grpc_response(initial.ok_or(ClientError::EmptyResponse)?) + } } impl DelegatedIdentityClient { fn parse_x509_svid_from_grpc_response( response: SubscribeToX509sviDsResponse, ) -> Result { - let svid = match response.x509_svids.get(DEFAULT_SVID) { - None => return Err(ClientError::EmptyResponse), - Some(s) => s, - }; + let svid = response + .x509_svids + .get(DEFAULT_SVID) + .ok_or(ClientError::EmptyResponse)?; - // OPTIMIZE THIS - let mut total_length = 0; - svid.x509_svid - .as_ref() - .ok_or(ClientError::EmptyResponse)? - .cert_chain - .iter() - .for_each(|c| total_length += c.len()); - let mut cert_chain = bytes::BytesMut::with_capacity(total_length); - svid.x509_svid - .as_ref() - .ok_or(ClientError::EmptyResponse)? - .cert_chain - .iter() - .for_each(|c| cert_chain.extend(c)); + let x509_svid = svid.x509_svid.as_ref().ok_or(ClientError::EmptyResponse)?; + + let total_length = x509_svid.cert_chain.iter().map(|c| c.len()).sum(); + let mut cert_chain_bytes = Vec::with_capacity(total_length); + + for c in &x509_svid.cert_chain { + cert_chain_bytes.extend_from_slice(c); + } - X509Svid::parse_from_der(cert_chain.as_ref(), svid.x509_svid_key.as_ref()) + X509Svid::parse_from_der(&cert_chain_bytes, svid.x509_svid_key.as_ref()) .map_err(|e| e.into()) } + fn parse_jwt_svid_from_grpc_response( + svids: Vec, + ) -> Result, ClientError> { + let result: Result, ClientError> = svids + .iter() + .map(|r| JwtSvid::from_str(&r.token).map_err(ClientError::InvalidJwtSvid)) + .collect(); + result + } + + fn parse_jwt_bundle_set_from_grpc_response( + response: SubscribeToJwtBundlesResponse, + ) -> Result { + let mut bundle_set = JwtBundleSet::new(); + + for (td, bundle_data) in response.bundles.into_iter() { + let trust_domain = TrustDomain::try_from(td)?; + let bundle = JwtBundle::from_jwt_authorities(trust_domain, &bundle_data) + .map_err(ClientError::from)?; + + bundle_set.add_bundle(bundle); + } + + Ok(bundle_set) + } + fn parse_x509_bundle_set_from_grpc_response( response: SubscribeToX509BundlesResponse, ) -> Result { diff --git a/spire-api/src/agent/mod.rs b/spire-api/src/agent/mod.rs index 46d6c37..23dbc96 100644 --- a/spire-api/src/agent/mod.rs +++ b/spire-api/src/agent/mod.rs @@ -1,13 +1,10 @@ //! Agent API //! -//! The agent API consists of APIs: -//! - delegated_identity -//! - debug (not implemented) +//! Consists of the following APIs: +//! - `delegated_identity`: For managing delegated identities. +//! - `debug`: (Not yet implemented). //! //! # Note -//! Both of these APIs must be accesed via the admin_socket_path which can be set -//! in the [agent configuration file](https://spiffe.io/docs/latest/deploying/spire_agent/#agent-configuration-file). +//! Access these APIs via the `admin_socket_path` in the [agent configuration file](https://spiffe.io/docs/latest/deploying/spire_agent/#agent-configuration-file). //! -//! - pub mod delegated_identity; diff --git a/spire-api/src/selectors.rs b/spire-api/src/selectors.rs index ce484f6..ea023c8 100644 --- a/spire-api/src/selectors.rs +++ b/spire-api/src/selectors.rs @@ -1,11 +1,10 @@ -//! Selectors which conform to SPIRE standards. -//! +//! Selectors conforming to SPIRE standards. use crate::proto::spire::api::types::Selector as SpiffeSelector; const K8S_TYPE: &str = "k8s"; const UNIX_TYPE: &str = "unix"; -/// User-facing version of underlying proto selector type +/// Converts user-defined selectors into SPIFFE selectors. impl From for SpiffeSelector { fn from(s: Selector) -> SpiffeSelector { match s { @@ -26,19 +25,20 @@ impl From for SpiffeSelector { } #[derive(Debug, Clone)] -/// Selector represents a SPIFFE ID selector. +/// Represents various types of SPIFFE identity selectors. pub enum Selector { - /// K8s represents a SPIFFE ID selector. + /// Represents a SPIFFE identity selector based on Kubernetes constructs. K8s(K8s), - /// Selector represents a SPIFFE ID selector. + /// Represents a SPIFFE identity selector based on Unix system constructs such as PID, GID, and UID. Unix(Unix), - /// Selector represents a SPIFFE ID selector. + /// Represents a generic SPIFFE identity selector defined by a key-value pair. Generic((String, String)), } const K8S_SA_TYPE: &str = "sa"; const K8S_NS_TYPE: &str = "ns"; +/// Converts Kubernetes selectors to their string representation. impl From for String { fn from(k: K8s) -> String { match k { @@ -49,11 +49,11 @@ impl From for String { } #[derive(Debug, Clone)] -/// K8s is a helper type to create a SPIFFE ID selector for Kubernetes. +/// Represents a SPIFFE identity selector for Kubernetes. pub enum K8s { - /// ServiceAccount represents the SPIFFE ID selector for a Kubernetes service account. + /// SPIFFE identity selector for a Kubernetes service account. ServiceAccount(String), - /// Namespace represents the SPIFFE ID selector for a Kubernetes namespace. + /// SPIFFE identity selector for a Kubernetes namespace. Namespace(String), } @@ -61,6 +61,7 @@ const UNIX_PID_TYPE: &str = "pid"; const UNIX_GID_TYPE: &str = "gid"; const UNIX_UID_TYPE: &str = "uid"; +/// Converts a Unix selector into a formatted string representation. impl From for String { fn from(value: Unix) -> Self { match value { @@ -73,7 +74,6 @@ impl From for String { #[derive(Debug, Clone)] /// Represents SPIFFE identity selectors based on Unix process-related attributes. -#[derive(Debug, Clone)] pub enum Unix { /// Specifies a selector for a Unix process ID (PID). Pid(u16), diff --git a/spire-api/tests/delegated_identity_api_client_test.rs b/spire-api/tests/delegated_identity_api_client_test.rs index f2a5146..9173995 100644 --- a/spire-api/tests/delegated_identity_api_client_test.rs +++ b/spire-api/tests/delegated_identity_api_client_test.rs @@ -4,8 +4,16 @@ #[cfg(feature = "integration-tests")] mod integration_tests { + use once_cell::sync::Lazy; + use spiffe::bundle::BundleRefSource; + use spiffe::bundle::jwt::JwtBundleSet; + use spiffe::spiffe_id::TrustDomain; use spire_api::selectors; use std::process::Command; + use tokio_stream::StreamExt; + use spire_api::agent::delegated_identity::DelegatedIdentityClient; + + static TRUST_DOMAIN: Lazy = Lazy::new(|| TrustDomain::new("example.org").unwrap()); fn get_uid() -> u16 { let mut uid = String::from_utf8( @@ -20,12 +28,31 @@ mod integration_tests { uid.parse().expect("could not parse uid to number") } + async fn get_client() -> DelegatedIdentityClient { + DelegatedIdentityClient::default() + .await + .expect("failed to create client") + } + #[tokio::test] - async fn fetch_delegate_svid() { - // let uid: u16 = std::env::var("UID").expect("UID env var not present").parse().expect("could not parse uid to number"); - let client = spire_api::agent::delegated_identity::DelegatedIdentityClient::default() + async fn fetch_delegate_jwt_svid() { + let mut client = get_client().await; + let svid = client + .fetch_jwt_svids( + &["my_audience"], + vec![selectors::Selector::Unix(selectors::Unix::Uid( + get_uid() + 1, + ))], + ) .await - .expect("failed to create client"); + .expect("Failed to fetch JWT SVID"); + assert_eq!(svid.len(), 1); + assert_eq!(svid[0].audience(), &["my_audience"]); + } + + #[tokio::test] + async fn fetch_delegate_x509_svid() { + let mut client = get_client().await; let response: spiffe::svid::x509::X509Svid = client .fetch_x509_svid(vec![selectors::Selector::Unix(selectors::Unix::Uid( get_uid() + 1, @@ -42,20 +69,111 @@ mod integration_tests { } #[tokio::test] - async fn fetch_trust_bundles() { - // let uid: u16 = std::env::var("UID").expect("UID env var not present").parse().expect("could not parse uid to number"); - let client = spire_api::agent::delegated_identity::DelegatedIdentityClient::default() + async fn stream_delegate_x509_svid() { + let test_duration = std::time::Duration::from_secs(60); + let mut client = get_client().await; + let mut stream = client + .stream_x509_svids(vec![selectors::Selector::Unix(selectors::Unix::Uid( + get_uid() + 1, + ))]) .await - .expect("failed to create client"); + .expect("Failed to fetch delegate SVID"); + + let result = tokio::time::timeout(test_duration, stream.next()) + .await + .expect("Test did not complete in the expected duration"); + let response = result.expect("empty result").expect("error in stream"); + // Not checking the chain as the root is generated by spire. + // In the future we could look in the downloaded spire directory for the keys. + assert_eq!(response.cert_chain().len(), 1); + assert_eq!( + response.spiffe_id().to_string(), + "spiffe://example.org/different-process" + ); + } + + #[tokio::test] + async fn fetch_delegated_x509_trust_bundles() { + let mut client = get_client().await; let response = client .fetch_x509_bundles() .await .expect("Failed to fetch trust bundles"); response - .get_bundle( - &spiffe::spiffe_id::TrustDomain::new("example.org".as_ref()) - .expect("Failed to parse trust domain ="), - ) + .get_bundle(&*TRUST_DOMAIN) + .expect("Failed to get bundle"); + } + + #[tokio::test] + async fn stream_delegated_x509_trust_bundles() { + let test_duration = std::time::Duration::from_secs(60); + let mut client = get_client().await; + let mut stream = client + .stream_x509_bundles() + .await + .expect("Failed to fetch trust bundles"); + + let result = tokio::time::timeout(test_duration, stream.next()) + .await + .expect("Test did not complete in the expected duration"); + let response = result.expect("empty result").expect("error in stream"); + response + .get_bundle(&*TRUST_DOMAIN) .expect("Failed to get bundle"); } + + async fn verify_jwt( + client: &mut DelegatedIdentityClient, + bundles: JwtBundleSet, + ) { + let svids = client + .fetch_jwt_svids( + &["my_audience"], + vec![selectors::Selector::Unix(selectors::Unix::Uid( + get_uid() + 1, + ))], + ) + .await + .expect("Failed to fetch JWT SVID"); + let svid = svids.first().expect("no items in jwt bundle list"); + let key_id = svid.key_id(); + + let bundle = bundles.get_bundle_for_trust_domain(&*TRUST_DOMAIN); + let bundle = bundle + .expect("Bundle was None") + .expect("Failed to unwrap bundle"); + assert_eq!(bundle.trust_domain(), &*TRUST_DOMAIN); + assert_eq!( + bundle.find_jwt_authority(key_id).unwrap().key_id, + Some(key_id.to_string()) + ); + } + + #[tokio::test] + async fn fetch_delegated_jwt_trust_bundles() { + let mut client = get_client().await; + let response = client + .fetch_jwt_bundles() + .await + .expect("Failed to fetch trust bundles"); + + + verify_jwt(&mut client, response).await; + } + + #[tokio::test] + async fn stream_delegated_jwt_trust_bundles() { + let mut client = get_client().await; + let test_duration = std::time::Duration::from_secs(60); + let mut stream = client + .stream_jwt_bundles() + .await + .expect("Failed to fetch trust bundles"); + + let result = tokio::time::timeout(test_duration, stream.next()) + .await + .expect("Test did not complete in the expected duration"); + + verify_jwt(&mut client, result.expect("empty result").expect("error in stream")).await; + } }