Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Telemetry API #563

Merged
merged 4 commits into from Nov 14, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
132 changes: 123 additions & 9 deletions lambda-extension/src/extension.rs
Expand Up @@ -9,22 +9,31 @@ use tokio_stream::StreamExt;
use tower::{service_fn, MakeService, Service, ServiceExt};
use tracing::{error, trace};

use crate::{logs::*, requests, Error, ExtensionError, LambdaEvent, NextEvent};
use crate::{
logs::*,
requests::{self, Api},
telemetry_wrapper, Error, ExtensionError, LambdaEvent, LambdaTelemetry, NextEvent,
};

const DEFAULT_LOG_PORT_NUMBER: u16 = 9002;
const DEFAULT_TELEMETRY_PORT_NUMBER: u16 = 9003;

/// An Extension that runs event and log processors
pub struct Extension<'a, E, L> {
/// An Extension that runs event, log and telemetry processors
pub struct Extension<'a, E, L, T> {
extension_name: Option<&'a str>,
events: Option<&'a [&'a str]>,
events_processor: E,
log_types: Option<&'a [&'a str]>,
logs_processor: Option<L>,
log_buffering: Option<LogBuffering>,
log_port_number: u16,
telemetry_types: Option<&'a [&'a str]>,
telemetry_processor: Option<T>,
telemetry_buffering: Option<LogBuffering>,
telemetry_port_number: u16,
}

impl<'a> Extension<'a, Identity<LambdaEvent>, MakeIdentity<Vec<LambdaLog>>> {
impl<'a> Extension<'a, Identity<LambdaEvent>, MakeIdentity<Vec<LambdaLog>>, MakeIdentity<Vec<LambdaTelemetry>>> {
/// Create a new base [`Extension`] with a no-op events processor
pub fn new() -> Self {
Extension {
Expand All @@ -35,17 +44,23 @@ impl<'a> Extension<'a, Identity<LambdaEvent>, MakeIdentity<Vec<LambdaLog>>> {
log_buffering: None,
logs_processor: None,
log_port_number: DEFAULT_LOG_PORT_NUMBER,
telemetry_types: None,
telemetry_buffering: None,
telemetry_processor: None,
telemetry_port_number: DEFAULT_TELEMETRY_PORT_NUMBER,
}
}
}

impl<'a> Default for Extension<'a, Identity<LambdaEvent>, MakeIdentity<Vec<LambdaLog>>> {
impl<'a> Default
for Extension<'a, Identity<LambdaEvent>, MakeIdentity<Vec<LambdaLog>>, MakeIdentity<Vec<LambdaTelemetry>>>
{
fn default() -> Self {
Self::new()
}
}

impl<'a, E, L> Extension<'a, E, L>
impl<'a, E, L, T> Extension<'a, E, L, T>
where
E: Service<LambdaEvent>,
E::Future: Future<Output = Result<(), E::Error>>,
Expand All @@ -58,6 +73,14 @@ where
L::Error: Into<Box<dyn std::error::Error + Send + Sync>> + fmt::Debug,
L::MakeError: Into<Box<dyn std::error::Error + Send + Sync>> + fmt::Debug,
L::Future: Send,

// Fixme: 'static bound might be too restrictive
T: MakeService<(), Vec<LambdaTelemetry>, Response = ()> + Send + Sync + 'static,
T::Service: Service<Vec<LambdaTelemetry>, Response = ()> + Send + Sync,
<T::Service as Service<Vec<LambdaTelemetry>>>::Future: Send + 'a,
T::Error: Into<Box<dyn std::error::Error + Send + Sync>> + fmt::Debug,
T::MakeError: Into<Box<dyn std::error::Error + Send + Sync>> + fmt::Debug,
T::Future: Send,
{
/// Create a new [`Extension`] with a given extension name
pub fn with_extension_name(self, extension_name: &'a str) -> Self {
Expand All @@ -77,7 +100,7 @@ where
}

/// Create a new [`Extension`] with a service that receives Lambda events.
pub fn with_events_processor<N>(self, ep: N) -> Extension<'a, N, L>
pub fn with_events_processor<N>(self, ep: N) -> Extension<'a, N, L, T>
where
N: Service<LambdaEvent>,
N::Future: Future<Output = Result<(), N::Error>>,
Expand All @@ -91,11 +114,15 @@ where
log_buffering: self.log_buffering,
logs_processor: self.logs_processor,
log_port_number: self.log_port_number,
telemetry_types: self.telemetry_types,
telemetry_buffering: self.telemetry_buffering,
telemetry_processor: self.telemetry_processor,
telemetry_port_number: self.telemetry_port_number,
}
}

/// Create a new [`Extension`] with a service that receives Lambda logs.
pub fn with_logs_processor<N, NS>(self, lp: N) -> Extension<'a, E, N>
pub fn with_logs_processor<N, NS>(self, lp: N) -> Extension<'a, E, N, T>
where
N: Service<()>,
N::Future: Future<Output = Result<NS, N::Error>>,
Expand All @@ -109,6 +136,10 @@ where
log_types: self.log_types,
log_buffering: self.log_buffering,
log_port_number: self.log_port_number,
telemetry_types: self.telemetry_types,
telemetry_buffering: self.telemetry_buffering,
telemetry_processor: self.telemetry_processor,
telemetry_port_number: self.telemetry_port_number,
}
}

Expand Down Expand Up @@ -137,6 +168,53 @@ where
}
}

/// Create a new [`Extension`] with a service that receives Lambda logs.
RafalSumislawski marked this conversation as resolved.
Show resolved Hide resolved
pub fn with_telemetry_processor<N, NS>(self, lp: N) -> Extension<'a, E, L, N>
where
N: Service<()>,
N::Future: Future<Output = Result<NS, N::Error>>,
N::Error: Into<Box<dyn std::error::Error + Send + Sync>> + fmt::Display,
{
Extension {
telemetry_processor: Some(lp),
events_processor: self.events_processor,
extension_name: self.extension_name,
events: self.events,
log_types: self.log_types,
log_buffering: self.log_buffering,
logs_processor: self.logs_processor,
log_port_number: self.log_port_number,
telemetry_types: self.telemetry_types,
telemetry_buffering: self.telemetry_buffering,
telemetry_port_number: self.telemetry_port_number,
}
}

/// Create a new [`Extension`] with a list of telemetry types to subscribe.
/// The only accepted telemetry types are `function`, `platform`, and `extension`.
pub fn with_telemetry_types(self, telemetry_types: &'a [&'a str]) -> Self {
Extension {
telemetry_types: Some(telemetry_types),
..self
}
}

/// Create a new [`Extension`] with specific configuration to buffer telemetry.
pub fn with_telemetry_buffering(self, lb: LogBuffering) -> Self {
Extension {
telemetry_buffering: Some(lb),
..self
}
}

/// Create a new [`Extension`] with a different port number to listen to telemetry.
pub fn with_telemetry_port_number(self, port_number: u16) -> Self {
Extension {
telemetry_port_number: port_number,
..self
}
}

/// Execute the given extension
pub async fn run(self) -> Result<(), Error> {
let client = &Client::builder().build()?;
Expand Down Expand Up @@ -166,7 +244,8 @@ where
trace!("Log processor started");

// Call Logs API to start receiving events
let req = requests::subscribe_logs_request(
let req = requests::subscribe_request(
Api::LogsApi,
extension_id,
self.log_types,
self.log_buffering,
Expand All @@ -179,6 +258,41 @@ where
trace!("Registered extension with Logs API");
}

if let Some(mut telemetry_processor) = self.telemetry_processor {
trace!("Telemetry processor found");
// Spawn task to run processor
let addr = SocketAddr::from(([0, 0, 0, 0], self.telemetry_port_number));
let make_service = service_fn(move |_socket: &AddrStream| {
trace!("Creating new telemetry processor Service");
let service = telemetry_processor.make_service(());
async move {
let service = Arc::new(Mutex::new(service.await?));
Ok::<_, T::MakeError>(service_fn(move |req| telemetry_wrapper(service.clone(), req)))
}
});
let server = Server::bind(&addr).serve(make_service);
tokio::spawn(async move {
if let Err(e) = server.await {
error!("Error while running telemetry processor: {}", e);
}
});
trace!("Telemetry processor started");

// Call Telemetry API to start receiving events
let req = requests::subscribe_request(
Api::TelemetryApi,
extension_id,
self.telemetry_types,
self.telemetry_buffering,
self.telemetry_port_number,
)?;
let res = client.call(req).await?;
if res.status() != http::StatusCode::OK {
return Err(ExtensionError::boxed("unable to initialize the telemetry api"));
}
trace!("Registered extension with Telemetry API");
}

let incoming = async_stream::stream! {
loop {
trace!("Waiting for next event (incoming loop)");
Expand Down
2 changes: 2 additions & 0 deletions lambda-extension/src/lib.rs
Expand Up @@ -17,6 +17,8 @@ mod events;
pub use events::*;
mod logs;
pub use logs::*;
mod telemetry;
pub use telemetry::*;

/// Include several request builders to interact with the Extension API.
pub mod requests;
Expand Down
28 changes: 25 additions & 3 deletions lambda-extension/src/requests.rs
Expand Up @@ -29,7 +29,29 @@ pub(crate) fn register_request(extension_name: &str, events: &[&str]) -> Result<
Ok(req)
}

pub(crate) fn subscribe_logs_request(
pub(crate) enum Api {
LogsApi,
TelemetryApi,
}

impl Api {
pub(crate) fn schema_version(&self) -> &str {
match *self {
Api::LogsApi => "2021-03-18",
Api::TelemetryApi => "2022-07-01",
}
}

pub(crate) fn uri(&self) -> &str {
match *self {
Api::LogsApi => "/2020-08-15/logs",
Api::TelemetryApi => "/2022-07-01/telemetry",
}
}
}

pub(crate) fn subscribe_request(
api: Api,
extension_id: &str,
types: Option<&[&str]>,
buffering: Option<LogBuffering>,
Expand All @@ -38,7 +60,7 @@ pub(crate) fn subscribe_logs_request(
let types = types.unwrap_or(&["platform", "function"]);

let data = serde_json::json!({
"schemaVersion": "2021-03-18",
"schemaVersion": api.schema_version(),
"types": types,
"buffering": buffering.unwrap_or_default(),
"destination": {
Expand All @@ -49,7 +71,7 @@ pub(crate) fn subscribe_logs_request(

let req = build_request()
.method(Method::PUT)
.uri("/2020-08-15/logs")
.uri(api.uri())
.header(EXTENSION_ID_HEADER, extension_id)
.body(Body::from(serde_json::to_string(&data)?))?;

Expand Down