Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Slabs #2756

Merged
merged 2 commits into from
May 10, 2021
Merged

Slabs #2756

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
31 changes: 31 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ chrono = "0.4"
clap = "2"
download = {path = "download", default-features = false}
effective-limits = "0.5.2"
enum-map = "1.1.0"
flate2 = "1"
git-testament = "0.1.4"
home = {git = "https://github.com/rbtcollins/home", rev = "a243ee2fbee6022c57d56f5aa79aefe194eabe53"}
Expand All @@ -52,6 +53,7 @@ scopeguard = "1"
semver = "0.11"
serde = {version = "1.0", features = ["derive"]}
sha2 = "0.9"
sharded-slab = "0.1.1"
strsim = "0.10"
tar = "0.4.26"
tempfile = "3.1"
Expand Down
3 changes: 3 additions & 0 deletions src/cli/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ impl NotifyOnConsole {
NotificationLevel::Error => {
err!("{}", n);
}
NotificationLevel::Debug => {
debug!("{}", n);
}
}
}
}
Expand Down
23 changes: 20 additions & 3 deletions src/diskio/immediate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::{
time::Instant,
};

use super::{CompletedIo, Executor, Item};
use super::{CompletedIo, Executor, FileBuffer, Item};

#[derive(Debug)]
pub struct _IncrementalFileState {
Expand Down Expand Up @@ -70,7 +70,11 @@ impl Executor for ImmediateUnpacker {
item.result = match &mut item.kind {
super::Kind::Directory => super::create_dir(&item.full_path),
super::Kind::File(ref contents) => {
super::write_file(&item.full_path, &contents, item.mode)
if let super::FileBuffer::Immediate(ref contents) = &contents {
super::write_file(&item.full_path, contents, item.mode)
} else {
unreachable!()
}
}
super::Kind::IncrementalFile(_incremental_file) => {
return {
Expand Down Expand Up @@ -124,6 +128,14 @@ impl Executor for ImmediateUnpacker {
super::IncrementalFileState::Immediate(self.incremental_state.clone())
}
}

fn get_buffer(&mut self, capacity: usize) -> super::FileBuffer {
super::FileBuffer::Immediate(Vec::with_capacity(capacity))
}

fn buffer_available(&self, _len: usize) -> bool {
true
}
}

/// The non-shared state for writing a file incrementally
Expand Down Expand Up @@ -160,10 +172,15 @@ impl IncrementalFileWriter {
})
}

pub fn chunk_submit(&mut self, chunk: Vec<u8>) -> bool {
pub fn chunk_submit(&mut self, chunk: FileBuffer) -> bool {
if (self.state.lock().unwrap()).is_none() {
return false;
}
let chunk = if let FileBuffer::Immediate(v) = chunk {
v
} else {
unreachable!()
};
match self.write(chunk) {
Ok(v) => v,
Err(e) => {
Expand Down
101 changes: 91 additions & 10 deletions src/diskio/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ mod test;
pub mod threaded;

use std::io::{self, Write};
use std::ops::{Deref, DerefMut};
use std::path::{Path, PathBuf};
use std::sync::mpsc::Receiver;
use std::time::{Duration, Instant};
Expand All @@ -66,12 +67,73 @@ use anyhow::{Context, Result};

use crate::process;
use crate::utils::notifications::Notification;
use threaded::PoolReference;

/// Carries the implementation specific data for complete file transfers into the executor.
#[derive(Debug)]
pub enum FileBuffer {
Immediate(Vec<u8>),
// A reference to the object in the pool, and a handle to write to it
Threaded(PoolReference),
}

impl FileBuffer {
/// All the buffers space to be re-used when the last reference to it is dropped.
pub(crate) fn clear(&mut self) {
if let FileBuffer::Threaded(ref mut contents) = self {
contents.clear()
}
}

pub(crate) fn len(&self) -> usize {
match self {
FileBuffer::Immediate(ref vec) => vec.len(),
FileBuffer::Threaded(PoolReference::Owned(owned, _)) => owned.len(),
FileBuffer::Threaded(PoolReference::Mut(mutable, _)) => mutable.len(),
}
}

pub(crate) fn finished(self) -> Self {
match self {
FileBuffer::Threaded(PoolReference::Mut(mutable, pool)) => {
FileBuffer::Threaded(PoolReference::Owned(mutable.downgrade(), pool))
}
_ => self,
}
}
}

impl Deref for FileBuffer {
type Target = Vec<u8>;

fn deref(&self) -> &Self::Target {
match self {
FileBuffer::Immediate(ref vec) => &vec,
FileBuffer::Threaded(PoolReference::Owned(owned, _)) => owned,
FileBuffer::Threaded(PoolReference::Mut(mutable, _)) => mutable,
}
}
}

impl DerefMut for FileBuffer {
fn deref_mut(&mut self) -> &mut Self::Target {
match self {
FileBuffer::Immediate(ref mut vec) => vec,
FileBuffer::Threaded(PoolReference::Owned(_, _)) => {
unimplemented!()
}
FileBuffer::Threaded(PoolReference::Mut(mutable, _)) => mutable,
}
}
}

pub(crate) const IO_CHUNK_SIZE: usize = 16_777_216;

/// Carries the implementation specific channel data into the executor.
#[derive(Debug)]
pub enum IncrementalFile {
ImmediateReceiver,
ThreadedReceiver(Receiver<Vec<u8>>),
ThreadedReceiver(Receiver<FileBuffer>),
}

// The basic idea is that in single threaded mode we get this pattern:
Expand Down Expand Up @@ -116,7 +178,7 @@ pub enum IncrementalFile {
#[derive(Debug)]
pub enum Kind {
Directory,
File(Vec<u8>),
File(FileBuffer),
IncrementalFile(IncrementalFile),
}

Expand Down Expand Up @@ -160,7 +222,7 @@ impl Item {
}
}

pub fn write_file(full_path: PathBuf, content: Vec<u8>, mode: u32) -> Self {
pub fn write_file(full_path: PathBuf, mode: u32, content: FileBuffer) -> Self {
let len = content.len();
Self {
full_path,
Expand All @@ -177,7 +239,7 @@ impl Item {
full_path: PathBuf,
mode: u32,
state: IncrementalFileState,
) -> Result<(Self, Box<dyn FnMut(Vec<u8>) -> bool + 'a>)> {
) -> Result<(Self, Box<dyn FnMut(FileBuffer) -> bool + 'a>)> {
let (chunk_submit, content_callback) = state.incremental_file_channel(&full_path, mode)?;
let result = Self {
full_path,
Expand Down Expand Up @@ -210,19 +272,19 @@ impl IncrementalFileState {
&self,
path: &Path,
mode: u32,
) -> Result<(Box<dyn FnMut(Vec<u8>) -> bool>, IncrementalFile)> {
) -> Result<(Box<dyn FnMut(FileBuffer) -> bool>, IncrementalFile)> {
use std::sync::mpsc::channel;
match *self {
IncrementalFileState::Threaded => {
let (tx, rx) = channel::<Vec<u8>>();
let (tx, rx) = channel::<FileBuffer>();
let content_callback = IncrementalFile::ThreadedReceiver(rx);
let chunk_submit = move |chunk: Vec<u8>| tx.send(chunk).is_ok();
let chunk_submit = move |chunk: FileBuffer| tx.send(chunk).is_ok();
Ok((Box::new(chunk_submit), content_callback))
}
IncrementalFileState::Immediate(ref state) => {
let content_callback = IncrementalFile::ImmediateReceiver;
let mut writer = immediate::IncrementalFileWriter::new(path, mode, state.clone())?;
let chunk_submit = move |chunk: Vec<u8>| writer.chunk_submit(chunk);
let chunk_submit = move |chunk: FileBuffer| writer.chunk_submit(chunk);
Ok((Box::new(chunk_submit), content_callback))
}
}
Expand Down Expand Up @@ -258,6 +320,14 @@ pub trait Executor {

/// Get any state needed for incremental file processing
fn incremental_file_state(&self) -> IncrementalFileState;

/// Get a disk buffer E.g. this gets the right sized pool object for
/// optimized situations, or just a malloc when optimisations are off etc
/// etc.
fn get_buffer(&mut self, len: usize) -> FileBuffer;

/// Query the memory budget to see if a particular size buffer is available
fn buffer_available(&self, len: usize) -> bool;
}

/// Trivial single threaded IO to be used from executors.
Expand All @@ -267,7 +337,17 @@ pub fn perform<F: Fn(usize)>(item: &mut Item, chunk_complete_callback: F) {
// Files, write them.
item.result = match &mut item.kind {
Kind::Directory => create_dir(&item.full_path),
Kind::File(ref contents) => write_file(&item.full_path, &contents, item.mode),
Kind::File(ref mut contents) => {
contents.clear();
match contents {
FileBuffer::Immediate(ref contents) => {
write_file(&item.full_path, &contents, item.mode)
}
FileBuffer::Threaded(ref mut contents) => {
write_file(&item.full_path, &contents, item.mode)
}
}
}
Kind::IncrementalFile(incremental_file) => write_file_incremental(
&item.full_path,
incremental_file,
Expand Down Expand Up @@ -367,6 +447,7 @@ pub fn create_dir<P: AsRef<Path>>(path: P) -> io::Result<()> {
/// Get the executor for disk IO.
pub fn get_executor<'a>(
notify_handler: Option<&'a dyn Fn(Notification<'_>)>,
ram_budget: usize,
) -> Result<Box<dyn Executor + 'a>> {
// If this gets lots of use, consider exposing via the config file.
let thread_count = match process().var("RUSTUP_IO_THREADS") {
Expand All @@ -377,6 +458,6 @@ pub fn get_executor<'a>(
};
Ok(match thread_count {
0 | 1 => Box::new(immediate::ImmediateUnpacker::new()),
n => Box::new(threaded::Threaded::new(notify_handler, n)),
n => Box::new(threaded::Threaded::new(notify_handler, n, ram_budget)),
})
}