From 37614ac384b9fcfb6d22e4c4d2bdabac2da43307 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Tue, 8 Nov 2022 10:00:08 +1300 Subject: [PATCH 1/3] Split out arrow-csv (#2594) --- .github/workflows/arrow.yml | 5 + .github/workflows/arrow_flight.yml | 1 + .github/workflows/dev_pr/labeler.yml | 1 + .github/workflows/integration.yml | 1 + .github/workflows/miri.yaml | 1 + .github/workflows/parquet.yml | 1 + Cargo.toml | 1 + arrow-csv/Cargo.toml | 53 ++ arrow/src/csv/mod.rs => arrow-csv/src/lib.rs | 8 +- {arrow/src/csv => arrow-csv/src}/reader.rs | 544 ++----------------- {arrow/src/csv => arrow-csv/src}/writer.rs | 172 ++---- arrow/Cargo.toml | 11 +- arrow/src/lib.rs | 2 +- arrow/tests/csv.rs | 486 +++++++++++++++++ arrow/tests/{ipc_integration.rs => ipc.rs} | 0 15 files changed, 655 insertions(+), 632 deletions(-) create mode 100644 arrow-csv/Cargo.toml rename arrow/src/csv/mod.rs => arrow-csv/src/lib.rs (85%) rename {arrow/src/csv => arrow-csv/src}/reader.rs (76%) rename {arrow/src/csv => arrow-csv/src}/writer.rs (81%) create mode 100644 arrow/tests/csv.rs rename arrow/tests/{ipc_integration.rs => ipc.rs} (100%) diff --git a/.github/workflows/arrow.yml b/.github/workflows/arrow.yml index d930086ef56..461e7e87ea5 100644 --- a/.github/workflows/arrow.yml +++ b/.github/workflows/arrow.yml @@ -34,6 +34,7 @@ on: - arrow-select/** - arrow-integration-test/** - arrow-ipc/** + - arrow-csv/** - .github/** jobs: @@ -64,6 +65,8 @@ jobs: run: cargo test -p arrow-cast --all-features - name: Test arrow-ipc with all features run: cargo test -p arrow-ipc --all-features + - name: Test arrow-csv with all features + run: cargo test -p arrow-csv --all-features - name: Test arrow-integration-test with all features run: cargo test -p arrow-integration-test --all-features - name: Test arrow with default features @@ -174,5 +177,7 @@ jobs: run: cargo clippy -p arrow-cast --all-targets --all-features -- -D warnings - name: Clippy arrow-ipc with all features run: cargo clippy -p arrow-ipc --all-targets --all-features -- -D warnings + - name: Clippy arrow-csv with all features + run: cargo clippy -p arrow-csv --all-targets --all-features -- -D warnings - name: Clippy arrow run: cargo clippy -p arrow --features=prettyprint,csv,ipc,test_utils,ffi,ipc_compression,dyn_cmp_dict,dyn_arith_dict,chrono-tz --all-targets -- -D warnings diff --git a/.github/workflows/arrow_flight.yml b/.github/workflows/arrow_flight.yml index ded4f5a6791..1f830ccf2b2 100644 --- a/.github/workflows/arrow_flight.yml +++ b/.github/workflows/arrow_flight.yml @@ -36,6 +36,7 @@ on: - arrow-select/** - arrow-flight/** - arrow-ipc/** + - arrow-csv/** - .github/** jobs: diff --git a/.github/workflows/dev_pr/labeler.yml b/.github/workflows/dev_pr/labeler.yml index 17ebf54de73..04c7c080e01 100644 --- a/.github/workflows/dev_pr/labeler.yml +++ b/.github/workflows/dev_pr/labeler.yml @@ -24,6 +24,7 @@ arrow: - arrow-schema/**/* - arrow-select/**/* - arrow-ipc/**/* + - arrow-csv/**/* arrow-flight: - arrow-flight/**/* diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml index 8566230ea0b..9418b904283 100644 --- a/.github/workflows/integration.yml +++ b/.github/workflows/integration.yml @@ -32,6 +32,7 @@ on: - arrow-schema/** - arrow-select/** - arrow-ipc/** + - arrow-csv/** - arrow-pyarrow-integration-testing/** - arrow-integration-test/** - arrow-integration-testing/** diff --git a/.github/workflows/miri.yaml b/.github/workflows/miri.yaml index 2e85c9dd95a..e58ebdb3569 100644 --- a/.github/workflows/miri.yaml +++ b/.github/workflows/miri.yaml @@ -32,6 +32,7 @@ on: - arrow-schema/** - arrow-select/** - arrow-ipc/** + - arrow-csv/** - .github/** jobs: diff --git a/.github/workflows/parquet.yml b/.github/workflows/parquet.yml index b369ef69bfd..4f3cf5f8005 100644 --- a/.github/workflows/parquet.yml +++ b/.github/workflows/parquet.yml @@ -35,6 +35,7 @@ on: - arrow-schema/** - arrow-select/** - arrow-ipc/** + - arrow-csv/** - parquet/** - .github/** diff --git a/Cargo.toml b/Cargo.toml index 0ab4853c6e1..18497d04379 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,7 @@ members = [ "arrow-array", "arrow-buffer", "arrow-cast", + "arrow-csv", "arrow-data", "arrow-flight", "arrow-integration-test", diff --git a/arrow-csv/Cargo.toml b/arrow-csv/Cargo.toml new file mode 100644 index 00000000000..95ea1345a79 --- /dev/null +++ b/arrow-csv/Cargo.toml @@ -0,0 +1,53 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "arrow-csv" +version = "26.0.0" +description = "Support for the Arrow CSV format" +homepage = "https://github.com/apache/arrow-rs" +repository = "https://github.com/apache/arrow-rs" +authors = ["Apache Arrow "] +license = "Apache-2.0" +keywords = ["arrow"] +include = [ + "benches/*.rs", + "src/**/*.rs", + "Cargo.toml", +] +edition = "2021" +rust-version = "1.62" + +[lib] +name = "arrow_csv" +path = "src/lib.rs" +bench = false + +[dependencies] +arrow-array = { version = "26.0.0", path = "../arrow-array" } +arrow-buffer = { version = "26.0.0", path = "../arrow-buffer" } +arrow-cast = { version = "26.0.0", path = "../arrow-cast" } +arrow-data = { version = "26.0.0", path = "../arrow-data" } +arrow-schema = { version = "26.0.0", path = "../arrow-schema" } +chrono = { version = "0.4", default-features = false, features = ["clock"] } +csv = { version = "1.1", default-features = false } +lazy_static = { version = "1.4", default-features = false } +lexical-core = { version = "^0.8", default-features = false } +regex = { version = "1.5.6", default-features = false, features = ["std", "unicode"] } + +[dev-dependencies] +tempfile = "3.3" diff --git a/arrow/src/csv/mod.rs b/arrow-csv/src/lib.rs similarity index 85% rename from arrow/src/csv/mod.rs rename to arrow-csv/src/lib.rs index 46ba7d71e20..a45cf082d71 100644 --- a/arrow/src/csv/mod.rs +++ b/arrow-csv/src/lib.rs @@ -27,14 +27,14 @@ pub use self::writer::Writer; pub use self::writer::WriterBuilder; use arrow_schema::ArrowError; -fn map_csv_error(error: csv_crate::Error) -> ArrowError { +fn map_csv_error(error: csv::Error) -> ArrowError { match error.kind() { - csv_crate::ErrorKind::Io(error) => ArrowError::CsvError(error.to_string()), - csv_crate::ErrorKind::Utf8 { pos: _, err } => ArrowError::CsvError(format!( + csv::ErrorKind::Io(error) => ArrowError::CsvError(error.to_string()), + csv::ErrorKind::Utf8 { pos: _, err } => ArrowError::CsvError(format!( "Encountered UTF-8 error while reading CSV file: {}", err )), - csv_crate::ErrorKind::UnequalLengths { + csv::ErrorKind::UnequalLengths { expected_len, len, .. } => ArrowError::CsvError(format!( "Encountered unequal lengths between records on CSV file. Expected {} \ diff --git a/arrow/src/csv/reader.rs b/arrow-csv/src/reader.rs similarity index 76% rename from arrow/src/csv/reader.rs rename to arrow-csv/src/reader.rs index 404f37e9309..426d343b8ca 100644 --- a/arrow/src/csv/reader.rs +++ b/arrow-csv/src/reader.rs @@ -22,11 +22,11 @@ //! //! Example: //! -//! ``` -//! use arrow::csv; -//! use arrow::datatypes::{DataType, Field, Schema}; -//! use std::fs::File; -//! use std::sync::Arc; +//! ```norun +//! # use arrow_schema::*; +//! # use arrow_csv::Reader; +//! # use std::fs::File; +//! # use std::sync::Arc; //! //! let schema = Schema::new(vec![ //! Field::new("city", DataType::Utf8, false), @@ -36,7 +36,7 @@ //! //! let file = File::open("test/data/uk_cities.csv").unwrap(); //! -//! let mut csv = csv::Reader::new(file, Arc::new(schema), false, None, 1024, None, None, None); +//! let mut csv = Reader::new(file, Arc::new(schema), false, None, 1024, None, None, None); //! let batch = csv.next().unwrap().unwrap(); //! ``` @@ -49,17 +49,15 @@ use std::fs::File; use std::io::{Read, Seek, SeekFrom}; use std::sync::Arc; -use crate::array::{ - ArrayRef, BooleanArray, Decimal128Builder, DictionaryArray, PrimitiveArray, - StringArray, -}; -use crate::datatypes::*; -use crate::error::{ArrowError, Result}; -use crate::record_batch::{RecordBatch, RecordBatchOptions}; +use arrow_array::builder::Decimal128Builder; +use arrow_array::types::*; +use arrow_array::*; use arrow_cast::parse::Parser; +use arrow_schema::*; -use crate::csv::map_csv_error; -use csv_crate::{ByteRecord, StringRecord}; +use crate::map_csv_error; +use arrow_data::decimal::validate_decimal_precision; +use csv::{ByteRecord, StringRecord}; use std::ops::Neg; lazy_static! { @@ -128,7 +126,7 @@ pub fn infer_file_schema( delimiter: u8, max_read_records: Option, has_header: bool, -) -> Result<(Schema, usize)> { +) -> Result<(Schema, usize), ArrowError> { let roptions = ReaderOptions { delimiter: Some(delimiter), max_read_records, @@ -142,7 +140,7 @@ pub fn infer_file_schema( fn infer_file_schema_with_csv_options( mut reader: R, roptions: ReaderOptions, -) -> Result<(Schema, usize)> { +) -> Result<(Schema, usize), ArrowError> { let saved_offset = reader.seek(SeekFrom::Current(0))?; let (schema, records_count) = @@ -164,7 +162,7 @@ pub fn infer_reader_schema( delimiter: u8, max_read_records: Option, has_header: bool, -) -> Result<(Schema, usize)> { +) -> Result<(Schema, usize), ArrowError> { let roptions = ReaderOptions { delimiter: Some(delimiter), max_read_records, @@ -177,7 +175,7 @@ pub fn infer_reader_schema( fn infer_reader_schema_with_csv_options( reader: R, roptions: ReaderOptions, -) -> Result<(Schema, usize)> { +) -> Result<(Schema, usize), ArrowError> { let mut csv_reader = Reader::build_csv_reader( reader, roptions.has_header, @@ -268,7 +266,7 @@ pub fn infer_schema_from_files( delimiter: u8, max_read_records: Option, has_header: bool, -) -> Result { +) -> Result { let mut schemas = vec![]; let mut records_to_read = max_read_records.unwrap_or(usize::MAX); @@ -302,7 +300,7 @@ pub struct Reader { /// Optional projection for which columns to load (zero-based column indices) projection: Option>, /// File reader - reader: csv_crate::Reader, + reader: csv::Reader, /// Current line number line_number: usize, /// Maximum number of rows to read @@ -410,8 +408,8 @@ impl Reader { escape: Option, quote: Option, terminator: Option, - ) -> csv_crate::Reader { - let mut reader_builder = csv_crate::ReaderBuilder::new(); + ) -> csv::Reader { + let mut reader_builder = csv::ReaderBuilder::new(); reader_builder.has_headers(has_header); if let Some(c) = delimiter { @@ -422,13 +420,13 @@ impl Reader { reader_builder.quote(c); } if let Some(t) = terminator { - reader_builder.terminator(csv_crate::Terminator::Any(t)); + reader_builder.terminator(csv::Terminator::Any(t)); } reader_builder.from_reader(reader) } fn from_csv_reader( - mut csv_reader: csv_crate::Reader, + mut csv_reader: csv::Reader, schema: SchemaRef, has_header: bool, batch_size: usize, @@ -474,7 +472,7 @@ impl Reader { } impl Iterator for Reader { - type Item = Result; + type Item = Result; fn next(&mut self) -> Option { let remaining = self.end - self.line_number; @@ -522,7 +520,7 @@ impl Iterator for Reader { } } -/// parses a slice of [csv_crate::StringRecord] into a +/// parses a slice of [csv::StringRecord] into a /// [RecordBatch](crate::record_batch::RecordBatch). fn parse( rows: &[StringRecord], @@ -531,13 +529,13 @@ fn parse( projection: Option<&Vec>, line_number: usize, datetime_format: Option<&str>, -) -> Result { +) -> Result { let projection: Vec = match projection { Some(v) => v.clone(), None => fields.iter().enumerate().map(|(i, _)| i).collect(), }; - let arrays: Result> = projection + let arrays: Result, _> = projection .iter() .map(|i| { let i = *i; @@ -706,7 +704,7 @@ fn build_decimal_array( col_idx: usize, precision: u8, scale: u8, -) -> Result { +) -> Result { let mut decimal_builder = Decimal128Builder::with_capacity(rows.len()); for row in rows { let col_s = row.get(col_idx); @@ -720,7 +718,7 @@ fn build_decimal_array( // append null decimal_builder.append_null(); } else { - let decimal_value: Result = + let decimal_value: Result = parse_decimal_with_parameter(s, precision, scale); match decimal_value { Ok(v) => { @@ -743,7 +741,11 @@ fn build_decimal_array( // Parse the string format decimal value to i128 format and checking the precision and scale. // The result i128 value can't be out of bounds. -fn parse_decimal_with_parameter(s: &str, precision: u8, scale: u8) -> Result { +fn parse_decimal_with_parameter( + s: &str, + precision: u8, + scale: u8, +) -> Result { if PARSE_DECIMAL_RE.is_match(s) { let mut offset = s.len(); let len = s.len(); @@ -808,7 +810,7 @@ fn parse_decimal_with_parameter(s: &str, precision: u8, scale: u8) -> Result Result { +fn parse_decimal(s: &str) -> Result { if PARSE_DECIMAL_RE.is_match(s) { let mut offset = s.len(); // each byte is digit、'-' or '.' @@ -856,7 +858,7 @@ fn build_primitive_array( rows: &[StringRecord], col_idx: usize, format: Option<&str>, -) -> Result { +) -> Result { rows.iter() .enumerate() .map(|(row_index, row)| { @@ -884,7 +886,7 @@ fn build_primitive_array( None => Ok(None), } }) - .collect::>>() + .collect::, ArrowError>>() .map(|e| Arc::new(e) as ArrayRef) } @@ -893,7 +895,7 @@ fn build_boolean_array( line_number: usize, rows: &[StringRecord], col_idx: usize, -) -> Result { +) -> Result { rows.iter() .enumerate() .map(|(row_index, row)| { @@ -918,7 +920,7 @@ fn build_boolean_array( None => Ok(None), } }) - .collect::>() + .collect::>() .map(|e| Arc::new(e) as ArrayRef) } @@ -988,16 +990,14 @@ impl ReaderBuilder { /// # Example /// /// ``` - /// extern crate arrow; - /// - /// use arrow::csv; + /// use arrow_csv::{Reader, ReaderBuilder}; /// use std::fs::File; /// - /// fn example() -> csv::Reader { + /// fn example() -> Reader { /// let file = File::open("test/data/uk_cities_with_headers.csv").unwrap(); /// /// // create a builder, inferring the schema with the first 100 records - /// let builder = csv::ReaderBuilder::new().infer_schema(Some(100)); + /// let builder = ReaderBuilder::new().infer_schema(Some(100)); /// /// let reader = builder.build(file).unwrap(); /// @@ -1086,7 +1086,7 @@ impl ReaderBuilder { } /// Create a new `Reader` from the `ReaderBuilder` - pub fn build(self, mut reader: R) -> Result> { + pub fn build(self, mut reader: R) -> Result, ArrowError> { // check if schema should be inferred let delimiter = self.delimiter.unwrap_or(b','); let schema = match self.schema { @@ -1131,436 +1131,11 @@ impl ReaderBuilder { mod tests { use super::*; - use std::fs::File; - use std::io::{Cursor, Write}; + use std::io::Write; use tempfile::NamedTempFile; - use crate::array::*; - use crate::compute::cast; - use crate::datatypes::Field; use chrono::prelude::*; - #[test] - fn test_csv() { - let _: Vec<()> = vec![None, Some("%Y-%m-%dT%H:%M:%S%.f%:z".to_string())] - .into_iter() - .map(|format| { - let schema = Schema::new(vec![ - Field::new("city", DataType::Utf8, false), - Field::new("lat", DataType::Float64, false), - Field::new("lng", DataType::Float64, false), - ]); - - let file = File::open("test/data/uk_cities.csv").unwrap(); - let mut csv = Reader::new( - file, - Arc::new(schema.clone()), - false, - None, - 1024, - None, - None, - format, - ); - assert_eq!(Arc::new(schema), csv.schema()); - let batch = csv.next().unwrap().unwrap(); - assert_eq!(37, batch.num_rows()); - assert_eq!(3, batch.num_columns()); - - // access data from a primitive array - let lat = batch - .column(1) - .as_any() - .downcast_ref::() - .unwrap(); - assert_eq!(57.653484, lat.value(0)); - - // access data from a string array (ListArray) - let city = batch - .column(0) - .as_any() - .downcast_ref::() - .unwrap(); - - assert_eq!("Aberdeen, Aberdeen City, UK", city.value(13)); - }) - .collect(); - } - - #[test] - fn test_csv_schema_metadata() { - let mut metadata = std::collections::HashMap::new(); - metadata.insert("foo".to_owned(), "bar".to_owned()); - let schema = Schema::new_with_metadata( - vec![ - Field::new("city", DataType::Utf8, false), - Field::new("lat", DataType::Float64, false), - Field::new("lng", DataType::Float64, false), - ], - metadata.clone(), - ); - - let file = File::open("test/data/uk_cities.csv").unwrap(); - - let mut csv = Reader::new( - file, - Arc::new(schema.clone()), - false, - None, - 1024, - None, - None, - None, - ); - assert_eq!(Arc::new(schema), csv.schema()); - let batch = csv.next().unwrap().unwrap(); - assert_eq!(37, batch.num_rows()); - assert_eq!(3, batch.num_columns()); - - assert_eq!(&metadata, batch.schema().metadata()); - } - - #[test] - fn test_csv_reader_with_decimal() { - let schema = Schema::new(vec![ - Field::new("city", DataType::Utf8, false), - Field::new("lat", DataType::Decimal128(38, 6), false), - Field::new("lng", DataType::Decimal128(38, 6), false), - ]); - - let file = File::open("test/data/decimal_test.csv").unwrap(); - - let mut csv = - Reader::new(file, Arc::new(schema), false, None, 1024, None, None, None); - let batch = csv.next().unwrap().unwrap(); - // access data from a primitive array - let lat = batch - .column(1) - .as_any() - .downcast_ref::() - .unwrap(); - - assert_eq!("57.653484", lat.value_as_string(0)); - assert_eq!("53.002666", lat.value_as_string(1)); - assert_eq!("52.412811", lat.value_as_string(2)); - assert_eq!("51.481583", lat.value_as_string(3)); - assert_eq!("12.123456", lat.value_as_string(4)); - assert_eq!("50.760000", lat.value_as_string(5)); - assert_eq!("0.123000", lat.value_as_string(6)); - assert_eq!("123.000000", lat.value_as_string(7)); - assert_eq!("123.000000", lat.value_as_string(8)); - assert_eq!("-50.760000", lat.value_as_string(9)); - } - - #[test] - fn test_csv_from_buf_reader() { - let schema = Schema::new(vec![ - Field::new("city", DataType::Utf8, false), - Field::new("lat", DataType::Float64, false), - Field::new("lng", DataType::Float64, false), - ]); - - let file_with_headers = - File::open("test/data/uk_cities_with_headers.csv").unwrap(); - let file_without_headers = File::open("test/data/uk_cities.csv").unwrap(); - let both_files = file_with_headers - .chain(Cursor::new("\n".to_string())) - .chain(file_without_headers); - let mut csv = Reader::from_reader( - both_files, - Arc::new(schema), - true, - None, - 1024, - None, - None, - None, - ); - let batch = csv.next().unwrap().unwrap(); - assert_eq!(74, batch.num_rows()); - assert_eq!(3, batch.num_columns()); - } - - #[test] - fn test_csv_with_schema_inference() { - let file = File::open("test/data/uk_cities_with_headers.csv").unwrap(); - - let builder = ReaderBuilder::new().has_header(true).infer_schema(None); - - let mut csv = builder.build(file).unwrap(); - let expected_schema = Schema::new(vec![ - Field::new("city", DataType::Utf8, true), - Field::new("lat", DataType::Float64, true), - Field::new("lng", DataType::Float64, true), - ]); - assert_eq!(Arc::new(expected_schema), csv.schema()); - let batch = csv.next().unwrap().unwrap(); - assert_eq!(37, batch.num_rows()); - assert_eq!(3, batch.num_columns()); - - // access data from a primitive array - let lat = batch - .column(1) - .as_any() - .downcast_ref::() - .unwrap(); - assert_eq!(57.653484, lat.value(0)); - - // access data from a string array (ListArray) - let city = batch - .column(0) - .as_any() - .downcast_ref::() - .unwrap(); - - assert_eq!("Aberdeen, Aberdeen City, UK", city.value(13)); - } - - #[test] - fn test_csv_with_schema_inference_no_headers() { - let file = File::open("test/data/uk_cities.csv").unwrap(); - - let builder = ReaderBuilder::new().infer_schema(None); - - let mut csv = builder.build(file).unwrap(); - - // csv field names should be 'column_{number}' - let schema = csv.schema(); - assert_eq!("column_1", schema.field(0).name()); - assert_eq!("column_2", schema.field(1).name()); - assert_eq!("column_3", schema.field(2).name()); - let batch = csv.next().unwrap().unwrap(); - let batch_schema = batch.schema(); - - assert_eq!(schema, batch_schema); - assert_eq!(37, batch.num_rows()); - assert_eq!(3, batch.num_columns()); - - // access data from a primitive array - let lat = batch - .column(1) - .as_any() - .downcast_ref::() - .unwrap(); - assert_eq!(57.653484, lat.value(0)); - - // access data from a string array (ListArray) - let city = batch - .column(0) - .as_any() - .downcast_ref::() - .unwrap(); - - assert_eq!("Aberdeen, Aberdeen City, UK", city.value(13)); - } - - #[test] - fn test_csv_builder_with_bounds() { - let file = File::open("test/data/uk_cities.csv").unwrap(); - - // Set the bounds to the lines 0, 1 and 2. - let mut csv = ReaderBuilder::new().with_bounds(0, 2).build(file).unwrap(); - let batch = csv.next().unwrap().unwrap(); - - // access data from a string array (ListArray) - let city = batch - .column(0) - .as_any() - .downcast_ref::() - .unwrap(); - - // The value on line 0 is within the bounds - assert_eq!("Elgin, Scotland, the UK", city.value(0)); - - // The value on line 13 is outside of the bounds. Therefore - // the call to .value() will panic. - let result = std::panic::catch_unwind(|| city.value(13)); - assert!(result.is_err()); - } - - #[test] - fn test_csv_with_projection() { - let schema = Schema::new(vec![ - Field::new("city", DataType::Utf8, false), - Field::new("lat", DataType::Float64, false), - Field::new("lng", DataType::Float64, false), - ]); - - let file = File::open("test/data/uk_cities.csv").unwrap(); - - let mut csv = Reader::new( - file, - Arc::new(schema), - false, - None, - 1024, - None, - Some(vec![0, 1]), - None, - ); - let projected_schema = Arc::new(Schema::new(vec![ - Field::new("city", DataType::Utf8, false), - Field::new("lat", DataType::Float64, false), - ])); - assert_eq!(projected_schema, csv.schema()); - let batch = csv.next().unwrap().unwrap(); - assert_eq!(projected_schema, batch.schema()); - assert_eq!(37, batch.num_rows()); - assert_eq!(2, batch.num_columns()); - } - - #[test] - fn test_csv_with_dictionary() { - let schema = Schema::new(vec![ - Field::new( - "city", - DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), - false, - ), - Field::new("lat", DataType::Float64, false), - Field::new("lng", DataType::Float64, false), - ]); - - let file = File::open("test/data/uk_cities.csv").unwrap(); - - let mut csv = Reader::new( - file, - Arc::new(schema), - false, - None, - 1024, - None, - Some(vec![0, 1]), - None, - ); - let projected_schema = Arc::new(Schema::new(vec![ - Field::new( - "city", - DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), - false, - ), - Field::new("lat", DataType::Float64, false), - ])); - assert_eq!(projected_schema, csv.schema()); - let batch = csv.next().unwrap().unwrap(); - assert_eq!(projected_schema, batch.schema()); - assert_eq!(37, batch.num_rows()); - assert_eq!(2, batch.num_columns()); - - let strings = cast(batch.column(0), &DataType::Utf8).unwrap(); - let strings = strings.as_any().downcast_ref::().unwrap(); - - assert_eq!(strings.value(0), "Elgin, Scotland, the UK"); - assert_eq!(strings.value(4), "Eastbourne, East Sussex, UK"); - assert_eq!(strings.value(29), "Uckfield, East Sussex, UK"); - } - - #[test] - fn test_nulls() { - let schema = Schema::new(vec![ - Field::new("c_int", DataType::UInt64, false), - Field::new("c_float", DataType::Float32, true), - Field::new("c_string", DataType::Utf8, false), - ]); - - let file = File::open("test/data/null_test.csv").unwrap(); - - let mut csv = - Reader::new(file, Arc::new(schema), true, None, 1024, None, None, None); - let batch = csv.next().unwrap().unwrap(); - - assert!(!batch.column(1).is_null(0)); - assert!(!batch.column(1).is_null(1)); - assert!(batch.column(1).is_null(2)); - assert!(!batch.column(1).is_null(3)); - assert!(!batch.column(1).is_null(4)); - } - - #[test] - fn test_nulls_with_inference() { - let file = File::open("test/data/various_types.csv").unwrap(); - - let builder = ReaderBuilder::new() - .infer_schema(None) - .has_header(true) - .with_delimiter(b'|') - .with_batch_size(512) - .with_projection(vec![0, 1, 2, 3, 4, 5]); - - let mut csv = builder.build(file).unwrap(); - let batch = csv.next().unwrap().unwrap(); - - assert_eq!(7, batch.num_rows()); - assert_eq!(6, batch.num_columns()); - - let schema = batch.schema(); - - assert_eq!(&DataType::Int64, schema.field(0).data_type()); - assert_eq!(&DataType::Float64, schema.field(1).data_type()); - assert_eq!(&DataType::Float64, schema.field(2).data_type()); - assert_eq!(&DataType::Boolean, schema.field(3).data_type()); - assert_eq!(&DataType::Date32, schema.field(4).data_type()); - assert_eq!(&DataType::Date64, schema.field(5).data_type()); - - let names: Vec<&str> = - schema.fields().iter().map(|x| x.name().as_str()).collect(); - assert_eq!( - names, - vec![ - "c_int", - "c_float", - "c_string", - "c_bool", - "c_date", - "c_datetime" - ] - ); - - assert!(schema.field(0).is_nullable()); - assert!(schema.field(1).is_nullable()); - assert!(schema.field(2).is_nullable()); - assert!(schema.field(3).is_nullable()); - assert!(schema.field(4).is_nullable()); - assert!(schema.field(5).is_nullable()); - - assert!(!batch.column(1).is_null(0)); - assert!(!batch.column(1).is_null(1)); - assert!(batch.column(1).is_null(2)); - assert!(!batch.column(1).is_null(3)); - assert!(!batch.column(1).is_null(4)); - } - - #[test] - fn test_parse_invalid_csv() { - let file = File::open("test/data/various_types_invalid.csv").unwrap(); - - let schema = Schema::new(vec![ - Field::new("c_int", DataType::UInt64, false), - Field::new("c_float", DataType::Float32, false), - Field::new("c_string", DataType::Utf8, false), - Field::new("c_bool", DataType::Boolean, false), - ]); - - let builder = ReaderBuilder::new() - .with_schema(Arc::new(schema)) - .has_header(true) - .with_delimiter(b'|') - .with_batch_size(512) - .with_projection(vec![0, 1, 2, 3]); - - let mut csv = builder.build(file).unwrap(); - match csv.next() { - Some(e) => match e { - Err(e) => assert_eq!( - "ParseError(\"Error while parsing value 4.x4 for column 1 at line 4\")", - format!("{:?}", e) - ), - Ok(_) => panic!("should have failed"), - }, - None => panic!("should have failed"), - } - } - #[test] fn test_infer_field_schema() { assert_eq!(infer_field_schema("A", None), DataType::Utf8); @@ -1771,21 +1346,21 @@ mod tests { } #[test] - fn test_infer_schema_from_multiple_files() -> Result<()> { - let mut csv1 = NamedTempFile::new()?; - let mut csv2 = NamedTempFile::new()?; - let csv3 = NamedTempFile::new()?; // empty csv file should be skipped - let mut csv4 = NamedTempFile::new()?; - writeln!(csv1, "c1,c2,c3")?; - writeln!(csv1, "1,\"foo\",0.5")?; - writeln!(csv1, "3,\"bar\",1")?; - writeln!(csv1, "3,\"bar\",2e-06")?; + fn test_infer_schema_from_multiple_files() { + let mut csv1 = NamedTempFile::new().unwrap(); + let mut csv2 = NamedTempFile::new().unwrap(); + let csv3 = NamedTempFile::new().unwrap(); // empty csv file should be skipped + let mut csv4 = NamedTempFile::new().unwrap(); + writeln!(csv1, "c1,c2,c3").unwrap(); + writeln!(csv1, "1,\"foo\",0.5").unwrap(); + writeln!(csv1, "3,\"bar\",1").unwrap(); + writeln!(csv1, "3,\"bar\",2e-06").unwrap(); // reading csv2 will set c2 to optional - writeln!(csv2, "c1,c2,c3,c4")?; - writeln!(csv2, "10,,3.14,true")?; + writeln!(csv2, "c1,c2,c3,c4").unwrap(); + writeln!(csv2, "10,,3.14,true").unwrap(); // reading csv4 will set c3 to optional - writeln!(csv4, "c1,c2,c3")?; - writeln!(csv4, "10,\"foo\",")?; + writeln!(csv4, "c1,c2,c3").unwrap(); + writeln!(csv4, "10,\"foo\",").unwrap(); let schema = infer_schema_from_files( &[ @@ -1797,7 +1372,8 @@ mod tests { b',', Some(4), // only csv1 and csv2 should be read true, - )?; + ) + .unwrap(); assert_eq!(schema.fields().len(), 4); assert!(schema.field(0).is_nullable()); @@ -1809,8 +1385,6 @@ mod tests { assert_eq!(&DataType::Utf8, schema.field(1).data_type()); assert_eq!(&DataType::Float64, schema.field(2).data_type()); assert_eq!(&DataType::Boolean, schema.field(3).data_type()); - - Ok(()) } #[test] diff --git a/arrow/src/csv/writer.rs b/arrow-csv/src/writer.rs similarity index 81% rename from arrow/src/csv/writer.rs rename to arrow-csv/src/writer.rs index b2d02fe8494..674b333698b 100644 --- a/arrow/src/csv/writer.rs +++ b/arrow-csv/src/writer.rs @@ -23,11 +23,11 @@ //! Example: //! //! ``` -//! use arrow::array::*; -//! use arrow::csv; -//! use arrow::datatypes::*; -//! use arrow::record_batch::RecordBatch; -//! use std::sync::Arc; +//! # use arrow_array::*; +//! # use arrow_array::types::*; +//! # use arrow_csv::Writer; +//! # use arrow_schema::*; +//! # use std::sync::Arc; //! //! let schema = Schema::new(vec![ //! Field::new("c1", DataType::Utf8, false), @@ -56,7 +56,7 @@ //! //! let mut output = Vec::with_capacity(1024); //! -//! let mut writer = csv::Writer::new(&mut output); +//! let mut writer = Writer::new(&mut output); //! let batches = vec![&batch, &batch]; //! for batch in batches { //! writer.write(batch).unwrap(); @@ -64,15 +64,14 @@ //! ``` use arrow_array::timezone::Tz; +use arrow_array::types::*; +use arrow_array::*; +use arrow_cast::display::{lexical_to_string, make_string_from_decimal}; +use arrow_schema::*; use chrono::{DateTime, Utc}; use std::io::Write; -use crate::array::*; -use crate::csv::map_csv_error; -use crate::datatypes::*; -use crate::error::{ArrowError, Result}; -use crate::record_batch::RecordBatch; -use crate::util::display::{lexical_to_string, make_string_from_decimal}; +use crate::map_csv_error; const DEFAULT_DATE_FORMAT: &str = "%F"; const DEFAULT_TIME_FORMAT: &str = "%T"; @@ -81,7 +80,7 @@ const DEFAULT_TIMESTAMP_TZ_FORMAT: &str = "%FT%H:%M:%S.%9f%:z"; fn write_primitive_value(array: &ArrayRef, i: usize) -> String where - T: ArrowNumericType, + T: ArrowPrimitiveType, T::Native: lexical_core::ToLexical, { let c = array.as_any().downcast_ref::>().unwrap(); @@ -92,7 +91,7 @@ where #[derive(Debug)] pub struct Writer { /// The object to write to - writer: csv_crate::Writer, + writer: csv::Writer, /// Whether file should be written with headers. Defaults to `true` has_headers: bool, /// The date format for date arrays @@ -115,7 +114,7 @@ impl Writer { /// Create a new CsvWriter from a writable object, with default options pub fn new(writer: W) -> Self { let delimiter = b','; - let mut builder = csv_crate::WriterBuilder::new(); + let mut builder = csv::WriterBuilder::new(); let writer = builder.delimiter(delimiter).from_writer(writer); Writer { writer, @@ -135,7 +134,7 @@ impl Writer { batch: &[ArrayRef], row_index: usize, buffer: &mut [String], - ) -> Result<()> { + ) -> Result<(), ArrowError> { // TODO: it'd be more efficient if we could create `record: Vec<&[u8]> for (col_index, item) in buffer.iter_mut().enumerate() { let col = &batch[col_index]; @@ -242,7 +241,7 @@ impl Writer { time_zone: Option<&String>, row_index: usize, col: &ArrayRef, - ) -> Result { + ) -> Result { use TimeUnit::*; let datetime = match time_unit { Second => col @@ -283,7 +282,7 @@ impl Writer { } /// Write a vector of record batches to a writable object - pub fn write(&mut self, batch: &RecordBatch) -> Result<()> { + pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> { let num_columns = batch.num_columns(); if self.beginning { if self.has_headers { @@ -305,7 +304,7 @@ impl Writer { .iter() .map(|array| match array.data_type() { DataType::Dictionary(_, value_type) => { - crate::compute::kernels::cast::cast(array, value_type) + arrow_cast::cast(array, value_type) .expect("cannot cast dictionary to underlying values") } _ => array.clone(), @@ -365,16 +364,14 @@ impl WriterBuilder { /// # Example /// /// ``` - /// extern crate arrow; + /// # use arrow_csv::{Writer, WriterBuilder}; + /// # use std::fs::File; /// - /// use arrow::csv; - /// use std::fs::File; - /// - /// fn example() -> csv::Writer { + /// fn example() -> Writer { /// let file = File::create("target/out.csv").unwrap(); /// /// // create a builder that doesn't write headers - /// let builder = csv::WriterBuilder::new().has_headers(false); + /// let builder = WriterBuilder::new().has_headers(false); /// let writer = builder.build(file); /// /// writer @@ -423,7 +420,7 @@ impl WriterBuilder { /// Create a new `Writer` pub fn build(self, writer: W) -> Writer { let delimiter = self.delimiter.unwrap_or(b','); - let mut builder = csv_crate::WriterBuilder::new(); + let mut builder = csv::WriterBuilder::new(); let writer = builder.delimiter(delimiter).from_writer(writer); Writer { writer, @@ -452,13 +449,8 @@ impl WriterBuilder { mod tests { use super::*; - use crate::csv::Reader; - use crate::datatypes::{Field, Schema}; - #[cfg(feature = "chrono-tz")] - use crate::util::string_writer::StringWriter; - use crate::util::test_util::get_temp_file; - use std::fs::File; - use std::io::{Cursor, Read}; + use crate::Reader; + use std::io::{Cursor, Read, Seek}; use std::sync::Arc; #[test] @@ -512,15 +504,17 @@ mod tests { ) .unwrap(); - let file = get_temp_file("columns.csv", &[]); + let mut file = tempfile::tempfile().unwrap(); - let mut writer = Writer::new(file); + let mut writer = Writer::new(&mut file); let batches = vec![&batch, &batch]; for batch in batches { writer.write(batch).unwrap(); } + drop(writer); + // check that file was written successfully - let mut file = File::open("target/debug/testdata/columns.csv").unwrap(); + file.rewind().unwrap(); let mut buffer: Vec = vec![]; file.read_to_end(&mut buffer).unwrap(); @@ -571,20 +565,21 @@ sed do eiusmod tempor,-556132.25,1,,2019-04-18T02:45:55.555000000,23:46:03,foo ) .unwrap(); - let file = get_temp_file("custom_options.csv", &[]); + let mut file = tempfile::tempfile().unwrap(); let builder = WriterBuilder::new() .has_headers(false) .with_delimiter(b'|') .with_time_format("%r".to_string()); - let mut writer = builder.build(file); + let mut writer = builder.build(&mut file); let batches = vec![&batch]; for batch in batches { writer.write(batch).unwrap(); } + drop(writer); // check that file was written successfully - let mut file = File::open("target/debug/testdata/custom_options.csv").unwrap(); + file.rewind().unwrap(); let mut buffer: Vec = vec![]; file.read_to_end(&mut buffer).unwrap(); @@ -595,105 +590,6 @@ sed do eiusmod tempor,-556132.25,1,,2019-04-18T02:45:55.555000000,23:46:03,foo ); } - #[cfg(feature = "chrono-tz")] - #[test] - fn test_export_csv_timestamps() { - let schema = Schema::new(vec![ - Field::new( - "c1", - DataType::Timestamp( - TimeUnit::Millisecond, - Some("Australia/Sydney".to_string()), - ), - true, - ), - Field::new("c2", DataType::Timestamp(TimeUnit::Millisecond, None), true), - ]); - - let c1 = TimestampMillisecondArray::from( - // 1555584887 converts to 2019-04-18, 20:54:47 in time zone Australia/Sydney (AEST). - // The offset (difference to UTC) is +10:00. - // 1635577147 converts to 2021-10-30 17:59:07 in time zone Australia/Sydney (AEDT) - // The offset (difference to UTC) is +11:00. Note that daylight savings is in effect on 2021-10-30. - // - vec![Some(1555584887378), Some(1635577147000)], - ) - .with_timezone("Australia/Sydney".to_string()); - let c2 = TimestampMillisecondArray::from(vec![ - Some(1555584887378), - Some(1635577147000), - ]); - let batch = - RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]) - .unwrap(); - - let sw = StringWriter::new(); - let mut writer = Writer::new(sw); - let batches = vec![&batch]; - for batch in batches { - writer.write(batch).unwrap(); - } - - let left = "c1,c2 -2019-04-18T20:54:47.378000000+10:00,2019-04-18T10:54:47.378000000 -2021-10-30T17:59:07.000000000+11:00,2021-10-30T06:59:07.000000000\n"; - let right = writer.writer.into_inner().map(|s| s.to_string()); - assert_eq!(Some(left.to_string()), right.ok()); - } - - #[cfg(not(feature = "chrono-tz"))] - #[test] - fn test_conversion_consistency() { - // test if we can serialize and deserialize whilst retaining the same type information/ precision - - let schema = Schema::new(vec![ - Field::new("c1", DataType::Date32, false), - Field::new("c2", DataType::Date64, false), - ]); - - let c1 = Date32Array::from(vec![3, 2, 1]); - let c2 = Date64Array::from(vec![3, 2, 1]); - - let batch = RecordBatch::try_new( - Arc::new(schema.clone()), - vec![Arc::new(c1), Arc::new(c2)], - ) - .unwrap(); - - let builder = WriterBuilder::new().has_headers(false); - - let mut buf: Cursor> = Default::default(); - // drop the writer early to release the borrow. - { - let mut writer = builder.build(&mut buf); - writer.write(&batch).unwrap(); - } - buf.set_position(0); - - let mut reader = Reader::new( - buf, - Arc::new(schema), - false, - None, - 3, - // starting at row 2 and up to row 6. - None, - None, - None, - ); - let rb = reader.next().unwrap().unwrap(); - let c1 = rb.column(0).as_any().downcast_ref::().unwrap(); - let c2 = rb.column(1).as_any().downcast_ref::().unwrap(); - - let actual = c1.into_iter().collect::>(); - let expected = vec![Some(3), Some(2), Some(1)]; - assert_eq!(actual, expected); - let actual = c2.into_iter().collect::>(); - let expected = vec![Some(3), Some(2), Some(1)]; - assert_eq!(actual, expected); - } - - #[cfg(feature = "chrono-tz")] #[test] fn test_conversion_consistency() { // test if we can serialize and deserialize whilst retaining the same type information/ precision diff --git a/arrow/Cargo.toml b/arrow/Cargo.toml index 6c30df6bd27..cc9421de710 100644 --- a/arrow/Cargo.toml +++ b/arrow/Cargo.toml @@ -46,6 +46,7 @@ ahash = { version = "0.8", default-features = false, features = ["runtime-rng"] [dependencies] arrow-buffer = { version = "26.0.0", path = "../arrow-buffer" } arrow-cast = { version = "26.0.0", path = "../arrow-cast" } +arrow-csv = { version = "26.0.0", path = "../arrow-csv", optional = true } arrow-data = { version = "26.0.0", path = "../arrow-data" } arrow-schema = { version = "26.0.0", path = "../arrow-schema" } arrow-array = { version = "26.0.0", path = "../arrow-array" } @@ -57,10 +58,8 @@ rand = { version = "0.8", default-features = false, features = ["std", "std_rng" num = { version = "0.4", default-features = false, features = ["std"] } half = { version = "2.1", default-features = false, features = ["num-traits"] } hashbrown = { version = "0.12", default-features = false } -csv_crate = { version = "1.1", default-features = false, optional = true, package = "csv" } regex = { version = "1.5.6", default-features = false, features = ["std", "unicode"] } regex-syntax = { version = "0.6.27", default-features = false, features = ["unicode"] } -lazy_static = { version = "1.4", default-features = false } packed_simd = { version = "0.3", default-features = false, optional = true, package = "packed_simd_2" } chrono = { version = "0.4", default-features = false, features = ["clock"] } comfy-table = { version = "6.0", optional = true, default-features = false } @@ -75,7 +74,7 @@ features = ["prettyprint", "ipc_compression", "dyn_cmp_dict", "ffi", "pyarrow"] [features] default = ["csv", "ipc", "json"] ipc_compression = ["ipc", "arrow-ipc/lz4", "arrow-ipc/zstd"] -csv = ["csv_crate"] +csv = ["arrow-csv"] ipc = ["arrow-ipc"] json = ["serde_json"] simd = ["packed_simd"] @@ -265,5 +264,9 @@ harness = false required-features = ["test_utils"] [[test]] -name = "ipc_integration" +name = "ipc" required-features = ["test_utils", "ipc"] + +[[test]] +name = "csv" +required-features = ["csv"] diff --git a/arrow/src/lib.rs b/arrow/src/lib.rs index b2fa30d26d5..d1e0095840a 100644 --- a/arrow/src/lib.rs +++ b/arrow/src/lib.rs @@ -306,7 +306,7 @@ pub mod bitmap { pub mod array; pub mod compute; #[cfg(feature = "csv")] -pub mod csv; +pub use arrow_csv as csv; pub mod datatypes; pub mod error; #[cfg(feature = "ffi")] diff --git a/arrow/tests/csv.rs b/arrow/tests/csv.rs new file mode 100644 index 00000000000..11e1b30e148 --- /dev/null +++ b/arrow/tests/csv.rs @@ -0,0 +1,486 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fs::File; +use std::io::{Cursor, Read}; +use std::sync::Arc; + +use arrow_array::*; +use arrow_csv::{Reader, ReaderBuilder}; +use arrow_schema::*; + +#[test] +#[cfg(feature = "chrono-tz")] +fn test_export_csv_timestamps() { + let schema = Schema::new(vec![ + Field::new( + "c1", + DataType::Timestamp( + TimeUnit::Millisecond, + Some("Australia/Sydney".to_string()), + ), + true, + ), + Field::new("c2", DataType::Timestamp(TimeUnit::Millisecond, None), true), + ]); + + let c1 = TimestampMillisecondArray::from( + // 1555584887 converts to 2019-04-18, 20:54:47 in time zone Australia/Sydney (AEST). + // The offset (difference to UTC) is +10:00. + // 1635577147 converts to 2021-10-30 17:59:07 in time zone Australia/Sydney (AEDT) + // The offset (difference to UTC) is +11:00. Note that daylight savings is in effect on 2021-10-30. + // + vec![Some(1555584887378), Some(1635577147000)], + ) + .with_timezone("Australia/Sydney".to_string()); + let c2 = + TimestampMillisecondArray::from(vec![Some(1555584887378), Some(1635577147000)]); + let batch = + RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]).unwrap(); + + let mut sw = Vec::new(); + let mut writer = arrow_csv::Writer::new(&mut sw); + let batches = vec![&batch]; + for batch in batches { + writer.write(batch).unwrap(); + } + drop(writer); + + let left = "c1,c2 +2019-04-18T20:54:47.378000000+10:00,2019-04-18T10:54:47.378000000 +2021-10-30T17:59:07.000000000+11:00,2021-10-30T06:59:07.000000000\n"; + let right = String::from_utf8(sw).unwrap(); + assert_eq!(left, right); +} + +#[test] +fn test_csv() { + let _: Vec<()> = vec![None, Some("%Y-%m-%dT%H:%M:%S%.f%:z".to_string())] + .into_iter() + .map(|format| { + let schema = Schema::new(vec![ + Field::new("city", DataType::Utf8, false), + Field::new("lat", DataType::Float64, false), + Field::new("lng", DataType::Float64, false), + ]); + + let file = File::open("test/data/uk_cities.csv").unwrap(); + let mut csv = Reader::new( + file, + Arc::new(schema.clone()), + false, + None, + 1024, + None, + None, + format, + ); + assert_eq!(Arc::new(schema), csv.schema()); + let batch = csv.next().unwrap().unwrap(); + assert_eq!(37, batch.num_rows()); + assert_eq!(3, batch.num_columns()); + + // access data from a primitive array + let lat = batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(57.653484, lat.value(0)); + + // access data from a string array (ListArray) + let city = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + + assert_eq!("Aberdeen, Aberdeen City, UK", city.value(13)); + }) + .collect(); +} + +#[test] +fn test_csv_schema_metadata() { + let mut metadata = std::collections::HashMap::new(); + metadata.insert("foo".to_owned(), "bar".to_owned()); + let schema = Schema::new_with_metadata( + vec![ + Field::new("city", DataType::Utf8, false), + Field::new("lat", DataType::Float64, false), + Field::new("lng", DataType::Float64, false), + ], + metadata.clone(), + ); + + let file = File::open("test/data/uk_cities.csv").unwrap(); + + let mut csv = Reader::new( + file, + Arc::new(schema.clone()), + false, + None, + 1024, + None, + None, + None, + ); + assert_eq!(Arc::new(schema), csv.schema()); + let batch = csv.next().unwrap().unwrap(); + assert_eq!(37, batch.num_rows()); + assert_eq!(3, batch.num_columns()); + + assert_eq!(&metadata, batch.schema().metadata()); +} + +#[test] +fn test_csv_reader_with_decimal() { + let schema = Schema::new(vec![ + Field::new("city", DataType::Utf8, false), + Field::new("lat", DataType::Decimal128(38, 6), false), + Field::new("lng", DataType::Decimal128(38, 6), false), + ]); + + let file = File::open("test/data/decimal_test.csv").unwrap(); + + let mut csv = + Reader::new(file, Arc::new(schema), false, None, 1024, None, None, None); + let batch = csv.next().unwrap().unwrap(); + // access data from a primitive array + let lat = batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + + assert_eq!("57.653484", lat.value_as_string(0)); + assert_eq!("53.002666", lat.value_as_string(1)); + assert_eq!("52.412811", lat.value_as_string(2)); + assert_eq!("51.481583", lat.value_as_string(3)); + assert_eq!("12.123456", lat.value_as_string(4)); + assert_eq!("50.760000", lat.value_as_string(5)); + assert_eq!("0.123000", lat.value_as_string(6)); + assert_eq!("123.000000", lat.value_as_string(7)); + assert_eq!("123.000000", lat.value_as_string(8)); + assert_eq!("-50.760000", lat.value_as_string(9)); +} + +#[test] +fn test_csv_from_buf_reader() { + let schema = Schema::new(vec![ + Field::new("city", DataType::Utf8, false), + Field::new("lat", DataType::Float64, false), + Field::new("lng", DataType::Float64, false), + ]); + + let file_with_headers = File::open("test/data/uk_cities_with_headers.csv").unwrap(); + let file_without_headers = File::open("test/data/uk_cities.csv").unwrap(); + let both_files = file_with_headers + .chain(Cursor::new("\n".to_string())) + .chain(file_without_headers); + let mut csv = Reader::from_reader( + both_files, + Arc::new(schema), + true, + None, + 1024, + None, + None, + None, + ); + let batch = csv.next().unwrap().unwrap(); + assert_eq!(74, batch.num_rows()); + assert_eq!(3, batch.num_columns()); +} + +#[test] +fn test_csv_with_schema_inference() { + let file = File::open("test/data/uk_cities_with_headers.csv").unwrap(); + + let builder = ReaderBuilder::new().has_header(true).infer_schema(None); + + let mut csv = builder.build(file).unwrap(); + let expected_schema = Schema::new(vec![ + Field::new("city", DataType::Utf8, true), + Field::new("lat", DataType::Float64, true), + Field::new("lng", DataType::Float64, true), + ]); + assert_eq!(Arc::new(expected_schema), csv.schema()); + let batch = csv.next().unwrap().unwrap(); + assert_eq!(37, batch.num_rows()); + assert_eq!(3, batch.num_columns()); + + // access data from a primitive array + let lat = batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(57.653484, lat.value(0)); + + // access data from a string array (ListArray) + let city = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + + assert_eq!("Aberdeen, Aberdeen City, UK", city.value(13)); +} + +#[test] +fn test_csv_with_schema_inference_no_headers() { + let file = File::open("test/data/uk_cities.csv").unwrap(); + + let builder = ReaderBuilder::new().infer_schema(None); + + let mut csv = builder.build(file).unwrap(); + + // csv field names should be 'column_{number}' + let schema = csv.schema(); + assert_eq!("column_1", schema.field(0).name()); + assert_eq!("column_2", schema.field(1).name()); + assert_eq!("column_3", schema.field(2).name()); + let batch = csv.next().unwrap().unwrap(); + let batch_schema = batch.schema(); + + assert_eq!(schema, batch_schema); + assert_eq!(37, batch.num_rows()); + assert_eq!(3, batch.num_columns()); + + // access data from a primitive array + let lat = batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(57.653484, lat.value(0)); + + // access data from a string array (ListArray) + let city = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + + assert_eq!("Aberdeen, Aberdeen City, UK", city.value(13)); +} + +#[test] +fn test_csv_builder_with_bounds() { + let file = File::open("test/data/uk_cities.csv").unwrap(); + + // Set the bounds to the lines 0, 1 and 2. + let mut csv = ReaderBuilder::new().with_bounds(0, 2).build(file).unwrap(); + let batch = csv.next().unwrap().unwrap(); + + // access data from a string array (ListArray) + let city = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + + // The value on line 0 is within the bounds + assert_eq!("Elgin, Scotland, the UK", city.value(0)); + + // The value on line 13 is outside of the bounds. Therefore + // the call to .value() will panic. + let result = std::panic::catch_unwind(|| city.value(13)); + assert!(result.is_err()); +} + +#[test] +fn test_csv_with_projection() { + let schema = Schema::new(vec![ + Field::new("city", DataType::Utf8, false), + Field::new("lat", DataType::Float64, false), + Field::new("lng", DataType::Float64, false), + ]); + + let file = File::open("test/data/uk_cities.csv").unwrap(); + + let mut csv = Reader::new( + file, + Arc::new(schema), + false, + None, + 1024, + None, + Some(vec![0, 1]), + None, + ); + let projected_schema = Arc::new(Schema::new(vec![ + Field::new("city", DataType::Utf8, false), + Field::new("lat", DataType::Float64, false), + ])); + assert_eq!(projected_schema, csv.schema()); + let batch = csv.next().unwrap().unwrap(); + assert_eq!(projected_schema, batch.schema()); + assert_eq!(37, batch.num_rows()); + assert_eq!(2, batch.num_columns()); +} + +#[test] +fn test_csv_with_dictionary() { + let schema = Schema::new(vec![ + Field::new( + "city", + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), + false, + ), + Field::new("lat", DataType::Float64, false), + Field::new("lng", DataType::Float64, false), + ]); + + let file = File::open("test/data/uk_cities.csv").unwrap(); + + let mut csv = Reader::new( + file, + Arc::new(schema), + false, + None, + 1024, + None, + Some(vec![0, 1]), + None, + ); + let projected_schema = Arc::new(Schema::new(vec![ + Field::new( + "city", + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), + false, + ), + Field::new("lat", DataType::Float64, false), + ])); + assert_eq!(projected_schema, csv.schema()); + let batch = csv.next().unwrap().unwrap(); + assert_eq!(projected_schema, batch.schema()); + assert_eq!(37, batch.num_rows()); + assert_eq!(2, batch.num_columns()); + + let strings = arrow_cast::cast(batch.column(0), &DataType::Utf8).unwrap(); + let strings = strings.as_any().downcast_ref::().unwrap(); + + assert_eq!(strings.value(0), "Elgin, Scotland, the UK"); + assert_eq!(strings.value(4), "Eastbourne, East Sussex, UK"); + assert_eq!(strings.value(29), "Uckfield, East Sussex, UK"); +} + +#[test] +fn test_nulls() { + let schema = Schema::new(vec![ + Field::new("c_int", DataType::UInt64, false), + Field::new("c_float", DataType::Float32, true), + Field::new("c_string", DataType::Utf8, false), + ]); + + let file = File::open("test/data/null_test.csv").unwrap(); + + let mut csv = Reader::new(file, Arc::new(schema), true, None, 1024, None, None, None); + let batch = csv.next().unwrap().unwrap(); + + assert!(!batch.column(1).is_null(0)); + assert!(!batch.column(1).is_null(1)); + assert!(batch.column(1).is_null(2)); + assert!(!batch.column(1).is_null(3)); + assert!(!batch.column(1).is_null(4)); +} + +#[test] +fn test_nulls_with_inference() { + let file = File::open("test/data/various_types.csv").unwrap(); + + let builder = ReaderBuilder::new() + .infer_schema(None) + .has_header(true) + .with_delimiter(b'|') + .with_batch_size(512) + .with_projection(vec![0, 1, 2, 3, 4, 5]); + + let mut csv = builder.build(file).unwrap(); + let batch = csv.next().unwrap().unwrap(); + + assert_eq!(7, batch.num_rows()); + assert_eq!(6, batch.num_columns()); + + let schema = batch.schema(); + + assert_eq!(&DataType::Int64, schema.field(0).data_type()); + assert_eq!(&DataType::Float64, schema.field(1).data_type()); + assert_eq!(&DataType::Float64, schema.field(2).data_type()); + assert_eq!(&DataType::Boolean, schema.field(3).data_type()); + assert_eq!(&DataType::Date32, schema.field(4).data_type()); + assert_eq!(&DataType::Date64, schema.field(5).data_type()); + + let names: Vec<&str> = schema.fields().iter().map(|x| x.name().as_str()).collect(); + assert_eq!( + names, + vec![ + "c_int", + "c_float", + "c_string", + "c_bool", + "c_date", + "c_datetime" + ] + ); + + assert!(schema.field(0).is_nullable()); + assert!(schema.field(1).is_nullable()); + assert!(schema.field(2).is_nullable()); + assert!(schema.field(3).is_nullable()); + assert!(schema.field(4).is_nullable()); + assert!(schema.field(5).is_nullable()); + + assert!(!batch.column(1).is_null(0)); + assert!(!batch.column(1).is_null(1)); + assert!(batch.column(1).is_null(2)); + assert!(!batch.column(1).is_null(3)); + assert!(!batch.column(1).is_null(4)); +} + +#[test] +fn test_parse_invalid_csv() { + let file = File::open("test/data/various_types_invalid.csv").unwrap(); + + let schema = Schema::new(vec![ + Field::new("c_int", DataType::UInt64, false), + Field::new("c_float", DataType::Float32, false), + Field::new("c_string", DataType::Utf8, false), + Field::new("c_bool", DataType::Boolean, false), + ]); + + let builder = ReaderBuilder::new() + .with_schema(Arc::new(schema)) + .has_header(true) + .with_delimiter(b'|') + .with_batch_size(512) + .with_projection(vec![0, 1, 2, 3]); + + let mut csv = builder.build(file).unwrap(); + match csv.next() { + Some(e) => match e { + Err(e) => assert_eq!( + "ParseError(\"Error while parsing value 4.x4 for column 1 at line 4\")", + format!("{:?}", e) + ), + Ok(_) => panic!("should have failed"), + }, + None => panic!("should have failed"), + } +} diff --git a/arrow/tests/ipc_integration.rs b/arrow/tests/ipc.rs similarity index 100% rename from arrow/tests/ipc_integration.rs rename to arrow/tests/ipc.rs From 41af500cabd1501b656d1d80f1dc923d76b2bdce Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Tue, 8 Nov 2022 10:43:21 +1300 Subject: [PATCH 2/3] Fix doc --- arrow-csv/src/reader.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/arrow-csv/src/reader.rs b/arrow-csv/src/reader.rs index 426d343b8ca..459c23ad261 100644 --- a/arrow-csv/src/reader.rs +++ b/arrow-csv/src/reader.rs @@ -22,7 +22,7 @@ //! //! Example: //! -//! ```norun +//! ```no_run //! # use arrow_schema::*; //! # use arrow_csv::Reader; //! # use std::fs::File; @@ -521,7 +521,7 @@ impl Iterator for Reader { } /// parses a slice of [csv::StringRecord] into a -/// [RecordBatch](crate::record_batch::RecordBatch). +/// [RecordBatch] fn parse( rows: &[StringRecord], fields: &[Field], From f4460d314d27c16e0a716016b89178510a4ccfdf Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Wed, 9 Nov 2022 07:41:41 +1300 Subject: [PATCH 3/3] Update arrow-csv/Cargo.toml Co-authored-by: Andrew Lamb --- arrow-csv/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arrow-csv/Cargo.toml b/arrow-csv/Cargo.toml index 95ea1345a79..d40cef0db11 100644 --- a/arrow-csv/Cargo.toml +++ b/arrow-csv/Cargo.toml @@ -18,7 +18,7 @@ [package] name = "arrow-csv" version = "26.0.0" -description = "Support for the Arrow CSV format" +description = "Support for parsing CSV format into the Arrow format" homepage = "https://github.com/apache/arrow-rs" repository = "https://github.com/apache/arrow-rs" authors = ["Apache Arrow "]