Skip to content

Commit

Permalink
Reuse the same memory buffers during unpacking
Browse files Browse the repository at this point in the history
We think we have memory fragmentation causing failed extraction in
Windows containers and smaller Unix devices.

Writes of both full objects and streamed objects now re-use the Vec via
a sharded-slab implementation. To facilitate the more complicated memory
logic, buffer limit management is now integrated into the IO Executor:
the immediate executor doesn't limit at all as no outstanding buffers
occur, and the threaded executor tracks both the total allocated buffers
as well as whether a reusable buffer is available.
  • Loading branch information
rbtcollins committed May 7, 2021
1 parent cbd1c84 commit f88a688
Show file tree
Hide file tree
Showing 11 changed files with 520 additions and 143 deletions.
31 changes: 31 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ chrono = "0.4"
clap = "2"
download = {path = "download", default-features = false}
effective-limits = "0.5.2"
enum-map = "1.1.0"
flate2 = "1"
git-testament = "0.1.4"
home = {git = "https://github.com/rbtcollins/home", rev = "a243ee2fbee6022c57d56f5aa79aefe194eabe53"}
Expand All @@ -52,6 +53,7 @@ scopeguard = "1"
semver = "0.11"
serde = {version = "1.0", features = ["derive"]}
sha2 = "0.9"
sharded-slab = "0.1.1"
strsim = "0.10"
tar = "0.4.26"
tempfile = "3.1"
Expand Down
3 changes: 3 additions & 0 deletions src/cli/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ impl NotifyOnConsole {
NotificationLevel::Error => {
err!("{}", n);
}
NotificationLevel::Debug => {
debug!("{}", n);
}
}
}
}
Expand Down
23 changes: 20 additions & 3 deletions src/diskio/immediate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::{
time::Instant,
};

use super::{CompletedIo, Executor, Item};
use super::{CompletedIo, Executor, FileBuffer, Item};

#[derive(Debug)]
pub struct _IncrementalFileState {
Expand Down Expand Up @@ -70,7 +70,11 @@ impl Executor for ImmediateUnpacker {
item.result = match &mut item.kind {
super::Kind::Directory => super::create_dir(&item.full_path),
super::Kind::File(ref contents) => {
super::write_file(&item.full_path, &contents, item.mode)
if let super::FileBuffer::Immediate(ref contents) = &contents {
super::write_file(&item.full_path, contents, item.mode)
} else {
unreachable!()
}
}
super::Kind::IncrementalFile(_incremental_file) => {
return {
Expand Down Expand Up @@ -124,6 +128,14 @@ impl Executor for ImmediateUnpacker {
super::IncrementalFileState::Immediate(self.incremental_state.clone())
}
}

fn get_buffer(&mut self, capacity: usize) -> super::FileBuffer {
super::FileBuffer::Immediate(Vec::with_capacity(capacity))
}

fn buffer_available(&self, _len: usize) -> bool {
true
}
}

/// The non-shared state for writing a file incrementally
Expand Down Expand Up @@ -160,10 +172,15 @@ impl IncrementalFileWriter {
})
}

pub fn chunk_submit(&mut self, chunk: Vec<u8>) -> bool {
pub fn chunk_submit(&mut self, chunk: FileBuffer) -> bool {
if (self.state.lock().unwrap()).is_none() {
return false;
}
let chunk = if let FileBuffer::Immediate(v) = chunk {
v
} else {
unreachable!()
};
match self.write(chunk) {
Ok(v) => v,
Err(e) => {
Expand Down
101 changes: 91 additions & 10 deletions src/diskio/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ mod test;
pub mod threaded;

use std::io::{self, Write};
use std::ops::{Deref, DerefMut};
use std::path::{Path, PathBuf};
use std::sync::mpsc::Receiver;
use std::time::{Duration, Instant};
Expand All @@ -66,12 +67,73 @@ use anyhow::{Context, Result};

use crate::process;
use crate::utils::notifications::Notification;
use threaded::PoolReference;

/// Carries the implementation specific data for complete file transfers into the executor.
#[derive(Debug)]
pub enum FileBuffer {
Immediate(Vec<u8>),
// A reference to the object in the pool, and a handle to write to it
Threaded(PoolReference),
}

impl FileBuffer {
/// All the buffers space to be re-used when the last reference to it is dropped.
pub(crate) fn clear(&mut self) {
if let FileBuffer::Threaded(ref mut contents) = self {
contents.clear()
}
}

pub(crate) fn len(&self) -> usize {
match self {
FileBuffer::Immediate(ref vec) => vec.len(),
FileBuffer::Threaded(PoolReference::Owned(owned, _)) => owned.len(),
FileBuffer::Threaded(PoolReference::Mut(mutable, _)) => mutable.len(),
}
}

pub(crate) fn finished(self) -> Self {
match self {
FileBuffer::Threaded(PoolReference::Mut(mutable, pool)) => {
FileBuffer::Threaded(PoolReference::Owned(mutable.downgrade(), pool))
}
_ => self,
}
}
}

impl Deref for FileBuffer {
type Target = Vec<u8>;

fn deref(&self) -> &Self::Target {
match self {
FileBuffer::Immediate(ref vec) => &vec,
FileBuffer::Threaded(PoolReference::Owned(owned, _)) => owned,
FileBuffer::Threaded(PoolReference::Mut(mutable, _)) => mutable,
}
}
}

impl DerefMut for FileBuffer {
fn deref_mut(&mut self) -> &mut Self::Target {
match self {
FileBuffer::Immediate(ref mut vec) => vec,
FileBuffer::Threaded(PoolReference::Owned(_, _)) => {
unimplemented!()
}
FileBuffer::Threaded(PoolReference::Mut(mutable, _)) => mutable,
}
}
}

pub(crate) const IO_CHUNK_SIZE: usize = 16_777_216;

/// Carries the implementation specific channel data into the executor.
#[derive(Debug)]
pub enum IncrementalFile {
ImmediateReceiver,
ThreadedReceiver(Receiver<Vec<u8>>),
ThreadedReceiver(Receiver<FileBuffer>),
}

// The basic idea is that in single threaded mode we get this pattern:
Expand Down Expand Up @@ -116,7 +178,7 @@ pub enum IncrementalFile {
#[derive(Debug)]
pub enum Kind {
Directory,
File(Vec<u8>),
File(FileBuffer),
IncrementalFile(IncrementalFile),
}

Expand Down Expand Up @@ -160,7 +222,7 @@ impl Item {
}
}

pub fn write_file(full_path: PathBuf, content: Vec<u8>, mode: u32) -> Self {
pub fn write_file(full_path: PathBuf, mode: u32, content: FileBuffer) -> Self {
let len = content.len();
Self {
full_path,
Expand All @@ -177,7 +239,7 @@ impl Item {
full_path: PathBuf,
mode: u32,
state: IncrementalFileState,
) -> Result<(Self, Box<dyn FnMut(Vec<u8>) -> bool + 'a>)> {
) -> Result<(Self, Box<dyn FnMut(FileBuffer) -> bool + 'a>)> {
let (chunk_submit, content_callback) = state.incremental_file_channel(&full_path, mode)?;
let result = Self {
full_path,
Expand Down Expand Up @@ -210,19 +272,19 @@ impl IncrementalFileState {
&self,
path: &Path,
mode: u32,
) -> Result<(Box<dyn FnMut(Vec<u8>) -> bool>, IncrementalFile)> {
) -> Result<(Box<dyn FnMut(FileBuffer) -> bool>, IncrementalFile)> {
use std::sync::mpsc::channel;
match *self {
IncrementalFileState::Threaded => {
let (tx, rx) = channel::<Vec<u8>>();
let (tx, rx) = channel::<FileBuffer>();
let content_callback = IncrementalFile::ThreadedReceiver(rx);
let chunk_submit = move |chunk: Vec<u8>| tx.send(chunk).is_ok();
let chunk_submit = move |chunk: FileBuffer| tx.send(chunk).is_ok();
Ok((Box::new(chunk_submit), content_callback))
}
IncrementalFileState::Immediate(ref state) => {
let content_callback = IncrementalFile::ImmediateReceiver;
let mut writer = immediate::IncrementalFileWriter::new(path, mode, state.clone())?;
let chunk_submit = move |chunk: Vec<u8>| writer.chunk_submit(chunk);
let chunk_submit = move |chunk: FileBuffer| writer.chunk_submit(chunk);
Ok((Box::new(chunk_submit), content_callback))
}
}
Expand Down Expand Up @@ -258,6 +320,14 @@ pub trait Executor {

/// Get any state needed for incremental file processing
fn incremental_file_state(&self) -> IncrementalFileState;

/// Get a disk buffer E.g. this gets the right sized pool object for
/// optimized situations, or just a malloc when optimisations are off etc
/// etc.
fn get_buffer(&mut self, len: usize) -> FileBuffer;

/// Query the memory budget to see if a particular size buffer is available
fn buffer_available(&self, len: usize) -> bool;
}

/// Trivial single threaded IO to be used from executors.
Expand All @@ -267,7 +337,17 @@ pub fn perform<F: Fn(usize)>(item: &mut Item, chunk_complete_callback: F) {
// Files, write them.
item.result = match &mut item.kind {
Kind::Directory => create_dir(&item.full_path),
Kind::File(ref contents) => write_file(&item.full_path, &contents, item.mode),
Kind::File(ref mut contents) => {
contents.clear();
match contents {
FileBuffer::Immediate(ref contents) => {
write_file(&item.full_path, &contents, item.mode)
}
FileBuffer::Threaded(ref mut contents) => {
write_file(&item.full_path, &contents, item.mode)
}
}
}
Kind::IncrementalFile(incremental_file) => write_file_incremental(
&item.full_path,
incremental_file,
Expand Down Expand Up @@ -367,6 +447,7 @@ pub fn create_dir<P: AsRef<Path>>(path: P) -> io::Result<()> {
/// Get the executor for disk IO.
pub fn get_executor<'a>(
notify_handler: Option<&'a dyn Fn(Notification<'_>)>,
ram_budget: usize,
) -> Result<Box<dyn Executor + 'a>> {
// If this gets lots of use, consider exposing via the config file.
let thread_count = match process().var("RUSTUP_IO_THREADS") {
Expand All @@ -377,6 +458,6 @@ pub fn get_executor<'a>(
};
Ok(match thread_count {
0 | 1 => Box::new(immediate::ImmediateUnpacker::new()),
n => Box::new(threaded::Threaded::new(notify_handler, n)),
n => Box::new(threaded::Threaded::new(notify_handler, n, ram_budget)),
})
}

0 comments on commit f88a688

Please sign in to comment.