Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup ChunkReader (#4118) #4156

Merged
merged 5 commits into from
May 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
39 changes: 22 additions & 17 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -744,6 +744,25 @@ enum ColumnChunkData {
Dense { offset: usize, data: Bytes },
}

impl ColumnChunkData {
fn get(&self, start: u64) -> Result<Bytes> {
match &self {
ColumnChunkData::Sparse { data, .. } => data
.binary_search_by_key(&start, |(offset, _)| *offset as u64)
.map(|idx| data[idx].1.clone())
.map_err(|_| {
ParquetError::General(format!(
"Invalid offset in sparse column chunk data: {start}"
))
}),
ColumnChunkData::Dense { offset, data } => {
let start = start as usize - *offset;
Ok(data.slice(start..))
}
}
}
}

impl Length for ColumnChunkData {
fn len(&self) -> u64 {
match &self {
Expand All @@ -756,26 +775,12 @@ impl Length for ColumnChunkData {
impl ChunkReader for ColumnChunkData {
type T = bytes::buf::Reader<Bytes>;

fn get_read(&self, start: u64, length: usize) -> Result<Self::T> {
Ok(self.get_bytes(start, length)?.reader())
fn get_read(&self, start: u64) -> Result<Self::T> {
Ok(self.get(start)?.reader())
}

fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
match &self {
ColumnChunkData::Sparse { data, .. } => data
.binary_search_by_key(&start, |(offset, _)| *offset as u64)
.map(|idx| data[idx].1.slice(0..length))
.map_err(|_| {
ParquetError::General(format!(
"Invalid offset in sparse column chunk data: {start}"
))
}),
ColumnChunkData::Dense { offset, data } => {
let start = start as usize - *offset;
let end = start + length;
Ok(data.slice(start..end))
}
}
Ok(self.get(start)?.slice(..length))
}
}

Expand Down
3 changes: 1 addition & 2 deletions parquet/src/bin/parquet-layout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,7 @@ fn read_page_header<C: ChunkReader>(
}
}

let len = reader.len().checked_sub(offset).unwrap() as usize;
let input = reader.get_read(offset, len)?;
let input = reader.get_read(offset)?;
let mut tracked = TrackedRead(input, 0);
let mut prot = TCompactInputProtocol::new(&mut tracked);
let header = PageHeader::read_from_in_protocol(&mut prot)?;
Expand Down
2 changes: 1 addition & 1 deletion parquet/src/file/footer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub fn parse_metadata<R: ChunkReader>(chunk_reader: &R) -> Result<ParquetMetaDat

let mut footer = [0_u8; 8];
chunk_reader
.get_read(file_size - 8, 8)?
.get_read(file_size - 8)?
.read_exact(&mut footer)?;

let metadata_len = decode_footer(&footer)?;
Expand Down
68 changes: 59 additions & 9 deletions parquet/src/file/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
//! readers to read individual column chunks, or access record
//! iterator.

use bytes::Bytes;
use bytes::{Buf, Bytes};
use std::fs::File;
use std::io::{BufReader, Seek, SeekFrom};
use std::{boxed::Box, io::Read, sync::Arc};

use crate::bloom_filter::Sbbf;
Expand All @@ -44,19 +46,47 @@ pub trait Length {
}

/// The ChunkReader trait generates readers of chunks of a source.
/// For a file system reader, each chunk might contain a clone of File bounded on a given range.
/// For an object store reader, each read can be mapped to a range request.
///
/// For more information see [`File::try_clone`]
pub trait ChunkReader: Length + Send + Sync {
type T: Read + Send;
/// Get a serially readable slice of the current reader
/// This should fail if the slice exceeds the current bounds
fn get_read(&self, start: u64, length: usize) -> Result<Self::T>;
type T: Read;

/// Get a [`Read`] starting at the provided file offset
///
/// Subsequent or concurrent calls to [`Self::get_read`] or [`Self::get_bytes`] may
Copy link
Contributor Author

@tustvold tustvold Apr 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FileSource provided protection against subsequent calls to get_read, by calling Seek on every read, but provided no protection against concurrent access. I think it is less risky to just clearly not support non-serial usage, than to only break on concurrent usage.

TBC there are no safety implications of not synchronising this access. You will just get interleaved reads, which is no different from just reading gibberish.

One option would be to add Mutex to synchronise access, however, this solution is necessarily incomplete as a user can just call File::try_clone. Ultimately there is no reliable way to synchronise file IO, I think if no synchronisation is fine for the standard library, it is fine for the parquet crate.

/// side-effect on previously returned [`Self::T`]. Care should be taken to avoid this
///
/// See [`File::try_clone`] for more information
fn get_read(&self, start: u64) -> Result<Self::T>;

/// Get a range as bytes
/// This should fail if the exact number of bytes cannot be read
///
/// Concurrent calls to [`Self::get_bytes`] may result in interleaved output
///
/// See [`File::try_clone`] for more information
fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes>;
}

impl Length for File {
fn len(&self) -> u64 {
self.metadata().map(|m| m.len()).unwrap_or(0u64)
}
}

impl ChunkReader for File {
type T = BufReader<File>;

fn get_read(&self, start: u64) -> Result<Self::T> {
let mut reader = self.try_clone()?;
reader.seek(SeekFrom::Start(start))?;
Ok(BufReader::new(self.try_clone()?))
}

fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
let mut buffer = Vec::with_capacity(length);
let read = self.get_read(start, length)?.read_to_end(&mut buffer)?;
let mut reader = self.try_clone()?;
reader.seek(SeekFrom::Start(start))?;
let read = reader.take(length as _).read_to_end(&mut buffer)?;

if read != length {
return Err(eof_err!(
Expand All @@ -69,6 +99,26 @@ pub trait ChunkReader: Length + Send + Sync {
}
}

impl Length for Bytes {
fn len(&self) -> u64 {
self.len() as u64
}
}

impl ChunkReader for Bytes {
type T = bytes::buf::Reader<Bytes>;

fn get_read(&self, start: u64) -> Result<Self::T> {
let start = start as usize;
Ok(self.slice(start..).reader())
}

fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
let start = start as usize;
Ok(self.slice(start..start + length))
}
}

// ----------------------------------------------------------------------
// APIs for file & row group readers

Expand Down
63 changes: 5 additions & 58 deletions parquet/src/file/serialized_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,60 +40,8 @@ use crate::format::{PageHeader, PageLocation, PageType};
use crate::record::reader::RowIter;
use crate::record::Row;
use crate::schema::types::Type as SchemaType;
use crate::util::{io::TryClone, memory::ByteBufferPtr};
use bytes::{Buf, Bytes};
use crate::util::memory::ByteBufferPtr;
use thrift::protocol::{TCompactInputProtocol, TSerializable};
// export `SliceableCursor` and `FileSource` publicly so clients can
// re-use the logic in their own ParquetFileWriter wrappers
pub use crate::util::io::FileSource;

// ----------------------------------------------------------------------
// Implementations of traits facilitating the creation of a new reader

impl Length for File {
fn len(&self) -> u64 {
self.metadata().map(|m| m.len()).unwrap_or(0u64)
}
}

impl TryClone for File {
fn try_clone(&self) -> std::io::Result<Self> {
self.try_clone()
}
}

impl ChunkReader for File {
type T = FileSource<File>;

fn get_read(&self, start: u64, length: usize) -> Result<Self::T> {
Ok(FileSource::new(self, start, length))
}
}

impl Length for Bytes {
fn len(&self) -> u64 {
self.len() as u64
}
}

impl TryClone for Bytes {
fn try_clone(&self) -> std::io::Result<Self> {
Ok(self.clone())
}
}

impl ChunkReader for Bytes {
type T = bytes::buf::Reader<Bytes>;

fn get_read(&self, start: u64, length: usize) -> Result<Self::T> {
Ok(self.get_bytes(start, length)?.reader())
}

fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
let start = start as usize;
Ok(self.slice(start..start + length))
}
}

impl TryFrom<File> for SerializedFileReader<File> {
type Error = ParquetError;
Expand Down Expand Up @@ -662,7 +610,7 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
return Ok(None);
}

let mut read = self.reader.get_read(*offset as u64, *remaining)?;
let mut read = self.reader.get_read(*offset as u64)?;
let header = if let Some(header) = next_page_header.take() {
*header
} else {
Expand Down Expand Up @@ -752,8 +700,7 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
continue;
}
} else {
let mut read =
self.reader.get_read(*offset as u64, *remaining_bytes)?;
let mut read = self.reader.get_read(*offset as u64)?;
let (header_len, header) = read_page_header_len(&mut read)?;
*offset += header_len;
*remaining_bytes -= header_len;
Expand Down Expand Up @@ -807,8 +754,7 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
*offset += buffered_header.compressed_page_size as usize;
*remaining_bytes -= buffered_header.compressed_page_size as usize;
} else {
let mut read =
self.reader.get_read(*offset as u64, *remaining_bytes)?;
let mut read = self.reader.get_read(*offset as u64)?;
let (header_len, header) = read_page_header_len(&mut read)?;
let data_page_size = header.compressed_page_size as usize;
*offset += header_len + data_page_size;
Expand All @@ -827,6 +773,7 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> {

#[cfg(test)]
mod tests {
use bytes::Bytes;
use std::sync::Arc;

use crate::format::BoundaryOrder;
Expand Down