Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

appender: fix compilation 32-bit platforms like PowerPC #1675

Merged
merged 3 commits into from
Oct 22, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
72 changes: 59 additions & 13 deletions tracing-appender/src/non_blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use crate::Msg;
use crossbeam_channel::{bounded, SendTimeoutError, Sender};
use std::io;
use std::io::Write;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::thread::JoinHandle;
Expand Down Expand Up @@ -124,11 +124,20 @@ pub struct WorkerGuard {
/// [fmt]: mod@tracing_subscriber::fmt
#[derive(Clone, Debug)]
pub struct NonBlocking {
error_counter: Arc<AtomicU64>,
error_counter: ErrorCounter,
channel: Sender<Msg>,
is_lossy: bool,
}

/// Tracks the number of times a log line was dropped by the background thread.
///
/// If the non-blocking writer is not configured in [lossy mode], the error
/// count should always be 0.
///
/// [lossy mode]: NonBlockingBuilder::lossy
#[derive(Clone, Debug)]
pub struct ErrorCounter(Arc<AtomicUsize>);

impl NonBlocking {
/// Returns a new `NonBlocking` writer wrapping the provided `writer`.
///
Expand Down Expand Up @@ -157,7 +166,7 @@ impl NonBlocking {
(
Self {
channel: sender,
error_counter: Arc::new(AtomicU64::new(0)),
error_counter: ErrorCounter(Arc::new(AtomicUsize::new(0))),
is_lossy,
},
worker_guard,
Expand All @@ -166,7 +175,7 @@ impl NonBlocking {

/// Returns a counter for the number of times logs where dropped. This will always return zero if
/// `NonBlocking` is not lossy.
pub fn error_counter(&self) -> Arc<AtomicU64> {
pub fn error_counter(&self) -> ErrorCounter {
self.error_counter.clone()
}
}
Expand Down Expand Up @@ -218,7 +227,7 @@ impl std::io::Write for NonBlocking {
let buf_size = buf.len();
if self.is_lossy {
if self.channel.try_send(Msg::Line(buf.to_vec())).is_err() {
self.error_counter.fetch_add(1, Ordering::Release);
self.error_counter.incr_saturating();
}
} else {
return match self.channel.send(Msg::Line(buf.to_vec())) {
Expand Down Expand Up @@ -279,6 +288,43 @@ impl Drop for WorkerGuard {
}
}

// === impl ErrorCounter ===

impl ErrorCounter {
/// Returns the number of log lines that have been dropped.
///
/// If the non-blocking writer is not configured in [lossy mode], the error
/// count should always be 0.
///
/// [lossy mode]: NonBlockingBuilder::lossy
pub fn dropped_lines(&self) -> usize {
self.0.load(Ordering::Acquire)
}

fn incr_saturating(&self) {
let mut curr = self.0.load(Ordering::Acquire);
// We don't need to enter the CAS loop if the current value is already
// `usize::MAX`.
if curr == usize::MAX {
return;
}

// This is implemented as a CAS loop rather than as a simple
// `fetch_add`, because we don't want to wrap on overflow. Instead, we
// need to ensure that saturating addition is performed.
loop {
let val = curr.saturating_add(1);
match self
.0
.compare_exchange(curr, val, Ordering::AcqRel, Ordering::Acquire)
{
Ok(_) => return,
Err(actual) => curr = actual,
}
}
}
}

#[cfg(test)]
mod test {
use super::*;
Expand Down Expand Up @@ -321,7 +367,7 @@ mod test {
let error_count = non_blocking.error_counter();

non_blocking.write_all(b"Hello").expect("Failed to write");
assert_eq!(0, error_count.load(Ordering::Acquire));
assert_eq!(0, error_count.dropped_lines());

let handle = thread::spawn(move || {
non_blocking.write_all(b", World").expect("Failed to write");
Expand All @@ -330,7 +376,7 @@ mod test {
// Sleep a little to ensure previously spawned thread gets blocked on write.
thread::sleep(Duration::from_millis(100));
// We should not drop logs when blocked.
assert_eq!(0, error_count.load(Ordering::Acquire));
assert_eq!(0, error_count.dropped_lines());

// Read the first message to unblock sender.
let mut line = rx.recv().unwrap();
Expand Down Expand Up @@ -365,17 +411,17 @@ mod test {

// First write will not block
write_non_blocking(&mut non_blocking, b"Hello");
assert_eq!(0, error_count.load(Ordering::Acquire));
assert_eq!(0, error_count.dropped_lines());

// Second write will not block as Worker will have called `recv` on channel.
// "Hello" is not yet consumed. MockWriter call to write_all will block until
// "Hello" is consumed.
write_non_blocking(&mut non_blocking, b", World");
assert_eq!(0, error_count.load(Ordering::Acquire));
assert_eq!(0, error_count.dropped_lines());

// Will sit in NonBlocking channel's buffer.
write_non_blocking(&mut non_blocking, b"Test");
assert_eq!(0, error_count.load(Ordering::Acquire));
assert_eq!(0, error_count.dropped_lines());

// Allow a line to be written. "Hello" message will be consumed.
// ", World" will be able to write to MockWriter.
Expand All @@ -385,12 +431,12 @@ mod test {

// This will block as NonBlocking channel is full.
write_non_blocking(&mut non_blocking, b"Universe");
assert_eq!(1, error_count.load(Ordering::Acquire));
assert_eq!(1, error_count.dropped_lines());

// Finally the second message sent will be consumed.
let line = rx.recv().unwrap();
assert_eq!(line, ", World");
assert_eq!(1, error_count.load(Ordering::Acquire));
assert_eq!(1, error_count.dropped_lines());
}

#[test]
Expand Down Expand Up @@ -426,6 +472,6 @@ mod test {
}

assert_eq!(10, hello_count);
assert_eq!(0, error_count.load(Ordering::Acquire));
assert_eq!(0, error_count.dropped_lines());
}
}