From 5e001177785bfb1ad4c3f3ffc4a91f9b2005b704 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Sumis=C5=82awski?= Date: Thu, 20 Oct 2022 10:04:45 +0200 Subject: [PATCH 1/4] Add support for Telemetry API --- lambda-extension/src/extension.rs | 132 ++++++++- lambda-extension/src/lib.rs | 2 + lambda-extension/src/requests.rs | 28 +- lambda-extension/src/telemetry.rs | 449 ++++++++++++++++++++++++++++++ 4 files changed, 599 insertions(+), 12 deletions(-) create mode 100644 lambda-extension/src/telemetry.rs diff --git a/lambda-extension/src/extension.rs b/lambda-extension/src/extension.rs index ec83ce71..d0af3d63 100644 --- a/lambda-extension/src/extension.rs +++ b/lambda-extension/src/extension.rs @@ -9,12 +9,17 @@ use tokio_stream::StreamExt; use tower::{service_fn, MakeService, Service, ServiceExt}; use tracing::{error, trace}; -use crate::{logs::*, requests, Error, ExtensionError, LambdaEvent, NextEvent}; +use crate::{ + logs::*, + requests::{self, Api}, + telemetry_wrapper, Error, ExtensionError, LambdaEvent, LambdaTelemetry, NextEvent, +}; const DEFAULT_LOG_PORT_NUMBER: u16 = 9002; +const DEFAULT_TELEMETRY_PORT_NUMBER: u16 = 9003; -/// An Extension that runs event and log processors -pub struct Extension<'a, E, L> { +/// An Extension that runs event, log and telemetry processors +pub struct Extension<'a, E, L, T> { extension_name: Option<&'a str>, events: Option<&'a [&'a str]>, events_processor: E, @@ -22,9 +27,13 @@ pub struct Extension<'a, E, L> { logs_processor: Option, log_buffering: Option, log_port_number: u16, + telemetry_types: Option<&'a [&'a str]>, + telemetry_processor: Option, + telemetry_buffering: Option, + telemetry_port_number: u16, } -impl<'a> Extension<'a, Identity, MakeIdentity>> { +impl<'a> Extension<'a, Identity, MakeIdentity>, MakeIdentity>> { /// Create a new base [`Extension`] with a no-op events processor pub fn new() -> Self { Extension { @@ -35,17 +44,23 @@ impl<'a> Extension<'a, Identity, MakeIdentity>> { log_buffering: None, logs_processor: None, log_port_number: DEFAULT_LOG_PORT_NUMBER, + telemetry_types: None, + telemetry_buffering: None, + telemetry_processor: None, + telemetry_port_number: DEFAULT_TELEMETRY_PORT_NUMBER, } } } -impl<'a> Default for Extension<'a, Identity, MakeIdentity>> { +impl<'a> Default + for Extension<'a, Identity, MakeIdentity>, MakeIdentity>> +{ fn default() -> Self { Self::new() } } -impl<'a, E, L> Extension<'a, E, L> +impl<'a, E, L, T> Extension<'a, E, L, T> where E: Service, E::Future: Future>, @@ -58,6 +73,14 @@ where L::Error: Into> + fmt::Debug, L::MakeError: Into> + fmt::Debug, L::Future: Send, + + // Fixme: 'static bound might be too restrictive + T: MakeService<(), Vec, Response = ()> + Send + Sync + 'static, + T::Service: Service, Response = ()> + Send + Sync, + >>::Future: Send + 'a, + T::Error: Into> + fmt::Debug, + T::MakeError: Into> + fmt::Debug, + T::Future: Send, { /// Create a new [`Extension`] with a given extension name pub fn with_extension_name(self, extension_name: &'a str) -> Self { @@ -77,7 +100,7 @@ where } /// Create a new [`Extension`] with a service that receives Lambda events. - pub fn with_events_processor(self, ep: N) -> Extension<'a, N, L> + pub fn with_events_processor(self, ep: N) -> Extension<'a, N, L, T> where N: Service, N::Future: Future>, @@ -91,11 +114,15 @@ where log_buffering: self.log_buffering, logs_processor: self.logs_processor, log_port_number: self.log_port_number, + telemetry_types: self.telemetry_types, + telemetry_buffering: self.telemetry_buffering, + telemetry_processor: self.telemetry_processor, + telemetry_port_number: self.telemetry_port_number, } } /// Create a new [`Extension`] with a service that receives Lambda logs. - pub fn with_logs_processor(self, lp: N) -> Extension<'a, E, N> + pub fn with_logs_processor(self, lp: N) -> Extension<'a, E, N, T> where N: Service<()>, N::Future: Future>, @@ -109,6 +136,10 @@ where log_types: self.log_types, log_buffering: self.log_buffering, log_port_number: self.log_port_number, + telemetry_types: self.telemetry_types, + telemetry_buffering: self.telemetry_buffering, + telemetry_processor: self.telemetry_processor, + telemetry_port_number: self.telemetry_port_number, } } @@ -137,6 +168,53 @@ where } } + /// Create a new [`Extension`] with a service that receives Lambda logs. + pub fn with_telemetry_processor(self, lp: N) -> Extension<'a, E, L, N> + where + N: Service<()>, + N::Future: Future>, + N::Error: Into> + fmt::Display, + { + Extension { + telemetry_processor: Some(lp), + events_processor: self.events_processor, + extension_name: self.extension_name, + events: self.events, + log_types: self.log_types, + log_buffering: self.log_buffering, + logs_processor: self.logs_processor, + log_port_number: self.log_port_number, + telemetry_types: self.telemetry_types, + telemetry_buffering: self.telemetry_buffering, + telemetry_port_number: self.telemetry_port_number, + } + } + + /// Create a new [`Extension`] with a list of telemetry types to subscribe. + /// The only accepted telemetry types are `function`, `platform`, and `extension`. + pub fn with_telemetry_types(self, telemetry_types: &'a [&'a str]) -> Self { + Extension { + telemetry_types: Some(telemetry_types), + ..self + } + } + + /// Create a new [`Extension`] with specific configuration to buffer telemetry. + pub fn with_telemetry_buffering(self, lb: LogBuffering) -> Self { + Extension { + telemetry_buffering: Some(lb), + ..self + } + } + + /// Create a new [`Extension`] with a different port number to listen to telemetry. + pub fn with_telemetry_port_number(self, port_number: u16) -> Self { + Extension { + telemetry_port_number: port_number, + ..self + } + } + /// Execute the given extension pub async fn run(self) -> Result<(), Error> { let client = &Client::builder().build()?; @@ -166,7 +244,8 @@ where trace!("Log processor started"); // Call Logs API to start receiving events - let req = requests::subscribe_logs_request( + let req = requests::subscribe_request( + Api::LogsApi, extension_id, self.log_types, self.log_buffering, @@ -179,6 +258,41 @@ where trace!("Registered extension with Logs API"); } + if let Some(mut telemetry_processor) = self.telemetry_processor { + trace!("Telemetry processor found"); + // Spawn task to run processor + let addr = SocketAddr::from(([0, 0, 0, 0], self.telemetry_port_number)); + let make_service = service_fn(move |_socket: &AddrStream| { + trace!("Creating new telemetry processor Service"); + let service = telemetry_processor.make_service(()); + async move { + let service = Arc::new(Mutex::new(service.await?)); + Ok::<_, T::MakeError>(service_fn(move |req| telemetry_wrapper(service.clone(), req))) + } + }); + let server = Server::bind(&addr).serve(make_service); + tokio::spawn(async move { + if let Err(e) = server.await { + error!("Error while running telemetry processor: {}", e); + } + }); + trace!("Telemetry processor started"); + + // Call Telemetry API to start receiving events + let req = requests::subscribe_request( + Api::TelemetryApi, + extension_id, + self.telemetry_types, + self.telemetry_buffering, + self.telemetry_port_number, + )?; + let res = client.call(req).await?; + if res.status() != http::StatusCode::OK { + return Err(ExtensionError::boxed("unable to initialize the telemetry api")); + } + trace!("Registered extension with Telemetry API"); + } + let incoming = async_stream::stream! { loop { trace!("Waiting for next event (incoming loop)"); diff --git a/lambda-extension/src/lib.rs b/lambda-extension/src/lib.rs index e3e72eba..796eb3ef 100644 --- a/lambda-extension/src/lib.rs +++ b/lambda-extension/src/lib.rs @@ -17,6 +17,8 @@ mod events; pub use events::*; mod logs; pub use logs::*; +mod telemetry; +pub use telemetry::*; /// Include several request builders to interact with the Extension API. pub mod requests; diff --git a/lambda-extension/src/requests.rs b/lambda-extension/src/requests.rs index 6cff70b6..64485d35 100644 --- a/lambda-extension/src/requests.rs +++ b/lambda-extension/src/requests.rs @@ -29,7 +29,29 @@ pub(crate) fn register_request(extension_name: &str, events: &[&str]) -> Result< Ok(req) } -pub(crate) fn subscribe_logs_request( +pub(crate) enum Api { + LogsApi, + TelemetryApi, +} + +impl Api { + pub(crate) fn schema_version(&self) -> &str { + match *self { + Api::LogsApi => "2021-03-18", + Api::TelemetryApi => "2022-07-01", + } + } + + pub(crate) fn uri(&self) -> &str { + match *self { + Api::LogsApi => "/2020-08-15/logs", + Api::TelemetryApi => "/2022-07-01/telemetry", + } + } +} + +pub(crate) fn subscribe_request( + api: Api, extension_id: &str, types: Option<&[&str]>, buffering: Option, @@ -38,7 +60,7 @@ pub(crate) fn subscribe_logs_request( let types = types.unwrap_or(&["platform", "function"]); let data = serde_json::json!({ - "schemaVersion": "2021-03-18", + "schemaVersion": api.schema_version(), "types": types, "buffering": buffering.unwrap_or_default(), "destination": { @@ -49,7 +71,7 @@ pub(crate) fn subscribe_logs_request( let req = build_request() .method(Method::PUT) - .uri("/2020-08-15/logs") + .uri(api.uri()) .header(EXTENSION_ID_HEADER, extension_id) .body(Body::from(serde_json::to_string(&data)?))?; diff --git a/lambda-extension/src/telemetry.rs b/lambda-extension/src/telemetry.rs new file mode 100644 index 00000000..9dfaeb4f --- /dev/null +++ b/lambda-extension/src/telemetry.rs @@ -0,0 +1,449 @@ +use chrono::{DateTime, Utc}; +use serde::Deserialize; +use std::{boxed::Box, fmt, sync::Arc}; +use tokio::sync::Mutex; +use tower::Service; +use tracing::{error, trace}; + +/// Payload received from the Telemetry API +#[derive(Clone, Debug, Deserialize, PartialEq)] +pub struct LambdaTelemetry { + /// Time when the telemetry was generated + pub time: DateTime, + /// Telemetry record entry + #[serde(flatten)] + pub record: LambdaTelemetryRecord, +} + +/// Record in a LambdaTelemetry entry +#[derive(Clone, Debug, Deserialize, PartialEq)] +#[serde(tag = "type", content = "record", rename_all = "lowercase")] +pub enum LambdaTelemetryRecord { + /// Function log records + Function(String), + + /// Extension log records + Extension(String), + + /// Platform init start record + #[serde(rename = "platform.initStart", rename_all = "camelCase")] + PlatformInitStart { + /// Type of initialization + initialization_type: InitType, + /// Phase of initialisation + phase: InitPhase, + /// Lambda runtime version + runtime_version: Option, + /// Lambda runtime version ARN + runtime_version_arn: Option, + }, + /// Platform init runtime done record + #[serde(rename = "platform.initRuntimeDone", rename_all = "camelCase")] + PlatformInitRuntimeDone { + /// Type of initialization + initialization_type: InitType, + /// Phase of initialisation + phase: Option, + /// Status of initalization + status: Status, + /// When the status = failure, the error_type describes what kind of error occurred + error_type: Option, + /// Spans + #[serde(default)] + spans: Vec, + }, + /// Platform init start record + #[serde(rename = "platform.initReport", rename_all = "camelCase")] + PlatformInitReport { + /// Type of initialization + initialization_type: InitType, + /// Phase of initialisation + phase: InitPhase, + /// Metrics + metrics: InitReportMetrics, + /// Spans + #[serde(default)] + spans: Vec, + }, + /// Record marking start of an invocation + #[serde(rename = "platform.start", rename_all = "camelCase")] + PlatformStart { + /// Request identifier + request_id: String, + /// Version of the Lambda function + version: Option, + /// Trace Context + tracing: Option, + }, + /// Record marking the completion of an invocation + #[serde(rename = "platform.runtimeDone", rename_all = "camelCase")] + PlatformRuntimeDone { + /// Request identifier + request_id: String, + /// Status of the invocation + status: Status, + /// When unsuccessful, the error_type describes what kind of error occurred + error_type: Option, + /// Metrics corresponding to the runtime + metrics: Option, + /// Spans + #[serde(default)] + spans: Vec, + /// Trace Context + tracing: Option, + }, + /// Platfor report record + #[serde(rename = "platform.report", rename_all = "camelCase")] + PlatformReport { + /// Request identifier + request_id: String, + /// Status of the invocation + status: Status, + /// When unsuccessful, the error_type describes what kind of error occurred + error_type: Option, + /// Metrics + metrics: ReportMetrics, + /// Spans + #[serde(default)] + spans: Vec, + /// Trace Context + tracing: Option, + }, + + /// Extension-specific record + #[serde(rename = "platform.extension", rename_all = "camelCase")] + PlatformExtension { + /// Name of the extension + name: String, + /// State of the extension + state: String, + /// Events sent to the extension + events: Vec, + }, + /// Telemetry processor-specific record + #[serde(rename = "platform.telemetrySubscription", rename_all = "camelCase")] + PlatformTelemetrySubscription { + /// Name of the extension + name: String, + /// State of the extensions + state: String, + /// Types of records sent to the extension + types: Vec, + }, + /// Record generated when the telemetry processor is falling behind + #[serde(rename = "platform.logsDropped", rename_all = "camelCase")] + PlatformLogsDropped { + /// Reason for dropping the logs + reason: String, + /// Number of records dropped + dropped_records: u64, + /// Total size of the dropped records + dropped_bytes: u64, + }, +} + +/// Type of Initialization +#[derive(Clone, Debug, Deserialize, PartialEq)] +#[serde(rename_all = "kebab-case")] +pub enum InitType { + /// Initialised on demand + OnDemand, + /// Initialized to meet the provisioned concurrency + ProvisionedConcurrency, + /// SnapStart + SnapStart, +} + +/// Phase in which initialization occurs +#[derive(Clone, Debug, Deserialize, PartialEq)] +#[serde(rename_all = "kebab-case")] +pub enum InitPhase { + /// Initialization phase + Init, + /// Invocation phase + Invoke, +} + +/// Status of invocation/initialization +#[derive(Clone, Debug, Deserialize, PartialEq)] +#[serde(rename_all = "kebab-case")] +pub enum Status { + /// Success + Success, + /// Error + Error, + /// Failure + Failure, + /// Timeout + Timeout, +} + +/// Span +#[derive(Clone, Debug, Deserialize, PartialEq)] +#[serde(rename_all = "camelCase")] +pub struct Span { + /// Duration of the span + pub duration_ms: f64, + /// Name of the span + pub name: String, + /// Start of the span + pub start: DateTime, +} + +/// Tracing Context +#[derive(Clone, Debug, Deserialize, PartialEq)] +#[serde(rename_all = "camelCase")] +pub struct TraceContext { + /// Span ID + pub span_id: Option, + /// Type of tracing + pub r#type: TracingType, + /// A string containing tracing information like trace_id. The contents may depend on the TracingType. + pub value: String, +} + +/// Type of tracing +#[derive(Clone, Debug, Deserialize, PartialEq)] +pub enum TracingType { + /// Amazon trace type + #[serde(rename = "X-Amzn-Trace-Id")] + AmznTraceId, +} + +///Init report metrics +#[derive(Clone, Debug, Deserialize, PartialEq)] +#[serde(rename_all = "camelCase")] +pub struct InitReportMetrics { + /// Duration of initialization + pub duration_ms: f64, +} + +/// Report metrics +#[derive(Clone, Debug, Deserialize, PartialEq)] +#[serde(rename_all = "camelCase")] +pub struct ReportMetrics { + /// Duration in milliseconds + pub duration_ms: f64, + /// Billed duration in milliseconds + pub billed_duration_ms: u64, + /// Memory allocated in megabytes + #[serde(rename = "memorySizeMB")] + pub memory_size_mb: u64, + /// Maximum memory used for the invoke in megabytes + #[serde(rename = "maxMemoryUsedMB")] + pub max_memory_used_mb: u64, + /// Init duration in case of a cold start + #[serde(default = "Option::default")] + pub init_duration_ms: Option, + /// Restore duration in milliseconds + #[serde(default = "Option::default")] + pub restore_duration_ms: Option, +} + +/// Runtime done metrics +#[derive(Clone, Debug, Deserialize, PartialEq)] +#[serde(rename_all = "camelCase")] +pub struct RuntimeDoneMetrics { + /// Duration in milliseconds + pub duration_ms: f64, + /// Number of bytes produced as a result of the invocation + pub produced_bytes: Option, +} + +/// Wrapper function that sends telemetry to the subscriber Service +/// +/// This takes an `hyper::Request` and transforms it into `Vec` for the +/// underlying `Service` to process. +pub(crate) async fn telemetry_wrapper( + service: Arc>, + req: hyper::Request, +) -> Result, Box> +where + S: Service, Response = ()>, + S::Error: Into> + fmt::Debug, + S::Future: Send, +{ + trace!("Received telemetry request"); + // Parse the request body as a Vec + let body = match hyper::body::to_bytes(req.into_body()).await { + Ok(body) => body, + Err(e) => { + error!("Error reading telemetry request body: {}", e); + return Ok(hyper::Response::builder() + .status(hyper::StatusCode::BAD_REQUEST) + .body(hyper::Body::empty()) + .unwrap()); + } + }; + + let telemetry: Vec = match serde_json::from_slice(&body) { + Ok(telemetry) => telemetry, + Err(e) => { + error!("Error parsing telemetry: {}", e); + return Ok(hyper::Response::builder() + .status(hyper::StatusCode::BAD_REQUEST) + .body(hyper::Body::empty()) + .unwrap()); + } + }; + + { + let mut service = service.lock().await; + match service.call(telemetry).await { + Ok(_) => (), + Err(err) => println!("{:?}", err), + } + } + + Ok(hyper::Response::new(hyper::Body::empty())) +} + +#[cfg(test)] +mod tests { + use super::*; + use chrono::TimeZone; + + macro_rules! deserialize_tests { + ($($name:ident: $value:expr,)*) => { + $( + #[test] + fn $name() { + let (input, expected) = $value; + let actual = serde_json::from_str::(&input).expect("unable to deserialize"); + + assert!(actual.record == expected); + } + )* + } + } + + deserialize_tests! { + // function + function: ( + r#"{"time": "2020-08-20T12:31:32.123Z","type": "function", "record": "hello world"}"#, + LambdaTelemetryRecord::Function("hello world".to_string()), + ), + + // extension + extension: ( + r#"{"time": "2020-08-20T12:31:32.123Z","type": "extension", "record": "hello world"}"#, + LambdaTelemetryRecord::Extension("hello world".to_string()), + ), + + // platform.start + platform_start: ( + r#"{"time":"2022-10-21T14:05:03.165Z","type":"platform.start","record":{"requestId":"459921b5-681c-4a96-beb0-81e0aa586026","version":"$LATEST","tracing":{"spanId":"24cd7d670fa455f0","type":"X-Amzn-Trace-Id","value":"Root=1-6352a70e-1e2c502e358361800241fd45;Parent=35465b3a9e2f7c6a;Sampled=1"}}}"#, + LambdaTelemetryRecord::PlatformStart { + request_id: "459921b5-681c-4a96-beb0-81e0aa586026".to_string(), + version: Some("$LATEST".to_string()), + tracing: Some(TraceContext{ + span_id: Some("24cd7d670fa455f0".to_string()), + r#type: TracingType::AmznTraceId, + value: "Root=1-6352a70e-1e2c502e358361800241fd45;Parent=35465b3a9e2f7c6a;Sampled=1".to_string(), + }), + }, + ), + // platform.initStart + platform_init_start: ( + r#"{"time":"2022-10-19T13:52:15.636Z","type":"platform.initStart","record":{"initializationType":"on-demand","phase":"init"}}"#, + LambdaTelemetryRecord::PlatformInitStart { + initialization_type: InitType::OnDemand, + phase: InitPhase::Init, + runtime_version: None, + runtime_version_arn: None, + }, + ), + // platform.runtimeDone + platform_runtime_done: ( + r#"{"time":"2022-10-21T14:05:05.764Z","type":"platform.runtimeDone","record":{"requestId":"459921b5-681c-4a96-beb0-81e0aa586026","status":"success","tracing":{"spanId":"24cd7d670fa455f0","type":"X-Amzn-Trace-Id","value":"Root=1-6352a70e-1e2c502e358361800241fd45;Parent=35465b3a9e2f7c6a;Sampled=1"},"spans":[{"name":"responseLatency","start":"2022-10-21T14:05:03.165Z","durationMs":2598.0},{"name":"responseDuration","start":"2022-10-21T14:05:05.763Z","durationMs":0.0}],"metrics":{"durationMs":2599.0,"producedBytes":8}}}"#, + LambdaTelemetryRecord::PlatformRuntimeDone { + request_id: "459921b5-681c-4a96-beb0-81e0aa586026".to_string(), + status: Status::Success, + error_type: None, + metrics: Some(RuntimeDoneMetrics { + duration_ms: 2599.0, + produced_bytes: Some(8), + }), + spans: vec!( + Span { + name:"responseLatency".to_string(), + start: Utc.ymd(2022, 10, 21).and_hms_milli(14, 05, 03, 165), + duration_ms:2598.0 + }, + Span { + name:"responseDuration".to_string(), + start:Utc.ymd(2022, 10, 21).and_hms_milli(14, 05, 05, 763), + duration_ms:0.0 + }, + ), + tracing: Some(TraceContext{ + span_id: Some("24cd7d670fa455f0".to_string()), + r#type: TracingType::AmznTraceId, + value: "Root=1-6352a70e-1e2c502e358361800241fd45;Parent=35465b3a9e2f7c6a;Sampled=1".to_string(), + }), + }, + ), + // platform.report + platform_report: ( + r#"{"time":"2022-10-21T14:05:05.766Z","type":"platform.report","record":{"requestId":"459921b5-681c-4a96-beb0-81e0aa586026","metrics":{"durationMs":2599.4,"billedDurationMs":2600,"memorySizeMB":128,"maxMemoryUsedMB":94,"initDurationMs":549.04},"tracing":{"spanId":"24cd7d670fa455f0","type":"X-Amzn-Trace-Id","value":"Root=1-6352a70e-1e2c502e358361800241fd45;Parent=35465b3a9e2f7c6a;Sampled=1"},"status":"success"}}"#, + LambdaTelemetryRecord::PlatformReport { + request_id: "459921b5-681c-4a96-beb0-81e0aa586026".to_string(), + status: Status::Success, + error_type: None, + metrics: ReportMetrics { + duration_ms: 2599.4, + billed_duration_ms: 2600, + memory_size_mb:128, + max_memory_used_mb:94, + init_duration_ms: Some(549.04), + restore_duration_ms: None, + }, + spans: Vec::new(), + tracing: Some(TraceContext { + span_id: Some("24cd7d670fa455f0".to_string()), + r#type: TracingType::AmznTraceId, + value: "Root=1-6352a70e-1e2c502e358361800241fd45;Parent=35465b3a9e2f7c6a;Sampled=1".to_string(), + }), + }, + ), + // platform.telemetrySubscription + platform_telemetry_subscription: ( + r#"{"time":"2022-10-19T13:52:15.667Z","type":"platform.telemetrySubscription","record":{"name":"my-extension","state":"Subscribed","types":["platform","function"]}}"#, + LambdaTelemetryRecord::PlatformTelemetrySubscription { + name: "my-extension".to_string(), + state: "Subscribed".to_string(), + types: vec!("platform".to_string(), "function".to_string()), + }, + ), + // platform.initRuntimeDone + platform_init_runtime_done: ( + r#"{"time":"2022-10-19T13:52:16.136Z","type":"platform.initRuntimeDone","record":{"initializationType":"on-demand","status":"success"}}"#, + LambdaTelemetryRecord::PlatformInitRuntimeDone { + initialization_type: InitType::OnDemand, + status: Status::Success, + phase: None, + error_type: None, + spans: Vec::new(), + }, + ), + // platform.extension + platform_extension: ( + r#"{"time":"2022-10-19T13:52:16.136Z","type":"platform.extension","record":{"name":"my-extension","state":"Ready","events":["SHUTDOWN","INVOKE"]}}"#, + LambdaTelemetryRecord::PlatformExtension { + name: "my-extension".to_string(), + state: "Ready".to_string(), + events: vec!("SHUTDOWN".to_string(), "INVOKE".to_string()), + }, + ), + // platform.initReport + platform_init_report: ( + r#"{"time":"2022-10-19T13:52:16.136Z","type":"platform.initReport","record":{"initializationType":"on-demand","metrics":{"durationMs":500.0},"phase":"init"}}"#, + LambdaTelemetryRecord::PlatformInitReport { + initialization_type: InitType::OnDemand, + phase: InitPhase::Init, + metrics: InitReportMetrics { duration_ms: 500.0 }, + spans: Vec::new(), + } + ), + } +} From 9a12f639046a1737c12e3ae5c356d46e5949def3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Sumis=C5=82awski?= Date: Mon, 14 Nov 2022 10:10:37 +0100 Subject: [PATCH 2/4] Update lambda-extension/src/extension.rs Co-authored-by: David Calavera --- lambda-extension/src/extension.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lambda-extension/src/extension.rs b/lambda-extension/src/extension.rs index d0af3d63..d5bfcbd9 100644 --- a/lambda-extension/src/extension.rs +++ b/lambda-extension/src/extension.rs @@ -168,7 +168,7 @@ where } } - /// Create a new [`Extension`] with a service that receives Lambda logs. + /// Create a new [`Extension`] with a service that receives Lambda telemetry data. pub fn with_telemetry_processor(self, lp: N) -> Extension<'a, E, L, N> where N: Service<()>, From 464900caa62f596ff28431891f9ea1d25f7d64f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Sumis=C5=82awski?= Date: Mon, 14 Nov 2022 10:25:41 +0100 Subject: [PATCH 3/4] derive Eq where possible --- lambda-extension/src/telemetry.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/lambda-extension/src/telemetry.rs b/lambda-extension/src/telemetry.rs index 9dfaeb4f..0ccaa7fc 100644 --- a/lambda-extension/src/telemetry.rs +++ b/lambda-extension/src/telemetry.rs @@ -143,7 +143,7 @@ pub enum LambdaTelemetryRecord { } /// Type of Initialization -#[derive(Clone, Debug, Deserialize, PartialEq)] +#[derive(Clone, Debug, Deserialize, Eq, PartialEq)] #[serde(rename_all = "kebab-case")] pub enum InitType { /// Initialised on demand @@ -155,7 +155,7 @@ pub enum InitType { } /// Phase in which initialization occurs -#[derive(Clone, Debug, Deserialize, PartialEq)] +#[derive(Clone, Debug, Deserialize, Eq, PartialEq)] #[serde(rename_all = "kebab-case")] pub enum InitPhase { /// Initialization phase @@ -165,7 +165,7 @@ pub enum InitPhase { } /// Status of invocation/initialization -#[derive(Clone, Debug, Deserialize, PartialEq)] +#[derive(Clone, Debug, Deserialize, Eq, PartialEq)] #[serde(rename_all = "kebab-case")] pub enum Status { /// Success @@ -191,7 +191,7 @@ pub struct Span { } /// Tracing Context -#[derive(Clone, Debug, Deserialize, PartialEq)] +#[derive(Clone, Debug, Deserialize, Eq, PartialEq)] #[serde(rename_all = "camelCase")] pub struct TraceContext { /// Span ID @@ -203,7 +203,7 @@ pub struct TraceContext { } /// Type of tracing -#[derive(Clone, Debug, Deserialize, PartialEq)] +#[derive(Clone, Debug, Deserialize, Eq, PartialEq)] pub enum TracingType { /// Amazon trace type #[serde(rename = "X-Amzn-Trace-Id")] From 8a02a41c17aec05a96fe5bac775310ca83b8a4cb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Sumis=C5=82awski?= Date: Mon, 14 Nov 2022 11:53:03 +0100 Subject: [PATCH 4/4] Add simple examples of using telemetry API --- examples/extension-telemetry-basic/Cargo.toml | 20 +++++++ examples/extension-telemetry-basic/README.md | 14 +++++ .../extension-telemetry-basic/src/main.rs | 59 +++++++++++++++++++ lambda-extension/README.md | 41 ++++++++++++- 4 files changed, 133 insertions(+), 1 deletion(-) create mode 100644 examples/extension-telemetry-basic/Cargo.toml create mode 100644 examples/extension-telemetry-basic/README.md create mode 100644 examples/extension-telemetry-basic/src/main.rs diff --git a/examples/extension-telemetry-basic/Cargo.toml b/examples/extension-telemetry-basic/Cargo.toml new file mode 100644 index 00000000..869b604d --- /dev/null +++ b/examples/extension-telemetry-basic/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "extension-telemetry-basic" +version = "0.1.0" +edition = "2021" + + +# Use cargo-edit(https://github.com/killercup/cargo-edit#installation) +# to manage dependencies. +# Running `cargo add DEPENDENCY_NAME` will +# add the latest version of a dependency to the list, +# and it will keep the alphabetic ordering for you. + +[dependencies] +lambda-extension = { path = "../../lambda-extension" } +serde = "1.0.136" +tokio = { version = "1", features = ["macros", "rt"] } +tracing = { version = "0.1", features = ["log"] } +tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt"] } + + diff --git a/examples/extension-telemetry-basic/README.md b/examples/extension-telemetry-basic/README.md new file mode 100644 index 00000000..fe9c26ca --- /dev/null +++ b/examples/extension-telemetry-basic/README.md @@ -0,0 +1,14 @@ +# AWS Lambda Telemetry extension example + +## Build & Deploy + +1. Install [cargo-lambda](https://github.com/cargo-lambda/cargo-lambda#installation) +2. Build the extension with `cargo lambda build --release --extension` +3. Deploy the extension as a layer with `cargo lambda deploy --extension` + +The last command will give you an ARN for the extension layer that you can use in your functions. + + +## Build for ARM 64 + +Build the extension with `cargo lambda build --release --extension --arm64` diff --git a/examples/extension-telemetry-basic/src/main.rs b/examples/extension-telemetry-basic/src/main.rs new file mode 100644 index 00000000..d08bff38 --- /dev/null +++ b/examples/extension-telemetry-basic/src/main.rs @@ -0,0 +1,59 @@ +use lambda_extension::{service_fn, Error, Extension, LambdaTelemetry, LambdaTelemetryRecord, SharedService}; +use tracing::info; + +async fn handler(events: Vec) -> Result<(), Error> { + for event in events { + match event.record { + LambdaTelemetryRecord::Function(record) => info!("[logs] [function] {}", record), + LambdaTelemetryRecord::PlatformInitStart { + initialization_type: _, + phase: _, + runtime_version: _, + runtime_version_arn: _, + } => info!("[platform] Initialization started"), + LambdaTelemetryRecord::PlatformInitRuntimeDone { + initialization_type: _, + phase: _, + status: _, + error_type: _, + spans: _, + } => info!("[platform] Initialization finished"), + LambdaTelemetryRecord::PlatformStart { + request_id, + version: _, + tracing: _, + } => info!("[platform] Handling of request {} started", request_id), + LambdaTelemetryRecord::PlatformRuntimeDone { + request_id, + status: _, + error_type: _, + metrics: _, + spans: _, + tracing: _, + } => info!("[platform] Handling of request {} finished", request_id), + _ => (), + } + } + + Ok(()) +} + +#[tokio::main] +async fn main() -> Result<(), Error> { + // The runtime logging can be enabled here by initializing `tracing` with `tracing-subscriber` + // While `tracing` is used internally, `log` can be used as well if preferred. + tracing_subscriber::fmt() + .with_max_level(tracing::Level::INFO) + // disabling time is handy because CloudWatch will add the ingestion time. + .without_time() + .init(); + + let telemetry_processor = SharedService::new(service_fn(handler)); + + Extension::new() + .with_telemetry_processor(telemetry_processor) + .run() + .await?; + + Ok(()) +} diff --git a/lambda-extension/README.md b/lambda-extension/README.md index 0f132d75..ee606c58 100644 --- a/lambda-extension/README.md +++ b/lambda-extension/README.md @@ -8,7 +8,7 @@ ### Simple extension -The code below creates a simple extension that's registered to every `INVOKE` and `SHUTDOWN` events, and logs them in CloudWatch. +The code below creates a simple extension that's registered to every `INVOKE` and `SHUTDOWN` events. ```rust,no_run use lambda_extension::{service_fn, Error, LambdaEvent, NextEvent}; @@ -75,6 +75,45 @@ async fn main() -> Result<(), Error> { ``` +### Telemetry processor extension + +```rust,no_run +use lambda_extension::{service_fn, Error, Extension, LambdaTelemetry, LambdaTelemetryRecord, SharedService}; +use tracing::info; + +async fn handler(events: Vec) -> Result<(), Error> { + for event in events { + match event.record { + LambdaTelemetryRecord::Function(record) => { + // do something with the function log record + }, + LambdaTelemetryRecord::PlatformInitStart { + initialization_type: _, + phase: _, + runtime_version: _, + runtime_version_arn: _, + } => { + // do something with the PlatformInitStart event + }, + // more types of telemetry events are available + _ => (), + } + } + + Ok(()) +} + +#[tokio::main] +async fn main() -> Result<(), Error> { + let telemetry_processor = SharedService::new(service_fn(handler)); + + Extension::new().with_telemetry_processor(telemetry_processor).run().await?; + + Ok(()) +} + +``` + ## Deployment Lambda extensions can be added to your functions either using [Lambda layers](https://docs.aws.amazon.com/lambda/latest/dg/using-extensions.html#using-extensions-config), or adding them to [containers images](https://docs.aws.amazon.com/lambda/latest/dg/using-extensions.html#invocation-extensions-images).