Skip to content

Commit

Permalink
First attempt to get progress information from stat worker. (#470)
Browse files Browse the repository at this point in the history
But for some reason, the counter stays at 0 despite sharing the counter
stat.
  • Loading branch information
Byron committed Sep 20, 2022
1 parent 0871a96 commit 0947c70
Show file tree
Hide file tree
Showing 5 changed files with 192 additions and 22 deletions.
59 changes: 57 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Expand Up @@ -86,7 +86,7 @@ git-repository = { version = "^0.24.0", path = "git-repository", default-feature
git-transport-for-configuration-only = { package = "git-transport", optional = true, version = "^0.20.0", path = "git-transport" }

clap = { version = "3.2.5", features = ["derive", "cargo"] }
prodash = { version = "20.1.0", optional = true, default-features = false }
prodash = { version = "20.1.1", optional = true, default-features = false }
atty = { version = "0.2.14", optional = true, default-features = false }
env_logger = { version = "0.9.0", default-features = false }
crosstermion = { version = "0.10.1", optional = true, default-features = false }
Expand Down
2 changes: 1 addition & 1 deletion git-features/Cargo.toml
Expand Up @@ -115,7 +115,7 @@ crc32fast = { version = "1.2.1", optional = true }
sha1 = { version = "0.10.0", optional = true }

# progress
prodash = { version = "20.1.0", optional = true, default-features = false, features = ["unit-bytes", "unit-human"] }
prodash = { version = "20.1.1", optional = true, default-features = false, features = ["unit-bytes", "unit-human"] }

# pipe
bytes = { version = "1.0.0", optional = true }
Expand Down
5 changes: 4 additions & 1 deletion gitoxide-core/Cargo.toml
Expand Up @@ -18,7 +18,7 @@ default = []
## Discover all git repositories within a directory. Particularly useful with [skim](https://github.com/lotabout/skim).
organize = ["git-url", "jwalk"]
## Derive the amount of time invested into a git repository akin to [git-hours](https://github.com/kimmobrunfeldt/git-hours).
estimate-hours = ["itertools", "fs-err"]
estimate-hours = ["itertools", "fs-err", "num_cpus", "flume"]

#! ### Mutually Exclusive Networking
#! If both are set, _blocking-client_ will take precedence, allowing `--all-features` to be used.
Expand Down Expand Up @@ -59,8 +59,11 @@ blocking = { version = "1.0.2", optional = true }
git-url = { version = "^0.8.0", path = "../git-url", optional = true }
jwalk = { version = "0.6.0", optional = true }

# for 'hours'
itertools = { version = "0.10.1", optional = true }
fs-err = { version = "2.6.0", optional = true }
num_cpus = { version = "1.13.1", optional = true }
flume = { version = "0.10.14", optional = true }

document-features = { version = "0.2.0", optional = true }

Expand Down
146 changes: 129 additions & 17 deletions gitoxide-core/src/hours.rs
@@ -1,4 +1,6 @@
use std::collections::BTreeSet;
use std::convert::Infallible;
use std::sync::atomic::Ordering;
use std::{
collections::{hash_map::Entry, HashMap},
io,
Expand All @@ -9,7 +11,7 @@ use std::{
use anyhow::{anyhow, bail};
use git_repository as git;
use git_repository::bstr::BStr;
use git_repository::{actor, bstr::ByteSlice, interrupt, objs, prelude::*, progress, Progress};
use git_repository::{actor, bstr::ByteSlice, interrupt, prelude::*, progress, Progress};
use itertools::Itertools;

/// Additional configuration for the hours estimation functionality.
Expand Down Expand Up @@ -40,7 +42,7 @@ pub fn estimate<W, P>(
Context {
show_pii,
ignore_bots,
stats: _,
stats,
omit_unify_identities,
mut out,
}: Context<W>,
Expand All @@ -53,18 +55,25 @@ where
let commit_id = repo.rev_parse_single(rev_spec)?.detach();
let mut string_heap = BTreeSet::<&'static [u8]>::new();

let (all_commits, is_shallow) = {
let mut progress = progress.add_child("Traverse commit graph");
let (commit_authors, is_shallow) = {
let stat_progress = stats.then(|| progress.add_child("extract stats")).map(|mut p| {
p.init(None, progress::count("commits"));
p
});
let stat_counter = stat_progress.as_ref().and_then(|p| p.counter());

let mut progress = progress.add_child("traverse commit graph");
progress.init(None, progress::count("commits"));

std::thread::scope(|scope| -> anyhow::Result<(Vec<actor::SignatureRef<'static>>, bool)> {
let start = Instant::now();
progress.init(None, progress::count("commits"));
let (tx, rx) = std::sync::mpsc::channel::<Vec<u8>>();
let mailmap = repo.open_mailmap();

let handle = scope.spawn(move || -> anyhow::Result<Vec<actor::SignatureRef<'static>>> {
let commit_thread = scope.spawn(move || -> anyhow::Result<Vec<actor::SignatureRef<'static>>> {
let mut out = Vec::new();
for commit_data in rx {
if let Some(author) = objs::CommitRefIter::from_bytes(&commit_data)
if let Some(author) = git::objs::CommitRefIter::from_bytes(&commit_data)
.author()
.map(|author| mailmap.resolve_cow(author.trim()))
.ok()
Expand Down Expand Up @@ -101,12 +110,89 @@ where
Ok(out)
});

let (tx_tree_id, stat_threads) = stats
.then(|| {
let num_threads = num_cpus::get().saturating_sub(1 /*main thread*/).max(1);
let (tx, rx) = flume::unbounded::<(u32, Option<git::hash::ObjectId>, git::hash::ObjectId)>();
let stat_workers = (0..num_threads)
.map(|_| {
scope.spawn({
let counter = stat_counter.clone();
let mut repo = repo.clone();
repo.object_cache_size_if_unset(4 * 1024 * 1024);
let rx = rx.clone();
move || -> Result<_, git::object::tree::diff::Error> {
let mut out = Vec::new();
for (commit_idx, parent_commit, commit) in rx {
if let Some(c) = counter.as_ref() {
c.fetch_add(1, Ordering::SeqCst);
}
let mut stat = Stats::default();
let from = match parent_commit {
Some(id) => {
match repo.find_object(id).ok().and_then(|c| c.peel_to_tree().ok()) {
Some(tree) => tree,
None => continue,
}
}
None => repo
.find_object(git::hash::ObjectId::empty_tree(repo.object_hash()))
.expect("always present")
.into_tree(),
};
let to = match repo.find_object(commit).ok().and_then(|c| c.peel_to_tree().ok())
{
Some(c) => c,
None => continue,
};
from.changes().for_each_to_obtain_tree(&to, |change| {
use git::object::tree::diff::change::Event::*;
match change.event {
Addition { entry_mode, .. } => {
if entry_mode.is_no_tree() {
stat.added += 1
}
}
Deletion { entry_mode, .. } => {
if entry_mode.is_no_tree() {
stat.removed += 1
}
}
Modification { entry_mode, .. } => {
if entry_mode.is_no_tree() {
stat.modified += 1;
}
}
}
Ok::<_, Infallible>(Default::default())
})?;
out.push((commit_idx, stat));
}
Ok(out)
}
})
})
.collect::<Vec<_>>();
(Some(tx), stat_workers)
})
.unwrap_or_else(Default::default);

let mut commit_idx = 0_u32;
let commit_iter = interrupt::Iter::new(
commit_id.ancestors(|oid, buf| {
progress.inc();
repo.objects.find(oid, buf).map(|o| {
tx.send(o.data.to_owned()).ok();
objs::CommitRefIter::from_bytes(o.data)
if let Some((tx_tree, first_parent, commit)) = tx_tree_id.as_ref().and_then(|tx| {
git::objs::CommitRefIter::from_bytes(o.data)
.parent_ids()
.next()
.map(|first_parent| (tx, Some(first_parent), oid.to_owned()))
}) {
tx_tree.send((commit_idx, first_parent, commit)).ok();
}
commit_idx += 1;
git::objs::CommitRefIter::from_bytes(o.data)
})
}),
|| anyhow!("Cancelled by user"),
Expand All @@ -123,23 +209,38 @@ where
};
}
drop(tx);
drop(tx_tree_id);
progress.show_throughput(start);
Ok((handle.join().expect("no panic")?, is_shallow))

let _stats_by_commit_idx = match stat_progress {
Some(mut progress) => {
progress.init(Some(commit_idx as usize), progress::count("commits"));
let mut stats = Vec::new();
for handle in stat_threads {
stats.extend(handle.join().expect("no panic")?);
}
progress.show_throughput(start);
stats
}
None => Vec::new(),
};

Ok((commit_thread.join().expect("no panic")?, is_shallow))
})?
};

if all_commits.is_empty() {
if commit_authors.is_empty() {
bail!("No commits to process");
}

let start = Instant::now();
let mut current_email = &all_commits[0].email;
let mut current_email = &commit_authors[0].email;
let mut slice_start = 0;
let mut results_by_hours = Vec::new();
let mut ignored_bot_commits = 0_u32;
for (idx, elm) in all_commits.iter().enumerate() {
for (idx, elm) in commit_authors.iter().enumerate() {
if elm.email != *current_email {
let estimate = estimate_hours(&all_commits[slice_start..idx]);
let estimate = estimate_hours(&commit_authors[slice_start..idx]);
slice_start = idx;
current_email = &elm.email;
if ignore_bots && estimate.name.contains_str(b"[bot]") {
Expand All @@ -149,7 +250,7 @@ where
results_by_hours.push(estimate);
}
}
if let Some(commits) = all_commits.get(slice_start..) {
if let Some(commits) = commit_authors.get(slice_start..) {
results_by_hours.push(estimate_hours(commits));
}

Expand All @@ -167,9 +268,9 @@ where
let elapsed = start.elapsed();
progress.done(format!(
"Extracted and organized data from {} commits in {:?} ({:0.0} commits/s)",
all_commits.len(),
commit_authors.len(),
elapsed,
all_commits.len() as f32 / elapsed.as_secs_f32()
commit_authors.len() as f32 / elapsed.as_secs_f32()
));

let num_unique_authors = results_by_hours.len();
Expand Down Expand Up @@ -207,7 +308,7 @@ where
}
assert_eq!(
total_commits,
all_commits.len() as u32 - ignored_bot_commits,
commit_authors.len() as u32 - ignored_bot_commits,
"need to get all commits"
);
Ok(())
Expand Down Expand Up @@ -328,3 +429,14 @@ struct WorkByEmail {
hours: f32,
num_commits: u32,
}

/// Statistics for a particular commit.
#[derive(Debug, Default)]
struct Stats {
/// amount of added files
added: usize,
/// amount of removed files
removed: usize,
/// amount of modified files
modified: usize,
}

0 comments on commit 0947c70

Please sign in to comment.