Skip to content

Commit

Permalink
Add the length field for Buffer and use more Buffer in IPC reader…
Browse files Browse the repository at this point in the history
… to avoid memory copy. (#2557)

* add length field

Signed-off-by: remzi <13716567376yh@gmail.com>

* fix nit

Signed-off-by: remzi <13716567376yh@gmail.com>

* More buffer in ipc reader

Signed-off-by: remzi <13716567376yh@gmail.com>

Signed-off-by: remzi <13716567376yh@gmail.com>
  • Loading branch information
HaoYang670 committed Aug 23, 2022
1 parent 3430537 commit ad84176
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 30 deletions.
3 changes: 2 additions & 1 deletion arrow-flight/src/utils.rs
Expand Up @@ -21,6 +21,7 @@ use crate::{FlightData, IpcMessage, SchemaAsIpc, SchemaResult};
use std::collections::HashMap;

use arrow::array::ArrayRef;
use arrow::buffer::Buffer;
use arrow::datatypes::{Schema, SchemaRef};
use arrow::error::{ArrowError, Result};
use arrow::ipc::{reader, writer, writer::IpcWriteOptions};
Expand Down Expand Up @@ -66,7 +67,7 @@ pub fn flight_data_to_arrow_batch(
})
.map(|batch| {
reader::read_record_batch(
&data.data_body,
&Buffer::from(&data.data_body),
batch,
schema,
dictionaries_by_id,
Expand Down
45 changes: 36 additions & 9 deletions arrow/src/buffer/immutable.rs
Expand Up @@ -37,15 +37,20 @@ pub struct Buffer {

/// The offset into the buffer.
offset: usize,

/// Byte length of the buffer.
length: usize,
}

impl Buffer {
/// Auxiliary method to create a new Buffer
#[inline]
pub fn from_bytes(bytes: Bytes) -> Self {
let length = bytes.len();
Buffer {
data: Arc::new(bytes),
offset: 0,
length,
}
}

Expand Down Expand Up @@ -106,28 +111,32 @@ impl Buffer {
Buffer {
data: Arc::new(bytes),
offset: 0,
length: len,
}
}

/// Returns the number of bytes in the buffer
#[inline]
pub fn len(&self) -> usize {
self.data.len() - self.offset
self.length
}

/// Returns the capacity of this buffer.
/// For externally owned buffers, this returns zero
#[inline]
pub fn capacity(&self) -> usize {
self.data.capacity()
}

/// Returns whether the buffer is empty.
#[inline]
pub fn is_empty(&self) -> bool {
self.data.len() - self.offset == 0
self.length == 0
}

/// Returns the byte slice stored in this buffer
pub fn as_slice(&self) -> &[u8] {
&self.data[self.offset..]
&self.data[self.offset..(self.offset + self.length)]
}

/// Returns a new [Buffer] that is a slice of this buffer starting at `offset`.
Expand All @@ -142,6 +151,24 @@ impl Buffer {
Self {
data: self.data.clone(),
offset: self.offset + offset,
length: self.length - offset,
}
}

/// Returns a new [Buffer] that is a slice of this buffer starting at `offset`,
/// with `length` bytes.
/// Doing so allows the same memory region to be shared between buffers.
/// # Panics
/// Panics iff `(offset + length)` is larger than the existing length.
pub fn slice_with_length(&self, offset: usize, length: usize) -> Self {
assert!(
offset + length <= self.len(),
"the offset of the new Buffer cannot exceed the existing length"
);
Self {
data: self.data.clone(),
offset: self.offset + offset,
length,
}
}

Expand Down Expand Up @@ -319,10 +346,10 @@ mod tests {
let buf2 = Buffer::from(&[0, 1, 2, 3, 4]);
assert_eq!(buf1, buf2);

// slice with same offset should still preserve equality
// slice with same offset and same length should still preserve equality
let buf3 = buf1.slice(2);
assert_ne!(buf1, buf3);
let buf4 = buf2.slice(2);
let buf4 = buf2.slice_with_length(2, 3);
assert_eq!(buf3, buf4);

// Different capacities should still preserve equality
Expand Down Expand Up @@ -376,7 +403,7 @@ mod tests {
assert_eq!(3, buf2.len());
assert_eq!(unsafe { buf.as_ptr().offset(2) }, buf2.as_ptr());

let buf3 = buf2.slice(1);
let buf3 = buf2.slice_with_length(1, 2);
assert_eq!([8, 10], buf3.as_slice());
assert_eq!(2, buf3.len());
assert_eq!(unsafe { buf.as_ptr().offset(3) }, buf3.as_ptr());
Expand All @@ -386,7 +413,7 @@ mod tests {
assert_eq!(empty_slice, buf4.as_slice());
assert_eq!(0, buf4.len());
assert!(buf4.is_empty());
assert_eq!(buf2.slice(2).as_slice(), &[10]);
assert_eq!(buf2.slice_with_length(2, 1).as_slice(), &[10]);
}

#[test]
Expand Down Expand Up @@ -457,7 +484,7 @@ mod tests {
assert_eq!(
8,
Buffer::from(&[0b11111111, 0b11111111])
.slice(1)
.slice_with_length(1, 1)
.count_set_bits()
);
assert_eq!(
Expand All @@ -469,7 +496,7 @@ mod tests {
assert_eq!(
6,
Buffer::from(&[0b11111111, 0b01001001, 0b01010010])
.slice(1)
.slice_with_length(1, 2)
.count_set_bits()
);
assert_eq!(
Expand Down
30 changes: 15 additions & 15 deletions arrow/src/ipc/reader.rs
Expand Up @@ -26,7 +26,7 @@ use std::io::{BufReader, Read, Seek, SeekFrom};
use std::sync::Arc;

use crate::array::*;
use crate::buffer::Buffer;
use crate::buffer::{Buffer, MutableBuffer};
use crate::compute::cast;
use crate::datatypes::{DataType, Field, IntervalUnit, Schema, SchemaRef, UnionMode};
use crate::error::{ArrowError, Result};
Expand All @@ -48,12 +48,11 @@ use DataType::*;
/// compression does not yield appreciable savings.
fn read_buffer(
buf: &ipc::Buffer,
a_data: &[u8],
a_data: &Buffer,
compression_codec: &Option<CompressionCodec>,
) -> Result<Buffer> {
let start_offset = buf.offset() as usize;
let end_offset = start_offset + buf.length() as usize;
let buf_data = Buffer::from(&a_data[start_offset..end_offset]);
let buf_data = a_data.slice_with_length(start_offset, buf.length() as usize);
// corner case: empty buffer
match (buf_data.is_empty(), compression_codec) {
(true, _) | (_, None) => Ok(buf_data),
Expand All @@ -74,7 +73,7 @@ fn read_buffer(
fn create_array(
nodes: &[ipc::FieldNode],
field: &Field,
data: &[u8],
data: &Buffer,
buffers: &[ipc::Buffer],
dictionaries_by_id: &HashMap<i64, ArrayRef>,
mut node_index: usize,
Expand Down Expand Up @@ -556,7 +555,7 @@ fn create_dictionary_array(

/// Creates a record batch from binary data using the `ipc::RecordBatch` indexes and the `Schema`
pub fn read_record_batch(
buf: &[u8],
buf: &Buffer,
batch: ipc::RecordBatch,
schema: SchemaRef,
dictionaries_by_id: &HashMap<i64, ArrayRef>,
Expand Down Expand Up @@ -642,7 +641,7 @@ pub fn read_record_batch(
/// Read the dictionary from the buffer and provided metadata,
/// updating the `dictionaries_by_id` with the resulting dictionary
pub fn read_dictionary(
buf: &[u8],
buf: &Buffer,
batch: ipc::DictionaryBatch,
schema: &Schema,
dictionaries_by_id: &mut HashMap<i64, ArrayRef>,
Expand Down Expand Up @@ -817,14 +816,15 @@ impl<R: Read + Seek> FileReader<R> {
let batch = message.header_as_dictionary_batch().unwrap();

// read the block that makes up the dictionary batch into a buffer
let mut buf = vec![0; block.bodyLength() as usize];
let mut buf =
MutableBuffer::from_len_zeroed(message.bodyLength() as usize);
reader.seek(SeekFrom::Start(
block.offset() as u64 + block.metaDataLength() as u64,
))?;
reader.read_exact(&mut buf)?;

read_dictionary(
&buf,
&buf.into(),
batch,
&schema,
&mut dictionaries_by_id,
Expand Down Expand Up @@ -925,14 +925,14 @@ impl<R: Read + Seek> FileReader<R> {
)
})?;
// read the block that makes up the record batch into a buffer
let mut buf = vec![0; block.bodyLength() as usize];
let mut buf = MutableBuffer::from_len_zeroed(message.bodyLength() as usize);
self.reader.seek(SeekFrom::Start(
block.offset() as u64 + block.metaDataLength() as u64,
))?;
self.reader.read_exact(&mut buf)?;

read_record_batch(
&buf,
&buf.into(),
batch,
self.schema(),
&self.dictionaries_by_id,
Expand Down Expand Up @@ -1121,10 +1121,10 @@ impl<R: Read> StreamReader<R> {
)
})?;
// read the block that makes up the record batch into a buffer
let mut buf = vec![0; message.bodyLength() as usize];
let mut buf = MutableBuffer::from_len_zeroed(message.bodyLength() as usize);
self.reader.read_exact(&mut buf)?;

read_record_batch(&buf, batch, self.schema(), &self.dictionaries_by_id, self.projection.as_ref().map(|x| x.0.as_ref()), &message.version()).map(Some)
read_record_batch(&buf.into(), batch, self.schema(), &self.dictionaries_by_id, self.projection.as_ref().map(|x| x.0.as_ref()), &message.version()).map(Some)
}
ipc::MessageHeader::DictionaryBatch => {
let batch = message.header_as_dictionary_batch().ok_or_else(|| {
Expand All @@ -1133,11 +1133,11 @@ impl<R: Read> StreamReader<R> {
)
})?;
// read the block that makes up the dictionary batch into a buffer
let mut buf = vec![0; message.bodyLength() as usize];
let mut buf = MutableBuffer::from_len_zeroed(message.bodyLength() as usize);
self.reader.read_exact(&mut buf)?;

read_dictionary(
&buf, batch, &self.schema, &mut self.dictionaries_by_id, &message.version()
&buf.into(), batch, &self.schema, &mut self.dictionaries_by_id, &message.version()
)?;

// read the next message until we encounter a RecordBatch
Expand Down
Expand Up @@ -20,6 +20,7 @@ use std::collections::HashMap;

use arrow::{
array::ArrayRef,
buffer::Buffer,
datatypes::SchemaRef,
ipc::{self, reader, writer},
record_batch::RecordBatch,
Expand Down Expand Up @@ -264,7 +265,7 @@ async fn receive_batch_flight_data(

while message.header_type() == ipc::MessageHeader::DictionaryBatch {
reader::read_dictionary(
&data.data_body,
&Buffer::from(&data.data_body),
message
.header_as_dictionary_batch()
.expect("Error parsing dictionary"),
Expand Down
Expand Up @@ -22,6 +22,7 @@ use std::sync::Arc;

use arrow::{
array::ArrayRef,
buffer::Buffer,
datatypes::Schema,
datatypes::SchemaRef,
ipc::{self, reader},
Expand Down Expand Up @@ -282,7 +283,7 @@ async fn send_app_metadata(

async fn record_batch_from_message(
message: ipc::Message<'_>,
data_body: &[u8],
data_body: &Buffer,
schema_ref: SchemaRef,
dictionaries_by_id: &HashMap<i64, ArrayRef>,
) -> Result<RecordBatch, Status> {
Expand All @@ -306,7 +307,7 @@ async fn record_batch_from_message(

async fn dictionary_from_message(
message: ipc::Message<'_>,
data_body: &[u8],
data_body: &Buffer,
schema_ref: SchemaRef,
dictionaries_by_id: &mut HashMap<i64, ArrayRef>,
) -> Result<(), Status> {
Expand Down Expand Up @@ -354,7 +355,7 @@ async fn save_uploaded_chunks(

let batch = record_batch_from_message(
message,
&data.data_body,
&Buffer::from(data.data_body),
schema_ref.clone(),
&dictionaries_by_id,
)
Expand All @@ -365,7 +366,7 @@ async fn save_uploaded_chunks(
ipc::MessageHeader::DictionaryBatch => {
dictionary_from_message(
message,
&data.data_body,
&Buffer::from(data.data_body),
schema_ref.clone(),
&mut dictionaries_by_id,
)
Expand Down

0 comments on commit ad84176

Please sign in to comment.