Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optionally require alignment when reading IPC, respect alignment when writing #5554

Merged
merged 14 commits into from
Apr 4, 2024
2 changes: 1 addition & 1 deletion arrow-buffer/src/buffer/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl<T: ArrowNativeType> ScalarBuffer<T> {
/// This method will panic if
///
/// * `offset` or `len` would result in overflow
/// * `buffer` is not aligned to a multiple of `std::mem::size_of::<T>`
/// * `buffer` is not aligned to a multiple of `std::mem::align_of::<T>`
/// * `bytes` is not large enough for the requested slice
pub fn new(buffer: Buffer, offset: usize, len: usize) -> Self {
let size = std::mem::size_of::<T>();
Expand Down
1 change: 1 addition & 0 deletions arrow-flight/src/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ impl FlightDataDecoder {
&state.schema,
&mut state.dictionaries_by_field,
&message.version(),
false,
)
.map_err(|e| {
FlightError::DecodeError(format!("Error decoding ipc dictionary: {e}"))
Expand Down
6 changes: 5 additions & 1 deletion arrow-flight/src/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,7 @@ mod tests {
use arrow_array::{cast::downcast_array, types::*};
use arrow_buffer::Buffer;
use arrow_cast::pretty::pretty_format_batches;
use arrow_ipc::MetadataVersion;
use arrow_schema::UnionMode;
use std::collections::HashMap;

Expand All @@ -638,7 +639,8 @@ mod tests {
/// ensure only the batch's used data (not the allocated data) is sent
/// <https://github.com/apache/arrow-rs/issues/208>
fn test_encode_flight_data() {
let options = IpcWriteOptions::default();
// use 8-byte alignment - default alignment is 64 which produces bigger ipc data
let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap();
let c1 = UInt32Array::from(vec![1, 2, 3, 4, 5, 6]);

let batch = RecordBatch::try_from_iter(vec![("a", Arc::new(c1) as ArrayRef)])
Expand Down Expand Up @@ -1343,6 +1345,8 @@ mod tests {

let mut stream = FlightDataEncoderBuilder::new()
.with_max_flight_data_size(max_flight_data_size)
// use 8-byte alignment - default alignment is 64 which produces bigger ipc data
.with_options(IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap())
.build(futures::stream::iter([Ok(batch.clone())]));

let mut i = 0;
Expand Down
1 change: 1 addition & 0 deletions arrow-flight/src/sql/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,7 @@ pub fn arrow_data_from_flight_data(
&dictionaries_by_field,
None,
&ipc_message.version(),
false,
)?;
Ok(ArrowFlightData::RecordBatch(record_batch))
}
Expand Down
1 change: 1 addition & 0 deletions arrow-flight/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ pub fn flight_data_to_arrow_batch(
dictionaries_by_id,
None,
&message.version(),
false,
)
})?
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ async fn receive_batch_flight_data(
&schema,
dictionaries_by_id,
&message.version(),
false,
)
.expect("Error reading dictionary");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ async fn record_batch_from_message(
dictionaries_by_id,
None,
&message.version(),
false,
hzuo marked this conversation as resolved.
Show resolved Hide resolved
);

arrow_batch_result
Expand All @@ -330,6 +331,7 @@ async fn dictionary_from_message(
&schema_ref,
dictionaries_by_id,
&message.version(),
false,
);
dictionary_batch_result
.map_err(|e| Status::internal(format!("Could not convert to Dictionary: {e:?}")))
Expand Down