Skip to content

Commit

Permalink
feat: send structured fields on the wire (console-rs#26)
Browse files Browse the repository at this point in the history
This PR adds structured fields to the wire format.

Fixes console-rs#6

Signed-off-by: Zahari Dichev <zaharidichev@gmail.com>
  • Loading branch information
zaharidichev committed May 25, 2021
1 parent 53515a7 commit 38adbd9
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 45 deletions.
62 changes: 62 additions & 0 deletions console-api/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ impl<'a> From<&'a tracing_core::Metadata<'a>> for Metadata {
location: Some(location),
kind: kind as i32,
level: metadata::Level::from(*meta.level()) as i32,
field_names: Vec::new(),
..Default::default()
}
}
Expand All @@ -61,6 +62,31 @@ impl<'a> From<&'a std::panic::Location<'a>> for Location {
}
}

impl fmt::Display for field::Value {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
field::Value::BoolVal(v) => fmt::Display::fmt(v, f)?,
field::Value::StrVal(v) => fmt::Display::fmt(v, f)?,
field::Value::U64Val(v) => fmt::Display::fmt(v, f)?,
field::Value::DebugVal(v) => fmt::Display::fmt(v, f)?,
field::Value::I64Val(v) => fmt::Display::fmt(v, f)?,
}

Ok(())
}
}

impl fmt::Display for Field {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let name_val = (self.name.as_ref(), self.value.as_ref());
if let (Some(field::Name::StrName(name)), Some(val)) = name_val {
write!(f, "{}={}", name, val)?;
}

Ok(())
}
}

impl fmt::Display for Location {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match (self.module_path.as_ref(), self.file.as_ref()) {
Expand Down Expand Up @@ -109,3 +135,39 @@ impl From<&'static tracing_core::Metadata<'static>> for register_metadata::NewMe
}
}
}

impl From<i64> for field::Value {
fn from(val: i64) -> Self {
field::Value::I64Val(val)
}
}

impl From<u64> for field::Value {
fn from(val: u64) -> Self {
field::Value::U64Val(val)
}
}

impl From<bool> for field::Value {
fn from(val: bool) -> Self {
field::Value::BoolVal(val)
}
}

impl From<&str> for field::Value {
fn from(val: &str) -> Self {
field::Value::StrVal(val.into())
}
}

impl From<&str> for field::Name {
fn from(val: &str) -> Self {
field::Name::StrName(val.into())
}
}

impl From<&dyn std::fmt::Debug> for field::Value {
fn from(val: &dyn std::fmt::Debug) -> Self {
field::Value::DebugVal(format!("{:?}", val))
}
}
4 changes: 2 additions & 2 deletions console-subscriber/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ struct TaskData<T> {

struct Task {
metadata: &'static Metadata<'static>,
fields: String,
fields: Vec<proto::Field>,
}

impl Aggregator {
Expand Down Expand Up @@ -378,9 +378,9 @@ impl Task {
id: Some(id.into()),
// TODO: more kinds of tasks...
kind: proto::tasks::task::Kind::Spawn as i32,
string_fields: self.fields.clone(),
metadata: Some(self.metadata.into()),
parents: Vec::new(), // TODO: implement parents nicely
fields: self.fields.clone(),
}
}
}
92 changes: 60 additions & 32 deletions console-subscriber/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,29 +11,21 @@ use std::{
time::{Duration, SystemTime},
};
use tracing_core::{
field::{self, Visit},
span,
subscriber::{self, Subscriber},
Metadata,
};
use tracing_subscriber::{
fmt::{
format::{DefaultFields, FormatFields},
FormattedFields,
},
layer::Context,
registry::LookupSpan,
Layer,
};
use tracing_subscriber::{layer::Context, registry::LookupSpan, Layer};

mod aggregator;
use aggregator::Aggregator;

pub struct TasksLayer<F = DefaultFields> {
pub struct TasksLayer {
task_meta: AtomicPtr<Metadata<'static>>,
blocking_meta: AtomicPtr<Metadata<'static>>,
tx: mpsc::Sender<Event>,
flush: Arc<aggregator::Flush>,
format: F,
}

pub struct Server {
Expand All @@ -43,6 +35,11 @@ pub struct Server {
client_buffer: usize,
}

struct FieldVisitor {
fields: Vec<proto::Field>,
meta_id: proto::MetaId,
}

struct Watch(mpsc::Sender<Result<proto::tasks::TaskUpdate, tonic::Status>>);

enum Event {
Expand All @@ -51,7 +48,7 @@ enum Event {
id: span::Id,
metadata: &'static Metadata<'static>,
at: SystemTime,
fields: String,
fields: Vec<proto::Field>,
},
Enter {
id: span::Id,
Expand Down Expand Up @@ -94,13 +91,12 @@ impl TasksLayer {
flush,
task_meta: AtomicPtr::new(ptr::null_mut()),
blocking_meta: AtomicPtr::new(ptr::null_mut()),
format: Default::default(),
};
(layer, server)
}
}

impl<F> TasksLayer<F> {
impl TasksLayer {
pub const DEFAULT_EVENT_BUFFER_CAPACITY: usize = 1024 * 10;
pub const DEFAULT_CLIENT_BUFFER_CAPACITY: usize = 1024 * 4;
pub const DEFAULT_FLUSH_INTERVAL: Duration = Duration::from_secs(1);
Expand Down Expand Up @@ -163,10 +159,9 @@ impl<F> TasksLayer<F> {
}
}

impl<S, F> Layer<S> for TasksLayer<F>
impl<S> Layer<S> for TasksLayer
where
S: Subscriber + for<'a> LookupSpan<'a>,
F: for<'writer> FormatFields<'writer> + 'static,
{
fn register_callsite(&self, meta: &'static Metadata<'static>) -> subscriber::Interest {
if meta.target() == "tokio::task" && meta.name() == "task" {
Expand All @@ -192,30 +187,21 @@ where
subscriber::Interest::always()
}

fn new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, cx: Context<'_, S>) {
fn new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, _: Context<'_, S>) {
let metadata = attrs.metadata();
if self.is_spawn(metadata) {
let at = SystemTime::now();
let span = cx.span(id).expect("newly-created span should exist");
let mut exts = span.extensions_mut();
let fields = match exts.get_mut::<FormattedFields<F>>() {
Some(fields) => fields.fields.clone(),
None => {
let mut fields = String::new();
match self.format.format_fields(&mut fields, attrs) {
Ok(()) => exts.insert(FormattedFields::<F>::new(fields.clone())),
Err(_) => {
tracing::warn!(span.id = ?id, span.attrs = ?attrs, "error formatting fields for span")
}
}
fields
}
let mut fields_collector = FieldVisitor {
fields: Vec::default(),
meta_id: metadata.into(),
};
attrs.record(&mut fields_collector);

self.send(Event::Spawn {
id: id.clone(),
at,
metadata,
fields,
fields: fields_collector.fields,
});
}
}
Expand Down Expand Up @@ -304,3 +290,45 @@ impl proto::tasks::tasks_server::Tasks for Server {
Ok(tonic::Response::new(stream))
}
}

impl Visit for FieldVisitor {
fn record_debug(&mut self, field: &field::Field, value: &dyn std::fmt::Debug) {
self.fields.push(proto::Field {
name: Some(field.name().into()),
value: Some(value.into()),
metadata_id: Some(self.meta_id.clone()),
});
}

fn record_i64(&mut self, field: &tracing_core::Field, value: i64) {
self.fields.push(proto::Field {
name: Some(field.name().into()),
value: Some(value.into()),
metadata_id: Some(self.meta_id.clone()),
});
}

fn record_u64(&mut self, field: &tracing_core::Field, value: u64) {
self.fields.push(proto::Field {
name: Some(field.name().into()),
value: Some(value.into()),
metadata_id: Some(self.meta_id.clone()),
});
}

fn record_bool(&mut self, field: &tracing_core::Field, value: bool) {
self.fields.push(proto::Field {
name: Some(field.name().into()),
value: Some(value.into()),
metadata_id: Some(self.meta_id.clone()),
});
}

fn record_str(&mut self, field: &tracing_core::Field, value: &str) {
self.fields.push(proto::Field {
name: Some(field.name().into()),
value: Some(value.into()),
metadata_id: Some(self.meta_id.clone()),
});
}
}
13 changes: 11 additions & 2 deletions console/src/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::{
cell::RefCell,
collections::HashMap,
convert::TryFrom,
fmt::Write,
rc::{Rc, Weak},
time::{Duration, SystemTime},
};
Expand Down Expand Up @@ -123,13 +124,21 @@ impl State {
proto::tasks::task::Kind::Spawn => "T",
proto::tasks::task::Kind::Blocking => "B",
};

let fields = task
.fields
.iter()
.fold(String::new(), |mut res, f| {
write!(&mut res, "{} ", f).unwrap();
res
})
.trim_end()
.into();
let id = task.id?.id;
let stats = stats_update.remove(&id)?.into();
let mut task = Task {
id,
id_hex: format!("{:x}", id),
fields: task.string_fields,
fields,
kind,
stats,
completed_for: 0,
Expand Down
22 changes: 19 additions & 3 deletions proto/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,28 @@ message SpanId {
uint64 id = 1;
}

// A message representing a key-value pair of data associated with a `Span`
message Field {
oneof field {
string debug = 1;
oneof name {
// The string representation of the name.
string str_name = 1;
// An index position into the `Metadata.field_names`.
uint64 name_idx = 2;
}
oneof value {
string debug_val = 3;
string str_val = 4;
uint64 u64_val = 5;
sint64 i64_val = 6;
bool bool_val = 7;
}
MetaId metadata_id = 8;
}

message Span {
SpanId id = 1;
MetaId metadata_id = 2;
map<string, Field> fields = 3;
repeated Field fields = 3;
google.protobuf.Timestamp at = 4;
}

Expand All @@ -54,6 +66,10 @@ message Metadata {
Kind kind = 5;
Level level = 6;

// The names of the key-value fields attached to the
// span or event this metadata is associated with.
repeated string field_names = 7;

enum Kind {
SPAN = 0;
EVENT = 1;
Expand Down
9 changes: 3 additions & 6 deletions proto/tasks.proto
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ message Task {
// When the task's stats change, or when the task completes, it will be
// identified by this ID; if the client requires additional information
// included in the `Task` message, it should store that data and access it
// by ID.
// by ID.
common.SpanId id = 1;
// The numeric ID of the task's `Metadata`.
//
Expand All @@ -63,11 +63,8 @@ message Task {
// The category of task this task belongs to.
Kind kind = 3;

// A string representation of any fields recorded about this task.
//
// NOTE: eventually, it would be nice to support structured fields in tasks;
// we can deprecate this when we add that.
string string_fields = 4;
// A list of `Field` objects attached to this task.
repeated common.Field fields = 4;

// An ordered list of span IDs corresponding to the `tracing` span context
// in which this task was spawned.
Expand Down

0 comments on commit 38adbd9

Please sign in to comment.