Skip to content

Commit

Permalink
Push ChunkReader into SerializedPageReader (#2463) (#2464)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Aug 17, 2022
1 parent ecc6210 commit 63ab69e
Show file tree
Hide file tree
Showing 6 changed files with 238 additions and 315 deletions.
46 changes: 22 additions & 24 deletions parquet/src/column/writer/mod.rs
Expand Up @@ -1082,6 +1082,7 @@ fn compare_greater_byte_array_decimals(a: &[u8], b: &[u8]) -> bool {

#[cfg(test)]
mod tests {
use bytes::Bytes;
use parquet_format::BoundaryOrder;
use rand::distributions::uniform::SampleUniform;
use std::sync::Arc;
Expand All @@ -1096,7 +1097,7 @@ mod tests {
writer::SerializedPageWriter,
};
use crate::schema::types::{ColumnDescriptor, ColumnPath, Type as SchemaType};
use crate::util::{io::FileSource, test_common::rand_gen::random_numbers_range};
use crate::util::test_common::rand_gen::random_numbers_range;

use super::*;

Expand Down Expand Up @@ -1645,7 +1646,7 @@ mod tests {
)
.unwrap();

let (_, _, metadata, _, _) = writer.close().unwrap();
let (_, rows_written, metadata, _, _) = writer.close().unwrap();

let stats = metadata.statistics().unwrap();
assert_eq!(stats.min_bytes(), 1_i32.to_le_bytes());
Expand All @@ -1654,10 +1655,10 @@ mod tests {
assert!(stats.distinct_count().is_none());

let reader = SerializedPageReader::new(
std::io::Cursor::new(buf),
7,
Compression::UNCOMPRESSED,
Type::INT32,
Arc::new(Bytes::from(buf)),
&metadata,
rows_written as usize,
None,
)
.unwrap();

Expand Down Expand Up @@ -1690,14 +1691,14 @@ mod tests {
.write_batch(&[1, 2, 3, 4], Some(&[1, 0, 0, 1, 1, 1]), None)
.unwrap();

let (_, _, metadata, _, _) = writer.close().unwrap();
let (_, rows_written, metadata, _, _) = writer.close().unwrap();
assert!(metadata.statistics().is_none());

let reader = SerializedPageReader::new(
std::io::Cursor::new(buf),
6,
Compression::UNCOMPRESSED,
Type::INT32,
Arc::new(Bytes::from(buf)),
&metadata,
rows_written as usize,
None,
)
.unwrap();

Expand Down Expand Up @@ -1818,16 +1819,15 @@ mod tests {
let data = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
writer.write_batch(data, None, None).unwrap();
let (bytes_written, _, _, _, _) = writer.close().unwrap();
let (_, rows_written, metadata, _, _) = writer.close().unwrap();

// Read pages and check the sequence
let source = FileSource::new(&file, 0, bytes_written as usize);
let mut page_reader = Box::new(
SerializedPageReader::new(
source,
data.len() as i64,
Compression::UNCOMPRESSED,
Int32Type::get_physical_type(),
Arc::new(file),
&metadata,
rows_written as usize,
None,
)
.unwrap(),
);
Expand Down Expand Up @@ -2201,16 +2201,14 @@ mod tests {

let values_written = writer.write_batch(values, def_levels, rep_levels).unwrap();
assert_eq!(values_written, values.len());
let (bytes_written, rows_written, column_metadata, _, _) =
writer.close().unwrap();
let (_, rows_written, column_metadata, _, _) = writer.close().unwrap();

let source = FileSource::new(&file, 0, bytes_written as usize);
let page_reader = Box::new(
SerializedPageReader::new(
source,
column_metadata.num_values(),
column_metadata.compression(),
T::get_physical_type(),
Arc::new(file),
&column_metadata,
rows_written as usize,
None,
)
.unwrap(),
);
Expand Down
2 changes: 1 addition & 1 deletion parquet/src/file/reader.rs
Expand Up @@ -45,7 +45,7 @@ pub trait Length {
/// For an object store reader, each read can be mapped to a range request.
pub trait ChunkReader: Length + Send + Sync {
type T: Read + Send;
/// get a serialy readeable slice of the current reader
/// Get a serially readable slice of the current reader
/// This should fail if the slice exceeds the current bounds
fn get_read(&self, start: u64, length: usize) -> Result<Self::T>;
}
Expand Down

0 comments on commit 63ab69e

Please sign in to comment.