Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add page index reader test and support empty index. #2011

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
106 changes: 53 additions & 53 deletions arrow-flight/src/arrow.flight.protocol.rs
Expand Up @@ -217,20 +217,20 @@ pub mod flight_service_client {
impl FlightServiceClient<tonic::transport::Channel> {
/// Attempt to create a new client by connecting to a given endpoint.
pub async fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error>
where
D: std::convert::TryInto<tonic::transport::Endpoint>,
D::Error: Into<StdError>,
where
D: std::convert::TryInto<tonic::transport::Endpoint>,
D::Error: Into<StdError>,
{
let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;
Ok(Self::new(conn))
}
}
impl<T> FlightServiceClient<T>
where
T: tonic::client::GrpcService<tonic::body::BoxBody>,
T::Error: Into<StdError>,
T::ResponseBody: Body<Data = Bytes> + Send + 'static,
<T::ResponseBody as Body>::Error: Into<StdError> + Send,
where
T: tonic::client::GrpcService<tonic::body::BoxBody>,
T::Error: Into<StdError>,
T::ResponseBody: Body<Data = Bytes> + Send + 'static,
<T::ResponseBody as Body>::Error: Into<StdError> + Send,
{
pub fn new(inner: T) -> Self {
let inner = tonic::client::Grpc::new(inner);
Expand All @@ -240,18 +240,18 @@ pub mod flight_service_client {
inner: T,
interceptor: F,
) -> FlightServiceClient<InterceptedService<T, F>>
where
F: tonic::service::Interceptor,
T::ResponseBody: Default,
T: tonic::codegen::Service<
http::Request<tonic::body::BoxBody>,
Response = http::Response<
<T as tonic::client::GrpcService<tonic::body::BoxBody>>::ResponseBody,
where
F: tonic::service::Interceptor,
T::ResponseBody: Default,
T: tonic::codegen::Service<
http::Request<tonic::body::BoxBody>,
Response = http::Response<
<T as tonic::client::GrpcService<tonic::body::BoxBody>>::ResponseBody,
>,
>,
>,
<T as tonic::codegen::Service<
http::Request<tonic::body::BoxBody>,
>>::Error: Into<StdError> + Send + Sync,
<T as tonic::codegen::Service<
http::Request<tonic::body::BoxBody>,
>>::Error: Into<StdError> + Send + Sync,
{
FlightServiceClient::new(InterceptedService::new(inner, interceptor))
}
Expand Down Expand Up @@ -530,10 +530,10 @@ pub mod flight_service_server {
pub trait FlightService: Send + Sync + 'static {
///Server streaming response type for the Handshake method.
type HandshakeStream: futures_core::Stream<
Item = Result<super::HandshakeResponse, tonic::Status>,
>
+ Send
+ 'static;
Item = Result<super::HandshakeResponse, tonic::Status>,
>
+ Send
+ 'static;
///
/// Handshake between client and server. Depending on the server, the
/// handshake may be required to determine the token that should be used for
Expand All @@ -545,10 +545,10 @@ pub mod flight_service_server {
) -> Result<tonic::Response<Self::HandshakeStream>, tonic::Status>;
///Server streaming response type for the ListFlights method.
type ListFlightsStream: futures_core::Stream<
Item = Result<super::FlightInfo, tonic::Status>,
>
+ Send
+ 'static;
Item = Result<super::FlightInfo, tonic::Status>,
>
+ Send
+ 'static;
///
/// Get a list of available streams given a particular criteria. Most flight
/// services will expose one or more streams that are readily available for
Expand Down Expand Up @@ -586,10 +586,10 @@ pub mod flight_service_server {
) -> Result<tonic::Response<super::SchemaResult>, tonic::Status>;
///Server streaming response type for the DoGet method.
type DoGetStream: futures_core::Stream<
Item = Result<super::FlightData, tonic::Status>,
>
+ Send
+ 'static;
Item = Result<super::FlightData, tonic::Status>,
>
+ Send
+ 'static;
///
/// Retrieve a single stream associated with a particular descriptor
/// associated with the referenced ticket. A Flight can be composed of one or
Expand All @@ -601,10 +601,10 @@ pub mod flight_service_server {
) -> Result<tonic::Response<Self::DoGetStream>, tonic::Status>;
///Server streaming response type for the DoPut method.
type DoPutStream: futures_core::Stream<
Item = Result<super::PutResult, tonic::Status>,
>
+ Send
+ 'static;
Item = Result<super::PutResult, tonic::Status>,
>
+ Send
+ 'static;
///
/// Push a stream to the flight service associated with a particular
/// flight stream. This allows a client of a flight service to upload a stream
Expand All @@ -618,10 +618,10 @@ pub mod flight_service_server {
) -> Result<tonic::Response<Self::DoPutStream>, tonic::Status>;
///Server streaming response type for the DoExchange method.
type DoExchangeStream: futures_core::Stream<
Item = Result<super::FlightData, tonic::Status>,
>
+ Send
+ 'static;
Item = Result<super::FlightData, tonic::Status>,
>
+ Send
+ 'static;
///
/// Open a bidirectional data channel for a given descriptor. This
/// allows clients to send and receive arbitrary Arrow data and
Expand All @@ -634,10 +634,10 @@ pub mod flight_service_server {
) -> Result<tonic::Response<Self::DoExchangeStream>, tonic::Status>;
///Server streaming response type for the DoAction method.
type DoActionStream: futures_core::Stream<
Item = Result<super::Result, tonic::Status>,
>
+ Send
+ 'static;
Item = Result<super::Result, tonic::Status>,
>
+ Send
+ 'static;
///
/// Flight services can support an arbitrary number of simple actions in
/// addition to the possible ListFlights, GetFlightInfo, DoGet, DoPut
Expand All @@ -651,10 +651,10 @@ pub mod flight_service_server {
) -> Result<tonic::Response<Self::DoActionStream>, tonic::Status>;
///Server streaming response type for the ListActions method.
type ListActionsStream: futures_core::Stream<
Item = Result<super::ActionType, tonic::Status>,
>
+ Send
+ 'static;
Item = Result<super::ActionType, tonic::Status>,
>
+ Send
+ 'static;
///
/// A flight service exposes all of the available action types that it has
/// along with descriptions. This allows different flight consumers to
Expand Down Expand Up @@ -692,17 +692,17 @@ pub mod flight_service_server {
inner: T,
interceptor: F,
) -> InterceptedService<Self, F>
where
F: tonic::service::Interceptor,
where
F: tonic::service::Interceptor,
{
InterceptedService::new(Self::new(inner), interceptor)
}
}
impl<T, B> tonic::codegen::Service<http::Request<B>> for FlightServiceServer<T>
where
T: FlightService,
B: Body + Send + 'static,
B::Error: Into<StdError> + Send + 'static,
where
T: FlightService,
B: Body + Send + 'static,
B::Error: Into<StdError> + Send + 'static,
{
type Response = http::Response<tonic::body::BoxBody>;
type Error = std::convert::Infallible;
Expand Down
14 changes: 8 additions & 6 deletions parquet/src/file/metadata.rs
Expand Up @@ -55,8 +55,10 @@ use crate::schema::types::{
pub struct ParquetMetaData {
file_metadata: FileMetaData,
row_groups: Vec<RowGroupMetaData>,
page_indexes: Option<Vec<Index>>,
offset_indexes: Option<Vec<Vec<PageLocation>>>,
/// Page index for all pages in each column chunk
page_indexes: Option<Vec<Vec<Index>>>,
/// Offset index for all pages in each column chunk
offset_indexes: Option<Vec<Vec<Vec<PageLocation>>>>,
}

impl ParquetMetaData {
Expand All @@ -74,8 +76,8 @@ impl ParquetMetaData {
pub fn new_with_page_index(
file_metadata: FileMetaData,
row_groups: Vec<RowGroupMetaData>,
page_indexes: Option<Vec<Index>>,
offset_indexes: Option<Vec<Vec<PageLocation>>>,
page_indexes: Option<Vec<Vec<Index>>>,
offset_indexes: Option<Vec<Vec<Vec<PageLocation>>>>,
) -> Self {
ParquetMetaData {
file_metadata,
Expand Down Expand Up @@ -107,12 +109,12 @@ impl ParquetMetaData {
}

/// Returns page indexes in this file.
pub fn page_indexes(&self) -> Option<&Vec<Index>> {
pub fn page_indexes(&self) -> Option<&Vec<Vec<Index>>> {
self.page_indexes.as_ref()
}

/// Returns offset indexes in this file.
pub fn offset_indexes(&self) -> Option<&Vec<Vec<PageLocation>>> {
pub fn offset_indexes(&self) -> Option<&Vec<Vec<Vec<PageLocation>>>> {
self.offset_indexes.as_ref()
}
}
Expand Down
1 change: 1 addition & 0 deletions parquet/src/file/page_index/index.rs
Expand Up @@ -56,6 +56,7 @@ pub enum Index {
DOUBLE(NativeIndex<f64>),
BYTE_ARRAY(ByteArrayIndex),
FIXED_LEN_BYTE_ARRAY(ByteArrayIndex),
EMPTY_ARRAY(),
}

/// An index of a column of [`Type`] physical representation
Expand Down
11 changes: 4 additions & 7 deletions parquet/src/file/page_index/index_reader.rs
Expand Up @@ -101,13 +101,7 @@ fn get_index_offset_and_lengths(
.iter()
.map(|x| x.column_index_length())
.map(|maybe_length| {
let index_length = maybe_length.ok_or_else(|| {
ParquetError::General(
"The column_index_length must exist if offset_index_offset exists"
.to_string(),
)
})?;

let index_length = maybe_length.unwrap_or(0);
Ok(index_length.try_into().unwrap())
})
.collect::<Result<Vec<_>, ParquetError>>()?;
Expand Down Expand Up @@ -143,6 +137,9 @@ fn deserialize_column_index(
data: &[u8],
column_type: Type,
) -> Result<Index, ParquetError> {
if data.is_empty() {
return Ok(Index::EMPTY_ARRAY());
}
let mut d = Cursor::new(data);
let mut prot = TCompactInputProtocol::new(&mut d);

Expand Down