Skip to content

Commit

Permalink
Adddressed PR comments and included suggested comments
Browse files Browse the repository at this point in the history
Signed-off-by: Eitan Yarmush <eitan.yarmush@solo.io>
  • Loading branch information
EItanya committed Aug 15, 2023
1 parent a566ddf commit e2e4e50
Show file tree
Hide file tree
Showing 8 changed files with 278 additions and 72 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Expand Up @@ -37,7 +37,7 @@ jobs:
runs-on: ubuntu-latest
env:
SPIFFE_ENDPOINT_SOCKET: unix:/tmp/spire-agent/public/api.sock
SPIFFE_ADMIN_ENDPOINT_SOCKET: unix:/tmp/spire-agent/admin/api.sock
SPIRE_ADMIN_ENDPOINT_SOCKET: unix:/tmp/spire-agent/admin/api.sock
needs: build
steps:
- name: Check out code
Expand Down
2 changes: 1 addition & 1 deletion scripts/agent.conf
Expand Up @@ -9,7 +9,7 @@ agent {
# simple testing/evaluation purposes.
insecure_bootstrap = true

admin_socket_path = "$STRIPPED_SPIFFE_ADMIN_ENDPOINT_SOCKET"
admin_socket_path = "$STRIPPED_SPIRE_ADMIN_ENDPOINT_SOCKET"
authorized_delegates = [
"spiffe://example.org/myservice",
]
Expand Down
2 changes: 1 addition & 1 deletion scripts/run-spire.sh
Expand Up @@ -37,7 +37,7 @@ mkdir -p /tmp/spire-server
bin/spire-server run -config conf/server/server.conf > "${spire_server_log_file}" 2>&1 &
wait_for_service "bin/spire-server healthcheck" "SPIRE Server" "${spire_server_log_file}"

export STRIPPED_SPIFFE_ADMIN_ENDPOINT_SOCKET=$(echo $SPIFFE_ADMIN_ENDPOINT_SOCKET| cut -c6-)
export STRIPPED_SPIRE_ADMIN_ENDPOINT_SOCKET=$(echo $SPIRE_ADMIN_ENDPOINT_SOCKET| cut -c6-)
cat $SCRIPT_DIR/agent.conf | envsubst > "conf/agent/agent.conf"

# Run the SPIRE agent with the joint token
Expand Down
12 changes: 1 addition & 11 deletions spire-api/Cargo.toml
Expand Up @@ -23,19 +23,9 @@ prost-types = {version = "0.11"}
tokio = { "version" = "1", features = ["net", "test-util"]}
tokio-stream = "0.1"
tower = { version = "0.4", features = ["util"] }
thiserror = "1.0"
url = "2.2"
asn1 = { package = "simple_asn1", version = "0.6" }
x509-parser = "0.15"
pkcs8 = "0.10"
jsonwebtoken = "8.3"
jsonwebkey = { version = "0.3", features = ["jsonwebtoken", "jwt-convert"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
zeroize = { version = "1.6", features = ["zeroize_derive"] }
time = "0.3"

[dev-dependencies]
once_cell = "1.18"

[build-dependencies]
tonic-build = { version = "0.9", default-features = false, features = ["prost"] }
Expand Down
157 changes: 129 additions & 28 deletions spire-api/src/agent/delegated_identity.rs
Expand Up @@ -8,26 +8,31 @@

use crate::proto::spire::api::agent::delegatedidentity::v1::{
delegated_identity_client::DelegatedIdentityClient as DelegatedIdentityApiClient,
SubscribeToX509BundlesRequest, SubscribeToX509BundlesResponse, SubscribeToX509sviDsRequest,
SubscribeToX509sviDsResponse,
FetchJwtsviDsRequest, SubscribeToX509BundlesRequest, SubscribeToX509BundlesResponse,
SubscribeToX509sviDsRequest, SubscribeToX509sviDsResponse, SubscribeToJwtBundlesRequest,
SubscribeToJwtBundlesResponse,
};
use crate::proto::spire::api::types::Jwtsvid as ProtoJwtSvid;
use spiffe::bundle::x509::{X509Bundle, X509BundleSet};
use spiffe::bundle::jwt::{JwtBundleSet, JwtBundle};
use spiffe::spiffe_id::TrustDomain;
use spiffe::svid::jwt::JwtSvid;
use spiffe::svid::x509::X509Svid;
use spiffe::workload_api::address::validate_socket_path;
use tokio_stream::{Stream, StreamExt};

use crate::selectors::Selector;
use spiffe::workload_api::client::{ClientError, DEFAULT_SVID};
use std::convert::{Into, TryFrom};
use std::str::FromStr;
use tokio::net::UnixStream;
use tonic::transport::{Endpoint, Uri};
use tower::service_fn;

/// Name of the environment variable that holds the default socket endpoint path.
pub const ADMIN_SOCKET_ENV: &str = "SPIFFE_ADMIN_ENDPOINT_SOCKET";
pub const ADMIN_SOCKET_ENV: &str = "SPIRE_ADMIN_ENDPOINT_SOCKET";

/// Gets the endpoint socket endpoint path from the environment variable `SPIFFE_ENDPOINT_SOCKET`,
/// Gets the endpoint socket endpoint path from the environment variable `ADMIN_SOCKET_ENV`,
/// as described in [SPIFFE standard](https://github.com/spiffe/spiffe/blob/main/standards/SPIFFE_Workload_Endpoint.md#4-locating-the-endpoint).
pub fn get_admin_socket_path() -> Option<String> {
match std::env::var(ADMIN_SOCKET_ENV) {
Expand Down Expand Up @@ -132,7 +137,7 @@ impl DelegatedIdentityClient {
///
/// Returns [`ClientError`] if the gRPC call fails or if the SVID could not be parsed from the gRPC response.
pub async fn fetch_x509_svid(
mut self,
&mut self,
selectors: Vec<Selector>,
) -> Result<X509Svid, ClientError> {
let request = SubscribeToX509sviDsRequest {
Expand Down Expand Up @@ -172,7 +177,7 @@ impl DelegatedIdentityClient {
///
/// Individual stream items might also be errors if there's an issue processing the response for a specific update.
pub async fn stream_x509_svids(
mut self,
&mut self,
selectors: Vec<Selector>,
) -> Result<impl Stream<Item = Result<X509Svid, ClientError>>, ClientError> {
let request = SubscribeToX509sviDsRequest {
Expand All @@ -197,7 +202,7 @@ impl DelegatedIdentityClient {
///
/// The function returns a variant of [`ClientError`] if there is en error connecting to the Workload API or
/// there is a problem processing the response.
pub async fn fetch_x509_bundles(mut self) -> Result<X509BundleSet, ClientError> {
pub async fn fetch_x509_bundles(&mut self) -> Result<X509BundleSet, ClientError> {
let request = SubscribeToX509BundlesRequest::default();

let response: tonic::Response<tonic::Streaming<SubscribeToX509BundlesResponse>> =
Expand Down Expand Up @@ -227,7 +232,7 @@ impl DelegatedIdentityClient {
///
/// Individual stream items might also be errors if there's an issue processing the response for a specific update.
pub async fn stream_x509_bundles(
mut self,
&mut self,
) -> Result<impl Stream<Item = Result<X509BundleSet, ClientError>>, ClientError> {
let request = SubscribeToX509BundlesRequest::default();

Expand All @@ -242,37 +247,133 @@ impl DelegatedIdentityClient {

Ok(stream)
}

/// Fetches a list of [`JwtSvid`] parsing the JWT token in the Workload API response, for the given audience and selectors.
///
/// # Arguments
///
/// * `audience` - A list of audiences to include in the JWT token. Cannot be empty nor contain only empty strings.
/// * `selectors` - A list of selectors to filter the list of [`JwtSvid`].
///
/// # Errors
///
/// The function returns a variant of [`ClientError`] if there is en error connecting to the Workload API or
/// there is a problem processing the response.
pub async fn fetch_jwt_svids<T: AsRef<str> + ToString>(
&mut self,
audience: &[T],
selectors: Vec<Selector>,
) -> Result<Vec<JwtSvid>, ClientError> {
let request = FetchJwtsviDsRequest {
audience: audience.iter().map(|s| s.to_string()).collect(),
selectors: selectors.into_iter().map(|s| s.into()).collect(),
};

DelegatedIdentityClient::parse_jwt_svid_from_grpc_response(
self.client
.fetch_jwtsvi_ds(request)
.await?
.into_inner()
.svids,
)
}



/// Watches the stream of [`JwtBundleSet`] updates.
///
/// This function establishes a stream with the Workload API to continuously receive updates for the [`JwtBundleSet`].
/// The returned stream can be used to asynchronously yield new `JwtBundleSet` updates as they become available.
///
/// # Returns
///
/// Returns a stream of `Result<JwtBundleSet, ClientError>`. Each item represents an updated [`JwtBundleSet`] or an error if
/// there was a problem processing an update from the stream.
///
/// # Errors
///
/// The function can return an error variant of [`ClientError`] in the following scenarios:
///
/// * There's an issue connecting to the Workload API.
/// * An error occurs while setting up the stream.
///
/// Individual stream items might also be errors if there's an issue processing the response for a specific update.
pub async fn stream_jwt_bundles(
&mut self,
) -> Result<impl Stream<Item = Result<JwtBundleSet, ClientError>>, ClientError> {
let request = SubscribeToJwtBundlesRequest::default();
let response = self.client.subscribe_to_jwt_bundles(request).await?;
Ok(response.into_inner().map(|message| {
message
.map_err(ClientError::from)
.and_then(DelegatedIdentityClient::parse_jwt_bundle_set_from_grpc_response)
}))
}

/// Fetches [`JwtBundleSet`] that is a set of [`JwtBundle`] keyed by the trust domain to which they belong.
///
/// # Errors
///
/// The function returns a variant of [`ClientError`] if there is en error connecting to the Workload API or
/// there is a problem processing the response.
pub async fn fetch_jwt_bundles(
&mut self,
) -> Result<JwtBundleSet, ClientError> {
let request = SubscribeToJwtBundlesRequest::default();
let response = self.client.subscribe_to_jwt_bundles(request).await?;
let initial = response.into_inner().message().await?;
DelegatedIdentityClient::parse_jwt_bundle_set_from_grpc_response(initial.ok_or(ClientError::EmptyResponse)?)
}
}

impl DelegatedIdentityClient {
fn parse_x509_svid_from_grpc_response(
response: SubscribeToX509sviDsResponse,
) -> Result<X509Svid, ClientError> {
let svid = match response.x509_svids.get(DEFAULT_SVID) {
None => return Err(ClientError::EmptyResponse),
Some(s) => s,
};
let svid = response
.x509_svids
.get(DEFAULT_SVID)
.ok_or(ClientError::EmptyResponse)?;

// OPTIMIZE THIS
let mut total_length = 0;
svid.x509_svid
.as_ref()
.ok_or(ClientError::EmptyResponse)?
.cert_chain
.iter()
.for_each(|c| total_length += c.len());
let mut cert_chain = bytes::BytesMut::with_capacity(total_length);
svid.x509_svid
.as_ref()
.ok_or(ClientError::EmptyResponse)?
.cert_chain
.iter()
.for_each(|c| cert_chain.extend(c));
let x509_svid = svid.x509_svid.as_ref().ok_or(ClientError::EmptyResponse)?;

let total_length = x509_svid.cert_chain.iter().map(|c| c.len()).sum();
let mut cert_chain_bytes = Vec::with_capacity(total_length);

for c in &x509_svid.cert_chain {
cert_chain_bytes.extend_from_slice(c);
}

X509Svid::parse_from_der(cert_chain.as_ref(), svid.x509_svid_key.as_ref())
X509Svid::parse_from_der(&cert_chain_bytes, svid.x509_svid_key.as_ref())
.map_err(|e| e.into())
}

fn parse_jwt_svid_from_grpc_response(
svids: Vec<ProtoJwtSvid>,
) -> Result<Vec<JwtSvid>, ClientError> {
let result: Result<Vec<JwtSvid>, ClientError> = svids
.iter()
.map(|r| JwtSvid::from_str(&r.token).map_err(ClientError::InvalidJwtSvid))
.collect();
result
}

fn parse_jwt_bundle_set_from_grpc_response(
response: SubscribeToJwtBundlesResponse,
) -> Result<JwtBundleSet, ClientError> {
let mut bundle_set = JwtBundleSet::new();

for (td, bundle_data) in response.bundles.into_iter() {
let trust_domain = TrustDomain::try_from(td)?;
let bundle = JwtBundle::from_jwt_authorities(trust_domain, &bundle_data)
.map_err(ClientError::from)?;

bundle_set.add_bundle(bundle);
}

Ok(bundle_set)
}

fn parse_x509_bundle_set_from_grpc_response(
response: SubscribeToX509BundlesResponse,
) -> Result<X509BundleSet, ClientError> {
Expand Down
11 changes: 4 additions & 7 deletions spire-api/src/agent/mod.rs
@@ -1,13 +1,10 @@
//! Agent API
//!
//! The agent API consists of APIs:
//! - delegated_identity
//! - debug (not implemented)
//! Consists of the following APIs:
//! - `delegated_identity`: For managing delegated identities.
//! - `debug`: (Not yet implemented).
//!
//! # Note
//! Both of these APIs must be accesed via the admin_socket_path which can be set
//! in the [agent configuration file](https://spiffe.io/docs/latest/deploying/spire_agent/#agent-configuration-file).
//! Access these APIs via the `admin_socket_path` in the [agent configuration file](https://spiffe.io/docs/latest/deploying/spire_agent/#agent-configuration-file).
//!
//!

pub mod delegated_identity;
22 changes: 11 additions & 11 deletions spire-api/src/selectors.rs
@@ -1,11 +1,10 @@
//! Selectors which conform to SPIRE standards.
//!
//! Selectors conforming to SPIRE standards.
use crate::proto::spire::api::types::Selector as SpiffeSelector;

const K8S_TYPE: &str = "k8s";
const UNIX_TYPE: &str = "unix";

/// User-facing version of underlying proto selector type
/// Converts user-defined selectors into SPIFFE selectors.
impl From<Selector> for SpiffeSelector {
fn from(s: Selector) -> SpiffeSelector {
match s {
Expand All @@ -26,19 +25,20 @@ impl From<Selector> for SpiffeSelector {
}

#[derive(Debug, Clone)]
/// Selector represents a SPIFFE ID selector.
/// Represents various types of SPIFFE identity selectors.
pub enum Selector {
/// K8s represents a SPIFFE ID selector.
/// Represents a SPIFFE identity selector based on Kubernetes constructs.
K8s(K8s),
/// Selector represents a SPIFFE ID selector.
/// Represents a SPIFFE identity selector based on Unix system constructs such as PID, GID, and UID.
Unix(Unix),
/// Selector represents a SPIFFE ID selector.
/// Represents a generic SPIFFE identity selector defined by a key-value pair.
Generic((String, String)),
}

const K8S_SA_TYPE: &str = "sa";
const K8S_NS_TYPE: &str = "ns";

/// Converts Kubernetes selectors to their string representation.
impl From<K8s> for String {
fn from(k: K8s) -> String {
match k {
Expand All @@ -49,18 +49,19 @@ impl From<K8s> for String {
}

#[derive(Debug, Clone)]
/// K8s is a helper type to create a SPIFFE ID selector for Kubernetes.
/// Represents a SPIFFE identity selector for Kubernetes.
pub enum K8s {
/// ServiceAccount represents the SPIFFE ID selector for a Kubernetes service account.
/// SPIFFE identity selector for a Kubernetes service account.
ServiceAccount(String),
/// Namespace represents the SPIFFE ID selector for a Kubernetes namespace.
/// SPIFFE identity selector for a Kubernetes namespace.
Namespace(String),
}

const UNIX_PID_TYPE: &str = "pid";
const UNIX_GID_TYPE: &str = "gid";
const UNIX_UID_TYPE: &str = "uid";

/// Converts a Unix selector into a formatted string representation.
impl From<Unix> for String {
fn from(value: Unix) -> Self {
match value {
Expand All @@ -73,7 +74,6 @@ impl From<Unix> for String {

#[derive(Debug, Clone)]
/// Represents SPIFFE identity selectors based on Unix process-related attributes.
#[derive(Debug, Clone)]
pub enum Unix {
/// Specifies a selector for a Unix process ID (PID).
Pid(u16),
Expand Down

0 comments on commit e2e4e50

Please sign in to comment.