Skip to content

Commit

Permalink
Add Raw JSON Reader (~2.5x faster) (#3479)
Browse files Browse the repository at this point in the history
* Add Raw JSON Reader

* Custom tape decoder

* RAT

* Cleanup

* More columns in benchmark

* CI fixes

* Tweaks

* Add List support

* Add support for nested nulls

* Remove unnecessary dependency

* Add RawDecoder

* Clippy

* Fix List

* Fix buffering

* More tests

* Add Send bounds

* Fix variance

* Review feedback

* Add deprecation notices

* Build RawDecoder with builder

* Improve field estimate

* Format

* Handle unicode split over strings

* Improve detection of invalid UTF-8 sequences
  • Loading branch information
tustvold committed Jan 26, 2023
1 parent 902a17d commit 0f1a92a
Show file tree
Hide file tree
Showing 12 changed files with 1,916 additions and 27 deletions.
1 change: 1 addition & 0 deletions arrow-json/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ indexmap = { version = "1.9", default-features = false, features = ["std"] }
num = { version = "0.4", default-features = false, features = ["std"] }
serde_json = { version = "1.0", default-features = false, features = ["std"] }
chrono = { version = "0.4.23", default-features = false, features = ["clock"] }
lexical-core = { version = "0.8", default-features = false }

[dev-dependencies]
tempfile = "3.3"
Expand Down
6 changes: 4 additions & 2 deletions arrow-json/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
pub mod reader;
pub mod writer;

pub use self::reader::Reader;
pub use self::reader::ReaderBuilder;
mod raw;

pub use self::raw::{RawDecoder, RawReader, RawReaderBuilder};
pub use self::reader::{Reader, ReaderBuilder};
pub use self::writer::{ArrayWriter, LineDelimitedWriter, Writer};
use half::f16;
use serde_json::{Number, Value};
Expand Down
43 changes: 43 additions & 0 deletions arrow-json/src/raw/boolean_array.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow_array::builder::BooleanBuilder;
use arrow_array::Array;
use arrow_data::ArrayData;
use arrow_schema::ArrowError;

use crate::raw::tape::{Tape, TapeElement};
use crate::raw::{tape_error, ArrayDecoder};

#[derive(Default)]
pub struct BooleanArrayDecoder {}

impl ArrayDecoder for BooleanArrayDecoder {
fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayData, ArrowError> {
let mut builder = BooleanBuilder::with_capacity(pos.len());
for p in pos {
match tape.get(*p) {
TapeElement::Null => builder.append_null(),
TapeElement::True => builder.append_value(true),
TapeElement::False => builder.append_value(false),
d => return Err(tape_error(d, "boolean")),
}
}

Ok(builder.finish().into_data())
}
}
116 changes: 116 additions & 0 deletions arrow-json/src/raw/list_array.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::raw::tape::{Tape, TapeElement};
use crate::raw::{make_decoder, tape_error, ArrayDecoder};
use arrow_array::builder::{BooleanBufferBuilder, BufferBuilder};
use arrow_array::OffsetSizeTrait;
use arrow_data::{ArrayData, ArrayDataBuilder};
use arrow_schema::{ArrowError, DataType};
use std::marker::PhantomData;

pub struct ListArrayDecoder<O> {
data_type: DataType,
decoder: Box<dyn ArrayDecoder>,
phantom: PhantomData<O>,
is_nullable: bool,
}

impl<O: OffsetSizeTrait> ListArrayDecoder<O> {
pub fn new(data_type: DataType, is_nullable: bool) -> Result<Self, ArrowError> {
let field = match &data_type {
DataType::List(f) if !O::IS_LARGE => f,
DataType::LargeList(f) if O::IS_LARGE => f,
_ => unreachable!(),
};
let decoder = make_decoder(field.data_type().clone(), field.is_nullable())?;

Ok(Self {
data_type,
decoder,
phantom: Default::default(),
is_nullable,
})
}
}

impl<O: OffsetSizeTrait> ArrayDecoder for ListArrayDecoder<O> {
fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayData, ArrowError> {
let mut child_pos = Vec::with_capacity(pos.len());
let mut offsets = BufferBuilder::<O>::new(pos.len() + 1);
offsets.append(O::from_usize(0).unwrap());

let mut null_count = 0;
let mut nulls = self
.is_nullable
.then(|| BooleanBufferBuilder::new(pos.len()));

for p in pos {
let end_idx = match (tape.get(*p), nulls.as_mut()) {
(TapeElement::StartList(end_idx), None) => end_idx,
(TapeElement::StartList(end_idx), Some(nulls)) => {
nulls.append(true);
end_idx
}
(TapeElement::Null, Some(nulls)) => {
nulls.append(false);
null_count += 1;
*p + 1
}
(d, _) => return Err(tape_error(d, "[")),
};

let mut cur_idx = *p + 1;
while cur_idx < end_idx {
child_pos.push(cur_idx);

// Advance to next field
cur_idx = match tape.get(cur_idx) {
TapeElement::String(_)
| TapeElement::Number(_)
| TapeElement::True
| TapeElement::False
| TapeElement::Null => cur_idx + 1,
TapeElement::StartList(end_idx) => end_idx + 1,
TapeElement::StartObject(end_idx) => end_idx + 1,
d => return Err(tape_error(d, "list value")),
}
}

let offset = O::from_usize(child_pos.len()).ok_or_else(|| {
ArrowError::JsonError(format!(
"offset overflow decoding {}",
self.data_type
))
})?;
offsets.append(offset)
}

let child_data = self.decoder.decode(tape, &child_pos).unwrap();

let data = ArrayDataBuilder::new(self.data_type.clone())
.len(pos.len())
.null_bit_buffer(nulls.as_mut().map(|x| x.finish()))
.null_count(null_count)
.add_buffer(offsets.finish())
.child_data(vec![child_data]);

// Safety
// Validated lengths above
Ok(unsafe { data.build_unchecked() })
}
}

0 comments on commit 0f1a92a

Please sign in to comment.