Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tracing-journald: Send large journal payloads through memfd #1744

Merged
merged 1 commit into from
Nov 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions tracing-journald/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ keywords = ["tracing", "journald"]
rust-version = "1.42.0"

[dependencies]
libc = "0.2.107"
tracing-core = { path = "../tracing-core", version = "0.2" }
tracing-subscriber = { path = "../tracing-subscriber", version = "0.3" }

Expand Down
52 changes: 49 additions & 3 deletions tracing-journald/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ use tracing_core::{
};
use tracing_subscriber::{registry::LookupSpan, subscribe::Context};

#[cfg(target_os = "linux")]
mod memfd;
#[cfg(target_os = "linux")]
mod socket;

/// Sends events and their fields to journald
///
/// [journald conventions] for structured field names differ from typical tracing idioms, and journald
Expand Down Expand Up @@ -109,6 +114,48 @@ impl Subscriber {
self.field_prefix = x;
self
}

#[cfg(not(unix))]
fn send_payload(&self, _opayload: &[u8]) -> io::Result<()> {
Err(io::Error::new(
io::ErrorKind::Unsupported,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MSRV is currently set to 1.49 but io::ErrorKind::Unsupported was stabilized in 1.53.

This wasn't caught on CI because it's gated behind #[cfg(not(unix))] 🙃

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ughhh, that's annoying. currently we don't build MSRV on windows (or macOS) to avoid having a really huge number of CI builds... but maybe we should be.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this really much of an issue? Does anyone actually use tracing-journald on Windows? I only added this because tracing-journald gets built on Windows on the pipeline; it's only there to silence Github actions, I never assumed anyone would actually build this on Windows.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In other words as far as I'm concerned we could also just use unimplemented!() here and have it panic.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replacing it with NotFound would be fine. We shouldn't panic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And by the way, did anyone actually hit this MSRV violation in their build?

I "hit" it, but only inasmuch as I was doing a minver check on the repository from windows.

For #2231 I just replaced this with ErrorKind::Other.

MSRV tests on other OSes

I think in general it's fine to assume the Linux build will catch things, as the OS-gated functionality is rare enough, and we can fix violations when they come up.

If we do want to check other targets, it should be via cross-compile where possible since linux hosted runner time is cheaper, and just a check is sufficient.

(Ultimately CI-wise this comes back to the downside of a monorepo being testing the entire monorepo to verify a subpackage-local change... though this is sometimes desirable since e.g. tracing-core gets a significant amount of test coverage only from testing of tracing or even tracing-subscriber...)

don't compile on windows

There are AIUI four reasonable choices (of which only the latter pair are legally semver compatible):

  • #![cfg] the entire crate to only expose any API on platforms where it makes sense. This is IIRC the approach taken by winapi. Consumers #[cfg] their usage.
  • compile_error! on platforms where the crate doesn't make any sense. Consumers need to use a target-dependent dependency as well as #[cfg] their usage.
  • panic! (or error) entrypoints into the API on unsupported platforms. Consumers #[cfg] their usage (or tolerate an error on supported platforms as well).
  • Stub functionality on unsupported platforms. IIUC tracing-journald is purely a sink, so stubbing it as > /dev/null ($nullpwsh) isn't wrong, just perhaps unexpected. Consumers can unconditionally use the API and just implicitly degrade on unsupported platforms.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll make another merge request which uses Other.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, @CAD97 already opened one, thanks a lot 🙏

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hawkw I doubt that anyone's building and using tracing-journald on Windows; that wouldn't make any sense.

But I'm fine with using ErrorKind::Other.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think NotFound would be wrong, semantically; after all, we're not searching for anything here

Semantically, we are searching for JOURNALD_PATH. This is the error that will be returned when running on a Unix without journald, and unconditionally when running on Windows.

"journald not supported on non-Unix",
))
}

#[cfg(unix)]
fn send_payload(&self, payload: &[u8]) -> io::Result<usize> {
self.socket.send(payload).or_else(|error| {
if Some(libc::EMSGSIZE) == error.raw_os_error() {
self.send_large_payload(payload)
} else {
Err(error)
}
})
}

#[cfg(all(unix, not(target_os = "linux")))]
fn send_large_payload(&self, _payload: &[u8]) -> io::Result<usize> {
Err(io::Error::new(
io::ErrorKind::Unsupported,
"Large payloads not supported on non-Linux OS",
))
}

/// Send large payloads to journald via a memfd.
#[cfg(target_os = "linux")]
fn send_large_payload(&self, payload: &[u8]) -> io::Result<usize> {
// If the payload's too large for a single datagram, send it through a memfd, see
// https://systemd.io/JOURNAL_NATIVE_PROTOCOL/
use std::os::unix::prelude::AsRawFd;
// Write the whole payload to a memfd
let mut mem = memfd::create_sealable()?;
mem.write_all(payload)?;
// Fully seal the memfd to signal journald that its backing data won't resize anymore
// and so is safe to mmap.
memfd::seal_fully(mem.as_raw_fd())?;
socket::send_one_fd(&self.socket, mem.as_raw_fd())
}
}

/// Construct a journald subscriber
Expand Down Expand Up @@ -174,9 +221,8 @@ where
self.field_prefix.as_ref().map(|x| &x[..]),
));

// What could we possibly do on error?
#[cfg(unix)]
let _ = self.socket.send(&buf);
// At this point we can't handle the error anymore so just ignore it.
let _ = self.send_payload(&buf);
}
}

Expand Down
31 changes: 31 additions & 0 deletions tracing-journald/src/memfd.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
//! memfd helpers.

use libc::*;
use std::fs::File;
use std::io::Error;
use std::io::Result;
use std::os::raw::c_uint;
use std::os::unix::prelude::{FromRawFd, RawFd};

fn create(flags: c_uint) -> Result<File> {
let fd = unsafe { memfd_create("tracing-journald\0".as_ptr() as *const c_char, flags) };
if fd < 0 {
Err(Error::last_os_error())
} else {
Ok(unsafe { File::from_raw_fd(fd as RawFd) })
}
}

pub fn create_sealable() -> Result<File> {
create(MFD_ALLOW_SEALING | MFD_CLOEXEC)
}

pub fn seal_fully(fd: RawFd) -> Result<()> {
let all_seals = F_SEAL_SHRINK | F_SEAL_GROW | F_SEAL_WRITE | F_SEAL_SEAL;
let result = unsafe { fcntl(fd, F_ADD_SEALS, all_seals) };
if result < 0 {
Err(Error::last_os_error())
} else {
Ok(())
}
}
66 changes: 66 additions & 0 deletions tracing-journald/src/socket.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
//! socket helpers.

use std::io::{Error, Result};
use std::mem::{size_of, zeroed};
use std::os::unix::net::UnixDatagram;
use std::os::unix::prelude::{AsRawFd, RawFd};
use std::ptr;

use libc::*;

const CMSG_BUFSIZE: usize = 64;

#[repr(C)]
union AlignedBuffer<T: Copy + Clone> {
buffer: T,
align: cmsghdr,
}

fn assert_cmsg_bufsize() {
let space_one_fd = unsafe { CMSG_SPACE(size_of::<RawFd>() as u32) };
assert!(
space_one_fd <= CMSG_BUFSIZE as u32,
"cmsghdr buffer too small (< {}) to hold a single fd",
space_one_fd
);
}

#[cfg(test)]
#[test]
fn cmsg_buffer_size_for_one_fd() {
assert_cmsg_bufsize()
}

pub fn send_one_fd(socket: &UnixDatagram, fd: RawFd) -> Result<usize> {
assert_cmsg_bufsize();

let mut cmsg_buffer = AlignedBuffer {
buffer: ([0u8; CMSG_BUFSIZE]),
};
let mut msg: msghdr = unsafe { zeroed() };

// We send no data body with this message.
msg.msg_iov = ptr::null_mut();
msg.msg_iovlen = 0;

msg.msg_control = unsafe { cmsg_buffer.buffer.as_mut_ptr() as _ };
msg.msg_controllen = unsafe { CMSG_SPACE(size_of::<RawFd>() as _) as _ };

let mut cmsg: &mut cmsghdr =
unsafe { CMSG_FIRSTHDR(&msg).as_mut() }.expect("Control message buffer exhausted");

cmsg.cmsg_level = SOL_SOCKET;
cmsg.cmsg_type = SCM_RIGHTS;
cmsg.cmsg_len = unsafe { CMSG_LEN(size_of::<RawFd>() as _) as _ };

unsafe { ptr::write(CMSG_DATA(cmsg) as *mut RawFd, fd) };

let result = unsafe { sendmsg(socket.as_raw_fd(), &msg, libc::MSG_NOSIGNAL) };

if result < 0 {
Err(Error::last_os_error())
} else {
// sendmsg returns the number of bytes written
Ok(result as usize)
}
}
27 changes: 20 additions & 7 deletions tracing-journald/tests/journal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,14 @@ impl PartialEq<[u8]> for Field {
}
}

/// Retry `f` 10 times 100ms apart.
/// Retry `f` 30 times 100ms apart, i.e. a total of three seconds.
///
/// When `f` returns an error wait 100ms and try it again, up to ten times.
/// If the last attempt failed return the error returned by that attempt.
///
/// If `f` returns Ok immediately return the result.
fn retry<T, E>(f: impl Fn() -> Result<T, E>) -> Result<T, E> {
let attempts = 10;
let attempts = 30;
let interval = Duration::from_millis(100);
for attempt in (0..attempts).rev() {
match f() {
Expand All @@ -85,7 +85,8 @@ fn retry<T, E>(f: impl Fn() -> Result<T, E>) -> Result<T, E> {
fn read_from_journal(test_name: &str) -> Vec<HashMap<String, Field>> {
let stdout = String::from_utf8(
Command::new("journalctl")
.args(&["--user", "--output=json"])
// We pass --all to circumvent journalctl's default limit of 4096 bytes for field values
.args(&["--user", "--output=json", "--all"])
// Filter by the PID of the current test process
.arg(format!("_PID={}", std::process::id()))
.arg(format!("TEST_NAME={}", test_name))
Expand All @@ -97,10 +98,7 @@ fn read_from_journal(test_name: &str) -> Vec<HashMap<String, Field>> {

stdout
.lines()
.map(|l| {
dbg!(l);
serde_json::from_str(l).unwrap()
})
.map(|l| serde_json::from_str(l).unwrap())
.collect()
}

Expand Down Expand Up @@ -169,3 +167,18 @@ fn internal_null_byte() {
assert_eq!(message["PRIORITY"], "6");
});
}

#[test]
fn large_message() {
let large_string = "b".repeat(512_000);
with_journald(|| {
debug!(test.name = "large_message", "Message: {}", large_string);

let message = retry_read_one_line_from_journal("large_message");
assert_eq!(
message["MESSAGE"],
format!("Message: {}", large_string).as_str()
);
assert_eq!(message["PRIORITY"], "6");
});
}