Skip to content

Commit

Permalink
journald: send large journal payloads through memfd (#1744)
Browse files Browse the repository at this point in the history
See #1698: Properly write large payloads to journal.

I'd appreciate a very careful review; this cmsg stuff is nasty, and
while it's well documented in `cmsg(3)` I had to fiddle a bit because
the corresponding functions in libc aren't const and thus don't permit a
direct allocation of the buffer as most `cmsg` C code around does.

Closes #1698

## Motivation

Linux limits the maximum amount of data permitted for a single Unix
datagram; sending large payloads directly will fail.

## Solution

Follow systemd.io/JOURNAL_NATIVE_PROTOCOL/ and check for `EMSGSIZE` from
`send()`; in this case write the payload to a memfd, seal it, and pass
it on to journald via a corresponding SCM_RIGHTS control message.

Per discussion in #1698 this adds no dependency on `nix`, and instead
implements fd forwarding directly with some bits of unsafe `libc` code.
  • Loading branch information
swsnr authored and hawkw committed Dec 3, 2021
1 parent d1e392d commit 379d7d3
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 10 deletions.
1 change: 1 addition & 0 deletions tracing-journald/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ keywords = ["tracing", "journald"]
rust-version = "1.42.0"

[dependencies]
libc = "0.2.107"
tracing-core = { path = "../tracing-core", version = "0.1.10" }
tracing-subscriber = { path = "../tracing-subscriber", version = "0.3" }

Expand Down
52 changes: 49 additions & 3 deletions tracing-journald/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ use tracing_core::{
};
use tracing_subscriber::{layer::Context, registry::LookupSpan};

#[cfg(target_os = "linux")]
mod memfd;
#[cfg(target_os = "linux")]
mod socket;

/// Sends events and their fields to journald
///
/// [journald conventions] for structured field names differ from typical tracing idioms, and journald
Expand Down Expand Up @@ -109,6 +114,48 @@ impl Layer {
self.field_prefix = x;
self
}

#[cfg(not(unix))]
fn send_payload(&self, _opayload: &[u8]) -> io::Result<()> {
Err(io::Error::new(
io::ErrorKind::Unsupported,
"journald not supported on non-Unix",
))
}

#[cfg(unix)]
fn send_payload(&self, payload: &[u8]) -> io::Result<usize> {
self.socket.send(payload).or_else(|error| {
if Some(libc::EMSGSIZE) == error.raw_os_error() {
self.send_large_payload(payload)
} else {
Err(error)
}
})
}

#[cfg(all(unix, not(target_os = "linux")))]
fn send_large_payload(&self, _payload: &[u8]) -> io::Result<usize> {
Err(io::Error::new(
io::ErrorKind::Unsupported,
"Large payloads not supported on non-Linux OS",
))
}

/// Send large payloads to journald via a memfd.
#[cfg(target_os = "linux")]
fn send_large_payload(&self, payload: &[u8]) -> io::Result<usize> {
// If the payload's too large for a single datagram, send it through a memfd, see
// https://systemd.io/JOURNAL_NATIVE_PROTOCOL/
use std::os::unix::prelude::AsRawFd;
// Write the whole payload to a memfd
let mut mem = memfd::create_sealable()?;
mem.write_all(payload)?;
// Fully seal the memfd to signal journald that its backing data won't resize anymore
// and so is safe to mmap.
memfd::seal_fully(mem.as_raw_fd())?;
socket::send_one_fd(&self.socket, mem.as_raw_fd())
}
}

/// Construct a journald layer
Expand Down Expand Up @@ -174,9 +221,8 @@ where
self.field_prefix.as_ref().map(|x| &x[..]),
));

// What could we possibly do on error?
#[cfg(unix)]
let _ = self.socket.send(&buf);
// At this point we can't handle the error anymore so just ignore it.
let _ = self.send_payload(&buf);
}
}

Expand Down
31 changes: 31 additions & 0 deletions tracing-journald/src/memfd.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
//! memfd helpers.

use libc::*;
use std::fs::File;
use std::io::Error;
use std::io::Result;
use std::os::raw::c_uint;
use std::os::unix::prelude::{FromRawFd, RawFd};

fn create(flags: c_uint) -> Result<File> {
let fd = unsafe { memfd_create("tracing-journald\0".as_ptr() as *const c_char, flags) };
if fd < 0 {
Err(Error::last_os_error())
} else {
Ok(unsafe { File::from_raw_fd(fd as RawFd) })
}
}

pub fn create_sealable() -> Result<File> {
create(MFD_ALLOW_SEALING | MFD_CLOEXEC)
}

pub fn seal_fully(fd: RawFd) -> Result<()> {
let all_seals = F_SEAL_SHRINK | F_SEAL_GROW | F_SEAL_WRITE | F_SEAL_SEAL;
let result = unsafe { fcntl(fd, F_ADD_SEALS, all_seals) };
if result < 0 {
Err(Error::last_os_error())
} else {
Ok(())
}
}
66 changes: 66 additions & 0 deletions tracing-journald/src/socket.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
//! socket helpers.

use std::io::{Error, Result};
use std::mem::{size_of, zeroed};
use std::os::unix::net::UnixDatagram;
use std::os::unix::prelude::{AsRawFd, RawFd};
use std::ptr;

use libc::*;

const CMSG_BUFSIZE: usize = 64;

#[repr(C)]
union AlignedBuffer<T: Copy + Clone> {
buffer: T,
align: cmsghdr,
}

fn assert_cmsg_bufsize() {
let space_one_fd = unsafe { CMSG_SPACE(size_of::<RawFd>() as u32) };
assert!(
space_one_fd <= CMSG_BUFSIZE as u32,
"cmsghdr buffer too small (< {}) to hold a single fd",
space_one_fd
);
}

#[cfg(test)]
#[test]
fn cmsg_buffer_size_for_one_fd() {
assert_cmsg_bufsize()
}

pub fn send_one_fd(socket: &UnixDatagram, fd: RawFd) -> Result<usize> {
assert_cmsg_bufsize();

let mut cmsg_buffer = AlignedBuffer {
buffer: ([0u8; CMSG_BUFSIZE]),
};
let mut msg: msghdr = unsafe { zeroed() };

// We send no data body with this message.
msg.msg_iov = ptr::null_mut();
msg.msg_iovlen = 0;

msg.msg_control = unsafe { cmsg_buffer.buffer.as_mut_ptr() as _ };
msg.msg_controllen = unsafe { CMSG_SPACE(size_of::<RawFd>() as _) as _ };

let mut cmsg: &mut cmsghdr =
unsafe { CMSG_FIRSTHDR(&msg).as_mut() }.expect("Control message buffer exhausted");

cmsg.cmsg_level = SOL_SOCKET;
cmsg.cmsg_type = SCM_RIGHTS;
cmsg.cmsg_len = unsafe { CMSG_LEN(size_of::<RawFd>() as _) as _ };

unsafe { ptr::write(CMSG_DATA(cmsg) as *mut RawFd, fd) };

let result = unsafe { sendmsg(socket.as_raw_fd(), &msg, libc::MSG_NOSIGNAL) };

if result < 0 {
Err(Error::last_os_error())
} else {
// sendmsg returns the number of bytes written
Ok(result as usize)
}
}
27 changes: 20 additions & 7 deletions tracing-journald/tests/journal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,14 @@ impl PartialEq<[u8]> for Field {
}
}

/// Retry `f` 10 times 100ms apart.
/// Retry `f` 30 times 100ms apart, i.e. a total of three seconds.
///
/// When `f` returns an error wait 100ms and try it again, up to ten times.
/// If the last attempt failed return the error returned by that attempt.
///
/// If `f` returns Ok immediately return the result.
fn retry<T, E>(f: impl Fn() -> Result<T, E>) -> Result<T, E> {
let attempts = 10;
let attempts = 30;
let interval = Duration::from_millis(100);
for attempt in (0..attempts).rev() {
match f() {
Expand All @@ -85,7 +85,8 @@ fn retry<T, E>(f: impl Fn() -> Result<T, E>) -> Result<T, E> {
fn read_from_journal(test_name: &str) -> Vec<HashMap<String, Field>> {
let stdout = String::from_utf8(
Command::new("journalctl")
.args(&["--user", "--output=json"])
// We pass --all to circumvent journalctl's default limit of 4096 bytes for field values
.args(&["--user", "--output=json", "--all"])
// Filter by the PID of the current test process
.arg(format!("_PID={}", std::process::id()))
.arg(format!("TEST_NAME={}", test_name))
Expand All @@ -97,10 +98,7 @@ fn read_from_journal(test_name: &str) -> Vec<HashMap<String, Field>> {

stdout
.lines()
.map(|l| {
dbg!(l);
serde_json::from_str(l).unwrap()
})
.map(|l| serde_json::from_str(l).unwrap())
.collect()
}

Expand Down Expand Up @@ -169,3 +167,18 @@ fn internal_null_byte() {
assert_eq!(message["PRIORITY"], "6");
});
}

#[test]
fn large_message() {
let large_string = "b".repeat(512_000);
with_journald(|| {
debug!(test.name = "large_message", "Message: {}", large_string);

let message = retry_read_one_line_from_journal("large_message");
assert_eq!(
message["MESSAGE"],
format!("Message: {}", large_string).as_str()
);
assert_eq!(message["PRIORITY"], "6");
});
}

0 comments on commit 379d7d3

Please sign in to comment.