From ad84176155471b4a4a8da92165a156d591f670d6 Mon Sep 17 00:00:00 2001 From: Remzi Yang <59198230+HaoYang670@users.noreply.github.com> Date: Wed, 24 Aug 2022 00:07:26 +0800 Subject: [PATCH] Add the `length` field for Buffer and use more `Buffer` in IPC reader to avoid memory copy. (#2557) * add length field Signed-off-by: remzi <13716567376yh@gmail.com> * fix nit Signed-off-by: remzi <13716567376yh@gmail.com> * More buffer in ipc reader Signed-off-by: remzi <13716567376yh@gmail.com> Signed-off-by: remzi <13716567376yh@gmail.com> --- arrow-flight/src/utils.rs | 3 +- arrow/src/buffer/immutable.rs | 45 +++++++++++++++---- arrow/src/ipc/reader.rs | 30 ++++++------- .../integration_test.rs | 3 +- .../integration_test.rs | 9 ++-- 5 files changed, 60 insertions(+), 30 deletions(-) diff --git a/arrow-flight/src/utils.rs b/arrow-flight/src/utils.rs index dda3fc7fe3d..21a5a857224 100644 --- a/arrow-flight/src/utils.rs +++ b/arrow-flight/src/utils.rs @@ -21,6 +21,7 @@ use crate::{FlightData, IpcMessage, SchemaAsIpc, SchemaResult}; use std::collections::HashMap; use arrow::array::ArrayRef; +use arrow::buffer::Buffer; use arrow::datatypes::{Schema, SchemaRef}; use arrow::error::{ArrowError, Result}; use arrow::ipc::{reader, writer, writer::IpcWriteOptions}; @@ -66,7 +67,7 @@ pub fn flight_data_to_arrow_batch( }) .map(|batch| { reader::read_record_batch( - &data.data_body, + &Buffer::from(&data.data_body), batch, schema, dictionaries_by_id, diff --git a/arrow/src/buffer/immutable.rs b/arrow/src/buffer/immutable.rs index 8ec5a455420..28042a3817b 100644 --- a/arrow/src/buffer/immutable.rs +++ b/arrow/src/buffer/immutable.rs @@ -37,15 +37,20 @@ pub struct Buffer { /// The offset into the buffer. offset: usize, + + /// Byte length of the buffer. + length: usize, } impl Buffer { /// Auxiliary method to create a new Buffer #[inline] pub fn from_bytes(bytes: Bytes) -> Self { + let length = bytes.len(); Buffer { data: Arc::new(bytes), offset: 0, + length, } } @@ -106,28 +111,32 @@ impl Buffer { Buffer { data: Arc::new(bytes), offset: 0, + length: len, } } /// Returns the number of bytes in the buffer + #[inline] pub fn len(&self) -> usize { - self.data.len() - self.offset + self.length } /// Returns the capacity of this buffer. /// For externally owned buffers, this returns zero + #[inline] pub fn capacity(&self) -> usize { self.data.capacity() } /// Returns whether the buffer is empty. + #[inline] pub fn is_empty(&self) -> bool { - self.data.len() - self.offset == 0 + self.length == 0 } /// Returns the byte slice stored in this buffer pub fn as_slice(&self) -> &[u8] { - &self.data[self.offset..] + &self.data[self.offset..(self.offset + self.length)] } /// Returns a new [Buffer] that is a slice of this buffer starting at `offset`. @@ -142,6 +151,24 @@ impl Buffer { Self { data: self.data.clone(), offset: self.offset + offset, + length: self.length - offset, + } + } + + /// Returns a new [Buffer] that is a slice of this buffer starting at `offset`, + /// with `length` bytes. + /// Doing so allows the same memory region to be shared between buffers. + /// # Panics + /// Panics iff `(offset + length)` is larger than the existing length. + pub fn slice_with_length(&self, offset: usize, length: usize) -> Self { + assert!( + offset + length <= self.len(), + "the offset of the new Buffer cannot exceed the existing length" + ); + Self { + data: self.data.clone(), + offset: self.offset + offset, + length, } } @@ -319,10 +346,10 @@ mod tests { let buf2 = Buffer::from(&[0, 1, 2, 3, 4]); assert_eq!(buf1, buf2); - // slice with same offset should still preserve equality + // slice with same offset and same length should still preserve equality let buf3 = buf1.slice(2); assert_ne!(buf1, buf3); - let buf4 = buf2.slice(2); + let buf4 = buf2.slice_with_length(2, 3); assert_eq!(buf3, buf4); // Different capacities should still preserve equality @@ -376,7 +403,7 @@ mod tests { assert_eq!(3, buf2.len()); assert_eq!(unsafe { buf.as_ptr().offset(2) }, buf2.as_ptr()); - let buf3 = buf2.slice(1); + let buf3 = buf2.slice_with_length(1, 2); assert_eq!([8, 10], buf3.as_slice()); assert_eq!(2, buf3.len()); assert_eq!(unsafe { buf.as_ptr().offset(3) }, buf3.as_ptr()); @@ -386,7 +413,7 @@ mod tests { assert_eq!(empty_slice, buf4.as_slice()); assert_eq!(0, buf4.len()); assert!(buf4.is_empty()); - assert_eq!(buf2.slice(2).as_slice(), &[10]); + assert_eq!(buf2.slice_with_length(2, 1).as_slice(), &[10]); } #[test] @@ -457,7 +484,7 @@ mod tests { assert_eq!( 8, Buffer::from(&[0b11111111, 0b11111111]) - .slice(1) + .slice_with_length(1, 1) .count_set_bits() ); assert_eq!( @@ -469,7 +496,7 @@ mod tests { assert_eq!( 6, Buffer::from(&[0b11111111, 0b01001001, 0b01010010]) - .slice(1) + .slice_with_length(1, 2) .count_set_bits() ); assert_eq!( diff --git a/arrow/src/ipc/reader.rs b/arrow/src/ipc/reader.rs index fca5fa9d678..7ffa9aa5946 100644 --- a/arrow/src/ipc/reader.rs +++ b/arrow/src/ipc/reader.rs @@ -26,7 +26,7 @@ use std::io::{BufReader, Read, Seek, SeekFrom}; use std::sync::Arc; use crate::array::*; -use crate::buffer::Buffer; +use crate::buffer::{Buffer, MutableBuffer}; use crate::compute::cast; use crate::datatypes::{DataType, Field, IntervalUnit, Schema, SchemaRef, UnionMode}; use crate::error::{ArrowError, Result}; @@ -48,12 +48,11 @@ use DataType::*; /// compression does not yield appreciable savings. fn read_buffer( buf: &ipc::Buffer, - a_data: &[u8], + a_data: &Buffer, compression_codec: &Option, ) -> Result { let start_offset = buf.offset() as usize; - let end_offset = start_offset + buf.length() as usize; - let buf_data = Buffer::from(&a_data[start_offset..end_offset]); + let buf_data = a_data.slice_with_length(start_offset, buf.length() as usize); // corner case: empty buffer match (buf_data.is_empty(), compression_codec) { (true, _) | (_, None) => Ok(buf_data), @@ -74,7 +73,7 @@ fn read_buffer( fn create_array( nodes: &[ipc::FieldNode], field: &Field, - data: &[u8], + data: &Buffer, buffers: &[ipc::Buffer], dictionaries_by_id: &HashMap, mut node_index: usize, @@ -556,7 +555,7 @@ fn create_dictionary_array( /// Creates a record batch from binary data using the `ipc::RecordBatch` indexes and the `Schema` pub fn read_record_batch( - buf: &[u8], + buf: &Buffer, batch: ipc::RecordBatch, schema: SchemaRef, dictionaries_by_id: &HashMap, @@ -642,7 +641,7 @@ pub fn read_record_batch( /// Read the dictionary from the buffer and provided metadata, /// updating the `dictionaries_by_id` with the resulting dictionary pub fn read_dictionary( - buf: &[u8], + buf: &Buffer, batch: ipc::DictionaryBatch, schema: &Schema, dictionaries_by_id: &mut HashMap, @@ -817,14 +816,15 @@ impl FileReader { let batch = message.header_as_dictionary_batch().unwrap(); // read the block that makes up the dictionary batch into a buffer - let mut buf = vec![0; block.bodyLength() as usize]; + let mut buf = + MutableBuffer::from_len_zeroed(message.bodyLength() as usize); reader.seek(SeekFrom::Start( block.offset() as u64 + block.metaDataLength() as u64, ))?; reader.read_exact(&mut buf)?; read_dictionary( - &buf, + &buf.into(), batch, &schema, &mut dictionaries_by_id, @@ -925,14 +925,14 @@ impl FileReader { ) })?; // read the block that makes up the record batch into a buffer - let mut buf = vec![0; block.bodyLength() as usize]; + let mut buf = MutableBuffer::from_len_zeroed(message.bodyLength() as usize); self.reader.seek(SeekFrom::Start( block.offset() as u64 + block.metaDataLength() as u64, ))?; self.reader.read_exact(&mut buf)?; read_record_batch( - &buf, + &buf.into(), batch, self.schema(), &self.dictionaries_by_id, @@ -1121,10 +1121,10 @@ impl StreamReader { ) })?; // read the block that makes up the record batch into a buffer - let mut buf = vec![0; message.bodyLength() as usize]; + let mut buf = MutableBuffer::from_len_zeroed(message.bodyLength() as usize); self.reader.read_exact(&mut buf)?; - read_record_batch(&buf, batch, self.schema(), &self.dictionaries_by_id, self.projection.as_ref().map(|x| x.0.as_ref()), &message.version()).map(Some) + read_record_batch(&buf.into(), batch, self.schema(), &self.dictionaries_by_id, self.projection.as_ref().map(|x| x.0.as_ref()), &message.version()).map(Some) } ipc::MessageHeader::DictionaryBatch => { let batch = message.header_as_dictionary_batch().ok_or_else(|| { @@ -1133,11 +1133,11 @@ impl StreamReader { ) })?; // read the block that makes up the dictionary batch into a buffer - let mut buf = vec![0; message.bodyLength() as usize]; + let mut buf = MutableBuffer::from_len_zeroed(message.bodyLength() as usize); self.reader.read_exact(&mut buf)?; read_dictionary( - &buf, batch, &self.schema, &mut self.dictionaries_by_id, &message.version() + &buf.into(), batch, &self.schema, &mut self.dictionaries_by_id, &message.version() )?; // read the next message until we encounter a RecordBatch diff --git a/integration-testing/src/flight_client_scenarios/integration_test.rs b/integration-testing/src/flight_client_scenarios/integration_test.rs index 62fe2b85d26..c01baa09a1f 100644 --- a/integration-testing/src/flight_client_scenarios/integration_test.rs +++ b/integration-testing/src/flight_client_scenarios/integration_test.rs @@ -20,6 +20,7 @@ use std::collections::HashMap; use arrow::{ array::ArrayRef, + buffer::Buffer, datatypes::SchemaRef, ipc::{self, reader, writer}, record_batch::RecordBatch, @@ -264,7 +265,7 @@ async fn receive_batch_flight_data( while message.header_type() == ipc::MessageHeader::DictionaryBatch { reader::read_dictionary( - &data.data_body, + &Buffer::from(&data.data_body), message .header_as_dictionary_batch() .expect("Error parsing dictionary"), diff --git a/integration-testing/src/flight_server_scenarios/integration_test.rs b/integration-testing/src/flight_server_scenarios/integration_test.rs index 7ad3d18eb5b..dee2fda3be3 100644 --- a/integration-testing/src/flight_server_scenarios/integration_test.rs +++ b/integration-testing/src/flight_server_scenarios/integration_test.rs @@ -22,6 +22,7 @@ use std::sync::Arc; use arrow::{ array::ArrayRef, + buffer::Buffer, datatypes::Schema, datatypes::SchemaRef, ipc::{self, reader}, @@ -282,7 +283,7 @@ async fn send_app_metadata( async fn record_batch_from_message( message: ipc::Message<'_>, - data_body: &[u8], + data_body: &Buffer, schema_ref: SchemaRef, dictionaries_by_id: &HashMap, ) -> Result { @@ -306,7 +307,7 @@ async fn record_batch_from_message( async fn dictionary_from_message( message: ipc::Message<'_>, - data_body: &[u8], + data_body: &Buffer, schema_ref: SchemaRef, dictionaries_by_id: &mut HashMap, ) -> Result<(), Status> { @@ -354,7 +355,7 @@ async fn save_uploaded_chunks( let batch = record_batch_from_message( message, - &data.data_body, + &Buffer::from(data.data_body), schema_ref.clone(), &dictionaries_by_id, ) @@ -365,7 +366,7 @@ async fn save_uploaded_chunks( ipc::MessageHeader::DictionaryBatch => { dictionary_from_message( message, - &data.data_body, + &Buffer::from(data.data_body), schema_ref.clone(), &mut dictionaries_by_id, )