diff --git a/arrow-flight/src/arrow.flight.protocol.rs b/arrow-flight/src/arrow.flight.protocol.rs index c76469b39ce..1cf47ab75fb 100644 --- a/arrow-flight/src/arrow.flight.protocol.rs +++ b/arrow-flight/src/arrow.flight.protocol.rs @@ -217,20 +217,20 @@ pub mod flight_service_client { impl FlightServiceClient { /// Attempt to create a new client by connecting to a given endpoint. pub async fn connect(dst: D) -> Result - where - D: std::convert::TryInto, - D::Error: Into, + where + D: std::convert::TryInto, + D::Error: Into, { let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; Ok(Self::new(conn)) } } impl FlightServiceClient - where - T: tonic::client::GrpcService, - T::Error: Into, - T::ResponseBody: Body + Send + 'static, - ::Error: Into + Send, + where + T: tonic::client::GrpcService, + T::Error: Into, + T::ResponseBody: Body + Send + 'static, + ::Error: Into + Send, { pub fn new(inner: T) -> Self { let inner = tonic::client::Grpc::new(inner); @@ -240,18 +240,18 @@ pub mod flight_service_client { inner: T, interceptor: F, ) -> FlightServiceClient> - where - F: tonic::service::Interceptor, - T::ResponseBody: Default, - T: tonic::codegen::Service< - http::Request, - Response = http::Response< - >::ResponseBody, + where + F: tonic::service::Interceptor, + T::ResponseBody: Default, + T: tonic::codegen::Service< + http::Request, + Response = http::Response< + >::ResponseBody, + >, >, - >, - , - >>::Error: Into + Send + Sync, + , + >>::Error: Into + Send + Sync, { FlightServiceClient::new(InterceptedService::new(inner, interceptor)) } @@ -530,10 +530,10 @@ pub mod flight_service_server { pub trait FlightService: Send + Sync + 'static { ///Server streaming response type for the Handshake method. type HandshakeStream: futures_core::Stream< - Item = Result, - > - + Send - + 'static; + Item = Result, + > + + Send + + 'static; /// /// Handshake between client and server. Depending on the server, the /// handshake may be required to determine the token that should be used for @@ -545,10 +545,10 @@ pub mod flight_service_server { ) -> Result, tonic::Status>; ///Server streaming response type for the ListFlights method. type ListFlightsStream: futures_core::Stream< - Item = Result, - > - + Send - + 'static; + Item = Result, + > + + Send + + 'static; /// /// Get a list of available streams given a particular criteria. Most flight /// services will expose one or more streams that are readily available for @@ -586,10 +586,10 @@ pub mod flight_service_server { ) -> Result, tonic::Status>; ///Server streaming response type for the DoGet method. type DoGetStream: futures_core::Stream< - Item = Result, - > - + Send - + 'static; + Item = Result, + > + + Send + + 'static; /// /// Retrieve a single stream associated with a particular descriptor /// associated with the referenced ticket. A Flight can be composed of one or @@ -601,10 +601,10 @@ pub mod flight_service_server { ) -> Result, tonic::Status>; ///Server streaming response type for the DoPut method. type DoPutStream: futures_core::Stream< - Item = Result, - > - + Send - + 'static; + Item = Result, + > + + Send + + 'static; /// /// Push a stream to the flight service associated with a particular /// flight stream. This allows a client of a flight service to upload a stream @@ -618,10 +618,10 @@ pub mod flight_service_server { ) -> Result, tonic::Status>; ///Server streaming response type for the DoExchange method. type DoExchangeStream: futures_core::Stream< - Item = Result, - > - + Send - + 'static; + Item = Result, + > + + Send + + 'static; /// /// Open a bidirectional data channel for a given descriptor. This /// allows clients to send and receive arbitrary Arrow data and @@ -634,10 +634,10 @@ pub mod flight_service_server { ) -> Result, tonic::Status>; ///Server streaming response type for the DoAction method. type DoActionStream: futures_core::Stream< - Item = Result, - > - + Send - + 'static; + Item = Result, + > + + Send + + 'static; /// /// Flight services can support an arbitrary number of simple actions in /// addition to the possible ListFlights, GetFlightInfo, DoGet, DoPut @@ -651,10 +651,10 @@ pub mod flight_service_server { ) -> Result, tonic::Status>; ///Server streaming response type for the ListActions method. type ListActionsStream: futures_core::Stream< - Item = Result, - > - + Send - + 'static; + Item = Result, + > + + Send + + 'static; /// /// A flight service exposes all of the available action types that it has /// along with descriptions. This allows different flight consumers to @@ -692,17 +692,17 @@ pub mod flight_service_server { inner: T, interceptor: F, ) -> InterceptedService - where - F: tonic::service::Interceptor, + where + F: tonic::service::Interceptor, { InterceptedService::new(Self::new(inner), interceptor) } } impl tonic::codegen::Service> for FlightServiceServer - where - T: FlightService, - B: Body + Send + 'static, - B::Error: Into + Send + 'static, + where + T: FlightService, + B: Body + Send + 'static, + B::Error: Into + Send + 'static, { type Response = http::Response; type Error = std::convert::Infallible; diff --git a/parquet/src/file/metadata.rs b/parquet/src/file/metadata.rs index 7ec29de0173..ad8fe16ad8b 100644 --- a/parquet/src/file/metadata.rs +++ b/parquet/src/file/metadata.rs @@ -55,8 +55,10 @@ use crate::schema::types::{ pub struct ParquetMetaData { file_metadata: FileMetaData, row_groups: Vec, - page_indexes: Option>, - offset_indexes: Option>>, + /// Page index for all pages in each column chunk + page_indexes: Option>>, + /// Offset index for all pages in each column chunk + offset_indexes: Option>>>, } impl ParquetMetaData { @@ -74,8 +76,8 @@ impl ParquetMetaData { pub fn new_with_page_index( file_metadata: FileMetaData, row_groups: Vec, - page_indexes: Option>, - offset_indexes: Option>>, + page_indexes: Option>>, + offset_indexes: Option>>>, ) -> Self { ParquetMetaData { file_metadata, @@ -107,12 +109,12 @@ impl ParquetMetaData { } /// Returns page indexes in this file. - pub fn page_indexes(&self) -> Option<&Vec> { + pub fn page_indexes(&self) -> Option<&Vec>> { self.page_indexes.as_ref() } /// Returns offset indexes in this file. - pub fn offset_indexes(&self) -> Option<&Vec>> { + pub fn offset_indexes(&self) -> Option<&Vec>>> { self.offset_indexes.as_ref() } } diff --git a/parquet/src/file/page_index/index.rs b/parquet/src/file/page_index/index.rs index e97826c63b4..a932ede79bc 100644 --- a/parquet/src/file/page_index/index.rs +++ b/parquet/src/file/page_index/index.rs @@ -56,6 +56,7 @@ pub enum Index { DOUBLE(NativeIndex), BYTE_ARRAY(ByteArrayIndex), FIXED_LEN_BYTE_ARRAY(ByteArrayIndex), + EMPTY_ARRAY(), } /// An index of a column of [`Type`] physical representation diff --git a/parquet/src/file/page_index/index_reader.rs b/parquet/src/file/page_index/index_reader.rs index 8414480903f..5621e59940a 100644 --- a/parquet/src/file/page_index/index_reader.rs +++ b/parquet/src/file/page_index/index_reader.rs @@ -101,13 +101,7 @@ fn get_index_offset_and_lengths( .iter() .map(|x| x.column_index_length()) .map(|maybe_length| { - let index_length = maybe_length.ok_or_else(|| { - ParquetError::General( - "The column_index_length must exist if offset_index_offset exists" - .to_string(), - ) - })?; - + let index_length = maybe_length.unwrap_or(0); Ok(index_length.try_into().unwrap()) }) .collect::, ParquetError>>()?; @@ -143,6 +137,9 @@ fn deserialize_column_index( data: &[u8], column_type: Type, ) -> Result { + if data.is_empty() { + return Ok(Index::EMPTY_ARRAY()); + } let mut d = Cursor::new(data); let mut prot = TCompactInputProtocol::new(&mut d); diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index 6ff73e041e8..bd0a8f2f3d0 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -248,13 +248,17 @@ impl SerializedFileReader { } if options.enable_page_index { - //Todo for now test data `data_index_bloom_encoding_stats.parquet` only have one rowgroup - //support multi after create multi-RG test data. - let cols = metadata.row_group(0); - let columns_indexes = - index_reader::read_columns_indexes(&chunk_reader, cols.columns())?; - let pages_locations = - index_reader::read_pages_locations(&chunk_reader, cols.columns())?; + let mut columns_indexes = vec![]; + let mut offset_indexes = vec![]; + + for rg in &filtered_row_groups { + let column_index = + index_reader::read_columns_indexes(&chunk_reader, rg.columns())?; + let offset_index = + index_reader::read_pages_locations(&chunk_reader, rg.columns())?; + columns_indexes.push(column_index); + offset_indexes.push(offset_index); + } Ok(Self { chunk_reader: Arc::new(chunk_reader), @@ -262,7 +266,7 @@ impl SerializedFileReader { metadata.file_metadata().clone(), filtered_row_groups, Some(columns_indexes), - Some(pages_locations), + Some(offset_indexes), ), }) } else { @@ -561,10 +565,13 @@ impl PageReader for SerializedPageReader { mod tests { use super::*; use crate::basic::{self, ColumnOrder}; - use crate::file::page_index::index::Index; + use crate::data_type::private::ParquetValueType; + use crate::file::page_index::index::{ByteArrayIndex, Index, NativeIndex}; use crate::record::RowAccessor; use crate::schema::parser::parse_message_type; + use crate::util::bit_util::from_le_slice; use crate::util::test_common::{get_test_file, get_test_path}; + use arrow::datatypes::ToByteSlice; use parquet_format::BoundaryOrder; use std::sync::Arc; @@ -1077,7 +1084,7 @@ mod tests { // only one row group assert_eq!(page_indexes.len(), 1); - let index = if let Index::BYTE_ARRAY(index) = page_indexes.get(0).unwrap() { + let index = if let Index::BYTE_ARRAY(index) = &page_indexes[0][0] { index } else { unreachable!() @@ -1089,7 +1096,7 @@ mod tests { //only one page group assert_eq!(index_in_pages.len(), 1); - let page0 = index_in_pages.get(0).unwrap(); + let page0 = &index_in_pages[0]; let min = page0.min.as_ref().unwrap(); let max = page0.max.as_ref().unwrap(); assert_eq!("Hello", std::str::from_utf8(min.as_slice()).unwrap()); @@ -1098,11 +1105,292 @@ mod tests { let offset_indexes = metadata.offset_indexes().unwrap(); // only one row group assert_eq!(offset_indexes.len(), 1); - let offset_index = offset_indexes.get(0).unwrap(); - let page_offset = offset_index.get(0).unwrap(); + let offset_index = &offset_indexes[0]; + let page_offset = &offset_index[0][0]; assert_eq!(4, page_offset.offset); assert_eq!(152, page_offset.compressed_page_size); assert_eq!(0, page_offset.first_row_index); } + + #[test] + fn test_page_index_reader_all_type() { + let test_file = get_test_file("alltypes_tiny_pages_plain.parquet"); + let builder = ReadOptionsBuilder::new(); + //enable read page index + let options = builder.with_page_index().build(); + let reader_result = SerializedFileReader::new_with_options(test_file, options); + let reader = reader_result.unwrap(); + + // Test contents in Parquet metadata + let metadata = reader.metadata(); + assert_eq!(metadata.num_row_groups(), 1); + + let page_indexes = metadata.page_indexes().unwrap(); + let row_group_offset_indexes = &metadata.offset_indexes().unwrap()[0]; + + // only one row group + assert_eq!(page_indexes.len(), 1); + let row_group_metadata = metadata.row_group(0); + + //col0->id: INT32 UNCOMPRESSED DO:0 FPO:4 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 7299, num_nulls: 0] + if let Index::INT32(index) = &page_indexes[0][0] { + check_native_page_index( + index, + 325, + row_group_metadata + .column(0) + .statistics() + .unwrap() + .min_bytes(), + row_group_metadata + .column(0) + .statistics() + .unwrap() + .max_bytes(), + BoundaryOrder::Unordered, + ); + assert_eq!(row_group_offset_indexes[0].len(), 325); + } else { + unreachable!() + }; + //col1->bool_col:BOOLEAN UNCOMPRESSED DO:0 FPO:37329 SZ:3022/3022/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: false, max: true, num_nulls: 0] + if let Index::BOOLEAN(index) = &page_indexes[0][1] { + assert_eq!(index.indexes.len(), 82); + assert_eq!(row_group_offset_indexes[1].len(), 82); + } else { + unreachable!() + }; + //col2->tinyint_col: INT32 UNCOMPRESSED DO:0 FPO:40351 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0] + if let Index::INT32(index) = &page_indexes[0][2] { + check_native_page_index( + index, + 325, + row_group_metadata + .column(0) + .statistics() + .unwrap() + .min_bytes(), + row_group_metadata + .column(0) + .statistics() + .unwrap() + .max_bytes(), + BoundaryOrder::Ascending, + ); + assert_eq!(row_group_offset_indexes[2].len(), 325); + } else { + unreachable!() + }; + //col4->smallint_col: INT32 UNCOMPRESSED DO:0 FPO:77676 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0] + if let Index::INT32(index) = &page_indexes[0][3] { + check_native_page_index( + index, + 325, + row_group_metadata + .column(0) + .statistics() + .unwrap() + .min_bytes(), + row_group_metadata + .column(0) + .statistics() + .unwrap() + .max_bytes(), + BoundaryOrder::Ascending, + ); + assert_eq!(row_group_offset_indexes[3].len(), 325); + } else { + unreachable!() + }; + //col5->smallint_col: INT32 UNCOMPRESSED DO:0 FPO:77676 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0] + if let Index::INT32(index) = &page_indexes[0][4] { + check_native_page_index( + index, + 325, + row_group_metadata + .column(0) + .statistics() + .unwrap() + .min_bytes(), + row_group_metadata + .column(0) + .statistics() + .unwrap() + .max_bytes(), + BoundaryOrder::Ascending, + ); + assert_eq!(row_group_offset_indexes[4].len(), 325); + } else { + unreachable!() + }; + //col6->bigint_col: INT64 UNCOMPRESSED DO:0 FPO:152326 SZ:71598/71598/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 90, num_nulls: 0] + if let Index::INT64(index) = &page_indexes[0][5] { + //Todo row_group_metadata.column(0).statistics().unwrap().min_bytes() only return 4 bytes + check_native_page_index( + index, + 528, + 0_i64.to_byte_slice(), + 100_i64.to_byte_slice(), + BoundaryOrder::Unordered, + ); + assert_eq!(row_group_offset_indexes[5].len(), 528); + } else { + unreachable!() + }; + //col7->float_col: FLOAT UNCOMPRESSED DO:0 FPO:223924 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: -0.0, max: 9.9, num_nulls: 0] + if let Index::FLOAT(index) = &page_indexes[0][6] { + check_native_page_index( + index, + 325, + row_group_metadata + .column(0) + .statistics() + .unwrap() + .min_bytes(), + row_group_metadata + .column(0) + .statistics() + .unwrap() + .max_bytes(), + BoundaryOrder::Ascending, + ); + assert_eq!(row_group_offset_indexes[6].len(), 325); + } else { + unreachable!() + }; + //col8->double_col: DOUBLE UNCOMPRESSED DO:0 FPO:261249 SZ:71598/71598/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: -0.0, max: 90.89999999999999, num_nulls: 0] + if let Index::DOUBLE(index) = &page_indexes[0][7] { + check_native_page_index( + index, + 528, + 0_i64.to_byte_slice(), + 100_i64.to_byte_slice(), + BoundaryOrder::Unordered, + ); + assert_eq!(row_group_offset_indexes[7].len(), 528); + } else { + unreachable!() + }; + //col9->date_string_col: BINARY UNCOMPRESSED DO:0 FPO:332847 SZ:111948/111948/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 01/01/09, max: 12/31/10, num_nulls: 0] + if let Index::BYTE_ARRAY(index) = &page_indexes[0][8] { + check_bytes_page_index( + index, + 974, + row_group_metadata + .column(0) + .statistics() + .unwrap() + .min_bytes(), + row_group_metadata + .column(0) + .statistics() + .unwrap() + .max_bytes(), + BoundaryOrder::Unordered, + ); + assert_eq!(row_group_offset_indexes[8].len(), 974); + } else { + unreachable!() + }; + //col10->string_col: BINARY UNCOMPRESSED DO:0 FPO:444795 SZ:45298/45298/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0] + if let Index::BYTE_ARRAY(index) = &page_indexes[0][9] { + check_bytes_page_index( + index, + 352, + row_group_metadata + .column(0) + .statistics() + .unwrap() + .min_bytes(), + row_group_metadata + .column(0) + .statistics() + .unwrap() + .max_bytes(), + BoundaryOrder::Ascending, + ); + assert_eq!(row_group_offset_indexes[9].len(), 352); + } else { + unreachable!() + }; + //col11->timestamp_col: INT96 UNCOMPRESSED DO:0 FPO:490093 SZ:111948/111948/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[num_nulls: 0, min/max not defined] + if let Index::EMPTY_ARRAY() = &page_indexes[0][10] { + assert_eq!(row_group_offset_indexes[10].len(), 974); + } else { + unreachable!() + }; + //col12->year: INT32 UNCOMPRESSED DO:0 FPO:602041 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 2009, max: 2010, num_nulls: 0] + if let Index::INT32(index) = &page_indexes[0][11] { + check_native_page_index( + index, + 325, + row_group_metadata + .column(0) + .statistics() + .unwrap() + .min_bytes(), + row_group_metadata + .column(0) + .statistics() + .unwrap() + .max_bytes(), + BoundaryOrder::Ascending, + ); + assert_eq!(row_group_offset_indexes[11].len(), 325); + } else { + unreachable!() + }; + //col13->month: INT32 UNCOMPRESSED DO:0 FPO:639366 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 1, max: 12, num_nulls: 0] + if let Index::INT32(index) = &page_indexes[0][12] { + check_native_page_index( + index, + 325, + row_group_metadata + .column(0) + .statistics() + .unwrap() + .min_bytes(), + row_group_metadata + .column(0) + .statistics() + .unwrap() + .max_bytes(), + BoundaryOrder::Unordered, + ); + assert_eq!(row_group_offset_indexes[12].len(), 325); + } else { + unreachable!() + }; + } + + fn check_native_page_index( + row_group_index: &NativeIndex, + page_size: usize, + min_value: &[u8], + max_value: &[u8], + boundary_order: BoundaryOrder, + ) { + assert_eq!(row_group_index.indexes.len(), page_size); + assert_eq!(row_group_index.boundary_order, boundary_order); + row_group_index.indexes.iter().all(|x| { + x.min.as_ref().unwrap() >= &from_le_slice::(min_value) + && x.max.as_ref().unwrap() <= &from_le_slice::(max_value) + }); + } + + fn check_bytes_page_index( + row_group_index: &ByteArrayIndex, + page_size: usize, + min_value: &[u8], + max_value: &[u8], + boundary_order: BoundaryOrder, + ) { + assert_eq!(row_group_index.indexes.len(), page_size); + assert_eq!(row_group_index.boundary_order, boundary_order); + row_group_index.indexes.iter().all(|x| { + x.min.as_ref().unwrap().as_slice() >= min_value + && x.max.as_ref().unwrap().as_slice() <= max_value + }); + } }