From ea14618c91120daee030608b7544075605beb325 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Sat, 27 Aug 2022 12:47:05 +0100 Subject: [PATCH] Split out integration test plumbing (#2594) (#2300) --- .github/workflows/arrow.yml | 2 + arrow/Cargo.toml | 7 +- arrow/src/ipc/reader.rs | 346 +-------------- arrow/src/ipc/writer.rs | 416 ++---------------- arrow/src/util/mod.rs | 2 - integration-testing/Cargo.toml | 9 +- .../data/integration.json | 0 .../src/bin/arrow-json-integration-test.rs | 3 +- integration-testing/src/lib.rs | 24 +- .../src/util.rs | 23 +- integration-testing/tests/ipc_reader.rs | 276 ++++++++++++ integration-testing/tests/ipc_writer.rs | 297 +++++++++++++ 12 files changed, 645 insertions(+), 760 deletions(-) rename {arrow/test => integration-testing}/data/integration.json (100%) rename arrow/src/util/integration_util.rs => integration-testing/src/util.rs (99%) create mode 100644 integration-testing/tests/ipc_reader.rs create mode 100644 integration-testing/tests/ipc_writer.rs diff --git a/.github/workflows/arrow.yml b/.github/workflows/arrow.yml index 48fdd9ba079..c3d2de82058 100644 --- a/.github/workflows/arrow.yml +++ b/.github/workflows/arrow.yml @@ -64,6 +64,8 @@ jobs: cargo run --example dynamic_types cargo run --example read_csv cargo run --example read_csv_infer_schema + - name: Run non-archery based integration-tests + run: cargo test -p arrow-integration-testing # test compilaton features linux-features: diff --git a/arrow/Cargo.toml b/arrow/Cargo.toml index ed339bc304a..c2872baa524 100644 --- a/arrow/Cargo.toml +++ b/arrow/Cargo.toml @@ -38,10 +38,10 @@ path = "src/lib.rs" bench = false [target.'cfg(target_arch = "wasm32")'.dependencies] -ahash = { version = "0.8", default-features = false, features=["compile-time-rng"] } +ahash = { version = "0.8", default-features = false, features = ["compile-time-rng"] } [target.'cfg(not(target_arch = "wasm32"))'.dependencies] -ahash = { version = "0.8", default-features = false, features=["runtime-rng"] } +ahash = { version = "0.8", default-features = false, features = ["runtime-rng"] } [dependencies] serde = { version = "1.0", default-features = false } @@ -61,7 +61,6 @@ packed_simd = { version = "0.3", default-features = false, optional = true, pack chrono = { version = "0.4", default-features = false, features = ["clock"] } chrono-tz = { version = "0.6", default-features = false, optional = true } flatbuffers = { version = "2.1.2", default-features = false, features = ["thiserror"], optional = true } -hex = { version = "0.4", default-features = false, features = ["std"] } comfy-table = { version = "6.0", optional = true, default-features = false } pyo3 = { version = "0.17", default-features = false, optional = true } lexical-core = { version = "^0.8", default-features = false, features = ["write-integers", "write-floats", "parse-integers", "parse-floats"] } @@ -102,7 +101,7 @@ tempfile = { version = "3", default-features = false } [[example]] name = "dynamic_types" required-features = ["prettyprint"] -path="./examples/dynamic_types.rs" +path = "./examples/dynamic_types.rs" [[bench]] name = "aggregate_kernels" diff --git a/arrow/src/ipc/reader.rs b/arrow/src/ipc/reader.rs index 7ffa9aa5946..969c8c43f02 100644 --- a/arrow/src/ipc/reader.rs +++ b/arrow/src/ipc/reader.rs @@ -1173,336 +1173,8 @@ mod tests { use std::fs::File; - use flate2::read::GzDecoder; - + use crate::datatypes; use crate::datatypes::{ArrowNativeType, Float64Type, Int32Type, Int8Type}; - use crate::{datatypes, util::integration_util::*}; - - #[test] - #[cfg(not(feature = "force_validate"))] - fn read_generated_files_014() { - let testdata = crate::util::test_util::arrow_test_data(); - let version = "0.14.1"; - // the test is repetitive, thus we can read all supported files at once - let paths = vec![ - "generated_interval", - "generated_datetime", - "generated_dictionary", - "generated_map", - "generated_nested", - "generated_primitive_no_batches", - "generated_primitive_zerolength", - "generated_primitive", - "generated_decimal", - ]; - paths.iter().for_each(|path| { - let file = File::open(format!( - "{}/arrow-ipc-stream/integration/{}/{}.arrow_file", - testdata, version, path - )) - .unwrap(); - - let mut reader = FileReader::try_new(file, None).unwrap(); - - // read expected JSON output - let arrow_json = read_gzip_json(version, path); - assert!(arrow_json.equals_reader(&mut reader).unwrap()); - }); - } - - #[test] - #[should_panic(expected = "Big Endian is not supported for Decimal!")] - fn read_decimal_be_file_should_panic() { - let testdata = crate::util::test_util::arrow_test_data(); - let file = File::open(format!( - "{}/arrow-ipc-stream/integration/1.0.0-bigendian/generated_decimal.arrow_file", - testdata - )) - .unwrap(); - FileReader::try_new(file, None).unwrap(); - } - - #[test] - #[should_panic( - expected = "Last offset 687865856 of Utf8 is larger than values length 41" - )] - fn read_dictionary_be_not_implemented() { - // The offsets are not translated for big-endian files - // https://github.com/apache/arrow-rs/issues/859 - let testdata = crate::util::test_util::arrow_test_data(); - let file = File::open(format!( - "{}/arrow-ipc-stream/integration/1.0.0-bigendian/generated_dictionary.arrow_file", - testdata - )) - .unwrap(); - FileReader::try_new(file, None).unwrap(); - } - - #[test] - fn read_generated_be_files_should_work() { - // complementary to the previous test - let testdata = crate::util::test_util::arrow_test_data(); - let paths = vec![ - "generated_interval", - "generated_datetime", - "generated_map", - "generated_nested", - "generated_null_trivial", - "generated_null", - "generated_primitive_no_batches", - "generated_primitive_zerolength", - "generated_primitive", - ]; - paths.iter().for_each(|path| { - let file = File::open(format!( - "{}/arrow-ipc-stream/integration/1.0.0-bigendian/{}.arrow_file", - testdata, path - )) - .unwrap(); - - FileReader::try_new(file, None).unwrap(); - }); - } - - #[test] - fn projection_should_work() { - // complementary to the previous test - let testdata = crate::util::test_util::arrow_test_data(); - let paths = vec![ - "generated_interval", - "generated_datetime", - "generated_map", - "generated_nested", - "generated_null_trivial", - "generated_null", - "generated_primitive_no_batches", - "generated_primitive_zerolength", - "generated_primitive", - ]; - paths.iter().for_each(|path| { - // We must use littleendian files here. - // The offsets are not translated for big-endian files - // https://github.com/apache/arrow-rs/issues/859 - let file = File::open(format!( - "{}/arrow-ipc-stream/integration/1.0.0-littleendian/{}.arrow_file", - testdata, path - )) - .unwrap(); - - let reader = FileReader::try_new(file, Some(vec![0])).unwrap(); - let datatype_0 = reader.schema().fields()[0].data_type().clone(); - reader.for_each(|batch| { - let batch = batch.unwrap(); - assert_eq!(batch.columns().len(), 1); - assert_eq!(datatype_0, batch.schema().fields()[0].data_type().clone()); - }); - }); - } - - #[test] - #[cfg(not(feature = "force_validate"))] - fn read_generated_streams_014() { - let testdata = crate::util::test_util::arrow_test_data(); - let version = "0.14.1"; - // the test is repetitive, thus we can read all supported files at once - let paths = vec![ - "generated_interval", - "generated_datetime", - "generated_dictionary", - "generated_map", - "generated_nested", - "generated_primitive_no_batches", - "generated_primitive_zerolength", - "generated_primitive", - "generated_decimal", - ]; - paths.iter().for_each(|path| { - let file = File::open(format!( - "{}/arrow-ipc-stream/integration/{}/{}.stream", - testdata, version, path - )) - .unwrap(); - - let mut reader = StreamReader::try_new(file, None).unwrap(); - - // read expected JSON output - let arrow_json = read_gzip_json(version, path); - assert!(arrow_json.equals_reader(&mut reader).unwrap()); - // the next batch must be empty - assert!(reader.next().is_none()); - // the stream must indicate that it's finished - assert!(reader.is_finished()); - }); - } - - #[test] - fn read_generated_files_100() { - let testdata = crate::util::test_util::arrow_test_data(); - let version = "1.0.0-littleendian"; - // the test is repetitive, thus we can read all supported files at once - let paths = vec![ - "generated_interval", - "generated_datetime", - "generated_dictionary", - "generated_map", - // "generated_map_non_canonical", - "generated_nested", - "generated_null_trivial", - "generated_null", - "generated_primitive_no_batches", - "generated_primitive_zerolength", - "generated_primitive", - ]; - paths.iter().for_each(|path| { - let file = File::open(format!( - "{}/arrow-ipc-stream/integration/{}/{}.arrow_file", - testdata, version, path - )) - .unwrap(); - - let mut reader = FileReader::try_new(file, None).unwrap(); - - // read expected JSON output - let arrow_json = read_gzip_json(version, path); - assert!(arrow_json.equals_reader(&mut reader).unwrap()); - }); - } - - #[test] - fn read_generated_streams_100() { - let testdata = crate::util::test_util::arrow_test_data(); - let version = "1.0.0-littleendian"; - // the test is repetitive, thus we can read all supported files at once - let paths = vec![ - "generated_interval", - "generated_datetime", - "generated_dictionary", - "generated_map", - // "generated_map_non_canonical", - "generated_nested", - "generated_null_trivial", - "generated_null", - "generated_primitive_no_batches", - "generated_primitive_zerolength", - "generated_primitive", - ]; - paths.iter().for_each(|path| { - let file = File::open(format!( - "{}/arrow-ipc-stream/integration/{}/{}.stream", - testdata, version, path - )) - .unwrap(); - - let mut reader = StreamReader::try_new(file, None).unwrap(); - - // read expected JSON output - let arrow_json = read_gzip_json(version, path); - assert!(arrow_json.equals_reader(&mut reader).unwrap()); - // the next batch must be empty - assert!(reader.next().is_none()); - // the stream must indicate that it's finished - assert!(reader.is_finished()); - }); - } - - #[test] - #[cfg(feature = "ipc_compression")] - fn read_generated_streams_200() { - let testdata = crate::util::test_util::arrow_test_data(); - let version = "2.0.0-compression"; - - // the test is repetitive, thus we can read all supported files at once - let paths = vec!["generated_lz4", "generated_zstd"]; - paths.iter().for_each(|path| { - let file = File::open(format!( - "{}/arrow-ipc-stream/integration/{}/{}.stream", - testdata, version, path - )) - .unwrap(); - - let mut reader = StreamReader::try_new(file, None).unwrap(); - - // read expected JSON output - let arrow_json = read_gzip_json(version, path); - assert!(arrow_json.equals_reader(&mut reader).unwrap()); - // the next batch must be empty - assert!(reader.next().is_none()); - // the stream must indicate that it's finished - assert!(reader.is_finished()); - }); - } - - #[test] - #[cfg(not(feature = "ipc_compression"))] - fn read_generated_streams_200_negative() { - let testdata = crate::util::test_util::arrow_test_data(); - let version = "2.0.0-compression"; - - // the test is repetitive, thus we can read all supported files at once - let cases = vec![("generated_lz4", "LZ4_FRAME"), ("generated_zstd", "ZSTD")]; - cases.iter().for_each(|(path, compression_name)| { - let file = File::open(format!( - "{}/arrow-ipc-stream/integration/{}/{}.stream", - testdata, version, path - )) - .unwrap(); - - let mut reader = StreamReader::try_new(file, None).unwrap(); - let err = reader.next().unwrap().unwrap_err(); - let expected_error = format!( - "Invalid argument error: compression type {} not supported because arrow was not compiled with the ipc_compression feature", - compression_name - ); - assert_eq!(err.to_string(), expected_error); - }); - } - - #[test] - #[cfg(feature = "ipc_compression")] - fn read_generated_files_200() { - let testdata = crate::util::test_util::arrow_test_data(); - let version = "2.0.0-compression"; - // the test is repetitive, thus we can read all supported files at once - let paths = vec!["generated_lz4", "generated_zstd"]; - paths.iter().for_each(|path| { - let file = File::open(format!( - "{}/arrow-ipc-stream/integration/{}/{}.arrow_file", - testdata, version, path - )) - .unwrap(); - - let mut reader = FileReader::try_new(file, None).unwrap(); - - // read expected JSON output - let arrow_json = read_gzip_json(version, path); - assert!(arrow_json.equals_reader(&mut reader).unwrap()); - }); - } - - #[test] - #[cfg(not(feature = "ipc_compression"))] - fn read_generated_files_200_negative() { - let testdata = crate::util::test_util::arrow_test_data(); - let version = "2.0.0-compression"; - // the test is repetitive, thus we can read all supported files at once - let cases = vec![("generated_lz4", "LZ4_FRAME"), ("generated_zstd", "ZSTD")]; - cases.iter().for_each(|(path, compression_name)| { - let file = File::open(format!( - "{}/arrow-ipc-stream/integration/{}/{}.arrow_file", - testdata, version, path - )) - .unwrap(); - - let mut reader = FileReader::try_new(file, None).unwrap(); - - let err = reader.next().unwrap().unwrap_err(); - let expected_error = format!( - "Invalid argument error: compression type {} not supported because arrow was not compiled with the ipc_compression feature", - compression_name - ); - assert_eq!(err.to_string(), expected_error); - }); - } fn create_test_projection_schema() -> Schema { // define field types @@ -1816,22 +1488,6 @@ mod tests { check_union_with_builder(UnionBuilder::new_sparse()); } - /// Read gzipped JSON file - fn read_gzip_json(version: &str, path: &str) -> ArrowJson { - let testdata = crate::util::test_util::arrow_test_data(); - let file = File::open(format!( - "{}/arrow-ipc-stream/integration/{}/{}.json.gz", - testdata, version, path - )) - .unwrap(); - let mut gz = GzDecoder::new(&file); - let mut s = String::new(); - gz.read_to_string(&mut s).unwrap(); - // convert to Arrow JSON - let arrow_json: ArrowJson = serde_json::from_str(&s).unwrap(); - arrow_json - } - #[test] fn test_roundtrip_stream_nested_dict() { let xs = vec!["AA", "BB", "AA", "CC", "BB"]; diff --git a/arrow/src/ipc/writer.rs b/arrow/src/ipc/writer.rs index 826e49ca87d..63f1520a5e9 100644 --- a/arrow/src/ipc/writer.rs +++ b/arrow/src/ipc/writer.rs @@ -1270,31 +1270,28 @@ mod tests { use super::*; use std::fs::File; - use std::io::Read; + use std::io::Seek; use std::sync::Arc; - use flate2::read::GzDecoder; use ipc::MetadataVersion; use crate::array::*; use crate::datatypes::Field; use crate::ipc::reader::*; - use crate::util::integration_util::*; #[test] #[cfg(feature = "ipc_compression")] fn test_write_empty_record_batch_lz4_compression() { - let file_name = "arrow_lz4_empty"; let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]); let values: Vec> = vec![]; let array = Int32Array::from(values); let record_batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]) .unwrap(); + + let mut file = tempfile::tempfile().unwrap(); + { - let file = - File::create(format!("target/debug/testdata/{}.arrow_file", file_name)) - .unwrap(); let write_option = IpcWriteOptions::try_new(8, false, ipc::MetadataVersion::V5) .unwrap() @@ -1302,15 +1299,14 @@ mod tests { .unwrap(); let mut writer = - FileWriter::try_new_with_options(file, &schema, write_option).unwrap(); + FileWriter::try_new_with_options(&mut file, &schema, write_option) + .unwrap(); writer.write(&record_batch).unwrap(); writer.finish().unwrap(); } + file.rewind().unwrap(); { // read file - let file = - File::open(format!("target/debug/testdata/{}.arrow_file", file_name)) - .unwrap(); let mut reader = FileReader::try_new(file, None).unwrap(); loop { match reader.next() { @@ -1345,9 +1341,9 @@ mod tests { let record_batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]) .unwrap(); + + let mut file = tempfile::tempfile().unwrap(); { - let file = - File::create("target/debug/testdata/arrow_lz4.arrow_file").unwrap(); let write_option = IpcWriteOptions::try_new(8, false, ipc::MetadataVersion::V5) .unwrap() @@ -1355,15 +1351,14 @@ mod tests { .unwrap(); let mut writer = - FileWriter::try_new_with_options(file, &schema, write_option).unwrap(); + FileWriter::try_new_with_options(&mut file, &schema, write_option) + .unwrap(); writer.write(&record_batch).unwrap(); writer.finish().unwrap(); } + file.rewind().unwrap(); { // read file - let file = - File::open(format!("target/debug/testdata/{}.arrow_file", "arrow_lz4")) - .unwrap(); let mut reader = FileReader::try_new(file, None).unwrap(); loop { match reader.next() { @@ -1398,9 +1393,8 @@ mod tests { let record_batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]) .unwrap(); + let mut file = tempfile::tempfile().unwrap(); { - let file = - File::create("target/debug/testdata/arrow_zstd.arrow_file").unwrap(); let write_option = IpcWriteOptions::try_new(8, false, ipc::MetadataVersion::V5) .unwrap() @@ -1408,15 +1402,14 @@ mod tests { .unwrap(); let mut writer = - FileWriter::try_new_with_options(file, &schema, write_option).unwrap(); + FileWriter::try_new_with_options(&mut file, &schema, write_option) + .unwrap(); writer.write(&record_batch).unwrap(); writer.finish().unwrap(); } + file.rewind().unwrap(); { // read file - let file = - File::open(format!("target/debug/testdata/{}.arrow_file", "arrow_zstd")) - .unwrap(); let mut reader = FileReader::try_new(file, None).unwrap(); loop { match reader.next() { @@ -1462,18 +1455,16 @@ mod tests { vec![Arc::new(array1) as ArrayRef], ) .unwrap(); + let mut file = tempfile::tempfile().unwrap(); { - let file = File::create("target/debug/testdata/arrow.arrow_file").unwrap(); - let mut writer = FileWriter::try_new(file, &schema).unwrap(); + let mut writer = FileWriter::try_new(&mut file, &schema).unwrap(); writer.write(&batch).unwrap(); writer.finish().unwrap(); } + file.rewind().unwrap(); { - let file = - File::open(format!("target/debug/testdata/{}.arrow_file", "arrow")) - .unwrap(); let mut reader = FileReader::try_new(file, None).unwrap(); while let Some(Ok(read_batch)) = reader.next() { read_batch @@ -1569,352 +1560,6 @@ mod tests { ); } - #[test] - #[cfg(not(feature = "force_validate"))] - fn read_and_rewrite_generated_files_014() { - let testdata = crate::util::test_util::arrow_test_data(); - let version = "0.14.1"; - // the test is repetitive, thus we can read all supported files at once - let paths = vec![ - "generated_interval", - "generated_datetime", - "generated_dictionary", - "generated_map", - "generated_nested", - "generated_primitive_no_batches", - "generated_primitive_zerolength", - "generated_primitive", - "generated_decimal", - ]; - paths.iter().for_each(|path| { - let file = File::open(format!( - "{}/arrow-ipc-stream/integration/{}/{}.arrow_file", - testdata, version, path - )) - .unwrap(); - - let mut reader = FileReader::try_new(file, None).unwrap(); - - // read and rewrite the file to a temp location - { - let file = File::create(format!( - "target/debug/testdata/{}-{}.arrow_file", - version, path - )) - .unwrap(); - let mut writer = FileWriter::try_new(file, &reader.schema()).unwrap(); - while let Some(Ok(batch)) = reader.next() { - writer.write(&batch).unwrap(); - } - writer.finish().unwrap(); - } - - let file = File::open(format!( - "target/debug/testdata/{}-{}.arrow_file", - version, path - )) - .unwrap(); - let mut reader = FileReader::try_new(file, None).unwrap(); - - // read expected JSON output - let arrow_json = read_gzip_json(version, path); - assert!(arrow_json.equals_reader(&mut reader).unwrap()); - }); - } - - #[test] - #[cfg(not(feature = "force_validate"))] - fn read_and_rewrite_generated_streams_014() { - let testdata = crate::util::test_util::arrow_test_data(); - let version = "0.14.1"; - // the test is repetitive, thus we can read all supported files at once - let paths = vec![ - "generated_interval", - "generated_datetime", - "generated_dictionary", - "generated_map", - "generated_nested", - "generated_primitive_no_batches", - "generated_primitive_zerolength", - "generated_primitive", - "generated_decimal", - ]; - paths.iter().for_each(|path| { - let file = File::open(format!( - "{}/arrow-ipc-stream/integration/{}/{}.stream", - testdata, version, path - )) - .unwrap(); - - let reader = StreamReader::try_new(file, None).unwrap(); - - // read and rewrite the stream to a temp location - { - let file = File::create(format!( - "target/debug/testdata/{}-{}.stream", - version, path - )) - .unwrap(); - let mut writer = StreamWriter::try_new(file, &reader.schema()).unwrap(); - reader.for_each(|batch| { - writer.write(&batch.unwrap()).unwrap(); - }); - writer.finish().unwrap(); - } - - let file = - File::open(format!("target/debug/testdata/{}-{}.stream", version, path)) - .unwrap(); - let mut reader = StreamReader::try_new(file, None).unwrap(); - - // read expected JSON output - let arrow_json = read_gzip_json(version, path); - assert!(arrow_json.equals_reader(&mut reader).unwrap()); - }); - } - - #[test] - fn read_and_rewrite_generated_files_100() { - let testdata = crate::util::test_util::arrow_test_data(); - let version = "1.0.0-littleendian"; - // the test is repetitive, thus we can read all supported files at once - let paths = vec![ - "generated_custom_metadata", - "generated_datetime", - "generated_dictionary_unsigned", - "generated_dictionary", - // "generated_duplicate_fieldnames", - "generated_interval", - "generated_map", - "generated_nested", - // "generated_nested_large_offsets", - "generated_null_trivial", - "generated_null", - "generated_primitive_large_offsets", - "generated_primitive_no_batches", - "generated_primitive_zerolength", - "generated_primitive", - // "generated_recursive_nested", - ]; - paths.iter().for_each(|path| { - let file = File::open(format!( - "{}/arrow-ipc-stream/integration/{}/{}.arrow_file", - testdata, version, path - )) - .unwrap(); - - let mut reader = FileReader::try_new(file, None).unwrap(); - - // read and rewrite the file to a temp location - { - let file = File::create(format!( - "target/debug/testdata/{}-{}.arrow_file", - version, path - )) - .unwrap(); - // write IPC version 5 - let options = - IpcWriteOptions::try_new(8, false, ipc::MetadataVersion::V5).unwrap(); - let mut writer = - FileWriter::try_new_with_options(file, &reader.schema(), options) - .unwrap(); - while let Some(Ok(batch)) = reader.next() { - writer.write(&batch).unwrap(); - } - writer.finish().unwrap(); - } - - let file = File::open(format!( - "target/debug/testdata/{}-{}.arrow_file", - version, path - )) - .unwrap(); - let mut reader = FileReader::try_new(file, None).unwrap(); - - // read expected JSON output - let arrow_json = read_gzip_json(version, path); - assert!(arrow_json.equals_reader(&mut reader).unwrap()); - }); - } - - #[test] - fn read_and_rewrite_generated_streams_100() { - let testdata = crate::util::test_util::arrow_test_data(); - let version = "1.0.0-littleendian"; - // the test is repetitive, thus we can read all supported files at once - let paths = vec![ - "generated_custom_metadata", - "generated_datetime", - "generated_dictionary_unsigned", - "generated_dictionary", - // "generated_duplicate_fieldnames", - "generated_interval", - "generated_map", - "generated_nested", - // "generated_nested_large_offsets", - "generated_null_trivial", - "generated_null", - "generated_primitive_large_offsets", - "generated_primitive_no_batches", - "generated_primitive_zerolength", - "generated_primitive", - // "generated_recursive_nested", - ]; - paths.iter().for_each(|path| { - let file = File::open(format!( - "{}/arrow-ipc-stream/integration/{}/{}.stream", - testdata, version, path - )) - .unwrap(); - - let reader = StreamReader::try_new(file, None).unwrap(); - - // read and rewrite the stream to a temp location - { - let file = File::create(format!( - "target/debug/testdata/{}-{}.stream", - version, path - )) - .unwrap(); - let options = - IpcWriteOptions::try_new(8, false, ipc::MetadataVersion::V5).unwrap(); - let mut writer = - StreamWriter::try_new_with_options(file, &reader.schema(), options) - .unwrap(); - reader.for_each(|batch| { - writer.write(&batch.unwrap()).unwrap(); - }); - writer.finish().unwrap(); - } - - let file = - File::open(format!("target/debug/testdata/{}-{}.stream", version, path)) - .unwrap(); - let mut reader = StreamReader::try_new(file, None).unwrap(); - - // read expected JSON output - let arrow_json = read_gzip_json(version, path); - assert!(arrow_json.equals_reader(&mut reader).unwrap()); - }); - } - - #[test] - #[cfg(feature = "ipc_compression")] - fn read_and_rewrite_compression_files_200() { - let testdata = crate::util::test_util::arrow_test_data(); - let version = "2.0.0-compression"; - // the test is repetitive, thus we can read all supported files at once - let paths = vec!["generated_lz4", "generated_zstd"]; - paths.iter().for_each(|path| { - let file = File::open(format!( - "{}/arrow-ipc-stream/integration/{}/{}.arrow_file", - testdata, version, path - )) - .unwrap(); - - let mut reader = FileReader::try_new(file, None).unwrap(); - - // read and rewrite the file to a temp location - { - let file = File::create(format!( - "target/debug/testdata/{}-{}.arrow_file", - version, path - )) - .unwrap(); - // write IPC version 5 - let options = - IpcWriteOptions::try_new(8, false, ipc::MetadataVersion::V5) - .unwrap() - .try_with_compression(Some(ipc::CompressionType::LZ4_FRAME)) - .unwrap(); - - let mut writer = - FileWriter::try_new_with_options(file, &reader.schema(), options) - .unwrap(); - while let Some(Ok(batch)) = reader.next() { - writer.write(&batch).unwrap(); - } - writer.finish().unwrap(); - } - - let file = File::open(format!( - "target/debug/testdata/{}-{}.arrow_file", - version, path - )) - .unwrap(); - let mut reader = FileReader::try_new(file, None).unwrap(); - - // read expected JSON output - let arrow_json = read_gzip_json(version, path); - assert!(arrow_json.equals_reader(&mut reader).unwrap()); - }); - } - - #[test] - #[cfg(feature = "ipc_compression")] - fn read_and_rewrite_compression_stream_200() { - let testdata = crate::util::test_util::arrow_test_data(); - let version = "2.0.0-compression"; - // the test is repetitive, thus we can read all supported files at once - let paths = vec!["generated_lz4", "generated_zstd"]; - paths.iter().for_each(|path| { - let file = File::open(format!( - "{}/arrow-ipc-stream/integration/{}/{}.stream", - testdata, version, path - )) - .unwrap(); - - let reader = StreamReader::try_new(file, None).unwrap(); - - // read and rewrite the stream to a temp location - { - let file = File::create(format!( - "target/debug/testdata/{}-{}.stream", - version, path - )) - .unwrap(); - let options = - IpcWriteOptions::try_new(8, false, ipc::MetadataVersion::V5) - .unwrap() - .try_with_compression(Some(ipc::CompressionType::ZSTD)) - .unwrap(); - - let mut writer = - StreamWriter::try_new_with_options(file, &reader.schema(), options) - .unwrap(); - reader.for_each(|batch| { - writer.write(&batch.unwrap()).unwrap(); - }); - writer.finish().unwrap(); - } - - let file = - File::open(format!("target/debug/testdata/{}-{}.stream", version, path)) - .unwrap(); - let mut reader = StreamReader::try_new(file, None).unwrap(); - - // read expected JSON output - let arrow_json = read_gzip_json(version, path); - assert!(arrow_json.equals_reader(&mut reader).unwrap()); - }); - } - - /// Read gzipped JSON file - fn read_gzip_json(version: &str, path: &str) -> ArrowJson { - let testdata = crate::util::test_util::arrow_test_data(); - let file = File::open(format!( - "{}/arrow-ipc-stream/integration/{}/{}.json.gz", - testdata, version, path - )) - .unwrap(); - let mut gz = GzDecoder::new(&file); - let mut s = String::new(); - gz.read_to_string(&mut s).unwrap(); - // convert to Arrow JSON - let arrow_json: ArrowJson = serde_json::from_str(&s).unwrap(); - arrow_json - } - #[test] fn track_union_nested_dict() { let inner: DictionaryArray = vec!["a", "b", "a"].into_iter().collect(); @@ -1982,7 +1627,6 @@ mod tests { #[test] fn read_union_017() { let testdata = crate::util::test_util::arrow_test_data(); - let version = "0.17.1"; let data_file = File::open(format!( "{}/arrow-ipc-stream/integration/0.17.1/generated_union.stream", testdata, @@ -1991,26 +1635,18 @@ mod tests { let reader = StreamReader::try_new(data_file, None).unwrap(); + let mut file = tempfile::tempfile().unwrap(); // read and rewrite the stream to a temp location { - let file = File::create(format!( - "target/debug/testdata/{}-generated_union.stream", - version - )) - .unwrap(); - let mut writer = StreamWriter::try_new(file, &reader.schema()).unwrap(); + let mut writer = StreamWriter::try_new(&mut file, &reader.schema()).unwrap(); reader.for_each(|batch| { writer.write(&batch.unwrap()).unwrap(); }); writer.finish().unwrap(); } + file.rewind().unwrap(); // Compare original file and rewrote file - let file = File::open(format!( - "target/debug/testdata/{}-generated_union.stream", - version - )) - .unwrap(); let rewrite_reader = StreamReader::try_new(file, None).unwrap(); let data_file = File::open(format!( @@ -2053,18 +1689,18 @@ mod tests { vec![Arc::new(union) as ArrayRef], ) .unwrap(); - let file_name = "target/debug/testdata/union.arrow_file"; + + let mut file = tempfile::tempfile().unwrap(); { - let file = File::create(&file_name).unwrap(); let mut writer = - FileWriter::try_new_with_options(file, &schema, options).unwrap(); + FileWriter::try_new_with_options(&mut file, &schema, options).unwrap(); writer.write(&batch).unwrap(); writer.finish().unwrap(); } + file.rewind().unwrap(); { - let file = File::open(&file_name).unwrap(); let reader = FileReader::try_new(file, None).unwrap(); reader.for_each(|maybe_batch| { maybe_batch diff --git a/arrow/src/util/mod.rs b/arrow/src/util/mod.rs index 1ee05d8a02c..6f68398e770 100644 --- a/arrow/src/util/mod.rs +++ b/arrow/src/util/mod.rs @@ -24,8 +24,6 @@ pub mod bit_util; #[cfg(feature = "test_utils")] pub mod data_gen; pub mod display; -#[cfg(any(test, feature = "test_utils"))] -pub mod integration_util; #[cfg(feature = "prettyprint")] pub mod pretty; pub(crate) mod serialization; diff --git a/integration-testing/Cargo.toml b/integration-testing/Cargo.toml index 786f77bd2f2..74a1ee6aa70 100644 --- a/integration-testing/Cargo.toml +++ b/integration-testing/Cargo.toml @@ -36,12 +36,15 @@ arrow-flight = { path = "../arrow-flight", default-features = false } async-trait = { version = "0.1.41", default-features = false } clap = { version = "3", default-features = false, features = ["std", "derive"] } futures = { version = "0.3", default-features = false } -hex = { version = "0.4", default-features = false } +hex = { version = "0.4", default-features = false, features = ["std"] } prost = { version = "0.11", default-features = false } -serde = { version = "1.0", default-features = false, features = ["rc"] } -serde_derive = { version = "1.0", default-features = false } +serde = { version = "1.0", default-features = false, features = ["rc", "derive"] } serde_json = { version = "1.0", default-features = false, features = ["std"] } tokio = { version = "1.0", default-features = false } tonic = { version = "0.8", default-features = false } tracing-subscriber = { version = "0.3.1", default-features = false, features = ["fmt"], optional = true } num = { version = "0.4", default-features = false, features = ["std"] } +flate2 = { version = "1", default-features = false, features = ["rust_backend"] } + +[dev-dependencies] +tempfile = { version = "3", default-features = false } diff --git a/arrow/test/data/integration.json b/integration-testing/data/integration.json similarity index 100% rename from arrow/test/data/integration.json rename to integration-testing/data/integration.json diff --git a/integration-testing/src/bin/arrow-json-integration-test.rs b/integration-testing/src/bin/arrow-json-integration-test.rs index b442e8b5ed3..a7d7cf6ee7c 100644 --- a/integration-testing/src/bin/arrow-json-integration-test.rs +++ b/integration-testing/src/bin/arrow-json-integration-test.rs @@ -20,8 +20,7 @@ use arrow::datatypes::{DataType, Field}; use arrow::error::{ArrowError, Result}; use arrow::ipc::reader::FileReader; use arrow::ipc::writer::FileWriter; -use arrow::util::integration_util::*; -use arrow_integration_testing::read_json_file; +use arrow_integration_testing::{read_json_file, util::*}; use clap::Parser; use std::fs::File; diff --git a/integration-testing/src/lib.rs b/integration-testing/src/lib.rs index 5d3da15d3f5..ffe112af72c 100644 --- a/integration-testing/src/lib.rs +++ b/integration-testing/src/lib.rs @@ -19,12 +19,12 @@ use serde_json::Value; -use arrow::util::integration_util::ArrowJsonBatch; +use util::*; use arrow::datatypes::Schema; use arrow::error::Result; use arrow::record_batch::RecordBatch; -use arrow::util::integration_util::*; +use arrow::util::test_util::arrow_test_data; use std::collections::HashMap; use std::fs::File; use std::io::BufReader; @@ -36,6 +36,7 @@ pub const AUTH_PASSWORD: &str = "flight"; pub mod flight_client_scenarios; pub mod flight_server_scenarios; +pub mod util; pub struct ArrowFile { pub schema: Schema, @@ -76,3 +77,22 @@ pub fn read_json_file(json_name: &str) -> Result { batches, }) } + +/// Read gzipped JSON test file +pub fn read_gzip_json(version: &str, path: &str) -> ArrowJson { + use flate2::read::GzDecoder; + use std::io::Read; + + let testdata = arrow_test_data(); + let file = File::open(format!( + "{}/arrow-ipc-stream/integration/{}/{}.json.gz", + testdata, version, path + )) + .unwrap(); + let mut gz = GzDecoder::new(&file); + let mut s = String::new(); + gz.read_to_string(&mut s).unwrap(); + // convert to Arrow JSON + let arrow_json: ArrowJson = serde_json::from_str(&s).unwrap(); + arrow_json +} diff --git a/arrow/src/util/integration_util.rs b/integration-testing/src/util.rs similarity index 99% rename from arrow/src/util/integration_util.rs rename to integration-testing/src/util.rs index 36751ed6aae..531bdc57175 100644 --- a/arrow/src/util/integration_util.rs +++ b/integration-testing/src/util.rs @@ -22,19 +22,19 @@ use hex::decode; use num::BigInt; use num::Signed; -use serde_derive::{Deserialize, Serialize}; +use serde::{Deserialize, Serialize}; use serde_json::{Map as SJMap, Value}; use std::collections::HashMap; use std::sync::Arc; -use crate::array::*; -use crate::buffer::{Buffer, MutableBuffer}; -use crate::compute; -use crate::datatypes::*; -use crate::error::{ArrowError, Result}; -use crate::record_batch::{RecordBatch, RecordBatchReader}; -use crate::util::bit_util; -use crate::util::decimal::Decimal256; +use arrow::array::*; +use arrow::buffer::{Buffer, MutableBuffer}; +use arrow::compute; +use arrow::datatypes::*; +use arrow::error::{ArrowError, Result}; +use arrow::record_batch::{RecordBatch, RecordBatchReader}; +use arrow::util::bit_util; +use arrow::util::decimal::Decimal256; /// A struct that represents an Arrow file with a schema and record batches #[derive(Deserialize, Serialize, Debug)] @@ -1047,7 +1047,7 @@ mod tests { use std::io::Read; use std::sync::Arc; - use crate::buffer::Buffer; + use arrow::buffer::Buffer; #[test] fn test_schema_equality() { @@ -1112,7 +1112,6 @@ mod tests { } #[test] - #[cfg_attr(miri, ignore)] // running forever fn test_arrow_data_equality() { let secs_tz = Some("Europe/Budapest".to_string()); let millis_tz = Some("America/New_York".to_string()); @@ -1333,7 +1332,7 @@ mod tests { ], ) .unwrap(); - let mut file = File::open("test/data/integration.json").unwrap(); + let mut file = File::open("data/integration.json").unwrap(); let mut json = String::new(); file.read_to_string(&mut json).unwrap(); let arrow_json: ArrowJson = serde_json::from_str(&json).unwrap(); diff --git a/integration-testing/tests/ipc_reader.rs b/integration-testing/tests/ipc_reader.rs new file mode 100644 index 00000000000..fc97a193c76 --- /dev/null +++ b/integration-testing/tests/ipc_reader.rs @@ -0,0 +1,276 @@ +use arrow::ipc::reader::{FileReader, StreamReader}; +use arrow::util::test_util::arrow_test_data; +use arrow_integration_testing::read_gzip_json; +use std::fs::File; + +#[test] +fn read_generated_files_014() { + let testdata = arrow_test_data(); + let version = "0.14.1"; + // the test is repetitive, thus we can read all supported files at once + let paths = vec![ + "generated_interval", + "generated_datetime", + "generated_dictionary", + "generated_map", + "generated_nested", + "generated_primitive_no_batches", + "generated_primitive_zerolength", + "generated_primitive", + "generated_decimal", + ]; + paths.iter().for_each(|path| { + let file = File::open(format!( + "{}/arrow-ipc-stream/integration/{}/{}.arrow_file", + testdata, version, path + )) + .unwrap(); + + let mut reader = FileReader::try_new(file, None).unwrap(); + + // read expected JSON output + let arrow_json = read_gzip_json(version, path); + assert!(arrow_json.equals_reader(&mut reader).unwrap()); + }); +} + +#[test] +#[should_panic(expected = "Big Endian is not supported for Decimal!")] +fn read_decimal_be_file_should_panic() { + let testdata = arrow_test_data(); + let file = File::open(format!( + "{}/arrow-ipc-stream/integration/1.0.0-bigendian/generated_decimal.arrow_file", + testdata + )) + .unwrap(); + FileReader::try_new(file, None).unwrap(); +} + +#[test] +#[should_panic( + expected = "Last offset 687865856 of Utf8 is larger than values length 41" +)] +fn read_dictionary_be_not_implemented() { + // The offsets are not translated for big-endian files + // https://github.com/apache/arrow-rs/issues/859 + let testdata = arrow_test_data(); + let file = File::open(format!( + "{}/arrow-ipc-stream/integration/1.0.0-bigendian/generated_dictionary.arrow_file", + testdata + )) + .unwrap(); + FileReader::try_new(file, None).unwrap(); +} + +#[test] +fn read_generated_be_files_should_work() { + // complementary to the previous test + let testdata = arrow_test_data(); + let paths = vec![ + "generated_interval", + "generated_datetime", + "generated_map", + "generated_nested", + "generated_null_trivial", + "generated_null", + "generated_primitive_no_batches", + "generated_primitive_zerolength", + "generated_primitive", + ]; + paths.iter().for_each(|path| { + let file = File::open(format!( + "{}/arrow-ipc-stream/integration/1.0.0-bigendian/{}.arrow_file", + testdata, path + )) + .unwrap(); + + FileReader::try_new(file, None).unwrap(); + }); +} + +#[test] +fn projection_should_work() { + // complementary to the previous test + let testdata = arrow_test_data(); + let paths = vec![ + "generated_interval", + "generated_datetime", + "generated_map", + "generated_nested", + "generated_null_trivial", + "generated_null", + "generated_primitive_no_batches", + "generated_primitive_zerolength", + "generated_primitive", + ]; + paths.iter().for_each(|path| { + // We must use littleendian files here. + // The offsets are not translated for big-endian files + // https://github.com/apache/arrow-rs/issues/859 + let file = File::open(format!( + "{}/arrow-ipc-stream/integration/1.0.0-littleendian/{}.arrow_file", + testdata, path + )) + .unwrap(); + + let reader = FileReader::try_new(file, Some(vec![0])).unwrap(); + let datatype_0 = reader.schema().fields()[0].data_type().clone(); + reader.for_each(|batch| { + let batch = batch.unwrap(); + assert_eq!(batch.columns().len(), 1); + assert_eq!(datatype_0, batch.schema().fields()[0].data_type().clone()); + }); + }); +} + +#[test] +fn read_generated_streams_014() { + let testdata = arrow_test_data(); + let version = "0.14.1"; + // the test is repetitive, thus we can read all supported files at once + let paths = vec![ + "generated_interval", + "generated_datetime", + "generated_dictionary", + "generated_map", + "generated_nested", + "generated_primitive_no_batches", + "generated_primitive_zerolength", + "generated_primitive", + "generated_decimal", + ]; + paths.iter().for_each(|path| { + let file = File::open(format!( + "{}/arrow-ipc-stream/integration/{}/{}.stream", + testdata, version, path + )) + .unwrap(); + + let mut reader = StreamReader::try_new(file, None).unwrap(); + + // read expected JSON output + let arrow_json = read_gzip_json(version, path); + assert!(arrow_json.equals_reader(&mut reader).unwrap()); + // the next batch must be empty + assert!(reader.next().is_none()); + // the stream must indicate that it's finished + assert!(reader.is_finished()); + }); +} + +#[test] +fn read_generated_files_100() { + let testdata = arrow_test_data(); + let version = "1.0.0-littleendian"; + // the test is repetitive, thus we can read all supported files at once + let paths = vec![ + "generated_interval", + "generated_datetime", + "generated_dictionary", + "generated_map", + // "generated_map_non_canonical", + "generated_nested", + "generated_null_trivial", + "generated_null", + "generated_primitive_no_batches", + "generated_primitive_zerolength", + "generated_primitive", + ]; + paths.iter().for_each(|path| { + let file = File::open(format!( + "{}/arrow-ipc-stream/integration/{}/{}.arrow_file", + testdata, version, path + )) + .unwrap(); + + let mut reader = FileReader::try_new(file, None).unwrap(); + + // read expected JSON output + let arrow_json = read_gzip_json(version, path); + assert!(arrow_json.equals_reader(&mut reader).unwrap()); + }); +} + +#[test] +fn read_generated_streams_100() { + let testdata = arrow_test_data(); + let version = "1.0.0-littleendian"; + // the test is repetitive, thus we can read all supported files at once + let paths = vec![ + "generated_interval", + "generated_datetime", + "generated_dictionary", + "generated_map", + // "generated_map_non_canonical", + "generated_nested", + "generated_null_trivial", + "generated_null", + "generated_primitive_no_batches", + "generated_primitive_zerolength", + "generated_primitive", + ]; + paths.iter().for_each(|path| { + let file = File::open(format!( + "{}/arrow-ipc-stream/integration/{}/{}.stream", + testdata, version, path + )) + .unwrap(); + + let mut reader = StreamReader::try_new(file, None).unwrap(); + + // read expected JSON output + let arrow_json = read_gzip_json(version, path); + assert!(arrow_json.equals_reader(&mut reader).unwrap()); + // the next batch must be empty + assert!(reader.next().is_none()); + // the stream must indicate that it's finished + assert!(reader.is_finished()); + }); +} + +#[test] +fn read_generated_streams_200() { + let testdata = arrow_test_data(); + let version = "2.0.0-compression"; + + // the test is repetitive, thus we can read all supported files at once + let paths = vec!["generated_lz4", "generated_zstd"]; + paths.iter().for_each(|path| { + let file = File::open(format!( + "{}/arrow-ipc-stream/integration/{}/{}.stream", + testdata, version, path + )) + .unwrap(); + + let mut reader = StreamReader::try_new(file, None).unwrap(); + + // read expected JSON output + let arrow_json = read_gzip_json(version, path); + assert!(arrow_json.equals_reader(&mut reader).unwrap()); + // the next batch must be empty + assert!(reader.next().is_none()); + // the stream must indicate that it's finished + assert!(reader.is_finished()); + }); +} + +#[test] +fn read_generated_files_200() { + let testdata = arrow_test_data(); + let version = "2.0.0-compression"; + // the test is repetitive, thus we can read all supported files at once + let paths = vec!["generated_lz4", "generated_zstd"]; + paths.iter().for_each(|path| { + let file = File::open(format!( + "{}/arrow-ipc-stream/integration/{}/{}.arrow_file", + testdata, version, path + )) + .unwrap(); + + let mut reader = FileReader::try_new(file, None).unwrap(); + + // read expected JSON output + let arrow_json = read_gzip_json(version, path); + assert!(arrow_json.equals_reader(&mut reader).unwrap()); + }); +} diff --git a/integration-testing/tests/ipc_writer.rs b/integration-testing/tests/ipc_writer.rs new file mode 100644 index 00000000000..e159694ecc1 --- /dev/null +++ b/integration-testing/tests/ipc_writer.rs @@ -0,0 +1,297 @@ +use arrow::ipc; +use arrow::ipc::reader::{FileReader, StreamReader}; +use arrow::ipc::writer::{FileWriter, IpcWriteOptions, StreamWriter}; +use arrow::util::test_util::arrow_test_data; +use arrow_integration_testing::read_gzip_json; +use std::fs::File; +use std::io::Seek; + +#[test] +fn read_and_rewrite_generated_files_014() { + let testdata = arrow_test_data(); + let version = "0.14.1"; + // the test is repetitive, thus we can read all supported files at once + let paths = vec![ + "generated_interval", + "generated_datetime", + "generated_dictionary", + "generated_map", + "generated_nested", + "generated_primitive_no_batches", + "generated_primitive_zerolength", + "generated_primitive", + "generated_decimal", + ]; + paths.iter().for_each(|path| { + let file = File::open(format!( + "{}/arrow-ipc-stream/integration/{}/{}.arrow_file", + testdata, version, path + )) + .unwrap(); + + let mut reader = FileReader::try_new(file, None).unwrap(); + + let mut file = tempfile::tempfile().unwrap(); + + // read and rewrite the file to a temp location + { + let mut writer = FileWriter::try_new(&mut file, &reader.schema()).unwrap(); + while let Some(Ok(batch)) = reader.next() { + writer.write(&batch).unwrap(); + } + writer.finish().unwrap(); + } + file.rewind().unwrap(); + + let mut reader = FileReader::try_new(file, None).unwrap(); + + // read expected JSON output + let arrow_json = read_gzip_json(version, path); + assert!(arrow_json.equals_reader(&mut reader).unwrap()); + }); +} + +#[test] +fn read_and_rewrite_generated_streams_014() { + let testdata = arrow_test_data(); + let version = "0.14.1"; + // the test is repetitive, thus we can read all supported files at once + let paths = vec![ + "generated_interval", + "generated_datetime", + "generated_dictionary", + "generated_map", + "generated_nested", + "generated_primitive_no_batches", + "generated_primitive_zerolength", + "generated_primitive", + "generated_decimal", + ]; + paths.iter().for_each(|path| { + let file = File::open(format!( + "{}/arrow-ipc-stream/integration/{}/{}.stream", + testdata, version, path + )) + .unwrap(); + + let reader = StreamReader::try_new(file, None).unwrap(); + + let mut file = tempfile::tempfile().unwrap(); + + // read and rewrite the stream to a temp location + { + let mut writer = StreamWriter::try_new(&mut file, &reader.schema()).unwrap(); + reader.for_each(|batch| { + writer.write(&batch.unwrap()).unwrap(); + }); + writer.finish().unwrap(); + } + + file.rewind().unwrap(); + let mut reader = StreamReader::try_new(file, None).unwrap(); + + // read expected JSON output + let arrow_json = read_gzip_json(version, path); + assert!(arrow_json.equals_reader(&mut reader).unwrap()); + }); +} + +#[test] +fn read_and_rewrite_generated_files_100() { + let testdata = arrow_test_data(); + let version = "1.0.0-littleendian"; + // the test is repetitive, thus we can read all supported files at once + let paths = vec![ + "generated_custom_metadata", + "generated_datetime", + "generated_dictionary_unsigned", + "generated_dictionary", + // "generated_duplicate_fieldnames", + "generated_interval", + "generated_map", + "generated_nested", + // "generated_nested_large_offsets", + "generated_null_trivial", + "generated_null", + "generated_primitive_large_offsets", + "generated_primitive_no_batches", + "generated_primitive_zerolength", + "generated_primitive", + // "generated_recursive_nested", + ]; + paths.iter().for_each(|path| { + let file = File::open(format!( + "{}/arrow-ipc-stream/integration/{}/{}.arrow_file", + testdata, version, path + )) + .unwrap(); + + let mut reader = FileReader::try_new(file, None).unwrap(); + + let mut file = tempfile::tempfile().unwrap(); + + // read and rewrite the file to a temp location + { + // write IPC version 5 + let options = + IpcWriteOptions::try_new(8, false, ipc::MetadataVersion::V5).unwrap(); + let mut writer = + FileWriter::try_new_with_options(&mut file, &reader.schema(), options) + .unwrap(); + while let Some(Ok(batch)) = reader.next() { + writer.write(&batch).unwrap(); + } + writer.finish().unwrap(); + } + + file.rewind().unwrap(); + let mut reader = FileReader::try_new(file, None).unwrap(); + + // read expected JSON output + let arrow_json = read_gzip_json(version, path); + assert!(arrow_json.equals_reader(&mut reader).unwrap()); + }); +} + +#[test] +fn read_and_rewrite_generated_streams_100() { + let testdata = arrow_test_data(); + let version = "1.0.0-littleendian"; + // the test is repetitive, thus we can read all supported files at once + let paths = vec![ + "generated_custom_metadata", + "generated_datetime", + "generated_dictionary_unsigned", + "generated_dictionary", + // "generated_duplicate_fieldnames", + "generated_interval", + "generated_map", + "generated_nested", + // "generated_nested_large_offsets", + "generated_null_trivial", + "generated_null", + "generated_primitive_large_offsets", + "generated_primitive_no_batches", + "generated_primitive_zerolength", + "generated_primitive", + // "generated_recursive_nested", + ]; + paths.iter().for_each(|path| { + let file = File::open(format!( + "{}/arrow-ipc-stream/integration/{}/{}.stream", + testdata, version, path + )) + .unwrap(); + + let reader = StreamReader::try_new(file, None).unwrap(); + + let mut file = tempfile::tempfile().unwrap(); + + // read and rewrite the stream to a temp location + { + let options = + IpcWriteOptions::try_new(8, false, ipc::MetadataVersion::V5).unwrap(); + let mut writer = + StreamWriter::try_new_with_options(&mut file, &reader.schema(), options) + .unwrap(); + reader.for_each(|batch| { + writer.write(&batch.unwrap()).unwrap(); + }); + writer.finish().unwrap(); + } + + file.rewind().unwrap(); + + let mut reader = StreamReader::try_new(file, None).unwrap(); + + // read expected JSON output + let arrow_json = read_gzip_json(version, path); + assert!(arrow_json.equals_reader(&mut reader).unwrap()); + }); +} + +#[test] +fn read_and_rewrite_compression_files_200() { + let testdata = arrow_test_data(); + let version = "2.0.0-compression"; + // the test is repetitive, thus we can read all supported files at once + let paths = vec!["generated_lz4", "generated_zstd"]; + paths.iter().for_each(|path| { + let file = File::open(format!( + "{}/arrow-ipc-stream/integration/{}/{}.arrow_file", + testdata, version, path + )) + .unwrap(); + + let mut reader = FileReader::try_new(file, None).unwrap(); + + let mut file = tempfile::tempfile().unwrap(); + + // read and rewrite the file to a temp location + { + // write IPC version 5 + let options = IpcWriteOptions::try_new(8, false, ipc::MetadataVersion::V5) + .unwrap() + .try_with_compression(Some(ipc::CompressionType::LZ4_FRAME)) + .unwrap(); + + let mut writer = + FileWriter::try_new_with_options(&mut file, &reader.schema(), options) + .unwrap(); + while let Some(Ok(batch)) = reader.next() { + writer.write(&batch).unwrap(); + } + writer.finish().unwrap(); + } + + file.rewind().unwrap(); + let mut reader = FileReader::try_new(file, None).unwrap(); + + // read expected JSON output + let arrow_json = read_gzip_json(version, path); + assert!(arrow_json.equals_reader(&mut reader).unwrap()); + }); +} + +#[test] +fn read_and_rewrite_compression_stream_200() { + let testdata = arrow_test_data(); + let version = "2.0.0-compression"; + // the test is repetitive, thus we can read all supported files at once + let paths = vec!["generated_lz4", "generated_zstd"]; + paths.iter().for_each(|path| { + let file = File::open(format!( + "{}/arrow-ipc-stream/integration/{}/{}.stream", + testdata, version, path + )) + .unwrap(); + + let reader = StreamReader::try_new(file, None).unwrap(); + + let mut file = tempfile::tempfile().unwrap(); + + // read and rewrite the stream to a temp location + { + let options = IpcWriteOptions::try_new(8, false, ipc::MetadataVersion::V5) + .unwrap() + .try_with_compression(Some(ipc::CompressionType::ZSTD)) + .unwrap(); + + let mut writer = + StreamWriter::try_new_with_options(&mut file, &reader.schema(), options) + .unwrap(); + reader.for_each(|batch| { + writer.write(&batch.unwrap()).unwrap(); + }); + writer.finish().unwrap(); + } + + file.rewind().unwrap(); + + let mut reader = StreamReader::try_new(file, None).unwrap(); + + // read expected JSON output + let arrow_json = read_gzip_json(version, path); + assert!(arrow_json.equals_reader(&mut reader).unwrap()); + }); +}