diff --git a/crates/ostree-ext/src/container/store.rs b/crates/ostree-ext/src/container/store.rs index 6ffa684d..8a23b3dd 100644 --- a/crates/ostree-ext/src/container/store.rs +++ b/crates/ostree-ext/src/container/store.rs @@ -7,7 +7,7 @@ use super::*; use crate::chunking::{self, Chunk}; -use crate::container::Decompressor; +use crate::generic_decompress::Decompressor; use crate::logging::system_repo_journal_print; use crate::refescape; use crate::sysroot::SysrootLock; diff --git a/crates/ostree-ext/src/container/unencapsulate.rs b/crates/ostree-ext/src/container/unencapsulate.rs index 81eb90c7..27efe7ea 100644 --- a/crates/ostree-ext/src/container/unencapsulate.rs +++ b/crates/ostree-ext/src/container/unencapsulate.rs @@ -38,7 +38,6 @@ use containers_image_proxy::{ImageProxy, OpenedImage}; use fn_error_context::context; use futures_util::{Future, FutureExt}; use oci_spec::image::{self as oci_image, Digest}; -use std::io::Read; use std::sync::{Arc, Mutex}; use tokio::{ io::{AsyncBufRead, AsyncRead}, @@ -46,11 +45,6 @@ use tokio::{ }; use tracing::instrument; -/// The legacy MIME type returned by the skopeo/(containers/storage) code -/// when we have local uncompressed docker-formatted image. -/// TODO: change the skopeo code to shield us from this correctly -const DOCKER_TYPE_LAYER_TAR: &str = "application/vnd.docker.image.rootfs.diff.tar"; - type Progress = tokio::sync::watch::Sender; /// A read wrapper that updates the download progress. @@ -191,89 +185,6 @@ pub async fn unencapsulate(repo: &ostree::Repo, imgref: &OstreeImageReference) - importer.unencapsulate().await } -pub(crate) struct Decompressor { - inner: Box, - finished: bool, -} - -impl Read for Decompressor { - fn read(&mut self, buf: &mut [u8]) -> std::io::Result { - self.inner.read(buf) - } -} - -impl Drop for Decompressor { - fn drop(&mut self) { - if self.finished { - return; - } - - // We really should not get here; users are required to call - // `finish()` to clean up the stream. But we'll give - // best-effort to clean things up nonetheless. If things go - // wrong, then panic, because we're in a bad state and it's - // likely that we end up with a broken pipe error or a - // deadlock. - self._finish().expect("Decompressor::finish MUST be called") - } -} - -impl Decompressor { - /// Create a decompressor for this MIME type, given a stream of input. - pub(crate) fn new( - media_type: &oci_image::MediaType, - src: impl Read + Send + 'static, - ) -> Result { - let r: Box = match media_type { - oci_image::MediaType::ImageLayerZstd => { - Box::new(zstd::stream::read::Decoder::new(src)?) - } - oci_image::MediaType::ImageLayerGzip => Box::new(flate2::bufread::GzDecoder::new( - std::io::BufReader::new(src), - )), - oci_image::MediaType::ImageLayer => Box::new(src), - oci_image::MediaType::Other(t) if t.as_str() == DOCKER_TYPE_LAYER_TAR => Box::new(src), - o => anyhow::bail!("Unhandled layer type: {}", o), - }; - Ok(Self { - inner: r, - finished: false, - }) - } - - pub(crate) fn finish(mut self) -> Result<()> { - self._finish() - } - - fn _finish(&mut self) -> Result<()> { - self.finished = true; - - // We need to make sure to flush out the decompressor and/or - // tar stream here. For tar, we might not read through the - // entire stream, because the archive has zero-block-markers - // at the end; or possibly because the final entry is filtered - // in filter_tar so we don't advance to read the data. For - // decompressor, zstd:chunked layers will have - // metadata/skippable frames at the end of the stream. That - // data isn't relevant to the tar stream, but if we don't read - // it here then on the skopeo proxy we'll block trying to - // write the end of the stream. That in turn will block our - // client end trying to call FinishPipe, and we end up - // deadlocking ourselves through skopeo. - // - // https://github.com/bootc-dev/bootc/issues/1204 - - let mut sink = std::io::sink(); - let n = std::io::copy(&mut self.inner, &mut sink)?; - - if n > 0 { - tracing::debug!("Read extra {n} bytes at end of decompressor stream"); - } - - Ok(()) - } -} - /// A wrapper for [`get_blob`] which fetches a layer and decompresses it. pub(crate) async fn fetch_layer<'a>( proxy: &'a ImageProxy, @@ -335,31 +246,3 @@ pub(crate) async fn fetch_layer<'a>( Ok((Box::new(blob), Either::Right(driver), media_type)) } } - -#[cfg(test)] -mod tests { - use super::*; - - struct BrokenPipe; - - impl Read for BrokenPipe { - fn read(&mut self, _buf: &mut [u8]) -> std::io::Result { - std::io::Result::Err(std::io::ErrorKind::BrokenPipe.into()) - } - } - - #[test] - #[should_panic(expected = "Decompressor::finish MUST be called")] - fn test_drop_decompressor_with_finish_error_should_panic() { - let broken = BrokenPipe; - let d = Decompressor::new(&oci_image::MediaType::ImageLayer, broken).unwrap(); - drop(d) - } - - #[test] - fn test_drop_decompressor_with_successful_finish() { - let empty = std::io::empty(); - let d = Decompressor::new(&oci_image::MediaType::ImageLayer, empty).unwrap(); - drop(d) - } -} diff --git a/crates/ostree-ext/src/generic_decompress.rs b/crates/ostree-ext/src/generic_decompress.rs new file mode 100644 index 00000000..9d2f32de --- /dev/null +++ b/crates/ostree-ext/src/generic_decompress.rs @@ -0,0 +1,231 @@ +//! This module primarily contains the `Decompressor` struct which is +//! used to decompress a stream based on its OCI media type. +//! +//! It also contains the `ReadWithGetInnerMut` trait and related +//! concrete implementations thereof. These provide a means for each +//! specific decompressor to give mutable access to the inner reader. +//! +//! For example, the GzipDecompressor would give the underlying +//! compressed stream. +//! +//! We need a common way to access this stream so that we can flush +//! the data during cleanup. +//! +//! See: + +use std::io::Read; + +use crate::oci_spec::image as oci_image; + +/// The legacy MIME type returned by the skopeo/(containers/storage) code +/// when we have local uncompressed docker-formatted image. +/// TODO: change the skopeo code to shield us from this correctly +const DOCKER_TYPE_LAYER_TAR: &str = "application/vnd.docker.image.rootfs.diff.tar"; + +/// Extends the `Read` trait with another method to get mutable access to the inner reader +trait ReadWithGetInnerMut: Read + Send + 'static { + fn get_inner_mut(&mut self) -> &mut (dyn Read); +} + +// TransparentDecompressor + +struct TransparentDecompressor(R); + +impl Read for TransparentDecompressor { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + self.0.read(buf) + } +} + +impl ReadWithGetInnerMut for TransparentDecompressor { + fn get_inner_mut(&mut self) -> &mut (dyn Read) { + &mut self.0 + } +} + +// GzipDecompressor + +struct GzipDecompressor(flate2::bufread::GzDecoder); + +impl Read for GzipDecompressor { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + self.0.read(buf) + } +} + +impl ReadWithGetInnerMut for GzipDecompressor { + fn get_inner_mut(&mut self) -> &mut (dyn Read) { + self.0.get_mut() + } +} + +// ZstdDecompressor + +struct ZstdDecompressor<'a, R: std::io::BufRead>(zstd::stream::read::Decoder<'a, R>); + +impl<'a: 'static, R: std::io::BufRead + Send + 'static> Read for ZstdDecompressor<'a, R> { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + self.0.read(buf) + } +} + +impl<'a: 'static, R: std::io::BufRead + Send + 'static> ReadWithGetInnerMut + for ZstdDecompressor<'a, R> +{ + fn get_inner_mut(&mut self) -> &mut (dyn Read) { + self.0.get_mut() + } +} + +pub(crate) struct Decompressor { + inner: Box, + finished: bool, +} + +impl Read for Decompressor { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + self.inner.read(buf) + } +} + +impl Drop for Decompressor { + fn drop(&mut self) { + if self.finished { + return; + } + + // Ideally we should not get here; users should call + // `finish()` to clean up the stream. But in reality there's + // codepaths that can and will short-circuit error out while + // processing the stream, and the Decompressor will get + // dropped before it's finished in those cases. We'll give + // best-effort to clean things up nonetheless. If things go + // wrong, then panic, because we're in a bad state and it's + // likely that we end up with a broken pipe error or a + // deadlock. + self._finish() + .expect("Failed to flush pipe while dropping Decompressor") + } +} + +impl Decompressor { + /// Create a decompressor for this MIME type, given a stream of input. + pub(crate) fn new( + media_type: &oci_image::MediaType, + src: impl Read + Send + 'static, + ) -> anyhow::Result { + let r: Box = match media_type { + oci_image::MediaType::ImageLayerZstd => { + Box::new(ZstdDecompressor(zstd::stream::read::Decoder::new(src)?)) + } + oci_image::MediaType::ImageLayerGzip => Box::new(GzipDecompressor( + flate2::bufread::GzDecoder::new(std::io::BufReader::new(src)), + )), + oci_image::MediaType::ImageLayer => Box::new(TransparentDecompressor(src)), + oci_image::MediaType::Other(t) if t.as_str() == DOCKER_TYPE_LAYER_TAR => { + Box::new(TransparentDecompressor(src)) + } + o => anyhow::bail!("Unhandled layer type: {}", o), + }; + Ok(Self { + inner: r, + finished: false, + }) + } + + pub(crate) fn finish(mut self) -> anyhow::Result<()> { + self._finish() + } + + fn _finish(&mut self) -> anyhow::Result<()> { + self.finished = true; + + // We need to make sure to flush out the decompressor and/or + // tar stream here. For tar, we might not read through the + // entire stream, because the archive has zero-block-markers + // at the end; or possibly because the final entry is filtered + // in filter_tar so we don't advance to read the data. For + // decompressor, zstd:chunked layers will have + // metadata/skippable frames at the end of the stream. That + // data isn't relevant to the tar stream, but if we don't read + // it here then on the skopeo proxy we'll block trying to + // write the end of the stream. That in turn will block our + // client end trying to call FinishPipe, and we end up + // deadlocking ourselves through skopeo. + // + // https://github.com/bootc-dev/bootc/issues/1204 + + let mut sink = std::io::sink(); + let n = std::io::copy(self.inner.get_inner_mut(), &mut sink)?; + + if n > 0 { + tracing::debug!("Read extra {n} bytes at end of decompressor stream"); + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + struct BrokenPipe; + + impl Read for BrokenPipe { + fn read(&mut self, _buf: &mut [u8]) -> std::io::Result { + std::io::Result::Err(std::io::ErrorKind::BrokenPipe.into()) + } + } + + #[test] + #[should_panic(expected = "Failed to flush pipe while dropping Decompressor")] + fn test_drop_decompressor_with_finish_error_should_panic() { + let broken = BrokenPipe; + let d = Decompressor::new(&oci_image::MediaType::ImageLayer, broken).unwrap(); + drop(d) + } + + #[test] + fn test_drop_decompressor_with_successful_finish() { + let empty = std::io::empty(); + let d = Decompressor::new(&oci_image::MediaType::ImageLayer, empty).unwrap(); + drop(d) + } + + #[test] + fn test_drop_decompressor_with_incomplete_gzip_data() { + let empty = std::io::empty(); + let d = Decompressor::new(&oci_image::MediaType::ImageLayerGzip, empty).unwrap(); + drop(d) + } + + #[test] + fn test_drop_decompressor_with_incomplete_zstd_data() { + let empty = std::io::empty(); + let d = Decompressor::new(&oci_image::MediaType::ImageLayerZstd, empty).unwrap(); + drop(d) + } + + #[test] + fn test_gzip_decompressor_with_garbage_input() { + let garbage = b"This is not valid gzip data"; + let mut d = Decompressor::new(&oci_image::MediaType::ImageLayerGzip, &garbage[..]).unwrap(); + let mut buf = [0u8; 32]; + let e = d.read(&mut buf).unwrap_err(); + assert!(matches!(e.kind(), std::io::ErrorKind::InvalidInput)); + assert_eq!(e.to_string(), "invalid gzip header".to_string()); + drop(d) + } + + #[test] + fn test_zstd_decompressor_with_garbage_input() { + let garbage = b"This is not valid zstd data"; + let mut d = Decompressor::new(&oci_image::MediaType::ImageLayerZstd, &garbage[..]).unwrap(); + let mut buf = [0u8; 32]; + let e = d.read(&mut buf).unwrap_err(); + assert!(matches!(e.kind(), std::io::ErrorKind::Other)); + assert_eq!(e.to_string(), "Unknown frame descriptor".to_string()); + drop(d) + } +} diff --git a/crates/ostree-ext/src/lib.rs b/crates/ostree-ext/src/lib.rs index 53f8267c..7889a022 100644 --- a/crates/ostree-ext/src/lib.rs +++ b/crates/ostree-ext/src/lib.rs @@ -37,6 +37,7 @@ pub mod cli; pub mod container; pub mod container_utils; pub mod diff; +pub(crate) mod generic_decompress; pub mod ima; pub mod keyfileext; pub(crate) mod logging; diff --git a/crates/ostree-ext/src/tar/write.rs b/crates/ostree-ext/src/tar/write.rs index 255418aa..9b25705c 100644 --- a/crates/ostree-ext/src/tar/write.rs +++ b/crates/ostree-ext/src/tar/write.rs @@ -7,7 +7,7 @@ //! In the future, this may also evolve into parsing the tar //! stream in Rust, not in C. -use crate::container::Decompressor; +use crate::generic_decompress::Decompressor; use crate::Result; use anyhow::{anyhow, Context}; use camino::{Utf8Component, Utf8Path, Utf8PathBuf};