Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bloom filter part V: add an integration with pytest against pyspark #3176

Merged
merged 1 commit into from Nov 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
34 changes: 32 additions & 2 deletions .github/workflows/parquet.yml
Expand Up @@ -19,7 +19,6 @@
# tests for parquet crate
name: "parquet"


# trigger for all PRs that touch certain files and changes to master
on:
push:
Expand Down Expand Up @@ -58,7 +57,6 @@ jobs:
- name: Test --all-features
run: cargo test -p parquet --all-features


# test compilation
linux-features:
name: Check Compilation
Expand Down Expand Up @@ -120,6 +118,38 @@ jobs:
- name: Build wasm32-wasi
run: cargo build -p parquet --no-default-features --features cli,snap,flate2,brotli --target wasm32-wasi

pyspark-integration-test:
name: PySpark Integration Test
runs-on: ubuntu-latest
strategy:
matrix:
rust: [stable]
steps:
- uses: actions/checkout@v3
- name: Setup Python
uses: actions/setup-python@v4
with:
python-version: "3.10"
cache: "pip"
- name: Install Python dependencies
run: |
cd parquet/pytest
pip install -r requirements.txt
- name: Black check the test files
run: |
cd parquet/pytest
black --check *.py --verbose
- name: Setup Rust toolchain
run: |
rustup toolchain install ${{ matrix.rust }}
rustup default ${{ matrix.rust }}
- name: Install binary for checking
run: cargo install --path parquet --bin parquet-show-bloom-filter --features=arrow,cli
- name: Run pytest
run: |
cd parquet/pytest
pytest -v

clippy:
name: Clippy
runs-on: ubuntu-latest
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Expand Up @@ -92,3 +92,6 @@ $RECYCLE.BIN/
# Windows shortcuts
*.lnk

# Python virtual env in parquet crate
parquet/pytest/venv/
__pycache__/
1 change: 0 additions & 1 deletion parquet/README.md
Expand Up @@ -41,7 +41,6 @@ However, for historical reasons, this crate uses versions with major numbers gre
The `parquet` crate provides the following features which may be enabled in your `Cargo.toml`:

- `arrow` (default) - support for reading / writing [`arrow`](https://crates.io/crates/arrow) arrays to / from parquet
- `bloom` (default) - support for [split block bloom filter](https://github.com/apache/parquet-format/blob/master/BloomFilter.md) for reading from / writing to parquet
- `async` - support `async` APIs for reading parquet
- `json` - support for reading / writing `json` data to / from parquet
- `brotli` (default) - support for parquet using `brotli` compression
Expand Down
65 changes: 65 additions & 0 deletions parquet/pytest/pyspark_integration_test.py
@@ -0,0 +1,65 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import pyspark.sql
import tempfile
import subprocess
import pathlib


def create_data_and_df():
spark = pyspark.sql.SparkSession.builder.getOrCreate()
spark.conf.set("parquet.bloom.filter.enabled", True)
spark.conf.set("parquet.bloom.filter.expected.ndv", 10)
spark.conf.set("parquet.bloom.filter.max.bytes", 32)
data = [(f"id-{i % 10}", f"name-{i%10}") for i in range(100)]
df = spark.createDataFrame(data, ["id", "name"]).repartition(1)
return data, df
jimexist marked this conversation as resolved.
Show resolved Hide resolved


def get_expected_output(data):
expected = ["Row group #0", "=" * 80]
for v in data:
expected.append(f"Value {v[0]} is present in bloom filter")
for v in data:
expected.append(f"Value {v[1]} is absent in bloom filter")
expected = "\n".join(expected) + "\n"
return expected.encode("utf-8")


def get_cli_output(output_dir, data, col_name="id"):
# take the first (and only) parquet file
parquet_file = sorted(pathlib.Path(output_dir).glob("*.parquet"))[0]
args = [
"parquet-show-bloom-filter",
"--file-name",
parquet_file,
"--column",
col_name,
]
for v in data:
args.extend(["--values", v[0]])
for v in data:
args.extend(["--values", v[1]])
return subprocess.check_output(args)


def test_pyspark_bloom_filter():
data, df = create_data_and_df()
with tempfile.TemporaryDirectory() as output_dir:
df.write.parquet(output_dir, mode="overwrite")
cli_output = get_cli_output(output_dir, data)
assert cli_output == get_expected_output(data)
20 changes: 20 additions & 0 deletions parquet/pytest/requirements.in
@@ -0,0 +1,20 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
pytest
pyspark
black

102 changes: 102 additions & 0 deletions parquet/pytest/requirements.txt
@@ -0,0 +1,102 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# This file is autogenerated by pip-compile with python 3.10
# To update, run:
#
# pip-compile --generate-hashes --resolver=backtracking
#
attrs==22.1.0 \
--hash=sha256:29adc2665447e5191d0e7c568fde78b21f9672d344281d0c6e1ab085429b22b6 \
--hash=sha256:86efa402f67bf2df34f51a335487cf46b1ec130d02b8d39fd248abfd30da551c
# via pytest
black==22.10.0 \
--hash=sha256:14ff67aec0a47c424bc99b71005202045dc09270da44a27848d534600ac64fc7 \
--hash=sha256:197df8509263b0b8614e1df1756b1dd41be6738eed2ba9e9769f3880c2b9d7b6 \
--hash=sha256:1e464456d24e23d11fced2bc8c47ef66d471f845c7b7a42f3bd77bf3d1789650 \
--hash=sha256:2039230db3c6c639bd84efe3292ec7b06e9214a2992cd9beb293d639c6402edb \
--hash=sha256:21199526696b8f09c3997e2b4db8d0b108d801a348414264d2eb8eb2532e540d \
--hash=sha256:2644b5d63633702bc2c5f3754b1b475378fbbfb481f62319388235d0cd104c2d \
--hash=sha256:432247333090c8c5366e69627ccb363bc58514ae3e63f7fc75c54b1ea80fa7de \
--hash=sha256:444ebfb4e441254e87bad00c661fe32df9969b2bf224373a448d8aca2132b395 \
--hash=sha256:5b9b29da4f564ba8787c119f37d174f2b69cdfdf9015b7d8c5c16121ddc054ae \
--hash=sha256:5cc42ca67989e9c3cf859e84c2bf014f6633db63d1cbdf8fdb666dcd9e77e3fa \
--hash=sha256:5d8f74030e67087b219b032aa33a919fae8806d49c867846bfacde57f43972ef \
--hash=sha256:72ef3925f30e12a184889aac03d77d031056860ccae8a1e519f6cbb742736383 \
--hash=sha256:819dc789f4498ecc91438a7de64427c73b45035e2e3680c92e18795a839ebb66 \
--hash=sha256:915ace4ff03fdfff953962fa672d44be269deb2eaf88499a0f8805221bc68c87 \
--hash=sha256:9311e99228ae10023300ecac05be5a296f60d2fd10fff31cf5c1fa4ca4b1988d \
--hash=sha256:974308c58d057a651d182208a484ce80a26dac0caef2895836a92dd6ebd725e0 \
--hash=sha256:b8b49776299fece66bffaafe357d929ca9451450f5466e997a7285ab0fe28e3b \
--hash=sha256:c957b2b4ea88587b46cf49d1dc17681c1e672864fd7af32fc1e9664d572b3458 \
--hash=sha256:e41a86c6c650bcecc6633ee3180d80a025db041a8e2398dcc059b3afa8382cd4 \
--hash=sha256:f513588da599943e0cde4e32cc9879e825d58720d6557062d1098c5ad80080e1 \
--hash=sha256:fba8a281e570adafb79f7755ac8721b6cf1bbf691186a287e990c7929c7692ff
# via -r requirements.in
click==8.1.3 \
--hash=sha256:7682dc8afb30297001674575ea00d1814d808d6a36af415a82bd481d37ba7b8e \
--hash=sha256:bb4d8133cb15a609f44e8213d9b391b0809795062913b383c62be0ee95b1db48
# via black
exceptiongroup==1.0.4 \
--hash=sha256:542adf9dea4055530d6e1279602fa5cb11dab2395fa650b8674eaec35fc4a828 \
--hash=sha256:bd14967b79cd9bdb54d97323216f8fdf533e278df937aa2a90089e7d6e06e5ec
# via pytest
iniconfig==1.1.1 \
--hash=sha256:011e24c64b7f47f6ebd835bb12a743f2fbe9a26d4cecaa7f53bc4f35ee9da8b3 \
--hash=sha256:bc3af051d7d14b2ee5ef9969666def0cd1a000e121eaea580d4a313df4b37f32
# via pytest
mypy-extensions==0.4.3 \
--hash=sha256:090fedd75945a69ae91ce1303b5824f428daf5a028d2f6ab8a299250a846f15d \
--hash=sha256:2d82818f5bb3e369420cb3c4060a7970edba416647068eb4c5343488a6c604a8
# via black
packaging==21.3 \
--hash=sha256:dd47c42927d89ab911e606518907cc2d3a1f38bbd026385970643f9c5b8ecfeb \
--hash=sha256:ef103e05f519cdc783ae24ea4e2e0f508a9c99b2d4969652eed6a2e1ea5bd522
# via pytest
pathspec==0.10.2 \
--hash=sha256:88c2606f2c1e818b978540f73ecc908e13999c6c3a383daf3705652ae79807a5 \
--hash=sha256:8f6bf73e5758fd365ef5d58ce09ac7c27d2833a8d7da51712eac6e27e35141b0
# via black
platformdirs==2.5.4 \
--hash=sha256:1006647646d80f16130f052404c6b901e80ee4ed6bef6792e1f238a8969106f7 \
--hash=sha256:af0276409f9a02373d540bf8480021a048711d572745aef4b7842dad245eba10
# via black
pluggy==1.0.0 \
--hash=sha256:4224373bacce55f955a878bf9cfa763c1e360858e330072059e10bad68531159 \
--hash=sha256:74134bbf457f031a36d68416e1509f34bd5ccc019f0bcc952c7b909d06b37bd3
# via pytest
py4j==0.10.9.5 \
--hash=sha256:276a4a3c5a2154df1860ef3303a927460e02e97b047dc0a47c1c3fb8cce34db6 \
--hash=sha256:52d171a6a2b031d8a5d1de6efe451cf4f5baff1a2819aabc3741c8406539ba04
# via pyspark
pyparsing==3.0.9 \
--hash=sha256:2b020ecf7d21b687f219b71ecad3631f644a47f01403fa1d1036b0c6416d70fb \
--hash=sha256:5026bae9a10eeaefb61dab2f09052b9f4307d44aee4eda64b309723d8d206bbc
# via packaging
pyspark==3.3.1 \
--hash=sha256:e99fa7de92be406884bfd831c32b9306a3a99de44cfc39a2eefb6ed07445d5fa
# via -r requirements.in
pytest==7.2.0 \
--hash=sha256:892f933d339f068883b6fd5a459f03d85bfcb355e4981e146d2c7616c21fef71 \
--hash=sha256:c4014eb40e10f11f355ad4e3c2fb2c6c6d1919c73f3b5a433de4708202cade59
# via -r requirements.in
tomli==2.0.1 \
--hash=sha256:939de3e7a6161af0c887ef91b7d41a53e7c5a1ca976325f429cb46ea9bc30ecc \
--hash=sha256:de526c12914f0c550d15924c62d72abc48d6fe7364aa87328337a31007fe8a4f
# via
# black
# pytest
19 changes: 16 additions & 3 deletions parquet/src/bin/parquet-show-bloom-filter.rs
Expand Up @@ -34,7 +34,11 @@
//! ```

use clap::Parser;
use parquet::file::reader::{FileReader, SerializedFileReader};
use parquet::file::{
properties::ReaderProperties,
reader::{FileReader, SerializedFileReader},
serialized_reader::ReadOptionsBuilder,
};
use std::{fs::File, path::Path};

#[derive(Debug, Parser)]
Expand Down Expand Up @@ -63,8 +67,17 @@ fn main() {
let path = Path::new(&file_name);
let file = File::open(path).expect("Unable to open file");

let file_reader =
SerializedFileReader::new(file).expect("Unable to open file as Parquet");
let file_reader = SerializedFileReader::new_with_options(
file,
ReadOptionsBuilder::new()
.with_reader_properties(
ReaderProperties::builder()
.set_read_bloom_filter(true)
.build(),
)
.build(),
)
.expect("Unable to open file as Parquet");
let metadata = file_reader.metadata();
for (ri, row_group) in metadata.row_groups().iter().enumerate() {
println!("Row group #{}", ri);
Expand Down