Skip to content

Commit

Permalink
Download compressed gz image from remote source
Browse files Browse the repository at this point in the history
Accept url arguments for remote image download and copy.
Implements async support for use of reqwest. Bmap file is searched as in
local option in the current file with same name and the extension ".bmap"

Signed-off-by: Rafael Garcia Ruiz <rafael.garcia@collabora.com>
  • Loading branch information
Razaloc committed Oct 28, 2022
1 parent 7b6d784 commit 6726fa1
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 13 deletions.
6 changes: 6 additions & 0 deletions bmap-rs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,9 @@ anyhow = "1.0.66"
nix = "0.25.0"
flate2 = "1.0.24"
clap = { version = "4.0.18", features = ["derive"] }
async-compression = { version = "0.3.15", features = ["gzip", "futures-io"] }
tokio = { version = "1.21.2", features = ["rt", "macros", "fs", "rt-multi-thread"] }
reqwest = { version = "0.11.12", features = ["stream"] }
tokio-util= { version = "0.7.4", features = ["compat"] }
bytes = "1.2.1"
futures = "0.3.25"
67 changes: 61 additions & 6 deletions bmap-rs/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
use anyhow::{anyhow, bail, Context, Result};
use bmap::{Bmap, Discarder, SeekForward};
use bmap::{AsyncDiscarder, Bmap, Discarder, SeekForward};
use clap::Parser;
use flate2::read::GzDecoder;
use futures::TryStreamExt;
use nix::unistd::ftruncate;
use reqwest::{Response as Reqwest_Response, Url};
use std::ffi::OsStr;
use std::fs::File;
use std::io::Read;
use std::os::unix::io::AsRawFd;
use std::path::{Path, PathBuf};
use tokio_util::compat::TokioAsyncReadCompatExt;

#[derive(Parser, Debug)]
struct Copy {
Expand Down Expand Up @@ -76,7 +79,7 @@ impl SeekForward for Decoder {
}
}

fn setup_input(path: &Path) -> Result<Decoder> {
fn setup_local_input(path: &Path) -> Result<Decoder> {
let f = File::open(path)?;
match path.extension().and_then(OsStr::to_str) {
Some("gz") => {
Expand All @@ -87,7 +90,22 @@ fn setup_input(path: &Path) -> Result<Decoder> {
}
}

fn copy(c: Copy) -> Result<()> {
async fn setup_remote_input(path: &Path) -> Reqwest_Response {
if path.extension().unwrap() != "gz" {
panic!("Image file format not implemented")
}
let url = Url::parse(path.to_str().unwrap()).unwrap();
reqwest::get(url).await.unwrap()
}

async fn copy(c: Copy) -> Result<()> {
match Url::parse(c.image.to_str().unwrap()) {
Ok(_) => copy_remote_input(c).await,
Err(_) => copy_local_input(c),
}
}

fn copy_local_input(c: Copy) -> Result<()> {
if !c.image.exists() {
bail!("Image file doesn't exist")
}
Expand All @@ -110,18 +128,55 @@ fn copy(c: Copy) -> Result<()> {
.context("Failed to truncate file")?;
}

let mut input = setup_input(&c.image)?;
let mut input = setup_local_input(&c.image)?;
bmap::copy(&mut input, &mut output, &bmap)?;
println!("Done: Syncing...");
output.sync_all().expect("Sync failure");

Ok(())
}

fn main() -> Result<()> {
async fn copy_remote_input(c: Copy) -> Result<()> {
let bmap = find_bmap(Path::new(&c.image.file_name().unwrap()))
.ok_or_else(|| anyhow!("Couldn't find bmap file"))?;
println!("Found bmap file: {}", bmap.display());

let mut b = File::open(&bmap).context("Failed to open bmap file")?;
let mut xml = String::new();
b.read_to_string(&mut xml)?;

let bmap = Bmap::from_xml(&xml)?;
let mut output = tokio::fs::OpenOptions::new()
.write(true)
.create(true)
.open(c.dest)
.await?;

if output.metadata().await?.is_file() {
ftruncate(output.as_raw_fd(), bmap.image_size() as i64)
.context("Failed to truncate file")?;
}

let res = setup_remote_input(&c.image).await;
let stream = res
.bytes_stream()
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
.into_async_read();
let reader = async_compression::futures::bufread::GzipDecoder::new(stream);
let mut input = AsyncDiscarder::new(reader);
bmap::copy_async(&mut input, &mut (&mut output).compat(), &bmap).await?;

println!("Done: Syncing...");
output.sync_all().await.expect("Sync failure");

Ok(())
}

#[tokio::main]
async fn main() -> Result<()> {
let opts = Opts::parse();

match opts.command {
Command::Copy(c) => copy(c),
Command::Copy(c) => copy(c).await,
}
}
1 change: 0 additions & 1 deletion bmap/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,5 @@ sha2 = { version = "0.10.6", features = [ "asm" ] }
strum = { version = "0.24.1", features = [ "derive"] }
digest = "0.10.5"
flate2 = "1.0.20"
tokio = { version = "1.21.2", features = ["full"] }
async-trait = "0.1.58"
futures = "0.3.25"
63 changes: 60 additions & 3 deletions bmap/src/discarder.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use crate::SeekForward;
use std::io::Read;
use std::io::Result as IOResult;
use crate::{AsyncSeekForward, SeekForward};
use async_trait::async_trait;
use futures::executor::block_on;
use futures::io::{AsyncRead, AsyncReadExt};
use std::io::{Read, Result as IOResult};
use std::pin::Pin;
use std::task::{Context, Poll};

/// Adaptor that implements SeekForward on types only implementing Read by discarding data
pub struct Discarder<R: Read> {
Expand Down Expand Up @@ -36,6 +40,59 @@ impl<R: Read> SeekForward for Discarder<R> {
}
}

//Async implementation

pub struct AsyncDiscarder<R: AsyncRead> {
reader: R,
}

impl<R: AsyncRead> AsyncDiscarder<R> {
pub fn new(reader: R) -> Self {
Self { reader }
}

pub fn into_inner(self) -> R {
self.reader
}
}

impl<R: AsyncRead + Unpin> AsyncRead for AsyncDiscarder<R> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<Result<usize, std::io::Error>> {
Pin::new(&mut self.reader).poll_read(cx, buf)
}

fn poll_read_vectored(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &mut [std::io::IoSliceMut<'_>],
) -> Poll<IOResult<usize>> {
for b in bufs {
if !b.is_empty() {
return self.poll_read(cx, b);
}
}

Pin::new(&mut self.reader).poll_read(cx, &mut [])
}
}
#[async_trait(?Send)]
impl<R: AsyncRead + Unpin> AsyncSeekForward for AsyncDiscarder<R> {
async fn async_seek_forward(&mut self, forward: u64) -> IOResult<()> {
let mut buf = [0; 4096];
let mut left = forward as usize;
while left > 0 {
let toread = left.min(buf.len());
let r = block_on(self.read(&mut buf[0..toread]))?;
left -= r;
}
Ok(())
}
}

#[cfg(test)]
mod test {
use super::*;
Expand Down
5 changes: 2 additions & 3 deletions bmap/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@ pub use crate::bmap::*;
mod discarder;
pub use crate::discarder::*;
use async_trait::async_trait;
use futures::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt};
use futures::TryFutureExt;
use sha2::{Digest, Sha256};
use std::io::Result as IOResult;
use std::io::{Read, Seek, SeekFrom, Write};
use std::io::{Read, Result as IOResult, Seek, SeekFrom, Write};
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt};

/// Trait that can only seek further forwards
pub trait SeekForward {
Expand Down

0 comments on commit 6726fa1

Please sign in to comment.