Skip to content

Commit

Permalink
Merge pull request #622 from cgwalters/no-async-decompress3
Browse files Browse the repository at this point in the history
container: Drop async_compression + support zstd:chunked
  • Loading branch information
cgwalters committed Apr 29, 2024
2 parents 3014069 + 62c4564 commit 689e793
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 26 deletions.
2 changes: 1 addition & 1 deletion lib/Cargo.toml
Expand Up @@ -12,7 +12,6 @@ rust-version = "1.74.0"
[dependencies]
anyhow = "1.0"
containers-image-proxy = "0.5.5"
async-compression = { version = "0.4", features = ["gzip", "tokio", "zstd"] }
camino = "1.0.4"
chrono = "0.4.19"
olpc-cjson = "0.1.1"
Expand Down Expand Up @@ -43,6 +42,7 @@ tokio = { features = ["io-std", "time", "process", "rt", "net"], version = ">= 1
tokio-util = { features = ["io-util"], version = "0.7" }
tokio-stream = { features = ["sync"], version = "0.1.8" }
tracing = "0.1"
zstd = "0.13.1"

indoc = { version = "2", optional = true }
xshell = { version = "0.2", optional = true }
Expand Down
88 changes: 72 additions & 16 deletions lib/src/container/unencapsulate.rs
Expand Up @@ -36,7 +36,7 @@ use crate::container::store::LayerProgress;
use super::*;
use containers_image_proxy::{ImageProxy, OpenedImage};
use fn_error_context::context;
use futures_util::{Future, FutureExt};
use futures_util::{Future, FutureExt, TryFutureExt as _};
use oci_spec::image as oci_image;
use std::sync::{Arc, Mutex};
use tokio::{
Expand Down Expand Up @@ -189,22 +189,76 @@ pub async fn unencapsulate(repo: &ostree::Repo, imgref: &OstreeImageReference) -
importer.unencapsulate().await
}

/// Take an async AsyncBufRead and handle decompression for it, returning
/// a wrapped AsyncBufRead implementation.
/// This is implemented with a background thread using a pipe-to-self,
/// and so there is an additional Future object returned that is a "driver"
/// task and must also be checked for errors.
pub(crate) fn decompress_bridge<'a>(
src: impl tokio::io::AsyncBufRead + Send + Unpin + 'static,
is_zstd: bool,
) -> Result<(
// This one is the input reader
impl tokio::io::AsyncBufRead + Send + Unpin + 'static,
// And this represents the worker thread doing copying
impl Future<Output = Result<()>> + Send + Unpin + 'static,
)> {
// We use a plain unix pipe() because it's just a very convenient
// way to bridge arbitrarily between sync and async with a worker
// thread. Yes, it involves going through the kernel, but
// eventually we'll replace all this logic with podman anyways.
let (tx, rx) = tokio::net::unix::pipe::pipe()?;
let task = tokio::task::spawn_blocking(move || -> Result<()> {
// Convert the write half of the pipe() into a regular blocking file descriptor
let tx = tx.into_blocking_fd()?;
let mut tx = std::fs::File::from(tx);
// Convert the async input back to synchronous.
let src = tokio_util::io::SyncIoBridge::new(src);
let bufr = std::io::BufReader::new(src);
// Wrap the input in a decompressor; I originally tried to make
// this function take a function pointer, but yeah that was painful
// with the type system.
let mut src: Box<dyn std::io::Read> = if is_zstd {
Box::new(zstd::stream::read::Decoder::new(bufr)?)
} else {
Box::new(flate2::bufread::GzDecoder::new(bufr))
};
// We don't care about the number of bytes copied
let _n: u64 = std::io::copy(&mut src, &mut tx)?;
Ok(())
})
// Flatten the nested Result<Result<>>
.map(crate::tokio_util::flatten_anyhow);
// And return the pair of futures
Ok((tokio::io::BufReader::new(rx), task))
}

/// Create a decompressor for this MIME type, given a stream of input.
fn new_async_decompressor<'a>(
media_type: &oci_image::MediaType,
src: impl AsyncBufRead + Send + Unpin + 'a,
) -> Result<Box<dyn AsyncBufRead + Send + Unpin + 'a>> {
match media_type {
oci_image::MediaType::ImageLayerGzip => Ok(Box::new(tokio::io::BufReader::new(
async_compression::tokio::bufread::GzipDecoder::new(src),
))),
oci_image::MediaType::ImageLayerZstd => Ok(Box::new(tokio::io::BufReader::new(
async_compression::tokio::bufread::ZstdDecoder::new(src),
))),
oci_image::MediaType::ImageLayer => Ok(Box::new(src)),
oci_image::MediaType::Other(t) if t.as_str() == DOCKER_TYPE_LAYER_TAR => Ok(Box::new(src)),
o => Err(anyhow::anyhow!("Unhandled layer type: {}", o)),
}
src: impl AsyncBufRead + Send + Unpin + 'static,
) -> Result<(
Box<dyn AsyncBufRead + Send + Unpin + 'static>,
impl Future<Output = Result<()>> + Send + Unpin + 'static,
)> {
let r: (
Box<dyn AsyncBufRead + Send + Unpin + 'static>,
Box<dyn Future<Output = Result<()>> + Send + Unpin + 'static>,
) = match media_type {
m @ (oci_image::MediaType::ImageLayerGzip | oci_image::MediaType::ImageLayerZstd) => {
let is_zstd = matches!(m, oci_image::MediaType::ImageLayerZstd);
let (r, driver) = decompress_bridge(src, is_zstd)?;
(Box::new(r), Box::new(driver) as _)
}
oci_image::MediaType::ImageLayer => {
(Box::new(src), Box::new(futures_util::future::ready(Ok(()))))
}
oci_image::MediaType::Other(t) if t.as_str() == DOCKER_TYPE_LAYER_TAR => {
(Box::new(src), Box::new(futures_util::future::ready(Ok(()))))
}
o => anyhow::bail!("Unhandled layer type: {}", o),
};
Ok(r)
}

/// A wrapper for [`get_blob`] which fetches a layer and decompresses it.
Expand Down Expand Up @@ -262,11 +316,13 @@ pub(crate) async fn fetch_layer_decompress<'a>(
progress.send_replace(Some(status));
}
};
let reader = new_async_decompressor(media_type, readprogress)?;
let (reader, compression_driver) = new_async_decompressor(media_type, readprogress)?;
let driver = driver.and_then(|()| compression_driver);
let driver = futures_util::future::join(readproxy, driver).map(|r| r.1);
Ok((reader, Either::Left(driver)))
} else {
let blob = new_async_decompressor(media_type, blob)?;
let (blob, compression_driver) = new_async_decompressor(media_type, blob)?;
let driver = driver.and_then(|()| compression_driver);
Ok((blob, Either::Right(driver)))
}
}
29 changes: 20 additions & 9 deletions lib/tests/it/main.rs
Expand Up @@ -401,10 +401,11 @@ async fn test_tar_write() -> Result<()> {
#[tokio::test]
async fn test_tar_write_tar_layer() -> Result<()> {
let fixture = Fixture::new_v1()?;
let uncompressed_tar = tokio::io::BufReader::new(
async_compression::tokio::bufread::GzipDecoder::new(EXAMPLE_TAR_LAYER),
);
ostree_ext::tar::write_tar(fixture.destrepo(), uncompressed_tar, "test", None).await?;
let mut v = Vec::new();
let mut dec = flate2::bufread::GzDecoder::new(std::io::Cursor::new(EXAMPLE_TAR_LAYER));
let _n = std::io::copy(&mut dec, &mut v)?;
let r = tokio::io::BufReader::new(std::io::Cursor::new(v));
ostree_ext::tar::write_tar(fixture.destrepo(), r, "test", None).await?;
Ok(())
}

Expand Down Expand Up @@ -1261,10 +1262,8 @@ async fn test_container_write_derive() -> Result<()> {
Ok(())
}

/// Test for zstd
/// We need to handle the case of modified hardlinks into /sysroot
#[tokio::test]
async fn test_container_zstd() -> Result<()> {
/// Implementation of a test case for non-gzip (i.e. zstd or zstd:chunked) compression
async fn test_non_gzip(format: &str) -> Result<()> {
let fixture = Fixture::new_v1()?;
let baseimg = &fixture.export_container().await?.0;
let basepath = &match baseimg.transport {
Expand All @@ -1276,7 +1275,7 @@ async fn test_container_zstd() -> Result<()> {
let st = tokio::process::Command::new("skopeo")
.args([
"copy",
"--dest-compress-format=zstd",
&format!("--dest-compress-format={format}"),
baseimg_ref.as_str(),
&format!("oci:{zstd_image_path}"),
])
Expand All @@ -1302,6 +1301,18 @@ async fn test_container_zstd() -> Result<()> {
Ok(())
}

/// Test for zstd
#[tokio::test]
async fn test_container_zstd() -> Result<()> {
test_non_gzip("zstd").await
}

/// Test for zstd:chunked
#[tokio::test]
async fn test_container_zstd_chunked() -> Result<()> {
test_non_gzip("zstd:chunked").await
}

/// Test for https://github.com/ostreedev/ostree-rs-ext/issues/405
/// We need to handle the case of modified hardlinks into /sysroot
#[tokio::test]
Expand Down

0 comments on commit 689e793

Please sign in to comment.