Skip to content

Commit

Permalink
Merge #41
Browse files Browse the repository at this point in the history
41: Download compressed gz image from remote source r=obbardc a=Razaloc

Accept url arguments for remote image download and copy.
Implements async support for use of reqwest. Bmap file is searched as in
local option in the current file with same name and the extension ".bmap"
Closes: #9 
Closes: #46 
Closes: #8 

Signed-off-by: Rafael Garcia Ruiz <rafael.garcia@collabora.com>

Co-authored-by: Rafael Garcia Ruiz <rafael.garcia@collabora.com>
  • Loading branch information
bors[bot] and Razaloc committed Dec 16, 2022
2 parents 9e31765 + 5a6e346 commit a588fa7
Show file tree
Hide file tree
Showing 5 changed files with 265 additions and 33 deletions.
9 changes: 7 additions & 2 deletions bmap-rs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,10 @@ bmap = { path = "../bmap" }
anyhow = "1.0.66"
nix = "0.26.1"
flate2 = "1.0.24"
clap = { version = "4.0.18", features = ["derive"] }
indicatif = "0.17.1"
clap = { version = "4.0.18", features = ["cargo"] }
indicatif = { version = "0.17.1", features = ["tokio"] }
async-compression = { version = "0.3.15", features = ["gzip", "futures-io"] }
tokio = { version = "1.21.2", features = ["rt", "macros", "fs", "rt-multi-thread"] }
reqwest = { version = "0.11.12", features = ["stream"] }
tokio-util = { version = "0.7.4", features = ["compat"] }
futures = "0.3.25"
173 changes: 143 additions & 30 deletions bmap-rs/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,75 @@
use anyhow::{anyhow, bail, Context, Result};
use bmap::{Bmap, Discarder, SeekForward};
use clap::Parser;
use anyhow::{anyhow, bail, ensure, Context, Result};
use async_compression::futures::bufread::GzipDecoder;
use bmap::{AsyncDiscarder, Bmap, Discarder, SeekForward};
use clap::{arg, command, Command};
use flate2::read::GzDecoder;
use futures::TryStreamExt;
use indicatif::{ProgressBar, ProgressState, ProgressStyle};
use nix::unistd::ftruncate;
use reqwest::{Response, Url};
use std::ffi::OsStr;
use std::fmt::Write;
use std::fs::File;
use std::io::Read;
use std::os::unix::io::AsRawFd;
use std::path::{Path, PathBuf};
use tokio_util::compat::TokioAsyncReadCompatExt;

#[derive(Parser, Debug)]
#[derive(Debug)]
enum Image {
Path(PathBuf),
Url(Url),
}

#[derive(Debug)]
struct Copy {
image: PathBuf,
image: Image,
dest: PathBuf,
}

#[derive(Parser, Debug)]
#[derive(Debug)]

enum Command {
enum Subcommand {
Copy(Copy),
}

#[derive(Parser, Debug)]
#[derive(Debug)]
struct Opts {
#[command(subcommand)]
command: Command,
command: Subcommand,
}

impl Opts {
fn parser() -> Opts {
let matches = command!()
.propagate_version(true)
.subcommand_required(true)
.arg_required_else_help(true)
.subcommand(
Command::new("copy")
.about("Copy image to block device or file")
.arg(arg!([IMAGE]).required(true))
.arg(arg!([DESTINATION]).required(true)),
)
.get_matches();
match matches.subcommand() {
Some(("copy", sub_matches)) => Opts {
command: Subcommand::Copy({
Copy {
image: match Url::parse(sub_matches.get_one::<String>("IMAGE").unwrap()) {
Ok(url) => Image::Url(url),
Err(_) => Image::Path(PathBuf::from(
sub_matches.get_one::<String>("IMAGE").unwrap(),
)),
},
dest: PathBuf::from(sub_matches.get_one::<String>("DESTINATION").unwrap()),
}
}),
},
_ => unreachable!(
"Exhausted list of subcommands and subcommand_required prevents `None`"
),
}
}
}

fn append(path: PathBuf) -> PathBuf {
Expand All @@ -51,6 +94,13 @@ fn find_bmap(img: &Path) -> Option<PathBuf> {
}
}

fn find_remote_bmap(mut url: Url) -> Result<Url> {
let mut path = PathBuf::from(url.path());
path.set_extension("bmap");
url.set_path(path.to_str().unwrap());
Ok(url)
}

trait ReadSeekForward: SeekForward + Read {}
impl<T: Read + SeekForward> ReadSeekForward for T {}

Expand Down Expand Up @@ -78,7 +128,7 @@ impl SeekForward for Decoder {
}
}

fn setup_input(path: &Path) -> Result<Decoder> {
fn setup_local_input(path: &Path) -> Result<Decoder> {
let f = File::open(path)?;
match path.extension().and_then(OsStr::to_str) {
Some("gz") => {
Expand All @@ -89,12 +139,44 @@ fn setup_input(path: &Path) -> Result<Decoder> {
}
}

fn copy(c: Copy) -> Result<()> {
if !c.image.exists() {
bail!("Image file doesn't exist")
async fn setup_remote_input(url: Url) -> Result<Response> {
match PathBuf::from(url.path())
.extension()
.and_then(OsStr::to_str)
{
Some("gz") => reqwest::get(url).await.map_err(anyhow::Error::new),
None => bail!("No file extension found"),
_ => bail!("Image file format not implemented"),
}
}

let bmap = find_bmap(&c.image).ok_or_else(|| anyhow!("Couldn't find bmap file"))?;
fn setup_progress_bar(bmap: &Bmap) -> ProgressBar {
let pb = ProgressBar::new(bmap.total_mapped_size());
pb.set_style(ProgressStyle::with_template("{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({eta})")
.unwrap()
.with_key("eta", |state: &ProgressState, w: &mut dyn Write| write!(w, "{:.1}s", state.eta().as_secs_f64()).unwrap())
.progress_chars("#>-"));
pb
}

fn setup_output<T: AsRawFd>(output: &T, bmap: &Bmap, metadata: std::fs::Metadata) -> Result<()> {
if metadata.is_file() {
ftruncate(output.as_raw_fd(), bmap.image_size() as i64)
.context("Failed to truncate file")?;
}
Ok(())
}

async fn copy(c: Copy) -> Result<()> {
match c.image {
Image::Path(path) => copy_local_input(path, c.dest),
Image::Url(url) => copy_remote_input(url, c.dest).await,
}
}

fn copy_local_input(source: PathBuf, destination: PathBuf) -> Result<()> {
ensure!(source.exists(), "Image file doesn't exist");
let bmap = find_bmap(&source).ok_or_else(|| anyhow!("Couldn't find bmap file"))?;
println!("Found bmap file: {}", bmap.display());

let mut b = File::open(&bmap).context("Failed to open bmap file")?;
Expand All @@ -105,32 +187,63 @@ fn copy(c: Copy) -> Result<()> {
let output = std::fs::OpenOptions::new()
.write(true)
.create(true)
.open(c.dest)?;
.open(destination)?;

if output.metadata()?.is_file() {
ftruncate(output.as_raw_fd(), bmap.image_size() as i64)
.context("Failed to truncate file")?;
}
setup_output(&output, &bmap, output.metadata()?)?;

let mut input = setup_input(&c.image)?;
let pb = ProgressBar::new(bmap.total_mapped_size());
pb.set_style(ProgressStyle::with_template("{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({eta})")
.unwrap()
.with_key("eta", |state: &ProgressState, w: &mut dyn Write| write!(w, "{:.1}s", state.eta().as_secs_f64()).unwrap())
.progress_chars("#>-"));
let mut input = setup_local_input(&source)?;
let pb = setup_progress_bar(&bmap);
bmap::copy(&mut input, &mut pb.wrap_write(&output), &bmap)?;
pb.finish_and_clear();

println!("Done: Syncing...");
output.sync_all().expect("Sync failure");
output.sync_all()?;

Ok(())
}

async fn copy_remote_input(source: Url, destination: PathBuf) -> Result<()> {
let bmap_url = find_remote_bmap(source.clone())?;

let xml = reqwest::get(bmap_url.clone()).await?.text().await?;
println!("Found bmap file: {}", bmap_url);

let bmap = Bmap::from_xml(&xml)?;
let mut output = tokio::fs::OpenOptions::new()
.write(true)
.create(true)
.open(destination)
.await?;

setup_output(&output, &bmap, output.metadata().await?)?;

let res = setup_remote_input(source).await?;
let stream = res
.bytes_stream()
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
.into_async_read();
let reader = GzipDecoder::new(stream);
let mut input = AsyncDiscarder::new(reader);
let pb = setup_progress_bar(&bmap);
bmap::copy_async(
&mut input,
&mut pb.wrap_async_write(&mut output).compat(),
&bmap,
)
.await?;
pb.finish_and_clear();

println!("Done: Syncing...");
output.sync_all().await?;

Ok(())
}

fn main() -> Result<()> {
let opts = Opts::parse();
#[tokio::main]
async fn main() -> Result<()> {
let opts = Opts::parser();

match opts.command {
Command::Copy(c) => copy(c),
Subcommand::Copy(c) => copy(c).await,
}
}
2 changes: 2 additions & 0 deletions bmap/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,5 @@ sha2 = { version = "0.10.6", features = [ "asm" ] }
strum = { version = "0.24.1", features = [ "derive"] }
digest = "0.10.5"
flate2 = "1.0.20"
async-trait = "0.1.58"
futures = "0.3.25"
44 changes: 43 additions & 1 deletion bmap/src/discarder.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use crate::SeekForward;
use crate::{AsyncSeekForward, SeekForward};
use async_trait::async_trait;
use futures::io::{AsyncRead, AsyncReadExt};
use std::io::Read;
use std::io::Result as IOResult;
use std::pin::Pin;
use std::task::{Context, Poll};

/// Adaptor that implements SeekForward on types only implementing Read by discarding data
pub struct Discarder<R: Read> {
Expand Down Expand Up @@ -36,6 +40,44 @@ impl<R: Read> SeekForward for Discarder<R> {
}
}

pub struct AsyncDiscarder<R: AsyncRead> {
reader: R,
}

impl<R: AsyncRead> AsyncDiscarder<R> {
pub fn new(reader: R) -> Self {
Self { reader }
}

pub fn into_inner(self) -> R {
self.reader
}
}

impl<R: AsyncRead + Unpin> AsyncRead for AsyncDiscarder<R> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<IOResult<usize>> {
Pin::new(&mut self.reader).poll_read(cx, buf)
}
}

#[async_trait(?Send)]
impl<R: AsyncRead + Unpin> AsyncSeekForward for AsyncDiscarder<R> {
async fn async_seek_forward(&mut self, forward: u64) -> IOResult<()> {
let mut buf = [0; 4096];
let mut left = forward as usize;
while left > 0 {
let toread = left.min(buf.len());
let r = self.read(&mut buf[0..toread]).await?;
left -= r;
}
Ok(())
}
}

#[cfg(test)]
mod test {
use super::*;
Expand Down

0 comments on commit a588fa7

Please sign in to comment.