Skip to content

Commit

Permalink
Download compressed gz image from remote source
Browse files Browse the repository at this point in the history
Accept url arguments for remote image download and copy.
Implements async support for use of reqwest. Bmap file is searched as in
local option in the current file with same name and the extension ".bmap"

Signed-off-by: Rafael Garcia Ruiz <rafael.garcia@collabora.com>
  • Loading branch information
Razaloc committed Oct 27, 2022
1 parent 7b6d784 commit 1d8a0f8
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 21 deletions.
6 changes: 6 additions & 0 deletions bmap-rs/Cargo.toml
Expand Up @@ -12,3 +12,9 @@ anyhow = "1.0.66"
nix = "0.25.0"
flate2 = "1.0.24"
clap = { version = "4.0.18", features = ["derive"] }
async-compression = { version = "0.3.15", features = ["gzip", "futures-io", "tokio"] }
tokio = { version = "1.21.2", features = ["full"] }
reqwest = { version = "0.11.12", features = ["stream"]}
tokio-util= { version = "0.7.4" }
bytes = "1.2.1"
futures = "0.3.25"
111 changes: 92 additions & 19 deletions bmap-rs/src/main.rs
@@ -1,20 +1,34 @@
use anyhow::{anyhow, bail, Context, Result};
use bmap::{Bmap, Discarder, SeekForward};
use bmap::{AsyncDiscarder, Bmap, Discarder, SeekForward};
use clap::Parser;
use flate2::read::GzDecoder;
use futures::TryStreamExt;
use nix::unistd::ftruncate;
use reqwest::{Response as Reqwest_Response, Url};
use std::ffi::OsStr;
use std::fs::File;
use std::io::Read;
use std::os::unix::io::AsRawFd;
use std::path::{Path, PathBuf};
use tokio_util::io::StreamReader;

#[derive(Parser, Debug)]
struct Copy {
image: PathBuf,
dest: PathBuf,
}

#[derive(PartialEq)]
enum InputType {
Local,
Remote,
}

enum Output {
Sync(std::fs::File),
Async(tokio::fs::File),
}

#[derive(Parser, Debug)]

enum Command {
Expand Down Expand Up @@ -76,7 +90,7 @@ impl SeekForward for Decoder {
}
}

fn setup_input(path: &Path) -> Result<Decoder> {
fn setup_local_input(path: &Path) -> Result<Decoder> {
let f = File::open(path)?;
match path.extension().and_then(OsStr::to_str) {
Some("gz") => {
Expand All @@ -87,41 +101,100 @@ fn setup_input(path: &Path) -> Result<Decoder> {
}
}

fn copy(c: Copy) -> Result<()> {
if !c.image.exists() {
async fn setup_remote_input(path: &Path) -> Reqwest_Response {
if path.extension().unwrap() != "gz" {
panic!("Image file format not implemented")
}
let url = Url::parse(path.to_str().unwrap()).unwrap();
reqwest::get(url).await.unwrap()
}

async fn copy(c: Copy) -> Result<()> {
let input_type = match Url::parse(c.image.to_str().unwrap()) {
Ok(_) => InputType::Remote,
Err(_) => InputType::Local,
};
if !c.image.exists() && input_type == InputType::Local {
bail!("Image file doesn't exist")
}

let bmap = find_bmap(&c.image).ok_or_else(|| anyhow!("Couldn't find bmap file"))?;
let bmap = match input_type {
InputType::Local => {
find_bmap(&c.image).ok_or_else(|| anyhow!("Couldn't find bmap file"))?
}
InputType::Remote => find_bmap(Path::new(&c.image.file_name().unwrap()))
.ok_or_else(|| anyhow!("Couldn't find bmap file"))?,
};
println!("Found bmap file: {}", bmap.display());

let mut b = File::open(&bmap).context("Failed to open bmap file")?;
let mut xml = String::new();
b.read_to_string(&mut xml)?;

let bmap = Bmap::from_xml(&xml)?;
let mut output = std::fs::OpenOptions::new()
.write(true)
.create(true)
.open(c.dest)?;

if output.metadata()?.is_file() {
ftruncate(output.as_raw_fd(), bmap.image_size() as i64)
.context("Failed to truncate file")?;
let output = match input_type {
InputType::Local => {
let file = std::fs::OpenOptions::new()
.write(true)
.create(true)
.open(c.dest)?;
Output::Sync(file)
}
InputType::Remote => {
let file = tokio::fs::OpenOptions::new()
.write(true)
.create(true)
.open(c.dest)
.await?;
Output::Async(file)
}
};

match output {
Output::Sync(ref output) => {
if output.metadata()?.is_file() {
ftruncate(output.as_raw_fd(), bmap.image_size() as i64)
.context("Failed to truncate file")?;
}
}
Output::Async(ref output) => {
if output.metadata().await?.is_file() {
ftruncate(output.as_raw_fd(), bmap.image_size() as i64)
.context("Failed to truncate file")?;
}
}
}

let mut input = setup_input(&c.image)?;
bmap::copy(&mut input, &mut output, &bmap)?;
println!("Done: Syncing...");
output.sync_all().expect("Sync failure");
match output {
Output::Sync(mut output) => {
let mut input = setup_local_input(&c.image)?;
bmap::copy(&mut input, &mut output, &bmap)?;
println!("Done: Syncing...");
output.sync_all().expect("Sync failure");
}
Output::Async(mut output) => {
let res = setup_remote_input(&c.image).await;
let stream = res
.bytes_stream()
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
.into_stream();
let read = StreamReader::new(stream);
let reader = async_compression::tokio::bufread::GzipDecoder::new(read);
let mut input = AsyncDiscarder::new(reader);
bmap::copy_async(&mut input, &mut output, &bmap).await?;
println!("Done: Syncing...");
output.sync_all().await.expect("Sync failure");
}
};

Ok(())
}

fn main() -> Result<()> {
#[tokio::main]
async fn main() -> Result<()> {
let opts = Opts::parse();

match opts.command {
Command::Copy(c) => copy(c),
Command::Copy(c) => copy(c).await,
}
}
2 changes: 1 addition & 1 deletion bmap/Cargo.toml
Expand Up @@ -15,6 +15,6 @@ sha2 = { version = "0.10.6", features = [ "asm" ] }
strum = { version = "0.24.1", features = [ "derive"] }
digest = "0.10.5"
flate2 = "1.0.20"
tokio = { version = "1.21.2", features = ["full"] }
tokio = { version = "1.21.2" }
async-trait = "0.1.58"
futures = "0.3.25"
47 changes: 46 additions & 1 deletion bmap/src/discarder.rs
@@ -1,6 +1,12 @@
use crate::SeekForward;
use crate::{AsyncSeekForward, SeekForward};
use async_trait::async_trait;
use futures::executor::block_on;
use std::io::Read;
use std::io::Result as IOResult;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use tokio::io::{AsyncRead, AsyncReadExt};

/// Adaptor that implements SeekForward on types only implementing Read by discarding data
pub struct Discarder<R: Read> {
Expand Down Expand Up @@ -36,6 +42,45 @@ impl<R: Read> SeekForward for Discarder<R> {
}
}

//Async implementation

pub struct AsyncDiscarder<R: AsyncRead> {
reader: R,
}

impl<R: AsyncRead> AsyncDiscarder<R> {
pub fn new(reader: R) -> Self {
Self { reader }
}

pub fn into_inner(self) -> R {
self.reader
}
}

impl<R: AsyncRead + Unpin> AsyncRead for AsyncDiscarder<R> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<Result<(), std::io::Error>> {
Pin::new(&mut self.reader).poll_read(cx, buf)
}
}
#[async_trait(?Send)]
impl<R: AsyncRead + Unpin> AsyncSeekForward for AsyncDiscarder<R> {
async fn async_seek_forward(&mut self, forward: u64) -> IOResult<()> {
let mut buf = [0; 4096];
let mut left = forward as usize;
while left > 0 {
let toread = left.min(buf.len());
let r = block_on(self.read(&mut buf[0..toread]))?;
left -= r;
}
Ok(())
}
}

#[cfg(test)]
mod test {
use super::*;
Expand Down

0 comments on commit 1d8a0f8

Please sign in to comment.