Skip to content

Commit

Permalink
Improve shared memory segment cleanup (#998)
Browse files Browse the repository at this point in the history
- we now maintain an internal list containing the id of every allocated
  segment, which we can use to clean up the segments on operating
  systems (e.g. MacOS) which do not provide a way to obtain a list of
  all the existing shared-memory segments on the system.
- also added more debug assertions to ensure correct handling of shared
  memory segments (i.e. not opening segments which haven't been created)
  • Loading branch information
teymour-aldridge committed Aug 31, 2022
1 parent 494d296 commit e44ad8e
Show file tree
Hide file tree
Showing 5 changed files with 232 additions and 107 deletions.
2 changes: 2 additions & 0 deletions packages/engine/lib/memory/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
//! [`Segment`]: crate::shared_memory::Segment
//! [`ArrowBatch`]: crate::arrow::ArrowBatch

#![feature(once_cell)]

mod error;

pub mod arrow;
Expand Down
2 changes: 1 addition & 1 deletion packages/engine/lib/memory/src/shared_memory/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ unsafe extern "C" fn load_shmem(id: *const u8, len: u64) -> *mut CSegment {
// `include_terminal_padding` = true because if external modules resize
// the shared memory, then that means that this shared memory module
// contains data subject to external resizing
match Segment::from_shmem_os_id(message, true, true) {
match Segment::open_unchecked(message, true, true) {
Ok(segment) => {
debug_assert!(!segment.data.is_owner());
let ptr = segment.data.as_ptr();
Expand Down
122 changes: 91 additions & 31 deletions packages/engine/lib/memory/src/shared_memory/segment/cleanup.rs
Original file line number Diff line number Diff line change
@@ -1,48 +1,108 @@
use std::{
collections::HashSet,
sync::{LazyLock, Mutex},
};

use glob::GlobError;
use uuid::Uuid;

use super::MemoryId;
use crate::{Error, Result};
/// We use this to keep a list of all the shared-memory segements which are
/// being used by the engine. When the engine exits, we then delete any
/// leftover shared memory segments (in release builds; we error in debug builds).
pub static IN_USE_SHM_SEGMENTS: LazyLock<Mutex<HashSet<String>>> = LazyLock::new(Mutex::default);

use crate::{shared_memory::MemoryId, Error, Result};

/// Clean up generated shared memory segments associated with a given `MemoryId`.
///
/// Note: this function does not work on macOS.
///
/// If debug assertions are enabled, this function will panic if there are any shared-memory
/// segments left to clear up. This is because macOS does not store a reference to the shared-memory
/// segments (anywhere!) - the only one we have is the one we receive when we first create the
/// shared-memory segment. As such, we should consider it a bug if the shared-memory segments have
/// not been cleaned up before the experiment has completed (obviously it is still useful to have
/// this function as a safety fallback for operating systems which provide a list of all the
/// shared-memory segments in use).
/// segments left to clear up. This is because ideally the engine would clean them all up promptly
/// (i.e. when the batch the segment corresponds to is no longer needed).
#[allow(clippy::significant_drop_in_scrutinee)]
pub fn cleanup_by_base_id(id: Uuid) -> Result<()> {
let shm_files = glob::glob(&format!("/dev/shm/{}_*", MemoryId::prefix(id)))
.map_err(|e| Error::Unique(format!("cleanup glob error: {}", e)))?;
let segments_not_removed_by_engine: Vec<String> = {
let mut segments_list_lock = IN_USE_SHM_SEGMENTS.lock().unwrap();

#[cfg(debug_assertions)]
let mut not_deallocated = Vec::new();

for path in shm_files {
if let Err(err) = path
.map_err(GlobError::into_error)
.map(|path| {
#[cfg(debug_assertions)]
not_deallocated.push(path.display().to_string());
path
})
.and_then(std::fs::remove_file)
{
tracing::warn!("Could not remove shared memory file: {err}");
let mut segments_not_removed_by_engine = Vec::new();

for os_id in segments_list_lock.iter() {
if !os_id.starts_with(&format!("{}_*", MemoryId::prefix(id))) {
continue;
}

let shm = shared_memory::ShmemConf::new(true)
.os_id(&os_id.clone())
.open();

match shm {
Ok(mut mem) => {
segments_not_removed_by_engine.push(os_id.clone());
let is_owner = mem.set_owner(true);
if !is_owner {
tracing::warn!(
"failed to gain ownership of the shared memory segment (this should \
not be possible)"
)
}
// deallocate the shared-memory segment (note: the `drop` call is not required,
// and is here to make it clear that we are trying to delete the shared memory
// segment)
drop(mem)
}
Err(e) => {
tracing::warn!("error when trying to open shared-memory segment: {e:?}")
}
}
}

for to_remove in segments_not_removed_by_engine.iter() {
segments_list_lock.remove(to_remove);
}
}

segments_not_removed_by_engine
};

debug_assert!(
segments_not_removed_by_engine.is_empty(),
"expected the engine to have cleaned up all the segments it created during the \
experiment, but segments with these ids remained at the experiment end: \
{segments_not_removed_by_engine:?}"
);

#[cfg(debug_assertions)]
{
if !not_deallocated.is_empty() {
panic!(
check_all_deallocated_linux(id)?;

Ok(())
}

/// On Linux we can obtain a list of all the shared memory segments in use, so we can perform some
/// additional cleanup (this function is mostly useful for testing the implementation of the cleanup
/// code which uses [`static@IN_USE_SHM_SEGMENTS`]).
fn check_all_deallocated_linux(id: Uuid) -> Result<()> {
if cfg!(target_os = "linux") {
let shm_files = glob::glob(&format!("/dev/shm/{}_*", MemoryId::prefix(id)))
.map_err(|e| Error::Unique(format!("cleanup glob error: {}", e)))?;

let mut not_deallocated: Vec<String> = Vec::new();

for path in shm_files {
if let Err(err) = path
.map_err(GlobError::into_error)
.map(|path| {
#[cfg(debug_assertions)]
not_deallocated.push(path.display().to_string());
path
})
.and_then(std::fs::remove_file)
{
tracing::warn!("Could not remove shared memory file: {err}");
}

debug_assert!(
not_deallocated.is_empty(),
"the following shared memory segments were not deallocated at the end of the \
experiment: {not_deallocated:?}"
)
);
}
}

Expand Down

0 comments on commit e44ad8e

Please sign in to comment.