Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SchemaResult in IPC deviates from other implementations #2445

Closed
skejserjensen opened this issue Aug 14, 2022 · 18 comments · Fixed by #2586
Closed

SchemaResult in IPC deviates from other implementations #2445

skejserjensen opened this issue Aug 14, 2022 · 18 comments · Fixed by #2586
Assignees
Labels
arrow Changes to the arrow crate arrow-flight Changes to the arrow-flight crate bug help wanted

Comments

@skejserjensen
Copy link

Describe the bug

The SchemaResult produced by SchemaAsIpc can be converted to a Schema by the Rust implementation of Apache Arrow Flight but not other implementations of Apache Arrow Flight (tested the Go, Java, and Python implementations).

To Reproduce

For the Rust server, implement FlightService.get_schema() as:

async fn get_schema(
    &self,
    _request: Request<FlightDescriptor>,
) -> Result<Response<SchemaResult>, Status> {
    let tid = Field::new("tid", DataType::Int32, false);
    let timestamp = Field::new("timestamp", DataType::Timestamp(TimeUnit::Millisecond, None), false);
    let value = Field::new("value", DataType::Float32, false);
    let schema = Schema::new(vec![tid, timestamp, value]);

    let options = IpcWriteOptions::default();
    let schema_as_ipc = SchemaAsIpc::new(&schema, &options);
    let schema_result: SchemaResult = schema_as_ipc.into();

    Ok(Response::new(schema_result))
}

Attempt to retrieve and print the Schema using the following Rust code:

use arrow::ipc::convert;
use arrow_flight::flight_service_client::FlightServiceClient;
use arrow_flight::FlightDescriptor;
use tokio::runtime::Runtime;
use tonic::Request;

fn main() {
    let tokio = Runtime::new().unwrap();

    tokio.block_on(async {
        let mut flight_service_client = FlightServiceClient::connect("grpc://127.0.0.1:9999").await.unwrap();
        let flight_descriptor = FlightDescriptor::new_path(vec!["".to_owned()]);
        let request = Request::new(flight_descriptor);
        let schema_result = flight_service_client.get_schema(request).await.unwrap().into_inner();
        let schema = convert::schema_from_bytes(&schema_result.schema).unwrap();
        dbg!(schema);
    });
}

Attempt to retrieve and print the Schema using the following Python code:

from pyarrow import flight
client = flight.FlightClient('grpc://127.0.0.1:9999')
descriptor = flight.FlightDescriptor.for_path("")
schema_result = client.get_schema(descriptor)
print(schema_result.schema)

Expected behavior

The Rust code should successfully retrieve and print the Schema while the Python code should fail due to the following OSError being raised:

Traceback (most recent call last):
  File "get_schema.py", line 5, in <module>
    print(schema_result.schema)
  File "pyarrow/_flight.pyx", line 720, in pyarrow._flight.SchemaResult.schema.__get__
  File "pyarrow/_flight.pyx", line 80, in pyarrow._flight.check_flight_status
  File "pyarrow/error.pxi", line 115, in pyarrow.lib.check_status
OSError: Invalid flatbuffers message.

Additional context

As the issue was first discovered using a client written in Go, we first raised apache/arrow#13853 as we believed the problem was in the Go implementation of Apache Arrow Flight. But from the comments provided by @zeroshade on that issue, it seems that the Rust implementation of Apache Arrow Flight deviates from the other implementations in how a Schema is serialized. For example, both the C++ and Go implementations include the continuation indicator (0xFFFFFFFF) followed by the message length as a 32-bit integer before the Schema:

use arrow_flight::{SchemaAsIpc, SchemaResult};
use arrow::datatypes::{Schema, TimeUnit, Field, DataType};
use arrow::ipc::writer::IpcWriteOptions;

fn main() {
    let tid = Field::new("tid", DataType::Int32, false);
    let timestamp = Field::new("timestamp", DataType::Timestamp(TimeUnit::Millisecond, None), false);
    let value = Field::new("value", DataType::Float32, false);
    let schema = Schema::new(vec![tid, timestamp, value]);

    let options = IpcWriteOptions::default();
    let schema_as_ipc = SchemaAsIpc::new(&schema, &options);
    let schema_result: SchemaResult = schema_as_ipc.into();
    dbg!(schema_result);
}

16 0 0 0 0 0 10 0 14 0 12 0 11 0 4 0 10 0 0 0 20 0 0 0 0 0 0 1 4 0 10 0 12 0 0 0 8 0 4 0 10 0 0 0 8 0 0 0 8 0 0 0 0 0 0 0 3 0 0 0 136 0 0 0 52 0 0 0 4 0 0 0 148 255 255 255 16 0 0 0 20 0 0 0 0 0 0 3 16 0 0 0 206 255 255 255 0 0 1 0 0 0 0 0 5 0 0 0 118 97 108 117 101 0 0 0 192 255 255 255 28 0 0 0 12 0 0 0 0 0 0 10 32 0 0 0 0 0 0 0 0 0 6 0 8 0 6 0 6 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 9 0 0 0 116 105 109 101 115 116 97 109 112 0 0 0 16 0 20 0 16 0 0 0 15 0 4 0 0 0 8 0 16 0 0 0 24 0 0 0 32 0 0 0 0 0 0 2 28 0 0 0 8 0 12 0 4 0 11 0 8 0 0 0 32 0 0 0 0 0 0 1 0 0 0 0 3 0 0 0 116 105 100 0

#include <iostream>

#include "arrow/type.h"
#include "arrow/buffer.h"
#include "arrow/ipc/writer.h"

int main() {
  std::shared_ptr<arrow::Field> tid = arrow::field("tid", arrow::int32());
  std::shared_ptr<arrow::Field> timestamp = arrow::field("timestamp", arrow::timestamp(arrow::TimeUnit::MILLI));
  std::shared_ptr<arrow::Field> value = arrow::field("value", arrow::float32());

  std::shared_ptr<arrow::Schema> schema_ptr = arrow::schema({tid, timestamp, value});
  arrow::Schema schema = *schema_ptr.get();
  std::shared_ptr<arrow::Buffer> serialized_schema = arrow::ipc::SerializeSchema(schema).ValueOrDie();

  size_t serialized_schema_size = serialized_schema->size();
  for (int index = 0; index < serialized_schema_size; index++) {
    std::cout << unsigned((*serialized_schema)[index]) << ' ';
  }
  std::cout << std::endl;
}

255 255 255 255 224 0 0 0 16 0 0 0 0 0 10 0 12 0 6 0 5 0 8 0 10 0 0 0 0 1 4 0 12 0 0 0 8 0 8 0 0 0 4 0 8 0 0 0 4 0 0 0 3 0 0 0 124 0 0 0 52 0 0 0 4 0 0 0 160 255 255 255 0 0 1 3 16 0 0 0 24 0 0 0 4 0 0 0 0 0 0 0 5 0 0 0 118 97 108 117 101 0 0 0 210 255 255 255 0 0 1 0 204 255 255 255 0 0 1 10 16 0 0 0 32 0 0 0 4 0 0 0 0 0 0 0 9 0 0 0 116 105 109 101 115 116 97 109 112 0 6 0 8 0 6 0 6 0 0 0 0 0 1 0 16 0 20 0 8 0 6 0 7 0 12 0 0 0 16 0 16 0 0 0 0 0 1 2 16 0 0 0 28 0 0 0 4 0 0 0 0 0 0 0 3 0 0 0 116 105 100 0 8 0 12 0 8 0 7 0 8 0 0 0 0 0 0 1 32 0 0 0

package main

import (
    "fmt"
    "github.com/apache/arrow/go/arrow"
    "github.com/apache/arrow/go/arrow/flight"
    "github.com/apache/arrow/go/arrow/memory"
)

func main() {
     schema := arrow.NewSchema(
		[]arrow.Field{
			{Name: "tid", Type: arrow.PrimitiveTypes.Int32},
			{Name: "timestamp", Type: arrow.FixedWidthTypes.Timestamp_ms},
			{Name: "value", Type: arrow.PrimitiveTypes.Float32},
		},
		nil,
	)
    serialized_schema := flight.SerializeSchema(schema,memory.DefaultAllocator)
    fmt.Println(serialized_schema)
}

255 255 255 255 248 0 0 0 16 0 0 0 0 0 10 0 12 0 10 0 9 0 4 0 10 0 0 0 16 0 0 0 0 1 4 0 8 0 8 0 0 0 4 0 8 0 0 0 4 0 0 0 3 0 0 0 148 0 0 0 60 0 0 0 4 0 0 0 136 255 255 255 16 0 0 0 24 0 0 0 0 0 0 3 24 0 0 0 0 0 0 0 0 0 6 0 8 0 6 0 6 0 0 0 0 0 1 0 5 0 0 0 118 97 108 117 101 0 0 0 188 255 255 255 16 0 0 0 24 0 0 0 0 0 0 10 36 0 0 0 0 0 0 0 8 0 12 0 10 0 4 0 8 0 0 0 8 0 0 0 0 0 1 0 3 0 0 0 85 84 67 0 9 0 0 0 116 105 109 101 115 116 97 109 112 0 0 0 16 0 20 0 16 0 0 0 15 0 8 0 0 0 4 0 16 0 0 0 16 0 0 0 24 0 0 0 0 0 0 2 28 0 0 0 0 0 0 0 8 0 12 0 8 0 7 0 8 0 0 0 0 0 0 1 32 0 0 0 3 0 0 0 116 105 100 0 255 255 255 255 0 0 0 0

@alamb
Copy link
Contributor

alamb commented Aug 17, 2022

@skejserjensen thank you for the great ticket and writeup ❤️

@avantgardnerio or @liukun4515 this might be relevant to you -- is there any chance one of you has some time to fix this issue?

@alamb alamb added arrow-flight Changes to the arrow-flight crate help wanted labels Aug 17, 2022
@alamb alamb changed the title SchemaResult deviates from other implementations SchemaResult in IPC deviates from other implementations Aug 17, 2022
@liukun4515
Copy link
Contributor

@skejserjensen thank you for the great ticket and writeup ❤️

@avantgardnerio or @liukun4515 this might be relevant to you -- is there any chance one of you has some time to fix this issue?

pick up this. But it maybe take look later this week.
And I will assign this issue to me. @alamb

@liukun4515
Copy link
Contributor

liukun4515 commented Aug 23, 2022

From the

impl TryFrom<SchemaAsIpc<'_>> for IpcMessage {
    type Error = ArrowError;

    fn try_from(schema_ipc: SchemaAsIpc) -> ArrowResult<Self> {
        let pair = *schema_ipc;
        // schema as data
        let encoded_data = flight_schema_as_encoded_data(pair.0, pair.1);

        let mut schema = vec![];
   
        arrow::ipc::writer::write_message(&mut schema, encoded_data, pair.1)?;
        Ok(IpcMessage(schema))
    }
}

Ipc message has the implementation with write_continuation and add the prefix const CONTINUATION_MARKER: [u8; 4] = [0xff; 4]; and length.

impl From<SchemaAsIpc<'_>> for SchemaResult {
    fn from(schema_ipc: SchemaAsIpc) -> Self {
        let IpcMessage(vals) = flight_schema_as_flatbuffer(schema_ipc.0, schema_ipc.1);
        SchemaResult { schema: vals }
    }
}

converting the schema to other type has the different implementation.

@liukun4515
Copy link
Contributor

liukun4515 commented Aug 23, 2022

@alamb I think @skejserjensen use the an error method to decode or deserialize the bytes from rust get_schema.
The SchemaResult just contains the bytes for the schema, and there is no CONTINUATION_MARKER and length data.

The flight or ipc message for the schema may contains the CONTINUATION_MARKER and length for this.

@liukun4515
Copy link
Contributor

The input of api https://github.com/apache/arrow/blob/072ae55dc8172bb1a898fda5d5a83ec063b05a6d/go/arrow/flight/record_batch_reader.go#L135 of go DeserializeSchema is for flight data.
You need to convert the go of Schema result to schema by other method.

@liukun4515
Copy link
Contributor

package main

import (
    "fmt"
    "github.com/apache/arrow/go/arrow"
    "github.com/apache/arrow/go/arrow/flight"
    "github.com/apache/arrow/go/arrow/memory"
)

func main() {
     schema := arrow.NewSchema(
		[]arrow.Field{
			{Name: "tid", Type: arrow.PrimitiveTypes.Int32},
			{Name: "timestamp", Type: arrow.FixedWidthTypes.Timestamp_ms},
			{Name: "value", Type: arrow.PrimitiveTypes.Float32},
		},
		nil,
	)
    serialized_schema := flight.SerializeSchema(schema,memory.DefaultAllocator)
    fmt.Println(serialized_schema)
}

255 255 255 255 248 0 0 0 16 0 0 0 0 0 10 0 12 0 10 0 9 0 4 0 10 0 0 0 16 0 0 0 0 1 4 0 8 0 8 0 0 0 4 0 8 0 0 0 4 0 0 0 3 0 0 0 148 0 0 0 60 0 0 0 4 0 0 0 136 255 255 255 16 0 0 0 24 0 0 0 0 0 0 3 24 0 0 0 0 0 0 0 0 0 6 0 8 0 6 0 6 0 0 0 0 0 1 0 5 0 0 0 118 97 108 117 101 0 0 0 188 255 255 255 16 0 0 0 24 0 0 0 0 0 0 10 36 0 0 0 0 0 0 0 8 0 12 0 10 0 4 0 8 0 0 0 8 0 0 0 0 0 1 0 3 0 0 0 85 84 67 0 9 0 0 0 116 105 109 101 115 116 97 109 112 0 0 0 16 0 20 0 16 0 0 0 15 0 8 0 0 0 4 0 16 0 0 0 16 0 0 0 24 0 0 0 0 0 0 2 28 0 0 0 0 0 0 0 8 0 12 0 8 0 7 0 8 0 0 0 0 0 0 1 32 0 0 0 3 0 0 0 116 105 100 0 255 255 255 255 0 0 0 0

The api you called in the rust and go are not the same.
In the go https://pkg.go.dev/github.com/apache/arrow/go/v10/arrow/flight#SerializeSchema, it will convert the schema to flight data.

But in the rust SchemaAsIpc.into() just get the SchemaResult

@liukun4515
Copy link
Contributor

liukun4515 commented Aug 23, 2022

This is caused by different implementation for in sides, and we can get the detail reason from this apache/arrow#13853 (comment)
@alamb @skejserjensen

@liukun4515
Copy link
Contributor

@skejserjensen , you can implement GetSchema by wrapping the schema to the ipc message format in the server side.

You can follow bellow code which is in the arrow-rs.

impl TryFrom<SchemaAsIpc<'_>> for IpcMessage {
    type Error = ArrowError;

    fn try_from(schema_ipc: SchemaAsIpc) -> ArrowResult<Self> {
        let pair = *schema_ipc;
        let encoded_data = flight_schema_as_encoded_data(pair.0, pair.1);

        let mut schema = vec![];
        arrow::ipc::writer::write_message(&mut schema, encoded_data, pair.1)?;
        Ok(IpcMessage(schema))
    }
}

@liukun4515
Copy link
Contributor

The define of GetSchema is rpc GetSchema(FlightDescriptor) returns (SchemaResult) {} and the SchemaResult is

/*
 * Wrap the result of a getSchema call
 */
message SchemaResult {
  // schema of the dataset as described in Schema.fbs::Schema.
  bytes schema = 1;
}

In the server side, you can wrap any data as the bytes as the schema, but in the client side you should use the corresponding decoder to decode the data.

If you want to use the utils or api in c++/go side, you should wrap the data as the format which should be adapted to the client api.

@skejserjensen I think it is not a bug or issue, just a problem about how to implement the server api and how to use the client api.

@liukun4515
Copy link
Contributor

You can change your server code to

async fn get_schema(
    &self,
    _request: Request<FlightDescriptor>,
) -> Result<Response<SchemaResult>, Status> {
    let tid = Field::new("tid", DataType::Int32, false);
    let timestamp = Field::new("timestamp", DataType::Timestamp(TimeUnit::Millisecond, None), false);
    let value = Field::new("value", DataType::Float32, false);
    let schema = Schema::new(vec![tid, timestamp, value]);

    let options = IpcWriteOptions::default();
    let schema_as_ipc = SchemaAsIpc::new(&schema, &options);
    // convert the schema as the ipc message
    let ipc_message : IpcMessage = IpcMessage::try_from(schema_as_ipc).unwrap();
    // wrap the `SchemaResult` using the ipc message bytes
    let schema_result: SchemaResult =  SchemaResult {
       schema: ipc_message.0,
    }

    Ok(Response::new(schema_result))
}

@skejserjensen

@zeroshade
Copy link
Member

@liukun4515 That seems out of date, the current definition in Flight.proto for Schema Result shows:

/*
 * Wrap the result of a getSchema call
 */
message SchemaResult {
  // The schema of the dataset in its IPC form:
  //   4 bytes - an optional IPC_CONTINUATION_TOKEN prefix
  //   4 bytes - the byte length of the payload
  //   a flatbuffer Message whose header is the Schema
  bytes schema = 1;
}

Defining that it should be the IPC message

@liukun4515
Copy link
Contributor

liukun4515 commented Aug 24, 2022

@liukun4515 That seems out of date, the current definition in Flight.proto for Schema Result shows:

/*
 * Wrap the result of a getSchema call
 */
message SchemaResult {
  // The schema of the dataset in its IPC form:
  //   4 bytes - an optional IPC_CONTINUATION_TOKEN prefix
  //   4 bytes - the byte length of the payload
  //   a flatbuffer Message whose header is the Schema
  bytes schema = 1;
}

Defining that it should be the IPC message

Thanks @zeroshade
I think the rust flight.proto is too old.

In the rust side, the definition is

/*
 * Wrap the result of a getSchema call
 */
message SchemaResult {
  // schema of the dataset as described in Schema.fbs::Schema.
  bytes schema = 1;
}

@alamb Maybe we should update the .proto in the rust side, then update the method of converting the schema to SchmeResult

@liukun4515
Copy link
Contributor

liukun4515 commented Aug 24, 2022

@liukun4515 That seems out of date, the current definition in Flight.proto for Schema Result shows:

/*
 * Wrap the result of a getSchema call
 */
message SchemaResult {
  // The schema of the dataset in its IPC form:
  //   4 bytes - an optional IPC_CONTINUATION_TOKEN prefix
  //   4 bytes - the byte length of the payload
  //   a flatbuffer Message whose header is the Schema
  bytes schema = 1;
}

Defining that it should be the IPC message

@zeroshade I will try to fix this in the rust side, but it will be released in the next version about 22.0.0 or later.

You can fix it in your side temporarily

@skejserjensen
Copy link
Author

You can change your server code to

async fn get_schema(
    &self,
    _request: Request<FlightDescriptor>,
) -> Result<Response<SchemaResult>, Status> {
    let tid = Field::new("tid", DataType::Int32, false);
    let timestamp = Field::new("timestamp", DataType::Timestamp(TimeUnit::Millisecond, None), false);
    let value = Field::new("value", DataType::Float32, false);
    let schema = Schema::new(vec![tid, timestamp, value]);

    let options = IpcWriteOptions::default();
    let schema_as_ipc = SchemaAsIpc::new(&schema, &options);
    // convert the schema as the ipc message
    let ipc_message : IpcMessage = IpcMessage::try_from(schema_as_ipc).unwrap();
    // wrap the `SchemaResult` using the ipc message bytes
    let schema_result: SchemaResult =  SchemaResult {
       schema: ipc_message.0,
    }

    Ok(Response::new(schema_result))
}

@skejserjensen

@liukun4515 and @zeroshade thank you for looking into this issue.

Based on @liukun4515's previous comment, I have also been looking into encoding Schema as IpcMessage using SchemaAsIpc and then returning the bytes from get_schema() as a SchemaResult to match the methods return type. I have successfully decoded these SchemaResults containing the bytes from IpcMessages using the Go, Java, Python, and Rust implementations of Apache Arrow Flight.

@liukun4515
Copy link
Contributor

I create an issue to fix the bug. #2571

@alamb alamb added development-process Related to development process of arrow-rs arrow Changes to the arrow crate labels Sep 16, 2022
@alamb
Copy link
Contributor

alamb commented Sep 16, 2022

Automatically added labels {'arrow'} from #2586

@alamb
Copy link
Contributor

alamb commented Sep 16, 2022

Automatically added labels {'api-change'} from #2586

@alamb alamb added api-change Changes to the arrow API and removed development-process Related to development process of arrow-rs api-change Changes to the arrow API labels Sep 16, 2022
@alamb
Copy link
Contributor

alamb commented Sep 16, 2022

api change label was automatically added by script accidentally. api change should only be on the PRs that introduced the changes

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
arrow Changes to the arrow crate arrow-flight Changes to the arrow-flight crate bug help wanted
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants