Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Push ChunkReader into SerializedPageReader (#2463) #2464

Merged
merged 1 commit into from Aug 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
46 changes: 22 additions & 24 deletions parquet/src/column/writer/mod.rs
Expand Up @@ -1082,6 +1082,7 @@ fn compare_greater_byte_array_decimals(a: &[u8], b: &[u8]) -> bool {

#[cfg(test)]
mod tests {
use bytes::Bytes;
use parquet_format::BoundaryOrder;
use rand::distributions::uniform::SampleUniform;
use std::sync::Arc;
Expand All @@ -1096,7 +1097,7 @@ mod tests {
writer::SerializedPageWriter,
};
use crate::schema::types::{ColumnDescriptor, ColumnPath, Type as SchemaType};
use crate::util::{io::FileSource, test_common::rand_gen::random_numbers_range};
use crate::util::test_common::rand_gen::random_numbers_range;

use super::*;

Expand Down Expand Up @@ -1645,7 +1646,7 @@ mod tests {
)
.unwrap();

let (_, _, metadata, _, _) = writer.close().unwrap();
let (_, rows_written, metadata, _, _) = writer.close().unwrap();

let stats = metadata.statistics().unwrap();
assert_eq!(stats.min_bytes(), 1_i32.to_le_bytes());
Expand All @@ -1654,10 +1655,10 @@ mod tests {
assert!(stats.distinct_count().is_none());

let reader = SerializedPageReader::new(
std::io::Cursor::new(buf),
7,
Compression::UNCOMPRESSED,
Type::INT32,
Arc::new(Bytes::from(buf)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought Bytes were already ref counted -- is there any need to wrap this in an additional Arc?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the signature needs Arc<T: ChunkReader> because ChunkReader doesn't impl Clone

&metadata,
rows_written as usize,
None,
)
.unwrap();

Expand Down Expand Up @@ -1690,14 +1691,14 @@ mod tests {
.write_batch(&[1, 2, 3, 4], Some(&[1, 0, 0, 1, 1, 1]), None)
.unwrap();

let (_, _, metadata, _, _) = writer.close().unwrap();
let (_, rows_written, metadata, _, _) = writer.close().unwrap();
assert!(metadata.statistics().is_none());

let reader = SerializedPageReader::new(
std::io::Cursor::new(buf),
6,
Compression::UNCOMPRESSED,
Type::INT32,
Arc::new(Bytes::from(buf)),
&metadata,
rows_written as usize,
None,
)
.unwrap();

Expand Down Expand Up @@ -1818,16 +1819,15 @@ mod tests {
let data = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
writer.write_batch(data, None, None).unwrap();
let (bytes_written, _, _, _, _) = writer.close().unwrap();
let (_, rows_written, metadata, _, _) = writer.close().unwrap();

// Read pages and check the sequence
let source = FileSource::new(&file, 0, bytes_written as usize);
let mut page_reader = Box::new(
SerializedPageReader::new(
source,
data.len() as i64,
Compression::UNCOMPRESSED,
Int32Type::get_physical_type(),
Arc::new(file),
&metadata,
rows_written as usize,
None,
)
.unwrap(),
);
Expand Down Expand Up @@ -2201,16 +2201,14 @@ mod tests {

let values_written = writer.write_batch(values, def_levels, rep_levels).unwrap();
assert_eq!(values_written, values.len());
let (bytes_written, rows_written, column_metadata, _, _) =
writer.close().unwrap();
let (_, rows_written, column_metadata, _, _) = writer.close().unwrap();

let source = FileSource::new(&file, 0, bytes_written as usize);
let page_reader = Box::new(
SerializedPageReader::new(
source,
column_metadata.num_values(),
column_metadata.compression(),
T::get_physical_type(),
Arc::new(file),
&column_metadata,
rows_written as usize,
None,
)
.unwrap(),
);
Expand Down
2 changes: 1 addition & 1 deletion parquet/src/file/reader.rs
Expand Up @@ -45,7 +45,7 @@ pub trait Length {
/// For an object store reader, each read can be mapped to a range request.
pub trait ChunkReader: Length + Send + Sync {
type T: Read + Send;
/// get a serialy readeable slice of the current reader
/// Get a serially readable slice of the current reader
/// This should fail if the slice exceeds the current bounds
fn get_read(&self, start: u64, length: usize) -> Result<Self::T>;
}
Expand Down