Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update arrow-flight subcrates (#3044) #3052

Merged
merged 2 commits into from Nov 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 0 additions & 3 deletions .github/workflows/arrow_flight.yml
Expand Up @@ -27,7 +27,6 @@ on:
- master
pull_request:
paths:
- arrow/**
- arrow-array/**
- arrow-buffer/**
- arrow-cast/**
tustvold marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -36,8 +35,6 @@ on:
- arrow-select/**
- arrow-flight/**
- arrow-ipc/**
- arrow-csv/**
- arrow-json/**
Comment on lines -39 to -40
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added by mistake?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the time arrow-flight had a transitive dependency on them via arrow

- .github/**

jobs:
Expand Down
5 changes: 4 additions & 1 deletion arrow-flight/Cargo.toml
Expand Up @@ -27,7 +27,10 @@ repository = "https://github.com/apache/arrow-rs"
license = "Apache-2.0"

[dependencies]
arrow = { path = "../arrow", version = "26.0.0", default-features = false, features = ["ipc"] }
arrow-array = { version = "26.0.0", path = "../arrow-array" }
arrow-buffer = { version = "26.0.0", path = "../arrow-buffer" }
arrow-ipc = { version = "26.0.0", path = "../arrow-ipc" }
arrow-schema = { version = "26.0.0", path = "../arrow-schema" }
base64 = { version = "0.13", default-features = false }
tonic = { version = "0.8", default-features = false, features = ["transport", "codegen", "prost"] }
bytes = { version = "1", default-features = false }
Expand Down
13 changes: 7 additions & 6 deletions arrow-flight/src/lib.rs
Expand Up @@ -17,17 +17,18 @@

#![allow(rustdoc::invalid_html_tags)]

use arrow::datatypes::Schema;
use arrow::error::{ArrowError, Result as ArrowResult};
use arrow::ipc::{convert, writer, writer::EncodedData, writer::IpcWriteOptions};
use arrow_ipc::{convert, writer, writer::EncodedData, writer::IpcWriteOptions};
use arrow_schema::{ArrowError, Schema};

use arrow::ipc::convert::try_schema_from_ipc_buffer;
use arrow_ipc::convert::try_schema_from_ipc_buffer;
use std::{
convert::{TryFrom, TryInto},
fmt,
ops::Deref,
};

type ArrowResult<T> = std::result::Result<T, ArrowError>;

#[allow(clippy::derive_partial_eq_without_eq)]

mod gen {
Expand Down Expand Up @@ -399,8 +400,8 @@ impl<'a> SchemaAsIpc<'a> {
#[cfg(test)]
mod tests {
use super::*;
use arrow::datatypes::{DataType, Field, TimeUnit};
use arrow::ipc::MetadataVersion;
use arrow_ipc::MetadataVersion;
use arrow_schema::{DataType, Field, TimeUnit};

struct TestVector(Vec<u8>, usize);

Expand Down
17 changes: 8 additions & 9 deletions arrow-flight/src/sql/mod.rs
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use arrow::error::{ArrowError, Result as ArrowResult};
use arrow_schema::ArrowError;
use prost::Message;

mod gen {
Expand Down Expand Up @@ -122,18 +122,18 @@ pub trait ProstAnyExt {
///
/// * `Ok(None)` when message type mismatch
/// * `Err` when parse failed
fn unpack<M: ProstMessageExt>(&self) -> ArrowResult<Option<M>>;
fn unpack<M: ProstMessageExt>(&self) -> Result<Option<M>, ArrowError>;
Copy link
Member

@viirya viirya Nov 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw you defined ArrowResult at arrow-flight/src/lib.rs, aren't we going to use it in arrow-flight crate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general I've been trying to avoid aliasing Result, it was necessary in lib.rs due to name collisions


/// Pack any message into `prost_types::Any` value.
fn pack<M: ProstMessageExt>(message: &M) -> ArrowResult<prost_types::Any>;
fn pack<M: ProstMessageExt>(message: &M) -> Result<prost_types::Any, ArrowError>;
}

impl ProstAnyExt for prost_types::Any {
fn is<M: ProstMessageExt>(&self) -> bool {
M::type_url() == self.type_url
}

fn unpack<M: ProstMessageExt>(&self) -> ArrowResult<Option<M>> {
fn unpack<M: ProstMessageExt>(&self) -> Result<Option<M>, ArrowError> {
if !self.is::<M>() {
return Ok(None);
}
Expand All @@ -143,7 +143,7 @@ impl ProstAnyExt for prost_types::Any {
Ok(Some(m))
}

fn pack<M: ProstMessageExt>(message: &M) -> ArrowResult<prost_types::Any> {
fn pack<M: ProstMessageExt>(message: &M) -> Result<prost_types::Any, ArrowError> {
Ok(message.as_any())
}
}
Expand All @@ -165,14 +165,13 @@ mod tests {
}

#[test]
fn test_prost_any_pack_unpack() -> ArrowResult<()> {
fn test_prost_any_pack_unpack() {
let query = CommandStatementQuery {
query: "select 1".to_string(),
};
let any = prost_types::Any::pack(&query)?;
let any = prost_types::Any::pack(&query).unwrap();
assert!(any.is::<CommandStatementQuery>());
let unpack_query: CommandStatementQuery = any.unpack()?.unwrap();
let unpack_query: CommandStatementQuery = any.unpack().unwrap().unwrap();
assert_eq!(query, unpack_query);
Ok(())
}
}
2 changes: 1 addition & 1 deletion arrow-flight/src/sql/server.rs
Expand Up @@ -589,6 +589,6 @@ fn decode_error_to_status(err: prost::DecodeError) -> Status {
Status::invalid_argument(format!("{:?}", err))
}

fn arrow_error_to_status(err: arrow::error::ArrowError) -> Status {
fn arrow_error_to_status(err: arrow_schema::ArrowError) -> Status {
Status::internal(format!("{:?}", err))
}
19 changes: 8 additions & 11 deletions arrow-flight/src/utils.rs
Expand Up @@ -20,13 +20,10 @@
use crate::{FlightData, IpcMessage, SchemaAsIpc, SchemaResult};
use std::collections::HashMap;

use arrow::array::ArrayRef;
use arrow::buffer::Buffer;
use arrow::datatypes::{Schema, SchemaRef};
use arrow::error::{ArrowError, Result};
use arrow::ipc::{reader, writer, writer::IpcWriteOptions};
use arrow::record_batch::RecordBatch;
use std::convert::TryInto;
use arrow_array::{ArrayRef, RecordBatch};
use arrow_buffer::Buffer;
use arrow_ipc::{reader, writer, writer::IpcWriteOptions};
use arrow_schema::{ArrowError, Schema, SchemaRef};

/// Convert a `RecordBatch` to a vector of `FlightData` representing the bytes of the dictionaries
/// and a `FlightData` representing the bytes of the batch's values
Expand All @@ -52,9 +49,9 @@ pub fn flight_data_to_arrow_batch(
data: &FlightData,
schema: SchemaRef,
dictionaries_by_id: &HashMap<i64, ArrayRef>,
) -> Result<RecordBatch> {
) -> Result<RecordBatch, ArrowError> {
// check that the data_header is a record batch message
let message = arrow::ipc::root_as_message(&data.data_header[..]).map_err(|err| {
let message = arrow_ipc::root_as_message(&data.data_header[..]).map_err(|err| {
ArrowError::ParseError(format!("Unable to get root as message: {:?}", err))
})?;

Expand Down Expand Up @@ -85,7 +82,7 @@ pub fn flight_data_to_arrow_batch(
pub fn flight_schema_from_arrow_schema(
schema: &Schema,
options: &IpcWriteOptions,
) -> Result<SchemaResult> {
) -> Result<SchemaResult, ArrowError> {
SchemaAsIpc::new(schema, options).try_into()
}

Expand All @@ -109,7 +106,7 @@ pub fn flight_data_from_arrow_schema(
pub fn ipc_message_from_arrow_schema(
schema: &Schema,
options: &IpcWriteOptions,
) -> Result<Vec<u8>> {
) -> Result<Vec<u8>, ArrowError> {
let message = SchemaAsIpc::new(schema, options).try_into()?;
let IpcMessage(vals) = message;
Ok(vals)
Expand Down