Skip to content

Commit

Permalink
Update arrow-flight subcrates (#3044) (#3052)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Nov 10, 2022
1 parent 8d364fe commit 132152c
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 31 deletions.
3 changes: 0 additions & 3 deletions .github/workflows/arrow_flight.yml
Expand Up @@ -27,7 +27,6 @@ on:
- master
pull_request:
paths:
- arrow/**
- arrow-array/**
- arrow-buffer/**
- arrow-cast/**
Expand All @@ -36,8 +35,6 @@ on:
- arrow-select/**
- arrow-flight/**
- arrow-ipc/**
- arrow-csv/**
- arrow-json/**
- .github/**

jobs:
Expand Down
5 changes: 4 additions & 1 deletion arrow-flight/Cargo.toml
Expand Up @@ -27,7 +27,10 @@ repository = "https://github.com/apache/arrow-rs"
license = "Apache-2.0"

[dependencies]
arrow = { path = "../arrow", version = "26.0.0", default-features = false, features = ["ipc"] }
arrow-array = { version = "26.0.0", path = "../arrow-array" }
arrow-buffer = { version = "26.0.0", path = "../arrow-buffer" }
arrow-ipc = { version = "26.0.0", path = "../arrow-ipc" }
arrow-schema = { version = "26.0.0", path = "../arrow-schema" }
base64 = { version = "0.13", default-features = false }
tonic = { version = "0.8", default-features = false, features = ["transport", "codegen", "prost"] }
bytes = { version = "1", default-features = false }
Expand Down
13 changes: 7 additions & 6 deletions arrow-flight/src/lib.rs
Expand Up @@ -17,17 +17,18 @@

#![allow(rustdoc::invalid_html_tags)]

use arrow::datatypes::Schema;
use arrow::error::{ArrowError, Result as ArrowResult};
use arrow::ipc::{convert, writer, writer::EncodedData, writer::IpcWriteOptions};
use arrow_ipc::{convert, writer, writer::EncodedData, writer::IpcWriteOptions};
use arrow_schema::{ArrowError, Schema};

use arrow::ipc::convert::try_schema_from_ipc_buffer;
use arrow_ipc::convert::try_schema_from_ipc_buffer;
use std::{
convert::{TryFrom, TryInto},
fmt,
ops::Deref,
};

type ArrowResult<T> = std::result::Result<T, ArrowError>;

#[allow(clippy::derive_partial_eq_without_eq)]

mod gen {
Expand Down Expand Up @@ -399,8 +400,8 @@ impl<'a> SchemaAsIpc<'a> {
#[cfg(test)]
mod tests {
use super::*;
use arrow::datatypes::{DataType, Field, TimeUnit};
use arrow::ipc::MetadataVersion;
use arrow_ipc::MetadataVersion;
use arrow_schema::{DataType, Field, TimeUnit};

struct TestVector(Vec<u8>, usize);

Expand Down
17 changes: 8 additions & 9 deletions arrow-flight/src/sql/mod.rs
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use arrow::error::{ArrowError, Result as ArrowResult};
use arrow_schema::ArrowError;
use prost::Message;

mod gen {
Expand Down Expand Up @@ -122,18 +122,18 @@ pub trait ProstAnyExt {
///
/// * `Ok(None)` when message type mismatch
/// * `Err` when parse failed
fn unpack<M: ProstMessageExt>(&self) -> ArrowResult<Option<M>>;
fn unpack<M: ProstMessageExt>(&self) -> Result<Option<M>, ArrowError>;

/// Pack any message into `prost_types::Any` value.
fn pack<M: ProstMessageExt>(message: &M) -> ArrowResult<prost_types::Any>;
fn pack<M: ProstMessageExt>(message: &M) -> Result<prost_types::Any, ArrowError>;
}

impl ProstAnyExt for prost_types::Any {
fn is<M: ProstMessageExt>(&self) -> bool {
M::type_url() == self.type_url
}

fn unpack<M: ProstMessageExt>(&self) -> ArrowResult<Option<M>> {
fn unpack<M: ProstMessageExt>(&self) -> Result<Option<M>, ArrowError> {
if !self.is::<M>() {
return Ok(None);
}
Expand All @@ -143,7 +143,7 @@ impl ProstAnyExt for prost_types::Any {
Ok(Some(m))
}

fn pack<M: ProstMessageExt>(message: &M) -> ArrowResult<prost_types::Any> {
fn pack<M: ProstMessageExt>(message: &M) -> Result<prost_types::Any, ArrowError> {
Ok(message.as_any())
}
}
Expand All @@ -165,14 +165,13 @@ mod tests {
}

#[test]
fn test_prost_any_pack_unpack() -> ArrowResult<()> {
fn test_prost_any_pack_unpack() {
let query = CommandStatementQuery {
query: "select 1".to_string(),
};
let any = prost_types::Any::pack(&query)?;
let any = prost_types::Any::pack(&query).unwrap();
assert!(any.is::<CommandStatementQuery>());
let unpack_query: CommandStatementQuery = any.unpack()?.unwrap();
let unpack_query: CommandStatementQuery = any.unpack().unwrap().unwrap();
assert_eq!(query, unpack_query);
Ok(())
}
}
2 changes: 1 addition & 1 deletion arrow-flight/src/sql/server.rs
Expand Up @@ -589,6 +589,6 @@ fn decode_error_to_status(err: prost::DecodeError) -> Status {
Status::invalid_argument(format!("{:?}", err))
}

fn arrow_error_to_status(err: arrow::error::ArrowError) -> Status {
fn arrow_error_to_status(err: arrow_schema::ArrowError) -> Status {
Status::internal(format!("{:?}", err))
}
19 changes: 8 additions & 11 deletions arrow-flight/src/utils.rs
Expand Up @@ -20,13 +20,10 @@
use crate::{FlightData, IpcMessage, SchemaAsIpc, SchemaResult};
use std::collections::HashMap;

use arrow::array::ArrayRef;
use arrow::buffer::Buffer;
use arrow::datatypes::{Schema, SchemaRef};
use arrow::error::{ArrowError, Result};
use arrow::ipc::{reader, writer, writer::IpcWriteOptions};
use arrow::record_batch::RecordBatch;
use std::convert::TryInto;
use arrow_array::{ArrayRef, RecordBatch};
use arrow_buffer::Buffer;
use arrow_ipc::{reader, writer, writer::IpcWriteOptions};
use arrow_schema::{ArrowError, Schema, SchemaRef};

/// Convert a `RecordBatch` to a vector of `FlightData` representing the bytes of the dictionaries
/// and a `FlightData` representing the bytes of the batch's values
Expand All @@ -52,9 +49,9 @@ pub fn flight_data_to_arrow_batch(
data: &FlightData,
schema: SchemaRef,
dictionaries_by_id: &HashMap<i64, ArrayRef>,
) -> Result<RecordBatch> {
) -> Result<RecordBatch, ArrowError> {
// check that the data_header is a record batch message
let message = arrow::ipc::root_as_message(&data.data_header[..]).map_err(|err| {
let message = arrow_ipc::root_as_message(&data.data_header[..]).map_err(|err| {
ArrowError::ParseError(format!("Unable to get root as message: {:?}", err))
})?;

Expand Down Expand Up @@ -85,7 +82,7 @@ pub fn flight_data_to_arrow_batch(
pub fn flight_schema_from_arrow_schema(
schema: &Schema,
options: &IpcWriteOptions,
) -> Result<SchemaResult> {
) -> Result<SchemaResult, ArrowError> {
SchemaAsIpc::new(schema, options).try_into()
}

Expand All @@ -109,7 +106,7 @@ pub fn flight_data_from_arrow_schema(
pub fn ipc_message_from_arrow_schema(
schema: &Schema,
options: &IpcWriteOptions,
) -> Result<Vec<u8>> {
) -> Result<Vec<u8>, ArrowError> {
let message = SchemaAsIpc::new(schema, options).try_into()?;
let IpcMessage(vals) = message;
Ok(vals)
Expand Down

0 comments on commit 132152c

Please sign in to comment.