From ce540285ab241761034592a54325315557b39071 Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Thu, 24 Nov 2022 19:33:12 +0800 Subject: [PATCH] add an integration with pytest against pyspark --- .github/workflows/parquet.yml | 34 ++++++- .gitignore | 3 + parquet/README.md | 1 - parquet/pytest/pyspark_integration_test.py | 65 ++++++++++++ parquet/pytest/requirements.in | 20 ++++ parquet/pytest/requirements.txt | 102 +++++++++++++++++++ parquet/src/bin/parquet-show-bloom-filter.rs | 19 +++- 7 files changed, 238 insertions(+), 6 deletions(-) create mode 100755 parquet/pytest/pyspark_integration_test.py create mode 100644 parquet/pytest/requirements.in create mode 100644 parquet/pytest/requirements.txt diff --git a/.github/workflows/parquet.yml b/.github/workflows/parquet.yml index 5b0cc87440e..c5c7aac053f 100644 --- a/.github/workflows/parquet.yml +++ b/.github/workflows/parquet.yml @@ -19,7 +19,6 @@ # tests for parquet crate name: "parquet" - # trigger for all PRs that touch certain files and changes to master on: push: @@ -58,7 +57,6 @@ jobs: - name: Test --all-features run: cargo test -p parquet --all-features - # test compilation linux-features: name: Check Compilation @@ -120,6 +118,38 @@ jobs: - name: Build wasm32-wasi run: cargo build -p parquet --no-default-features --features cli,snap,flate2,brotli --target wasm32-wasi + pyspark-integration-test: + name: PySpark Integration Test + runs-on: ubuntu-latest + strategy: + matrix: + rust: [stable] + steps: + - uses: actions/checkout@v3 + - name: Setup Python + uses: actions/setup-python@v4 + with: + python-version: "3.10" + cache: "pip" + - name: Install Python dependencies + run: | + cd parquet/pytest + pip install -r requirements.txt + - name: Black check the test files + run: | + cd parquet/pytest + black --check *.py --verbose + - name: Setup Rust toolchain + run: | + rustup toolchain install ${{ matrix.rust }} + rustup default ${{ matrix.rust }} + - name: Install binary for checking + run: cargo install --path parquet --bin parquet-show-bloom-filter --features=arrow,cli + - name: Run pytest + run: | + cd parquet/pytest + pytest -v + clippy: name: Clippy runs-on: ubuntu-latest diff --git a/.gitignore b/.gitignore index b8506ea06cb..52ad19cb077 100644 --- a/.gitignore +++ b/.gitignore @@ -92,3 +92,6 @@ $RECYCLE.BIN/ # Windows shortcuts *.lnk +# Python virtual env in parquet crate +parquet/pytest/venv/ +__pycache__/ diff --git a/parquet/README.md b/parquet/README.md index c9245b08211..d904fc64e74 100644 --- a/parquet/README.md +++ b/parquet/README.md @@ -41,7 +41,6 @@ However, for historical reasons, this crate uses versions with major numbers gre The `parquet` crate provides the following features which may be enabled in your `Cargo.toml`: - `arrow` (default) - support for reading / writing [`arrow`](https://crates.io/crates/arrow) arrays to / from parquet -- `bloom` (default) - support for [split block bloom filter](https://github.com/apache/parquet-format/blob/master/BloomFilter.md) for reading from / writing to parquet - `async` - support `async` APIs for reading parquet - `json` - support for reading / writing `json` data to / from parquet - `brotli` (default) - support for parquet using `brotli` compression diff --git a/parquet/pytest/pyspark_integration_test.py b/parquet/pytest/pyspark_integration_test.py new file mode 100755 index 00000000000..0a0b881e3e9 --- /dev/null +++ b/parquet/pytest/pyspark_integration_test.py @@ -0,0 +1,65 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import pyspark.sql +import tempfile +import subprocess +import pathlib + + +def create_data_and_df(): + spark = pyspark.sql.SparkSession.builder.getOrCreate() + spark.conf.set("parquet.bloom.filter.enabled", True) + spark.conf.set("parquet.bloom.filter.expected.ndv", 10) + spark.conf.set("parquet.bloom.filter.max.bytes", 32) + data = [(f"id-{i % 10}", f"name-{i%10}") for i in range(100)] + df = spark.createDataFrame(data, ["id", "name"]).repartition(1) + return data, df + + +def get_expected_output(data): + expected = ["Row group #0", "=" * 80] + for v in data: + expected.append(f"Value {v[0]} is present in bloom filter") + for v in data: + expected.append(f"Value {v[1]} is absent in bloom filter") + expected = "\n".join(expected) + "\n" + return expected.encode("utf-8") + + +def get_cli_output(output_dir, data, col_name="id"): + # take the first (and only) parquet file + parquet_file = sorted(pathlib.Path(output_dir).glob("*.parquet"))[0] + args = [ + "parquet-show-bloom-filter", + "--file-name", + parquet_file, + "--column", + col_name, + ] + for v in data: + args.extend(["--values", v[0]]) + for v in data: + args.extend(["--values", v[1]]) + return subprocess.check_output(args) + + +def test_pyspark_bloom_filter(): + data, df = create_data_and_df() + with tempfile.TemporaryDirectory() as output_dir: + df.write.parquet(output_dir, mode="overwrite") + cli_output = get_cli_output(output_dir, data) + assert cli_output == get_expected_output(data) diff --git a/parquet/pytest/requirements.in b/parquet/pytest/requirements.in new file mode 100644 index 00000000000..a0b30b86762 --- /dev/null +++ b/parquet/pytest/requirements.in @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +pytest +pyspark +black + diff --git a/parquet/pytest/requirements.txt b/parquet/pytest/requirements.txt new file mode 100644 index 00000000000..fb6f8fb6dd9 --- /dev/null +++ b/parquet/pytest/requirements.txt @@ -0,0 +1,102 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# This file is autogenerated by pip-compile with python 3.10 +# To update, run: +# +# pip-compile --generate-hashes --resolver=backtracking +# +attrs==22.1.0 \ + --hash=sha256:29adc2665447e5191d0e7c568fde78b21f9672d344281d0c6e1ab085429b22b6 \ + --hash=sha256:86efa402f67bf2df34f51a335487cf46b1ec130d02b8d39fd248abfd30da551c + # via pytest +black==22.10.0 \ + --hash=sha256:14ff67aec0a47c424bc99b71005202045dc09270da44a27848d534600ac64fc7 \ + --hash=sha256:197df8509263b0b8614e1df1756b1dd41be6738eed2ba9e9769f3880c2b9d7b6 \ + --hash=sha256:1e464456d24e23d11fced2bc8c47ef66d471f845c7b7a42f3bd77bf3d1789650 \ + --hash=sha256:2039230db3c6c639bd84efe3292ec7b06e9214a2992cd9beb293d639c6402edb \ + --hash=sha256:21199526696b8f09c3997e2b4db8d0b108d801a348414264d2eb8eb2532e540d \ + --hash=sha256:2644b5d63633702bc2c5f3754b1b475378fbbfb481f62319388235d0cd104c2d \ + --hash=sha256:432247333090c8c5366e69627ccb363bc58514ae3e63f7fc75c54b1ea80fa7de \ + --hash=sha256:444ebfb4e441254e87bad00c661fe32df9969b2bf224373a448d8aca2132b395 \ + --hash=sha256:5b9b29da4f564ba8787c119f37d174f2b69cdfdf9015b7d8c5c16121ddc054ae \ + --hash=sha256:5cc42ca67989e9c3cf859e84c2bf014f6633db63d1cbdf8fdb666dcd9e77e3fa \ + --hash=sha256:5d8f74030e67087b219b032aa33a919fae8806d49c867846bfacde57f43972ef \ + --hash=sha256:72ef3925f30e12a184889aac03d77d031056860ccae8a1e519f6cbb742736383 \ + --hash=sha256:819dc789f4498ecc91438a7de64427c73b45035e2e3680c92e18795a839ebb66 \ + --hash=sha256:915ace4ff03fdfff953962fa672d44be269deb2eaf88499a0f8805221bc68c87 \ + --hash=sha256:9311e99228ae10023300ecac05be5a296f60d2fd10fff31cf5c1fa4ca4b1988d \ + --hash=sha256:974308c58d057a651d182208a484ce80a26dac0caef2895836a92dd6ebd725e0 \ + --hash=sha256:b8b49776299fece66bffaafe357d929ca9451450f5466e997a7285ab0fe28e3b \ + --hash=sha256:c957b2b4ea88587b46cf49d1dc17681c1e672864fd7af32fc1e9664d572b3458 \ + --hash=sha256:e41a86c6c650bcecc6633ee3180d80a025db041a8e2398dcc059b3afa8382cd4 \ + --hash=sha256:f513588da599943e0cde4e32cc9879e825d58720d6557062d1098c5ad80080e1 \ + --hash=sha256:fba8a281e570adafb79f7755ac8721b6cf1bbf691186a287e990c7929c7692ff + # via -r requirements.in +click==8.1.3 \ + --hash=sha256:7682dc8afb30297001674575ea00d1814d808d6a36af415a82bd481d37ba7b8e \ + --hash=sha256:bb4d8133cb15a609f44e8213d9b391b0809795062913b383c62be0ee95b1db48 + # via black +exceptiongroup==1.0.4 \ + --hash=sha256:542adf9dea4055530d6e1279602fa5cb11dab2395fa650b8674eaec35fc4a828 \ + --hash=sha256:bd14967b79cd9bdb54d97323216f8fdf533e278df937aa2a90089e7d6e06e5ec + # via pytest +iniconfig==1.1.1 \ + --hash=sha256:011e24c64b7f47f6ebd835bb12a743f2fbe9a26d4cecaa7f53bc4f35ee9da8b3 \ + --hash=sha256:bc3af051d7d14b2ee5ef9969666def0cd1a000e121eaea580d4a313df4b37f32 + # via pytest +mypy-extensions==0.4.3 \ + --hash=sha256:090fedd75945a69ae91ce1303b5824f428daf5a028d2f6ab8a299250a846f15d \ + --hash=sha256:2d82818f5bb3e369420cb3c4060a7970edba416647068eb4c5343488a6c604a8 + # via black +packaging==21.3 \ + --hash=sha256:dd47c42927d89ab911e606518907cc2d3a1f38bbd026385970643f9c5b8ecfeb \ + --hash=sha256:ef103e05f519cdc783ae24ea4e2e0f508a9c99b2d4969652eed6a2e1ea5bd522 + # via pytest +pathspec==0.10.2 \ + --hash=sha256:88c2606f2c1e818b978540f73ecc908e13999c6c3a383daf3705652ae79807a5 \ + --hash=sha256:8f6bf73e5758fd365ef5d58ce09ac7c27d2833a8d7da51712eac6e27e35141b0 + # via black +platformdirs==2.5.4 \ + --hash=sha256:1006647646d80f16130f052404c6b901e80ee4ed6bef6792e1f238a8969106f7 \ + --hash=sha256:af0276409f9a02373d540bf8480021a048711d572745aef4b7842dad245eba10 + # via black +pluggy==1.0.0 \ + --hash=sha256:4224373bacce55f955a878bf9cfa763c1e360858e330072059e10bad68531159 \ + --hash=sha256:74134bbf457f031a36d68416e1509f34bd5ccc019f0bcc952c7b909d06b37bd3 + # via pytest +py4j==0.10.9.5 \ + --hash=sha256:276a4a3c5a2154df1860ef3303a927460e02e97b047dc0a47c1c3fb8cce34db6 \ + --hash=sha256:52d171a6a2b031d8a5d1de6efe451cf4f5baff1a2819aabc3741c8406539ba04 + # via pyspark +pyparsing==3.0.9 \ + --hash=sha256:2b020ecf7d21b687f219b71ecad3631f644a47f01403fa1d1036b0c6416d70fb \ + --hash=sha256:5026bae9a10eeaefb61dab2f09052b9f4307d44aee4eda64b309723d8d206bbc + # via packaging +pyspark==3.3.1 \ + --hash=sha256:e99fa7de92be406884bfd831c32b9306a3a99de44cfc39a2eefb6ed07445d5fa + # via -r requirements.in +pytest==7.2.0 \ + --hash=sha256:892f933d339f068883b6fd5a459f03d85bfcb355e4981e146d2c7616c21fef71 \ + --hash=sha256:c4014eb40e10f11f355ad4e3c2fb2c6c6d1919c73f3b5a433de4708202cade59 + # via -r requirements.in +tomli==2.0.1 \ + --hash=sha256:939de3e7a6161af0c887ef91b7d41a53e7c5a1ca976325f429cb46ea9bc30ecc \ + --hash=sha256:de526c12914f0c550d15924c62d72abc48d6fe7364aa87328337a31007fe8a4f + # via + # black + # pytest diff --git a/parquet/src/bin/parquet-show-bloom-filter.rs b/parquet/src/bin/parquet-show-bloom-filter.rs index 55ecb2abf13..f9462327f83 100644 --- a/parquet/src/bin/parquet-show-bloom-filter.rs +++ b/parquet/src/bin/parquet-show-bloom-filter.rs @@ -34,7 +34,11 @@ //! ``` use clap::Parser; -use parquet::file::reader::{FileReader, SerializedFileReader}; +use parquet::file::{ + properties::ReaderProperties, + reader::{FileReader, SerializedFileReader}, + serialized_reader::ReadOptionsBuilder, +}; use std::{fs::File, path::Path}; #[derive(Debug, Parser)] @@ -63,8 +67,17 @@ fn main() { let path = Path::new(&file_name); let file = File::open(path).expect("Unable to open file"); - let file_reader = - SerializedFileReader::new(file).expect("Unable to open file as Parquet"); + let file_reader = SerializedFileReader::new_with_options( + file, + ReadOptionsBuilder::new() + .with_reader_properties( + ReaderProperties::builder() + .set_read_bloom_filter(true) + .build(), + ) + .build(), + ) + .expect("Unable to open file as Parquet"); let metadata = file_reader.metadata(); for (ri, row_group) in metadata.row_groups().iter().enumerate() { println!("Row group #{}", ri);