Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optionally require alignment when reading IPC, respect alignment when writing #5554

Merged
merged 14 commits into from
Apr 4, 2024
1 change: 0 additions & 1 deletion arrow-flight/src/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,6 @@ impl FlightDataDecoder {
&state.schema,
&mut state.dictionaries_by_field,
&message.version(),
false,
)
.map_err(|e| {
FlightError::DecodeError(format!("Error decoding ipc dictionary: {e}"))
Expand Down
1 change: 0 additions & 1 deletion arrow-flight/src/sql/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,6 @@ pub fn arrow_data_from_flight_data(
&dictionaries_by_field,
None,
&ipc_message.version(),
false,
)?;
Ok(ArrowFlightData::RecordBatch(record_batch))
}
Expand Down
1 change: 0 additions & 1 deletion arrow-flight/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ pub fn flight_data_to_arrow_batch(
dictionaries_by_id,
None,
&message.version(),
false,
)
})?
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,6 @@ async fn receive_batch_flight_data(
&schema,
dictionaries_by_id,
&message.version(),
false,
)
.expect("Error reading dictionary");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,6 @@ async fn record_batch_from_message(
dictionaries_by_id,
None,
&message.version(),
false,
);

arrow_batch_result
Expand All @@ -331,7 +330,6 @@ async fn dictionary_from_message(
&schema_ref,
dictionaries_by_id,
&message.version(),
false,
);
dictionary_batch_result
.map_err(|e| Status::internal(format!("Could not convert to Dictionary: {e:?}")))
Expand Down
47 changes: 38 additions & 9 deletions arrow-ipc/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,35 @@ impl<'a> ArrayReader<'a> {
}
}

pub fn read_record_batch(
buf: &Buffer,
batch: crate::RecordBatch,
schema: SchemaRef,
dictionaries_by_id: &HashMap<i64, ArrayRef>,
projection: Option<&[usize]>,
metadata: &MetadataVersion,
) -> Result<RecordBatch, ArrowError> {
read_record_batch2(
buf,
batch,
schema,
dictionaries_by_id,
projection,
metadata,
false,
)
}

pub fn read_dictionary(
buf: &Buffer,
batch: crate::DictionaryBatch,
schema: &Schema,
dictionaries_by_id: &mut HashMap<i64, ArrayRef>,
metadata: &MetadataVersion,
) -> Result<(), ArrowError> {
read_dictionary2(buf, batch, schema, dictionaries_by_id, metadata, false)
}

/// Creates a record batch from binary data using the `crate::RecordBatch` indexes and the `Schema`.
///
/// If `require_alignment` is true, this function will return an error if any array data in the
Expand All @@ -495,7 +524,7 @@ impl<'a> ArrayReader<'a> {
/// and copy over the data if any array data in the input `buf` is not properly aligned.
/// (Properly aligned array data will remain zero-copy.)
/// Under the hood it will use [`arrow_data::ArrayDataBuilder::build_aligned`] to construct [`arrow_data::ArrayData`].
pub fn read_record_batch(
tustvold marked this conversation as resolved.
Show resolved Hide resolved
fn read_record_batch2(
buf: &Buffer,
batch: crate::RecordBatch,
schema: SchemaRef,
Expand Down Expand Up @@ -564,7 +593,7 @@ pub fn read_record_batch(

/// Read the dictionary from the buffer and provided metadata,
/// updating the `dictionaries_by_id` with the resulting dictionary
pub fn read_dictionary(
fn read_dictionary2(
buf: &Buffer,
batch: crate::DictionaryBatch,
schema: &Schema,
Expand Down Expand Up @@ -593,7 +622,7 @@ pub fn read_dictionary(
let value = value_type.as_ref().clone();
let schema = Schema::new(vec![Field::new("", value, true)]);
// Read a single column
let record_batch = read_record_batch(
let record_batch = read_record_batch2(
buf,
batch.data().unwrap(),
Arc::new(schema),
Expand Down Expand Up @@ -781,7 +810,7 @@ impl FileDecoder {
match message.header_type() {
crate::MessageHeader::DictionaryBatch => {
let batch = message.header_as_dictionary_batch().unwrap();
read_dictionary(
read_dictionary2(
&buf.slice(block.metaDataLength() as _),
batch,
&self.schema,
Expand Down Expand Up @@ -812,7 +841,7 @@ impl FileDecoder {
ArrowError::IpcError("Unable to read IPC message as record batch".to_string())
})?;
// read the block that makes up the record batch into a buffer
read_record_batch(
read_record_batch2(
&buf.slice(block.metaDataLength() as _),
batch,
self.schema.clone(),
Expand Down Expand Up @@ -1255,7 +1284,7 @@ impl<R: Read> StreamReader<R> {
let mut buf = MutableBuffer::from_len_zeroed(message.bodyLength() as usize);
self.reader.read_exact(&mut buf)?;

read_record_batch(
read_record_batch2(
&buf.into(),
batch,
self.schema(),
Expand All @@ -1276,7 +1305,7 @@ impl<R: Read> StreamReader<R> {
let mut buf = MutableBuffer::from_len_zeroed(message.bodyLength() as usize);
self.reader.read_exact(&mut buf)?;

read_dictionary(
read_dictionary2(
&buf.into(),
batch,
&self.schema,
Expand Down Expand Up @@ -2048,7 +2077,7 @@ mod tests {
assert_ne!(b.as_ptr().align_offset(8), 0);

let ipc_batch = message.header_as_record_batch().unwrap();
let roundtrip = read_record_batch(
let roundtrip = read_record_batch2(
&b,
ipc_batch,
batch.schema(),
Expand Down Expand Up @@ -2085,7 +2114,7 @@ mod tests {
assert_ne!(b.as_ptr().align_offset(8), 0);

let ipc_batch = message.header_as_record_batch().unwrap();
let result = read_record_batch(
let result = read_record_batch2(
&b,
ipc_batch,
batch.schema(),
Expand Down
29 changes: 24 additions & 5 deletions arrow-ipc/src/reader/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use arrow_buffer::{Buffer, MutableBuffer};
use arrow_schema::{ArrowError, SchemaRef};

use crate::convert::MessageBuffer;
use crate::reader::{read_dictionary, read_record_batch};
use crate::reader::{read_dictionary2, read_record_batch2};
use crate::{MessageHeader, CONTINUATION_MARKER};

/// A low-level interface for reading [`RecordBatch`] data from a stream of bytes
Expand All @@ -40,6 +40,8 @@ pub struct StreamDecoder {
state: DecoderState,
/// A scratch buffer when a read is split across multiple `Buffer`
buf: MutableBuffer,
/// Whether or not array data in input buffers are required to be aligned
require_alignment: bool,
}

#[derive(Debug)]
Expand Down Expand Up @@ -83,6 +85,23 @@ impl StreamDecoder {
Self::default()
}

/// Specifies whether or not array data in input buffers is required to be properly aligned.
///
/// If `require_alignment` is true, this decoder will return an error if any array data in the
/// input `buf` is not properly aligned.
/// Under the hood it will use [`arrow_data::ArrayDataBuilder::build`] to construct
/// [`arrow_data::ArrayData`].
///
/// If `require_alignment` is false (the default), this decoder will automatically allocate a
/// new aligned buffer and copy over the data if any array data in the input `buf` is not
/// properly aligned. (Properly aligned array data will remain zero-copy.)
/// Under the hood it will use [`arrow_data::ArrayDataBuilder::build_aligned`] to construct
/// [`arrow_data::ArrayData`].
pub fn with_require_alignment(mut self, require_alignment: bool) -> Self {
self.require_alignment = require_alignment;
self
}

/// Try to read the next [`RecordBatch`] from the provided [`Buffer`]
///
/// [`Buffer::advance`] will be called on `buffer` for any consumed bytes.
Expand Down Expand Up @@ -192,14 +211,14 @@ impl StreamDecoder {
let schema = self.schema.clone().ok_or_else(|| {
ArrowError::IpcError("Missing schema".to_string())
})?;
let batch = read_record_batch(
let batch = read_record_batch2(
&body,
batch,
schema,
&self.dictionaries,
None,
&version,
false,
self.require_alignment,
)?;
self.state = DecoderState::default();
return Ok(Some(batch));
Expand All @@ -209,13 +228,13 @@ impl StreamDecoder {
let schema = self.schema.as_deref().ok_or_else(|| {
ArrowError::IpcError("Missing schema".to_string())
})?;
read_dictionary(
read_dictionary2(
&body,
dictionary,
schema,
&mut self.dictionaries,
&version,
false,
self.require_alignment,
)?;
self.state = DecoderState::default();
}
Expand Down