From 132152cb8db5085163ee0f21d24fc867716ba6d5 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Fri, 11 Nov 2022 10:28:09 +1300 Subject: [PATCH] Update arrow-flight subcrates (#3044) (#3052) --- .github/workflows/arrow_flight.yml | 3 --- arrow-flight/Cargo.toml | 5 ++++- arrow-flight/src/lib.rs | 13 +++++++------ arrow-flight/src/sql/mod.rs | 17 ++++++++--------- arrow-flight/src/sql/server.rs | 2 +- arrow-flight/src/utils.rs | 19 ++++++++----------- 6 files changed, 28 insertions(+), 31 deletions(-) diff --git a/.github/workflows/arrow_flight.yml b/.github/workflows/arrow_flight.yml index 2825d2400f1..ab7030b05e3 100644 --- a/.github/workflows/arrow_flight.yml +++ b/.github/workflows/arrow_flight.yml @@ -27,7 +27,6 @@ on: - master pull_request: paths: - - arrow/** - arrow-array/** - arrow-buffer/** - arrow-cast/** @@ -36,8 +35,6 @@ on: - arrow-select/** - arrow-flight/** - arrow-ipc/** - - arrow-csv/** - - arrow-json/** - .github/** jobs: diff --git a/arrow-flight/Cargo.toml b/arrow-flight/Cargo.toml index 394fb98c3b2..085c8c50613 100644 --- a/arrow-flight/Cargo.toml +++ b/arrow-flight/Cargo.toml @@ -27,7 +27,10 @@ repository = "https://github.com/apache/arrow-rs" license = "Apache-2.0" [dependencies] -arrow = { path = "../arrow", version = "26.0.0", default-features = false, features = ["ipc"] } +arrow-array = { version = "26.0.0", path = "../arrow-array" } +arrow-buffer = { version = "26.0.0", path = "../arrow-buffer" } +arrow-ipc = { version = "26.0.0", path = "../arrow-ipc" } +arrow-schema = { version = "26.0.0", path = "../arrow-schema" } base64 = { version = "0.13", default-features = false } tonic = { version = "0.8", default-features = false, features = ["transport", "codegen", "prost"] } bytes = { version = "1", default-features = false } diff --git a/arrow-flight/src/lib.rs b/arrow-flight/src/lib.rs index 1f4bcc6c434..e742dbbe1a7 100644 --- a/arrow-flight/src/lib.rs +++ b/arrow-flight/src/lib.rs @@ -17,17 +17,18 @@ #![allow(rustdoc::invalid_html_tags)] -use arrow::datatypes::Schema; -use arrow::error::{ArrowError, Result as ArrowResult}; -use arrow::ipc::{convert, writer, writer::EncodedData, writer::IpcWriteOptions}; +use arrow_ipc::{convert, writer, writer::EncodedData, writer::IpcWriteOptions}; +use arrow_schema::{ArrowError, Schema}; -use arrow::ipc::convert::try_schema_from_ipc_buffer; +use arrow_ipc::convert::try_schema_from_ipc_buffer; use std::{ convert::{TryFrom, TryInto}, fmt, ops::Deref, }; +type ArrowResult = std::result::Result; + #[allow(clippy::derive_partial_eq_without_eq)] mod gen { @@ -399,8 +400,8 @@ impl<'a> SchemaAsIpc<'a> { #[cfg(test)] mod tests { use super::*; - use arrow::datatypes::{DataType, Field, TimeUnit}; - use arrow::ipc::MetadataVersion; + use arrow_ipc::MetadataVersion; + use arrow_schema::{DataType, Field, TimeUnit}; struct TestVector(Vec, usize); diff --git a/arrow-flight/src/sql/mod.rs b/arrow-flight/src/sql/mod.rs index 30bdcb5604f..a5d4c4c3436 100644 --- a/arrow-flight/src/sql/mod.rs +++ b/arrow-flight/src/sql/mod.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use arrow::error::{ArrowError, Result as ArrowResult}; +use arrow_schema::ArrowError; use prost::Message; mod gen { @@ -122,10 +122,10 @@ pub trait ProstAnyExt { /// /// * `Ok(None)` when message type mismatch /// * `Err` when parse failed - fn unpack(&self) -> ArrowResult>; + fn unpack(&self) -> Result, ArrowError>; /// Pack any message into `prost_types::Any` value. - fn pack(message: &M) -> ArrowResult; + fn pack(message: &M) -> Result; } impl ProstAnyExt for prost_types::Any { @@ -133,7 +133,7 @@ impl ProstAnyExt for prost_types::Any { M::type_url() == self.type_url } - fn unpack(&self) -> ArrowResult> { + fn unpack(&self) -> Result, ArrowError> { if !self.is::() { return Ok(None); } @@ -143,7 +143,7 @@ impl ProstAnyExt for prost_types::Any { Ok(Some(m)) } - fn pack(message: &M) -> ArrowResult { + fn pack(message: &M) -> Result { Ok(message.as_any()) } } @@ -165,14 +165,13 @@ mod tests { } #[test] - fn test_prost_any_pack_unpack() -> ArrowResult<()> { + fn test_prost_any_pack_unpack() { let query = CommandStatementQuery { query: "select 1".to_string(), }; - let any = prost_types::Any::pack(&query)?; + let any = prost_types::Any::pack(&query).unwrap(); assert!(any.is::()); - let unpack_query: CommandStatementQuery = any.unpack()?.unwrap(); + let unpack_query: CommandStatementQuery = any.unpack().unwrap().unwrap(); assert_eq!(query, unpack_query); - Ok(()) } } diff --git a/arrow-flight/src/sql/server.rs b/arrow-flight/src/sql/server.rs index 525c721aa2b..d78474849af 100644 --- a/arrow-flight/src/sql/server.rs +++ b/arrow-flight/src/sql/server.rs @@ -589,6 +589,6 @@ fn decode_error_to_status(err: prost::DecodeError) -> Status { Status::invalid_argument(format!("{:?}", err)) } -fn arrow_error_to_status(err: arrow::error::ArrowError) -> Status { +fn arrow_error_to_status(err: arrow_schema::ArrowError) -> Status { Status::internal(format!("{:?}", err)) } diff --git a/arrow-flight/src/utils.rs b/arrow-flight/src/utils.rs index 4a30b2d5aef..49f9c47db6d 100644 --- a/arrow-flight/src/utils.rs +++ b/arrow-flight/src/utils.rs @@ -20,13 +20,10 @@ use crate::{FlightData, IpcMessage, SchemaAsIpc, SchemaResult}; use std::collections::HashMap; -use arrow::array::ArrayRef; -use arrow::buffer::Buffer; -use arrow::datatypes::{Schema, SchemaRef}; -use arrow::error::{ArrowError, Result}; -use arrow::ipc::{reader, writer, writer::IpcWriteOptions}; -use arrow::record_batch::RecordBatch; -use std::convert::TryInto; +use arrow_array::{ArrayRef, RecordBatch}; +use arrow_buffer::Buffer; +use arrow_ipc::{reader, writer, writer::IpcWriteOptions}; +use arrow_schema::{ArrowError, Schema, SchemaRef}; /// Convert a `RecordBatch` to a vector of `FlightData` representing the bytes of the dictionaries /// and a `FlightData` representing the bytes of the batch's values @@ -52,9 +49,9 @@ pub fn flight_data_to_arrow_batch( data: &FlightData, schema: SchemaRef, dictionaries_by_id: &HashMap, -) -> Result { +) -> Result { // check that the data_header is a record batch message - let message = arrow::ipc::root_as_message(&data.data_header[..]).map_err(|err| { + let message = arrow_ipc::root_as_message(&data.data_header[..]).map_err(|err| { ArrowError::ParseError(format!("Unable to get root as message: {:?}", err)) })?; @@ -85,7 +82,7 @@ pub fn flight_data_to_arrow_batch( pub fn flight_schema_from_arrow_schema( schema: &Schema, options: &IpcWriteOptions, -) -> Result { +) -> Result { SchemaAsIpc::new(schema, options).try_into() } @@ -109,7 +106,7 @@ pub fn flight_data_from_arrow_schema( pub fn ipc_message_from_arrow_schema( schema: &Schema, options: &IpcWriteOptions, -) -> Result> { +) -> Result, ArrowError> { let message = SchemaAsIpc::new(schema, options).try_into()?; let IpcMessage(vals) = message; Ok(vals)